package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.class */
final class ReadBufferManager {
    private static final int ONE_KB = 1024;
    private static final int ONE_MB = 1048576;
    private static final int NUM_BUFFERS = 16;
    private static final int NUM_THREADS = 8;
    private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000;
    private byte[][] buffers;
    private static ReadBufferManager bufferManager;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ReadBufferManager.class);
    private static int blockSize = 4194304;
    private static int thresholdAgeMilliseconds = 3000;
    private static final ReentrantLock LOCK = new ReentrantLock();
    private Thread[] threads = new Thread[8];
    private Stack<Integer> freeList = new Stack<>();
    private Queue<ReadBuffer> readAheadQueue = new LinkedList();
    private LinkedList<ReadBuffer> inProgressList = new LinkedList<>();
    private LinkedList<ReadBuffer> completedReadList = new LinkedList<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ReadBufferManager getBufferManager() {
        if (bufferManager == null) {
            LOCK.lock();
            try {
                if (bufferManager == null) {
                    bufferManager = new ReadBufferManager();
                    bufferManager.init();
                }
                LOCK.unlock();
            } catch (Throwable th) {
                LOCK.unlock();
                throw th;
            }
        }
        return bufferManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setReadBufferManagerConfigs(int i) {
        if (bufferManager == null) {
            LOGGER.debug("ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}", Integer.valueOf(i));
            blockSize = i;
        }
    }

    /* JADX WARN: Type inference failed for: r1v1, types: [byte[], byte[][]] */
    private void init() {
        this.buffers = new byte[16];
        for (int i = 0; i < 16; i++) {
            this.buffers[i] = new byte[blockSize];
            this.freeList.add(Integer.valueOf(i));
        }
        for (int i2 = 0; i2 < 8; i2++) {
            Thread thread = new Thread(new ReadBufferWorker(i2));
            thread.setDaemon(true);
            this.threads[i2] = thread;
            thread.setName("ABFS-prefetch-" + i2);
            thread.start();
        }
        ReadBufferWorker.UNLEASH_WORKERS.countDown();
    }

    private ReadBufferManager() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void queueReadAhead(AbfsInputStream abfsInputStream, long j, int i, TracingContext tracingContext) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", abfsInputStream.getPath(), Long.valueOf(j), Integer.valueOf(i));
        }
        synchronized (this) {
            if (isAlreadyQueued(abfsInputStream, j)) {
                return;
            }
            if (!this.freeList.isEmpty() || tryEvict()) {
                ReadBuffer readBuffer = new ReadBuffer();
                readBuffer.setStream(abfsInputStream);
                readBuffer.setOffset(j);
                readBuffer.setLength(0);
                readBuffer.setRequestedLength(i);
                readBuffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
                readBuffer.setLatch(new CountDownLatch(1));
                readBuffer.setTracingContext(tracingContext);
                Integer pop = this.freeList.pop();
                readBuffer.setBuffer(this.buffers[pop.intValue()]);
                readBuffer.setBufferindex(pop.intValue());
                this.readAheadQueue.add(readBuffer);
                notifyAll();
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", abfsInputStream.getPath(), Long.valueOf(j), Integer.valueOf(readBuffer.getBufferindex()));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getBlock(AbfsInputStream abfsInputStream, long j, int i, byte[] bArr) throws IOException {
        int blockFromCompletedQueue;
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("getBlock for file {}  position {}  thread {}", abfsInputStream.getPath(), Long.valueOf(j), Thread.currentThread().getName());
        }
        waitForProcess(abfsInputStream, j);
        synchronized (this) {
            blockFromCompletedQueue = getBlockFromCompletedQueue(abfsInputStream, j, i, bArr);
        }
        if (blockFromCompletedQueue <= 0) {
            return 0;
        }
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Done read from Cache for {} position {} length {}", abfsInputStream.getPath(), Long.valueOf(j), Integer.valueOf(blockFromCompletedQueue));
        }
        return blockFromCompletedQueue;
    }

    private void waitForProcess(AbfsInputStream abfsInputStream, long j) {
        ReadBuffer fromList;
        synchronized (this) {
            clearFromReadAheadQueue(abfsInputStream, j);
            fromList = getFromList(this.inProgressList, abfsInputStream, j);
        }
        if (fromList != null) {
            try {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", abfsInputStream.getPath(), Long.valueOf(fromList.getOffset()), Integer.valueOf(fromList.getBufferindex()));
                }
                fromList.getLatch().await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("latch done for file {} buffer idx {} length {}", abfsInputStream.getPath(), Integer.valueOf(fromList.getBufferindex()), Integer.valueOf(fromList.getLength()));
            }
        }
    }

    private synchronized boolean tryEvict() {
        ReadBuffer readBuffer = null;
        if (this.completedReadList.size() <= 0) {
            return false;
        }
        long currentTimeMillis = currentTimeMillis();
        Iterator<ReadBuffer> it = this.completedReadList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ReadBuffer next = it.next();
            if (next.isFirstByteConsumed() && next.isLastByteConsumed()) {
                readBuffer = next;
                break;
            }
        }
        if (readBuffer != null) {
            return evict(readBuffer);
        }
        Iterator<ReadBuffer> it2 = this.completedReadList.iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            ReadBuffer next2 = it2.next();
            if (next2.isAnyByteConsumed()) {
                readBuffer = next2;
                break;
            }
        }
        if (readBuffer != null) {
            return evict(readBuffer);
        }
        long j = Long.MAX_VALUE;
        ArrayList arrayList = new ArrayList();
        Iterator<ReadBuffer> it3 = this.completedReadList.iterator();
        while (it3.hasNext()) {
            ReadBuffer next3 = it3.next();
            if (next3.getBufferindex() != -1 && next3.getTimeStamp() < j) {
                readBuffer = next3;
                j = next3.getTimeStamp();
            } else if (next3.getBufferindex() == -1 && currentTimeMillis - next3.getTimeStamp() > thresholdAgeMilliseconds) {
                arrayList.add(next3);
            }
        }
        Iterator it4 = arrayList.iterator();
        while (it4.hasNext()) {
            evict((ReadBuffer) it4.next());
        }
        if (currentTimeMillis - j > thresholdAgeMilliseconds && readBuffer != null) {
            return evict(readBuffer);
        }
        LOGGER.trace("No buffer eligible for eviction");
        return false;
    }

    private boolean evict(ReadBuffer readBuffer) {
        if (readBuffer.getBufferindex() != -1) {
            this.freeList.push(Integer.valueOf(readBuffer.getBufferindex()));
        }
        this.completedReadList.remove(readBuffer);
        readBuffer.setTracingContext(null);
        if (!LOGGER.isTraceEnabled()) {
            return true;
        }
        LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", Integer.valueOf(readBuffer.getBufferindex()), readBuffer.getStream().getPath(), Long.valueOf(readBuffer.getOffset()), Integer.valueOf(readBuffer.getLength()));
        return true;
    }

    private boolean isAlreadyQueued(AbfsInputStream abfsInputStream, long j) {
        return isInList(this.readAheadQueue, abfsInputStream, j) || isInList(this.inProgressList, abfsInputStream, j) || isInList(this.completedReadList, abfsInputStream, j);
    }

    private boolean isInList(Collection<ReadBuffer> collection, AbfsInputStream abfsInputStream, long j) {
        return getFromList(collection, abfsInputStream, j) != null;
    }

    private ReadBuffer getFromList(Collection<ReadBuffer> collection, AbfsInputStream abfsInputStream, long j) {
        for (ReadBuffer readBuffer : collection) {
            if (readBuffer.getStream() == abfsInputStream) {
                if (readBuffer.getStatus() == ReadBufferStatus.AVAILABLE && j >= readBuffer.getOffset() && j < readBuffer.getOffset() + readBuffer.getLength()) {
                    return readBuffer;
                }
                if (j >= readBuffer.getOffset() && j < readBuffer.getOffset() + readBuffer.getRequestedLength()) {
                    return readBuffer;
                }
            }
        }
        return null;
    }

    private ReadBuffer getBufferFromCompletedQueue(AbfsInputStream abfsInputStream, long j) {
        Iterator<ReadBuffer> it = this.completedReadList.iterator();
        while (it.hasNext()) {
            ReadBuffer next = it.next();
            if (next.getStream() == abfsInputStream && j >= next.getOffset() && (j < next.getOffset() + next.getLength() || j < next.getOffset() + next.getRequestedLength())) {
                return next;
            }
        }
        return null;
    }

    private void clearFromReadAheadQueue(AbfsInputStream abfsInputStream, long j) {
        ReadBuffer fromList = getFromList(this.readAheadQueue, abfsInputStream, j);
        if (fromList != null) {
            this.readAheadQueue.remove(fromList);
            notifyAll();
            this.freeList.push(Integer.valueOf(fromList.getBufferindex()));
        }
    }

    private int getBlockFromCompletedQueue(AbfsInputStream abfsInputStream, long j, int i, byte[] bArr) throws IOException {
        ReadBuffer bufferFromCompletedQueue = getBufferFromCompletedQueue(abfsInputStream, j);
        if (bufferFromCompletedQueue == null) {
            return 0;
        }
        if (bufferFromCompletedQueue.getStatus() == ReadBufferStatus.READ_FAILED) {
            if (currentTimeMillis() - bufferFromCompletedQueue.getTimeStamp() < thresholdAgeMilliseconds) {
                throw bufferFromCompletedQueue.getErrException();
            }
            return 0;
        }
        if (bufferFromCompletedQueue.getStatus() != ReadBufferStatus.AVAILABLE || j >= bufferFromCompletedQueue.getOffset() + bufferFromCompletedQueue.getLength()) {
            return 0;
        }
        int offset = (int) (j - bufferFromCompletedQueue.getOffset());
        int min = Math.min(i, bufferFromCompletedQueue.getLength() - offset);
        System.arraycopy(bufferFromCompletedQueue.getBuffer(), offset, bArr, 0, min);
        if (offset == 0) {
            bufferFromCompletedQueue.setFirstByteConsumed(true);
        }
        if (offset + min == bufferFromCompletedQueue.getLength()) {
            bufferFromCompletedQueue.setLastByteConsumed(true);
        }
        bufferFromCompletedQueue.setAnyByteConsumed(true);
        return min;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadBuffer getNextBlockToRead() throws InterruptedException {
        synchronized (this) {
            while (this.readAheadQueue.size() == 0) {
                wait();
            }
            ReadBuffer remove = this.readAheadQueue.remove();
            notifyAll();
            if (remove == null) {
                return null;
            }
            remove.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
            this.inProgressList.add(remove);
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("ReadBufferWorker picked file {} for offset {}", remove.getStream().getPath(), Long.valueOf(remove.getOffset()));
            }
            return remove;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doneReading(ReadBuffer readBuffer, ReadBufferStatus readBufferStatus, int i) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", readBuffer.getStream().getPath(), Long.valueOf(readBuffer.getOffset()), Integer.valueOf(i));
        }
        synchronized (this) {
            if (this.inProgressList.contains(readBuffer)) {
                this.inProgressList.remove(readBuffer);
                if (readBufferStatus != ReadBufferStatus.AVAILABLE || i <= 0) {
                    this.freeList.push(Integer.valueOf(readBuffer.getBufferindex()));
                } else {
                    readBuffer.setStatus(ReadBufferStatus.AVAILABLE);
                    readBuffer.setLength(i);
                }
                readBuffer.setStatus(readBufferStatus);
                readBuffer.setTimeStamp(currentTimeMillis());
                this.completedReadList.add(readBuffer);
            }
        }
        readBuffer.getLatch().countDown();
    }

    private long currentTimeMillis() {
        return (System.nanoTime() / 1000) / 1000;
    }

    @VisibleForTesting
    int getThresholdAgeMilliseconds() {
        return thresholdAgeMilliseconds;
    }

    @VisibleForTesting
    static void setThresholdAgeMilliseconds(int i) {
        thresholdAgeMilliseconds = i;
    }

    @VisibleForTesting
    int getCompletedReadListSize() {
        return this.completedReadList.size();
    }

    @VisibleForTesting
    public synchronized List<ReadBuffer> getCompletedReadListCopy() {
        return new ArrayList(this.completedReadList);
    }

    @VisibleForTesting
    public synchronized List<Integer> getFreeListCopy() {
        return new ArrayList(this.freeList);
    }

    @VisibleForTesting
    public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
        return new ArrayList(this.readAheadQueue);
    }

    @VisibleForTesting
    public synchronized List<ReadBuffer> getInProgressCopiedList() {
        return new ArrayList(this.inProgressList);
    }

    @VisibleForTesting
    void callTryEvict() {
        tryEvict();
    }

    public synchronized void purgeBuffersForStream(AbfsInputStream abfsInputStream) {
        LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", abfsInputStream);
        this.readAheadQueue.removeIf(readBuffer -> {
            return readBuffer.getStream() == abfsInputStream;
        });
        purgeList(abfsInputStream, this.completedReadList);
        purgeList(abfsInputStream, this.inProgressList);
    }

    private void purgeList(AbfsInputStream abfsInputStream, LinkedList<ReadBuffer> linkedList) {
        Iterator<ReadBuffer> it = linkedList.iterator();
        while (it.hasNext()) {
            ReadBuffer next = it.next();
            if (next.getStream() == abfsInputStream) {
                it.remove();
                if (next.getBufferindex() != -1) {
                    this.freeList.push(Integer.valueOf(next.getBufferindex()));
                }
            }
        }
    }

    @VisibleForTesting
    void testResetReadBufferManager() {
        synchronized (this) {
            ArrayList arrayList = new ArrayList();
            Iterator<ReadBuffer> it = this.completedReadList.iterator();
            while (it.hasNext()) {
                ReadBuffer next = it.next();
                if (next != null) {
                    arrayList.add(next);
                }
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                evict((ReadBuffer) it2.next());
            }
            this.readAheadQueue.clear();
            this.inProgressList.clear();
            this.completedReadList.clear();
            this.freeList.clear();
            for (int i = 0; i < 16; i++) {
                this.buffers[i] = null;
            }
            this.buffers = (byte[][]) null;
            resetBufferManager();
        }
    }

    @VisibleForTesting
    static void resetBufferManager() {
        bufferManager = null;
    }

    @VisibleForTesting
    void testResetReadBufferManager(int i, int i2) {
        setBlockSize(i);
        setThresholdAgeMilliseconds(i2);
        testResetReadBufferManager();
    }

    @VisibleForTesting
    static void setBlockSize(int i) {
        blockSize = i;
    }

    @VisibleForTesting
    int getReadAheadBlockSize() {
        return blockSize;
    }

    @VisibleForTesting
    void testMimicFullUseAndAddFailedBuffer(ReadBuffer readBuffer) {
        this.freeList.clear();
        this.completedReadList.add(readBuffer);
    }
}
