package com.aliyun.emr.rss.client.read;

import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
import com.aliyun.emr.rss.common.network.util.NettyUtils;
import com.aliyun.emr.rss.common.network.util.TransportConf;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
import com.aliyun.emr.rss.common.protocol.TransportModuleConstants;
import com.aliyun.emr.rss.common.util.Utils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/emr/rss/client/read/RetryingChunkClient.class */
public class RetryingChunkClient {
    private static final Logger logger = LoggerFactory.getLogger(RetryingChunkClient.class);
    private static final ExecutorService executorService = Executors.newCachedThreadPool(NettyUtils.createThreadFactory("Chunk Fetch Retry"));
    private final ChunkReceivedCallback callback;
    private final List<Replica> replicas;
    private final long retryWaitMs;
    private final int maxTries;
    private volatile int numTries;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/emr/rss/client/read/RetryingChunkClient$RetryingChunkReceiveCallback.class */
    public class RetryingChunkReceiveCallback implements ChunkReceivedCallback {
        final int currentNumTries;

        RetryingChunkReceiveCallback(int i) {
            this.currentNumTries = i;
        }

        @Override // com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback
        public void onSuccess(int i, ManagedBuffer managedBuffer) {
            RetryingChunkClient.this.callback.onSuccess(i, managedBuffer);
        }

        @Override // com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback
        public void onFailure(int i, Throwable th) {
            if (RetryingChunkClient.this.shouldRetry(th)) {
                RetryingChunkClient.this.initiateRetry(i, this.currentNumTries);
            } else {
                RetryingChunkClient.logger.error("Failed to fetch chunk {}, and will not retry({} tries).", Integer.valueOf(i), Integer.valueOf(this.currentNumTries));
                RetryingChunkClient.this.callback.onFailure(i, th);
            }
        }
    }

    public RetryingChunkClient(RssConf rssConf, String str, PartitionLocation partitionLocation, ChunkReceivedCallback chunkReceivedCallback, TransportClientFactory transportClientFactory) {
        this(rssConf, str, partitionLocation, chunkReceivedCallback, transportClientFactory, 0, Integer.MAX_VALUE);
    }

    public RetryingChunkClient(RssConf rssConf, String str, PartitionLocation partitionLocation, ChunkReceivedCallback chunkReceivedCallback, TransportClientFactory transportClientFactory, int i, int i2) {
        this.numTries = 0;
        TransportConf fromRssConf = Utils.fromRssConf(rssConf, TransportModuleConstants.DATA_MODULE, 0);
        this.replicas = new ArrayList(2);
        this.callback = chunkReceivedCallback;
        this.retryWaitMs = fromRssConf.ioRetryWaitTimeMs();
        long fetchChunkTimeoutMs = RssConf.fetchChunkTimeoutMs(rssConf);
        if (partitionLocation != null) {
            this.replicas.add(new Replica(fetchChunkTimeoutMs, str, partitionLocation, transportClientFactory, i, i2));
            if (partitionLocation.getPeer() != null) {
                this.replicas.add(new Replica(fetchChunkTimeoutMs, str, partitionLocation.getPeer(), transportClientFactory, i, i2));
            }
        }
        if (this.replicas.size() <= 0) {
            throw new IllegalArgumentException("Must contain at least one available PartitionLocation.");
        }
        this.maxTries = (fromRssConf.maxIORetries() + 1) * this.replicas.size();
    }

    public int openChunks() throws IOException {
        int i = -1;
        while (i == -1 && hasRemainingRetries()) {
            Replica currentReplica = getCurrentReplica();
            try {
                currentReplica.getOrOpenStream();
                i = currentReplica.getNumChunks();
            } catch (Exception e) {
                if (!(e instanceof InterruptedException)) {
                    logger.warn("Exception raised while sending open chunks message to {}.", currentReplica, e);
                    i = -1;
                    if (!shouldRetry(e)) {
                        break;
                    }
                    this.numTries++;
                } else {
                    Thread.currentThread().interrupt();
                    throw new IOException(e);
                }
            }
        }
        if (i == -1) {
            throw new IOException(String.format("Could not open chunks after %d tries.", Integer.valueOf(this.numTries)));
        }
        return i;
    }

    public void fetchChunk(int i) {
        Replica currentReplica;
        RetryingChunkReceiveCallback retryingChunkReceiveCallback;
        synchronized (this) {
            currentReplica = getCurrentReplica();
            retryingChunkReceiveCallback = new RetryingChunkReceiveCallback(this.numTries);
        }
        try {
            currentReplica.getOrOpenStream().fetchChunk(currentReplica.getStreamId(), i, retryingChunkReceiveCallback);
        } catch (Exception e) {
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(i);
            objArr[1] = this.numTries > 0 ? "(after " + this.numTries + " retries)" : "";
            objArr[2] = e;
            logger2.error("Exception raised while beginning fetch chunk {} {}.", objArr);
            if (shouldRetry(e)) {
                initiateRetry(i, retryingChunkReceiveCallback.currentNumTries);
            } else {
                retryingChunkReceiveCallback.onFailure(i, e);
            }
        }
    }

    @VisibleForTesting
    Replica getCurrentReplica() {
        return this.replicas.get(this.numTries % this.replicas.size());
    }

    @VisibleForTesting
    int getNumTries() {
        return this.numTries;
    }

    private boolean hasRemainingRetries() {
        return this.numTries < this.maxTries;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized boolean shouldRetry(Throwable th) {
        return ((th instanceof IOException) || (th instanceof TimeoutException) || ((th.getCause() != null && (th.getCause() instanceof TimeoutException)) || (th.getCause() != null && (th.getCause() instanceof IOException)))) && hasRemainingRetries();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void initiateRetry(int i, int i2) {
        this.numTries = Math.max(this.numTries, i2 + 1);
        logger.info("Retrying fetch ({}/{}) for chunk {} from {} after {} ms.", new Object[]{Integer.valueOf(i2), Integer.valueOf(this.maxTries), Integer.valueOf(i), getCurrentReplica(), Long.valueOf(this.retryWaitMs)});
        executorService.submit(() -> {
            Uninterruptibles.sleepUninterruptibly(this.retryWaitMs, TimeUnit.MILLISECONDS);
            fetchChunk(i);
        });
    }
}
