package com.aliyun.emr.rss.client.read;

import com.aliyun.emr.rss.client.compress.RssLz4Decompressor;
import com.aliyun.emr.rss.client.compress.RssLz4Trait;
import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
import com.aliyun.emr.rss.common.network.buffer.NettyManagedBuffer;
import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
import com.aliyun.emr.rss.common.unsafe.Platform;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/emr/rss/client/read/RssInputStream.class */
public abstract class RssInputStream extends InputStream {
    private static final Logger logger = LoggerFactory.getLogger(RssInputStream.class);
    private static final RssInputStream emptyInputStream = new RssInputStream() { // from class: com.aliyun.emr.rss.client.read.RssInputStream.1
        @Override // java.io.InputStream
        public int read() throws IOException {
            return -1;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            return -1;
        }

        @Override // com.aliyun.emr.rss.client.read.RssInputStream
        public void setCallback(MetricsCallback metricsCallback) {
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/emr/rss/client/read/RssInputStream$RssInputStreamImpl.class */
    public static final class RssInputStreamImpl extends RssInputStream {
        private final RssConf conf;
        private final TransportClientFactory clientFactory;
        private final String shuffleKey;
        private final PartitionLocation[] locations;
        private final int[] attempts;
        private final int attemptNumber;
        private final int startMapIndex;
        private final int endMapIndex;
        private final int maxInFlight;
        private byte[] compressedBuf;
        private byte[] decompressedBuf;
        private final RssLz4Decompressor decompressor;
        private ByteBuf currentChunk;
        private PartitionReader currentReader;
        private int fileIndex;
        private int position;
        private int limit;
        private MetricsCallback callback;
        private final Map<Integer, Set<Integer>> batchesRead = new HashMap();
        private final int BATCH_HEADER_SIZE = 16;
        private final byte[] sizeBuf = new byte[16];

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/aliyun/emr/rss/client/read/RssInputStream$RssInputStreamImpl$PartitionReader.class */
        public final class PartitionReader {
            private final RetryingChunkClient client;
            private final int numChunks;
            private int returnedChunks;
            private int chunkIndex;
            private final ChunkReceivedCallback callback;
            private final AtomicReference<IOException> exception = new AtomicReference<>();
            private boolean closed = false;
            private final LinkedBlockingQueue<ByteBuf> results = new LinkedBlockingQueue<>();

            PartitionReader(PartitionLocation partitionLocation) throws IOException {
                this.callback = new ChunkReceivedCallback() { // from class: com.aliyun.emr.rss.client.read.RssInputStream.RssInputStreamImpl.PartitionReader.1
                    @Override // com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback
                    public void onSuccess(int i, ManagedBuffer managedBuffer) {
                        synchronized (PartitionReader.this) {
                            ByteBuf buf = ((NettyManagedBuffer) managedBuffer).getBuf();
                            if (!PartitionReader.this.closed) {
                                buf.retain();
                                PartitionReader.this.results.add(buf);
                            }
                        }
                    }

                    @Override // com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback
                    public void onFailure(int i, Throwable th) {
                        String str = "Fetch chunk " + i + " failed.";
                        RssInputStream.logger.error(str, th);
                        PartitionReader.this.exception.set(new IOException(str, th));
                    }
                };
                this.client = new RetryingChunkClient(RssInputStreamImpl.this.conf, RssInputStreamImpl.this.shuffleKey, partitionLocation, this.callback, RssInputStreamImpl.this.clientFactory, RssInputStreamImpl.this.startMapIndex, RssInputStreamImpl.this.endMapIndex);
                this.numChunks = this.client.openChunks();
            }

            boolean hasNext() {
                return this.returnedChunks < this.numChunks;
            }

            ByteBuf next() throws IOException {
                checkException();
                if (this.chunkIndex < this.numChunks) {
                    fetchChunks();
                }
                ByteBuf byteBuf = null;
                while (byteBuf == null) {
                    try {
                        checkException();
                        byteBuf = this.results.poll(500L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        IOException iOException = new IOException(e);
                        this.exception.set(iOException);
                        throw iOException;
                    }
                }
                this.returnedChunks++;
                return byteBuf;
            }

            void close() {
                synchronized (this) {
                    this.closed = true;
                }
                if (this.results.size() > 0) {
                    this.results.forEach(byteBuf -> {
                        byteBuf.release();
                    });
                }
                this.results.clear();
            }

            private void fetchChunks() {
                int i = this.chunkIndex - this.returnedChunks;
                if (i < RssInputStreamImpl.this.maxInFlight) {
                    int min = Math.min((RssInputStreamImpl.this.maxInFlight - i) + 1, this.numChunks - this.chunkIndex);
                    for (int i2 = 0; i2 < min; i2++) {
                        RetryingChunkClient retryingChunkClient = this.client;
                        int i3 = this.chunkIndex;
                        this.chunkIndex = i3 + 1;
                        retryingChunkClient.fetchChunk(i3);
                    }
                }
            }

            private void checkException() throws IOException {
                IOException iOException = this.exception.get();
                if (iOException != null) {
                    throw iOException;
                }
            }
        }

        RssInputStreamImpl(RssConf rssConf, TransportClientFactory transportClientFactory, String str, final PartitionLocation[] partitionLocationArr, int[] iArr, int i, int i2, int i3) throws IOException {
            this.conf = rssConf;
            this.clientFactory = transportClientFactory;
            this.shuffleKey = str;
            ArrayList arrayList = new ArrayList() { // from class: com.aliyun.emr.rss.client.read.RssInputStream.RssInputStreamImpl.1
                {
                    addAll(Arrays.asList(partitionLocationArr));
                }
            };
            Collections.shuffle(arrayList);
            this.locations = (PartitionLocation[]) arrayList.toArray(new PartitionLocation[partitionLocationArr.length]);
            this.attempts = iArr;
            this.attemptNumber = i;
            this.startMapIndex = i2;
            this.endMapIndex = i3;
            this.maxInFlight = RssConf.fetchChunkMaxReqsInFlight(rssConf);
            int pushDataBufferSize = RssConf.pushDataBufferSize(rssConf) + RssLz4Trait.HEADER_LENGTH;
            this.compressedBuf = new byte[pushDataBufferSize];
            this.decompressedBuf = new byte[pushDataBufferSize];
            this.decompressor = new RssLz4Decompressor();
            moveToNextReader();
        }

        private void moveToNextReader() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
            }
            this.currentReader = createReader(this.locations[this.fileIndex]);
            RssInputStream.logger.info("Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read , get chunks size {}", new Object[]{this.locations[this.fileIndex], Integer.valueOf(this.startMapIndex), Integer.valueOf(this.endMapIndex), Integer.valueOf(this.fileIndex), Integer.valueOf(this.locations.length), Integer.valueOf(this.currentReader.numChunks)});
            while (this.currentReader.numChunks < 1 && this.fileIndex < this.locations.length - 1) {
                this.fileIndex++;
                this.currentReader.close();
                this.currentReader = createReader(this.locations[this.fileIndex]);
                RssInputStream.logger.info("Moved to next partition {},startMapIndex {} endMapIndex {} , {}/{} read , get chunks size {}", new Object[]{this.locations[this.fileIndex], Integer.valueOf(this.startMapIndex), Integer.valueOf(this.endMapIndex), Integer.valueOf(this.fileIndex), Integer.valueOf(this.locations.length), Integer.valueOf(this.currentReader.numChunks)});
            }
            if (this.currentReader.numChunks > 0) {
                this.currentChunk = this.currentReader.next();
                this.fileIndex++;
            } else {
                this.currentReader.close();
                this.currentReader = null;
            }
        }

        private PartitionReader createReader(PartitionLocation partitionLocation) throws IOException {
            if (partitionLocation.getPeer() == null) {
                RssInputStream.logger.debug("Partition {} has only one partition replica.", partitionLocation);
            }
            if (partitionLocation.getPeer() != null && this.attemptNumber % 2 == 1) {
                partitionLocation = partitionLocation.getPeer();
                RssInputStream.logger.debug("Read peer {} for attempt {}.", partitionLocation, Integer.valueOf(this.attemptNumber));
            }
            return new PartitionReader(partitionLocation);
        }

        @Override // com.aliyun.emr.rss.client.read.RssInputStream
        public void setCallback(MetricsCallback metricsCallback) {
            this.callback = metricsCallback;
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            if (this.position < this.limit) {
                byte b = this.decompressedBuf[this.position];
                this.position++;
                return b & 255;
            }
            if (!fillBuffer()) {
                return -1;
            }
            if (this.position >= this.limit) {
                return read();
            }
            byte b2 = this.decompressedBuf[this.position];
            this.position++;
            return b2 & 255;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            if (bArr == null) {
                throw new NullPointerException();
            }
            if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
                throw new IndexOutOfBoundsException();
            }
            if (i2 == 0) {
                return 0;
            }
            int i3 = 0;
            while (true) {
                int i4 = i3;
                if (i4 >= i2) {
                    return i4;
                }
                while (this.position >= this.limit) {
                    if (!fillBuffer()) {
                        if (i4 > 0) {
                            return i4;
                        }
                        return -1;
                    }
                }
                int min = Math.min(this.limit - this.position, i2 - i4);
                System.arraycopy(this.decompressedBuf, this.position, bArr, i + i4, min);
                this.position += min;
                i3 = i4 + min;
            }
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.currentChunk != null) {
                RssInputStream.logger.debug("Release chunk {}!", this.currentChunk);
                this.currentChunk.release();
                this.currentChunk = null;
            }
            if (this.currentReader != null) {
                RssInputStream.logger.debug("Closing reader");
                this.currentReader.close();
                this.currentReader = null;
            }
        }

        private boolean moveToNextChunk() throws IOException {
            if (this.currentChunk != null) {
                this.currentChunk.release();
            }
            this.currentChunk = null;
            if (this.currentReader.hasNext()) {
                this.currentChunk = this.currentReader.next();
                return true;
            }
            if (this.fileIndex < this.locations.length) {
                moveToNextReader();
                return this.currentReader != null;
            }
            this.currentReader = null;
            return false;
        }

        private boolean fillBuffer() throws IOException {
            if (this.currentChunk == null) {
                return false;
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = false;
            while (true) {
                if (!this.currentChunk.isReadable() && !moveToNextChunk()) {
                    break;
                }
                this.currentChunk.readBytes(this.sizeBuf);
                int i = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET);
                int i2 = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET + 4);
                int i3 = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET + 8);
                int i4 = Platform.getInt(this.sizeBuf, Platform.BYTE_ARRAY_OFFSET + 12);
                if (i4 > this.compressedBuf.length) {
                    this.compressedBuf = new byte[i4];
                }
                this.currentChunk.readBytes(this.compressedBuf, 0, i4);
                if (i2 == this.attempts[i]) {
                    if (!this.batchesRead.containsKey(Integer.valueOf(i))) {
                        this.batchesRead.put(Integer.valueOf(i), new HashSet());
                    }
                    Set<Integer> set = this.batchesRead.get(Integer.valueOf(i));
                    if (set.contains(Integer.valueOf(i3))) {
                        RssInputStream.logger.debug("Skip duplicated batch: mapId {}, attemptId {}, batchId {}.", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
                    } else {
                        set.add(Integer.valueOf(i3));
                        if (this.callback != null) {
                            this.callback.incBytesRead(16 + i4);
                        }
                        int originalLen = this.decompressor.getOriginalLen(this.compressedBuf);
                        if (this.decompressedBuf.length < originalLen) {
                            this.decompressedBuf = new byte[originalLen];
                        }
                        this.limit = this.decompressor.decompress(this.compressedBuf, this.decompressedBuf, 0);
                        this.position = 0;
                        z = true;
                    }
                }
            }
            if (this.callback != null) {
                this.callback.incReadTime(System.currentTimeMillis() - currentTimeMillis);
            }
            return z;
        }
    }

    public static RssInputStream create(RssConf rssConf, TransportClientFactory transportClientFactory, String str, PartitionLocation[] partitionLocationArr, int[] iArr, int i, int i2, int i3) throws IOException {
        return (partitionLocationArr == null || partitionLocationArr.length == 0) ? emptyInputStream : new RssInputStreamImpl(rssConf, transportClientFactory, str, partitionLocationArr, iArr, i, i2, i3);
    }

    public static RssInputStream empty() {
        return emptyInputStream;
    }

    public abstract void setCallback(MetricsCallback metricsCallback);
}
