package com.aliyun.emr.rss.client;

import com.aliyun.emr.rss.client.compress.RssLz4Compressor;
import com.aliyun.emr.rss.client.read.RssInputStream;
import com.aliyun.emr.rss.client.write.DataBatches;
import com.aliyun.emr.rss.client.write.PushState;
import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.network.TransportContext;
import com.aliyun.emr.rss.common.network.buffer.NettyManagedBuffer;
import com.aliyun.emr.rss.common.network.client.RpcResponseCallback;
import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
import com.aliyun.emr.rss.common.network.protocol.PushData;
import com.aliyun.emr.rss.common.network.protocol.PushMergedData;
import com.aliyun.emr.rss.common.network.server.NoOpRpcHandler;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
import com.aliyun.emr.rss.common.protocol.RpcNameConstants;
import com.aliyun.emr.rss.common.protocol.TransportModuleConstants;
import com.aliyun.emr.rss.common.protocol.message.ControlMessages;
import com.aliyun.emr.rss.common.protocol.message.StatusCode;
import com.aliyun.emr.rss.common.rpc.RpcAddress;
import com.aliyun.emr.rss.common.rpc.RpcEndpointRef;
import com.aliyun.emr.rss.common.rpc.RpcEnv;
import com.aliyun.emr.rss.common.unsafe.Platform;
import com.aliyun.emr.rss.common.util.ThreadUtils;
import com.aliyun.emr.rss.common.util.Utils;
import com.google.common.collect.Lists;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.ConcurrentSet;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.reflect.ClassTag$;

/* loaded from: input_file:com/aliyun/emr/rss/client/ShuffleClientImpl.class */
public class ShuffleClientImpl extends ShuffleClient {
    private static final Logger logger = LoggerFactory.getLogger(ShuffleClientImpl.class);
    private static final byte MASTER_MODE = PartitionLocation.Mode.Master.mode();
    private static final Random rand = new Random();
    private final RssConf conf;
    private final int registerShuffleMaxRetries;
    private final long registerShuffleRetryWait;
    private final int maxInFlight;
    private final int pushBufferSize;
    private final RpcEnv rpcEnv;
    private RpcEndpointRef driverRssMetaService;
    protected TransportClientFactory dataClientFactory;
    private final ExecutorService pushDataRetryPool;
    private final ExecutorService partitionSplitPool;
    private InetAddress ia = null;
    private final Map<Integer, ConcurrentHashMap<Integer, PartitionLocation>> reducePartitionMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, ConcurrentSet<String>> mapperEndMap = new ConcurrentHashMap<>();
    private final Map<String, PushState> pushStates = new ConcurrentHashMap();
    private final Map<Integer, Set<Integer>> splitting = new ConcurrentHashMap();
    ThreadLocal<RssLz4Compressor> lz4CompressorThreadLocal = new ThreadLocal<RssLz4Compressor>() { // from class: com.aliyun.emr.rss.client.ShuffleClientImpl.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public RssLz4Compressor initialValue() {
            return new RssLz4Compressor(RssConf.pushDataBufferSize(ShuffleClientImpl.this.conf));
        }
    };
    private final Map<Integer, ReduceFileGroups> reduceFileGroupsMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/emr/rss/client/ShuffleClientImpl$ReduceFileGroups.class */
    public static class ReduceFileGroups {
        final PartitionLocation[][] partitionGroups;
        final int[] mapAttempts;

        ReduceFileGroups(PartitionLocation[][] partitionLocationArr, int[] iArr) {
            this.partitionGroups = partitionLocationArr;
            this.mapAttempts = iArr;
        }
    }

    public ShuffleClientImpl(RssConf rssConf) {
        this.conf = rssConf;
        this.registerShuffleMaxRetries = RssConf.registerShuffleMaxRetry(rssConf);
        this.registerShuffleRetryWait = RssConf.registerShuffleRetryWait(rssConf);
        this.maxInFlight = RssConf.pushDataMaxReqsInFlight(rssConf);
        this.pushBufferSize = RssConf.pushDataBufferSize(rssConf);
        this.rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(), 0, rssConf);
        this.dataClientFactory = new TransportContext(Utils.fromRssConf(rssConf, TransportModuleConstants.DATA_MODULE, rssConf.getInt("rss.data.io.threads", 8)), new NoOpRpcHandler(), true).createClientFactory(Lists.newArrayList());
        this.pushDataRetryPool = ThreadUtils.newDaemonCachedThreadPool("Retry-Sender", RssConf.pushDataRetryThreadNum(rssConf), 60);
        this.partitionSplitPool = ThreadUtils.newDaemonCachedThreadPool("Shuffle-Split", RssConf.clientSplitPoolSize(rssConf), 60);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitRetryPushData(String str, int i, int i2, int i3, byte[] bArr, int i4, PartitionLocation partitionLocation, RpcResponseCallback rpcResponseCallback, PushState pushState, StatusCode statusCode) {
        int reduceId = partitionLocation.getReduceId();
        if (!revive(str, i, i2, i3, reduceId, partitionLocation.getEpoch(), partitionLocation, statusCode)) {
            rpcResponseCallback.onFailure(new IOException("Revive Failed"));
            return;
        }
        if (mapperEnded(i, i2, i3)) {
            logger.debug("Retrying push data, but the mapper(map {} attempt {}) has ended.", Integer.valueOf(i2), Integer.valueOf(i3));
            pushState.inFlightBatches.remove(Integer.valueOf(i4));
            return;
        }
        PartitionLocation partitionLocation2 = this.reducePartitionMap.get(Integer.valueOf(i)).get(Integer.valueOf(reduceId));
        logger.info("Revive success, new location for reduce {} is {}.", Integer.valueOf(reduceId), partitionLocation2);
        try {
            pushState.addFuture(i4, this.dataClientFactory.createClient(partitionLocation2.getHost(), partitionLocation2.getPushPort(), reduceId).pushData(new PushData(MASTER_MODE, Utils.makeShuffleKey(str, i), partitionLocation2.getUniqueId(), new NettyManagedBuffer(Unpooled.wrappedBuffer(bArr))), rpcResponseCallback));
        } catch (Exception e) {
            logger.warn("Exception raised while pushing data for shuffle {} map {} attempt {} batch {}.", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4), e});
            rpcResponseCallback.onFailure(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitRetryPushMergedData(PushState pushState, String str, int i, int i2, int i3, ArrayList<DataBatches.DataBatch> arrayList, boolean z, StatusCode statusCode) {
        HashMap hashMap = new HashMap();
        Iterator<DataBatches.DataBatch> it = arrayList.iterator();
        while (it.hasNext()) {
            DataBatches.DataBatch next = it.next();
            int reduceId = next.loc.getReduceId();
            if (!revive(str, i, i2, i3, reduceId, next.loc.getEpoch(), next.loc, statusCode)) {
                pushState.exception.compareAndSet(null, new IOException("Revive Failed in retry push merged data for location: " + next.loc));
                return;
            } else if (mapperEnded(i, i2, i3)) {
                logger.debug("Retrying push data, but the mapper(map {} attempt {}) has ended.", Integer.valueOf(i2), Integer.valueOf(i3));
            } else {
                PartitionLocation partitionLocation = this.reducePartitionMap.get(Integer.valueOf(i)).get(Integer.valueOf(reduceId));
                logger.info("Revive success, new location for reduce {} is {}.", Integer.valueOf(reduceId), partitionLocation);
                ((DataBatches) hashMap.computeIfAbsent(genAddressPair(partitionLocation), str2 -> {
                    return new DataBatches();
                })).addDataBatch(partitionLocation, next.batchId, next.body);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            doPushMergedData(((String) entry.getKey()).split("-")[0], str, i, i2, i3, ((DataBatches) entry.getValue()).requireBatches(), pushState, z);
        }
    }

    private String genAddressPair(PartitionLocation partitionLocation) {
        return partitionLocation.getPeer() != null ? partitionLocation.hostAndPushPort() + "-" + partitionLocation.getPeer().hostAndPushPort() : partitionLocation.hostAndPushPort();
    }

    private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(String str, int i, int i2, int i3) {
        for (int i4 = this.registerShuffleMaxRetries; i4 > 0; i4--) {
            try {
                ControlMessages.RegisterShuffleResponse registerShuffleResponse = (ControlMessages.RegisterShuffleResponse) this.driverRssMetaService.askSync(new ControlMessages.RegisterShuffle(str, i, i2, i3), ClassTag$.MODULE$.apply(ControlMessages.RegisterShuffleResponse.class));
                if (registerShuffleResponse.status().equals(StatusCode.Success)) {
                    ConcurrentHashMap<Integer, PartitionLocation> concurrentHashMap = new ConcurrentHashMap<>();
                    for (int i5 = 0; i5 < registerShuffleResponse.partitionLocations().size(); i5++) {
                        PartitionLocation partitionLocation = registerShuffleResponse.partitionLocations().get(i5);
                        concurrentHashMap.put(Integer.valueOf(partitionLocation.getReduceId()), partitionLocation);
                    }
                    return concurrentHashMap;
                }
                try {
                    TimeUnit.SECONDS.sleep(this.registerShuffleRetryWait);
                } catch (InterruptedException e) {
                    return null;
                }
            } catch (Exception e2) {
                logger.error("Exception raised while registering shuffle {} with {} mapper and {} partitions.", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), e2});
                return null;
            }
        }
        return null;
    }

    private void limitMaxInFlight(String str, PushState pushState, int i) throws IOException {
        if (pushState.exception.get() != null) {
            throw pushState.exception.get();
        }
        ConcurrentSet<Integer> concurrentSet = pushState.inFlightBatches;
        long limitInFlightTimeoutMs = RssConf.limitInFlightTimeoutMs(this.conf);
        long limitInFlightSleepDeltaMs = RssConf.limitInFlightSleepDeltaMs(this.conf);
        long j = limitInFlightTimeoutMs / limitInFlightSleepDeltaMs;
        while (j > 0) {
            try {
                if (concurrentSet.size() <= i) {
                    break;
                }
                if (pushState.exception.get() != null) {
                    throw pushState.exception.get();
                }
                Thread.sleep(limitInFlightSleepDeltaMs);
                j--;
            } catch (InterruptedException e) {
                pushState.exception.set(new IOException(e));
            }
        }
        if (j <= 0) {
            logger.error("After waiting for {} ms, there are still {} batches in flight for map {}, which exceeds the limit {}.", new Object[]{Long.valueOf(limitInFlightTimeoutMs), Integer.valueOf(concurrentSet.size()), str, Integer.valueOf(i)});
            logger.error("Map: {} in flight batches: {}", str, concurrentSet);
            throw new IOException("wait timeout for task " + str, pushState.exception.get());
        }
        if (pushState.exception.get() != null) {
            throw pushState.exception.get();
        }
    }

    private boolean waitRevivedLocation(ConcurrentHashMap<Integer, PartitionLocation> concurrentHashMap, int i, int i2) {
        PartitionLocation partitionLocation = concurrentHashMap.get(Integer.valueOf(i));
        if (partitionLocation != null && partitionLocation.getEpoch() > i2) {
            return true;
        }
        long nextInt = rand.nextInt(50);
        if (nextInt > 30) {
            try {
                TimeUnit.MILLISECONDS.sleep(nextInt);
            } catch (InterruptedException e) {
                logger.warn("Wait revived location interrupted", e);
                Thread.currentThread().interrupt();
            }
        }
        PartitionLocation partitionLocation2 = concurrentHashMap.get(Integer.valueOf(i));
        return partitionLocation2 != null && partitionLocation2.getEpoch() > i2;
    }

    private boolean revive(String str, int i, int i2, int i3, int i4, int i5, PartitionLocation partitionLocation, StatusCode statusCode) {
        ConcurrentHashMap<Integer, PartitionLocation> concurrentHashMap = this.reducePartitionMap.get(Integer.valueOf(i));
        if (waitRevivedLocation(concurrentHashMap, i4, i5)) {
            logger.debug("Has already revived for shuffle {} map {} reduce {} epoch {}, just return(Assume revive successfully).", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i4), Integer.valueOf(i5)});
            return true;
        }
        String makeMapKey = Utils.makeMapKey(i, i2, i3);
        if (mapperEnded(i, i2, i3)) {
            logger.debug("The mapper(shuffle {} map {}) has already ended, just return(Assume revive successfully).", Integer.valueOf(i), Integer.valueOf(i2));
            return true;
        }
        try {
            ControlMessages.ChangeLocationResponse changeLocationResponse = (ControlMessages.ChangeLocationResponse) this.driverRssMetaService.askSync(new ControlMessages.Revive(str, i, i2, i3, i4, i5, partitionLocation, statusCode), ClassTag$.MODULE$.apply(ControlMessages.ChangeLocationResponse.class));
            if (changeLocationResponse.status().equals(StatusCode.Success)) {
                concurrentHashMap.put(Integer.valueOf(i4), changeLocationResponse.partition());
                return true;
            }
            if (!changeLocationResponse.status().equals(StatusCode.MapEnded)) {
                return false;
            }
            this.mapperEndMap.computeIfAbsent(Integer.valueOf(i), num -> {
                return new ConcurrentSet();
            }).add(makeMapKey);
            return true;
        } catch (Exception e) {
            logger.error("Exception raised while reviving for shuffle {} reduce {} epoch {}.", new Object[]{Integer.valueOf(i), Integer.valueOf(i4), Integer.valueOf(i5), e});
            return false;
        }
    }

    public int pushOrMergeData(final String str, final int i, final int i2, final int i3, final int i4, byte[] bArr, int i5, int i6, int i7, int i8, boolean z) throws IOException {
        final String makeMapKey = Utils.makeMapKey(i, i2, i3);
        String makeShuffleKey = Utils.makeShuffleKey(str, i);
        if (mapperEnded(i, i2, i3)) {
            logger.debug("The mapper(shuffle {} map {} attempt {}) has already ended while pushing data.", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
            PushState pushState = this.pushStates.get(makeMapKey);
            if (pushState == null) {
                return 0;
            }
            pushState.cancelFutures();
            return 0;
        }
        ConcurrentHashMap<Integer, PartitionLocation> computeIfAbsent = this.reducePartitionMap.computeIfAbsent(Integer.valueOf(i), num -> {
            return registerShuffle(str, i, i7, i8);
        });
        if (computeIfAbsent == null) {
            throw new IOException("Register shuffle failed for shuffle " + makeShuffleKey);
        }
        if (!computeIfAbsent.containsKey(Integer.valueOf(i4)) && !revive(str, i, i2, i3, i4, 0, null, StatusCode.PushDataFailNonCriticalCause)) {
            throw new IOException("Revive for shuffle " + makeShuffleKey + " reduceId " + i4 + " failed.");
        }
        if (mapperEnded(i, i2, i3)) {
            logger.debug("The mapper(shuffle {} map {} attempt {}) has already ended while pushing data.", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
            PushState pushState2 = this.pushStates.get(makeMapKey);
            if (pushState2 == null) {
                return 0;
            }
            pushState2.cancelFutures();
            return 0;
        }
        final PartitionLocation partitionLocation = computeIfAbsent.get(Integer.valueOf(i4));
        if (partitionLocation == null) {
            throw new IOException("Partition location for shuffle " + makeShuffleKey + " reduceId " + i4 + " is NULL!");
        }
        final PushState computeIfAbsent2 = this.pushStates.computeIfAbsent(makeMapKey, str2 -> {
            return new PushState(this.conf);
        });
        final int addAndGet = computeIfAbsent2.batchId.addAndGet(1);
        RssLz4Compressor rssLz4Compressor = this.lz4CompressorThreadLocal.get();
        rssLz4Compressor.compress(bArr, i5, i6);
        int compressedTotalSize = rssLz4Compressor.getCompressedTotalSize();
        final byte[] bArr2 = new byte[16 + compressedTotalSize];
        Platform.putInt(bArr2, Platform.BYTE_ARRAY_OFFSET, i2);
        Platform.putInt(bArr2, Platform.BYTE_ARRAY_OFFSET + 4, i3);
        Platform.putInt(bArr2, Platform.BYTE_ARRAY_OFFSET + 8, addAndGet);
        Platform.putInt(bArr2, Platform.BYTE_ARRAY_OFFSET + 12, compressedTotalSize);
        System.arraycopy(rssLz4Compressor.getCompressedBuffer(), 0, bArr2, 16, compressedTotalSize);
        if (z) {
            logger.debug("Do push data for app {} shuffle {} map {} attempt {} reduce {} batch {}.", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4), Integer.valueOf(addAndGet)});
            limitMaxInFlight(makeMapKey, computeIfAbsent2, this.maxInFlight);
            computeIfAbsent2.inFlightBatches.add(Integer.valueOf(addAndGet));
            PushData pushData = new PushData(MASTER_MODE, makeShuffleKey, partitionLocation.getUniqueId(), new NettyManagedBuffer(Unpooled.wrappedBuffer(bArr2)));
            final RpcResponseCallback rpcResponseCallback = new RpcResponseCallback() { // from class: com.aliyun.emr.rss.client.ShuffleClientImpl.2
                @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
                public void onSuccess(ByteBuffer byteBuffer) {
                    computeIfAbsent2.inFlightBatches.remove(Integer.valueOf(addAndGet));
                    if (byteBuffer.remaining() > 0 && byteBuffer.get() == StatusCode.StageEnded.getValue()) {
                        ((ConcurrentSet) ShuffleClientImpl.this.mapperEndMap.computeIfAbsent(Integer.valueOf(i), num2 -> {
                            return new ConcurrentSet();
                        })).add(makeMapKey);
                    }
                    computeIfAbsent2.removeFuture(addAndGet);
                    ShuffleClientImpl.logger.debug("Push data success for map {} attempt {} batch {}.", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(addAndGet)});
                }

                @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
                public void onFailure(Throwable th) {
                    computeIfAbsent2.exception.compareAndSet(null, new IOException("Revived PushData failed!", th));
                    computeIfAbsent2.removeFuture(addAndGet);
                    ShuffleClientImpl.logger.debug("Push data failed for map {} attempt {} batch {}.", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(addAndGet)});
                }
            };
            RpcResponseCallback rpcResponseCallback2 = new RpcResponseCallback() { // from class: com.aliyun.emr.rss.client.ShuffleClientImpl.3
                @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
                public void onSuccess(ByteBuffer byteBuffer) {
                    if (byteBuffer.remaining() <= 0) {
                        byteBuffer.rewind();
                        rpcResponseCallback.onSuccess(byteBuffer);
                        return;
                    }
                    byte b = byteBuffer.get();
                    if (b == StatusCode.SoftSplit.getValue()) {
                        ShuffleClientImpl.logger.debug("Push data split required for map {} attempt {} batch {}", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(addAndGet)});
                        ShuffleClientImpl.this.splitPartition(i, i4, str, partitionLocation);
                        rpcResponseCallback.onSuccess(byteBuffer);
                    }
                    if (b == StatusCode.HardSplit.getValue()) {
                        ShuffleClientImpl.logger.debug("Push data split for map {} attempt {} batch {}.", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(addAndGet)});
                        ExecutorService executorService = ShuffleClientImpl.this.pushDataRetryPool;
                        String str3 = str;
                        int i9 = i;
                        int i10 = i2;
                        int i11 = i3;
                        byte[] bArr3 = bArr2;
                        int i12 = addAndGet;
                        PartitionLocation partitionLocation2 = partitionLocation;
                        PushState pushState3 = computeIfAbsent2;
                        executorService.submit(() -> {
                            ShuffleClientImpl.this.submitRetryPushData(str3, i9, i10, i11, bArr3, i12, partitionLocation2, this, pushState3, StatusCode.HardSplit);
                        });
                    }
                }

                @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
                public void onFailure(Throwable th) {
                    if (computeIfAbsent2.exception.get() != null) {
                        return;
                    }
                    if (ShuffleClientImpl.this.mapperEnded(i, i2, i3)) {
                        computeIfAbsent2.inFlightBatches.remove(Integer.valueOf(addAndGet));
                        ShuffleClientImpl.logger.info("Mapper shuffleId:{} mapId:{} attempt:{} already ended, remove batchId:{} .", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(addAndGet)});
                        return;
                    }
                    ExecutorService executorService = ShuffleClientImpl.this.pushDataRetryPool;
                    String str3 = str;
                    int i9 = i;
                    int i10 = i2;
                    int i11 = i3;
                    byte[] bArr3 = bArr2;
                    int i12 = addAndGet;
                    PartitionLocation partitionLocation2 = partitionLocation;
                    RpcResponseCallback rpcResponseCallback3 = rpcResponseCallback;
                    PushState pushState3 = computeIfAbsent2;
                    executorService.submit(() -> {
                        ShuffleClientImpl.this.submitRetryPushData(str3, i9, i10, i11, bArr3, i12, partitionLocation2, rpcResponseCallback3, pushState3, ShuffleClientImpl.this.getPushDataFailCause(th.getMessage()));
                    });
                }
            };
            try {
                computeIfAbsent2.addFuture(addAndGet, this.dataClientFactory.createClient(partitionLocation.getHost(), partitionLocation.getPushPort(), i4).pushData(pushData, rpcResponseCallback2));
            } catch (Exception e) {
                logger.warn("PushData failed", e);
                rpcResponseCallback2.onFailure(new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
            }
        } else {
            logger.debug("Merge batch {}.", Integer.valueOf(addAndGet));
            String genAddressPair = genAddressPair(partitionLocation);
            if (computeIfAbsent2.addBatchData(genAddressPair, partitionLocation, addAndGet, bArr2)) {
                limitMaxInFlight(makeMapKey, computeIfAbsent2, this.maxInFlight);
                doPushMergedData(genAddressPair.split("-")[0], str, i, i2, i3, computeIfAbsent2.takeDataBaches(genAddressPair).requireBatches(), computeIfAbsent2, false);
            }
        }
        return bArr2.length;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void splitPartition(int i, int i2, String str, PartitionLocation partitionLocation) {
        Set<Integer> computeIfAbsent = this.splitting.computeIfAbsent(Integer.valueOf(i), num -> {
            return ConcurrentHashMap.newKeySet();
        });
        synchronized (computeIfAbsent) {
            if (computeIfAbsent.contains(Integer.valueOf(i2))) {
                logger.debug("shuffle {} reduceId {} is splitting, skip split request ", Integer.valueOf(i), Integer.valueOf(i2));
                return;
            }
            computeIfAbsent.add(Integer.valueOf(i2));
            ShuffleClientHelper.sendShuffleSplitAsync(this.driverRssMetaService, new ControlMessages.PartitionSplit(str, i, i2, partitionLocation.getEpoch(), partitionLocation), this.partitionSplitPool, computeIfAbsent, i2, i, this.reducePartitionMap.get(Integer.valueOf(i)));
        }
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public int pushData(String str, int i, int i2, int i3, int i4, byte[] bArr, int i5, int i6, int i7, int i8) throws IOException {
        return pushOrMergeData(str, i, i2, i3, i4, bArr, i5, i6, i7, i8, true);
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public void prepareForMergeData(int i, int i2, int i3) throws IOException {
        String makeMapKey = Utils.makeMapKey(i, i2, i3);
        PushState pushState = this.pushStates.get(makeMapKey);
        if (pushState != null) {
            limitMaxInFlight(makeMapKey, pushState, 0);
        }
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public int mergeData(String str, int i, int i2, int i3, int i4, byte[] bArr, int i5, int i6, int i7, int i8) throws IOException {
        return pushOrMergeData(str, i, i2, i3, i4, bArr, i5, i6, i7, i8, false);
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public void pushMergedData(String str, int i, int i2, int i3) throws IOException {
        String makeMapKey = Utils.makeMapKey(i, i2, i3);
        PushState pushState = this.pushStates.get(makeMapKey);
        if (pushState == null) {
            return;
        }
        ArrayList arrayList = new ArrayList(pushState.batchesMap.entrySet());
        while (!arrayList.isEmpty()) {
            limitMaxInFlight(makeMapKey, pushState, this.maxInFlight);
            Map.Entry entry = (Map.Entry) arrayList.get(rand.nextInt(arrayList.size()));
            ArrayList<DataBatches.DataBatch> requireBatches = ((DataBatches) entry.getValue()).requireBatches(this.pushBufferSize);
            if (((DataBatches) entry.getValue()).getTotalSize() == 0) {
                arrayList.remove(entry);
            }
            doPushMergedData(((String) entry.getKey()).split("-")[0], str, i, i2, i3, requireBatches, pushState, false);
        }
    }

    private void doPushMergedData(String str, final String str2, final int i, final int i2, final int i3, final ArrayList<DataBatches.DataBatch> arrayList, final PushState pushState, final boolean z) {
        String[] split = str.split(":");
        String str3 = split[0];
        int parseInt = Integer.parseInt(split[1]);
        final int addAndGet = pushState.batchId.addAndGet(1);
        pushState.inFlightBatches.add(Integer.valueOf(addAndGet));
        int size = arrayList.size();
        String[] strArr = new String[size];
        int[] iArr = new int[size];
        final int[] iArr2 = new int[size];
        int i4 = 0;
        CompositeByteBuf compositeBuffer = Unpooled.compositeBuffer();
        for (int i5 = 0; i5 < size; i5++) {
            DataBatches.DataBatch dataBatch = arrayList.get(i5);
            strArr[i5] = dataBatch.loc.getUniqueId();
            iArr[i5] = i4;
            iArr2[i5] = dataBatch.batchId;
            i4 += dataBatch.body.length;
            compositeBuffer.addComponent(true, Unpooled.wrappedBuffer(dataBatch.body));
        }
        PushMergedData pushMergedData = new PushMergedData(MASTER_MODE, Utils.makeShuffleKey(str2, i), strArr, iArr, new NettyManagedBuffer(compositeBuffer));
        final RpcResponseCallback rpcResponseCallback = new RpcResponseCallback() { // from class: com.aliyun.emr.rss.client.ShuffleClientImpl.4
            @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
            public void onSuccess(ByteBuffer byteBuffer) {
                ShuffleClientImpl.logger.debug("Push data success for map {} attempt {} grouped batch {}.", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(addAndGet)});
                pushState.inFlightBatches.remove(Integer.valueOf(addAndGet));
                if (byteBuffer.remaining() <= 0 || byteBuffer.get() != StatusCode.StageEnded.getValue()) {
                    return;
                }
                ((ConcurrentSet) ShuffleClientImpl.this.mapperEndMap.computeIfAbsent(Integer.valueOf(i), num -> {
                    return new ConcurrentSet();
                })).add(Utils.makeMapKey(i, i2, i3));
            }

            @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
            public void onFailure(Throwable th) {
                pushState.exception.compareAndSet(null, new IOException((z ? "Revived push" : "Push") + " merged data failed!", th));
                if (ShuffleClientImpl.logger.isDebugEnabled()) {
                    for (int i6 : iArr2) {
                        ShuffleClientImpl.logger.debug("Push data failed for map {} attempt {} batch {}.", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i6)});
                    }
                }
            }
        };
        RpcResponseCallback rpcResponseCallback2 = new RpcResponseCallback() { // from class: com.aliyun.emr.rss.client.ShuffleClientImpl.5
            @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
            public void onSuccess(ByteBuffer byteBuffer) {
                rpcResponseCallback.onSuccess(byteBuffer);
            }

            @Override // com.aliyun.emr.rss.common.network.client.RpcResponseCallback
            public void onFailure(Throwable th) {
                if (pushState.exception.get() != null) {
                    return;
                }
                if (z) {
                    rpcResponseCallback.onFailure(th);
                    return;
                }
                pushState.inFlightBatches.remove(Integer.valueOf(addAndGet));
                if (ShuffleClientImpl.this.mapperEnded(i, i2, i3)) {
                    return;
                }
                ExecutorService executorService = ShuffleClientImpl.this.pushDataRetryPool;
                PushState pushState2 = pushState;
                String str4 = str2;
                int i6 = i;
                int i7 = i2;
                int i8 = i3;
                ArrayList arrayList2 = arrayList;
                executorService.submit(() -> {
                    ShuffleClientImpl.this.submitRetryPushMergedData(pushState2, str4, i6, i7, i8, arrayList2, true, ShuffleClientImpl.this.getPushDataFailCause(th.getMessage()));
                });
            }
        };
        try {
            this.dataClientFactory.createClient(str3, parseInt).pushMergedData(pushMergedData, rpcResponseCallback2);
        } catch (Exception e) {
            logger.warn("PushMergeData failed", e);
            rpcResponseCallback2.onFailure(new Exception(getPushDataFailCause(e.getMessage()).toString(), e));
        }
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public void mapperEnd(String str, int i, int i2, int i3, int i4) throws IOException {
        String makeMapKey = Utils.makeMapKey(i, i2, i3);
        try {
            limitMaxInFlight(makeMapKey, this.pushStates.computeIfAbsent(makeMapKey, str2 -> {
                return new PushState(this.conf);
            }), 0);
            ControlMessages.MapperEndResponse mapperEndResponse = (ControlMessages.MapperEndResponse) this.driverRssMetaService.askSync(new ControlMessages.MapperEnd(str, i, i2, i3, i4), ClassTag$.MODULE$.apply(ControlMessages.MapperEndResponse.class));
            if (mapperEndResponse.status() != StatusCode.Success) {
                throw new IOException("MapperEnd failed! StatusCode: " + mapperEndResponse.status());
            }
        } finally {
            this.pushStates.remove(makeMapKey);
        }
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public void cleanup(String str, int i, int i2, int i3) {
        PushState remove = this.pushStates.remove(Utils.makeMapKey(i, i2, i3));
        if (remove != null) {
            remove.exception.compareAndSet(null, new IOException("Cleaned Up"));
            remove.cancelFutures();
        }
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public boolean unregisterShuffle(String str, int i, boolean z) {
        if (z) {
            try {
                this.driverRssMetaService.send(new ControlMessages.UnregisterShuffle(str, i, ControlMessages.ZERO_UUID()));
            } catch (Exception e) {
                logger.warn("Send UnregisterShuffle failed, ignore.", e);
            }
        }
        this.reducePartitionMap.remove(Integer.valueOf(i));
        this.reduceFileGroupsMap.remove(Integer.valueOf(i));
        this.mapperEndMap.remove(Integer.valueOf(i));
        this.splitting.remove(Integer.valueOf(i));
        logger.info("Unregistered shuffle {}.", Integer.valueOf(i));
        return true;
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public RssInputStream readPartition(String str, int i, int i2, int i3) throws IOException {
        return readPartition(str, i, i2, i3, 0, Integer.MAX_VALUE);
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public RssInputStream readPartition(String str, int i, int i2, int i3, int i4, int i5) throws IOException {
        ReduceFileGroups computeIfAbsent = this.reduceFileGroupsMap.computeIfAbsent(Integer.valueOf(i), num -> {
            try {
                if (this.driverRssMetaService == null) {
                    logger.warn("Driver endpoint is null!");
                    return null;
                }
                ControlMessages.GetReducerFileGroupResponse getReducerFileGroupResponse = (ControlMessages.GetReducerFileGroupResponse) this.driverRssMetaService.askSync(new ControlMessages.GetReducerFileGroup(str, i), ClassTag$.MODULE$.apply(ControlMessages.GetReducerFileGroupResponse.class));
                if (getReducerFileGroupResponse == null || getReducerFileGroupResponse.status() != StatusCode.Success) {
                    return null;
                }
                return new ReduceFileGroups(getReducerFileGroupResponse.fileGroup(), getReducerFileGroupResponse.attempts());
            } catch (Exception e) {
                logger.warn("Exception raised while getting reduce file groups.", e);
                return null;
            }
        });
        if (computeIfAbsent == null) {
            String str2 = "Shuffle data lost for shuffle " + i + " reduce " + i2 + "!";
            logger.error(str2);
            throw new IOException(str2);
        }
        if (computeIfAbsent.partitionGroups == null) {
            logger.warn("Shuffle data is empty for shuffle {} reduce {}.", Integer.valueOf(i), Integer.valueOf(i2));
            return RssInputStream.empty();
        }
        return RssInputStream.create(this.conf, this.dataClientFactory, Utils.makeShuffleKey(str, i), computeIfAbsent.partitionGroups[i2], computeIfAbsent.mapAttempts, i3, i4, i5);
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public void shutDown() {
        if (null != this.rpcEnv) {
            this.rpcEnv.shutdown();
        }
        if (null != this.dataClientFactory) {
            this.dataClientFactory.close();
        }
        if (null != this.pushDataRetryPool) {
            this.pushDataRetryPool.shutdown();
        }
        if (null != this.partitionSplitPool) {
            this.partitionSplitPool.shutdown();
        }
        if (null != this.driverRssMetaService) {
            this.driverRssMetaService = null;
        }
        logger.warn("Shuffle client has been shutdown!");
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public void setupMetaServiceRef(String str, int i) {
        this.driverRssMetaService = this.rpcEnv.setupEndpointRef(new RpcAddress(str, i), RpcNameConstants.RSS_METASERVICE_EP);
    }

    @Override // com.aliyun.emr.rss.client.ShuffleClient
    public void setupMetaServiceRef(RpcEndpointRef rpcEndpointRef) {
        this.driverRssMetaService = rpcEndpointRef;
    }

    private synchronized String getLocalHost() {
        if (this.ia == null) {
            try {
                this.ia = InetAddress.getLocalHost();
            } catch (UnknownHostException e) {
                logger.error("Unknown host", e);
                return null;
            }
        }
        return this.ia.getHostName();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean mapperEnded(int i, int i2, int i3) {
        return this.mapperEndMap.containsKey(Integer.valueOf(i)) && this.mapperEndMap.get(Integer.valueOf(i)).contains(Utils.makeMapKey(i, i2, i3));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StatusCode getPushDataFailCause(String str) {
        logger.info("[getPushDataFailCause] message: " + str);
        return StatusCode.PushDataFailSlave.getMessage().equals(str) ? StatusCode.PushDataFailSlave : (StatusCode.PushDataFailMain.getMessage().equals(str) || connectFail(str)) ? StatusCode.PushDataFailMain : StatusCode.PushDataFailNonCriticalCause;
    }

    private boolean connectFail(String str) {
        return (str.startsWith("Connection from ") && str.endsWith(" closed")) || str.equals("Connection reset by peer") || str.startsWith("Failed to send RPC ");
    }
}
