package com.aliyun.emr.rss.client.write;

import com.aliyun.emr.rss.client.ShuffleClient;
import com.aliyun.emr.rss.common.RssConf;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

/* loaded from: input_file:com/aliyun/emr/rss/client/write/DataPusher.class */
public class DataPusher {
    private final LinkedBlockingQueue<PushTask> idleQueue;
    private final LinkedBlockingQueue<PushTask> workingQueue;
    private final String appId;
    private final int shuffleId;
    private final int mapId;
    private final int attemptId;
    private final int numMappers;
    private final int numPartitions;
    private final ShuffleClient client;
    private final Consumer<Integer> afterPush;
    private volatile boolean terminated;
    private LongAdder[] mapStatusLengths;
    private final long WAIT_TIME_NANOS = TimeUnit.MILLISECONDS.toNanos(500);
    private final ReentrantLock idleLock = new ReentrantLock();
    private final Condition idleFull = this.idleLock.newCondition();
    private final AtomicReference<IOException> exception = new AtomicReference<>();

    /* JADX WARN: Type inference failed for: r0v22, types: [com.aliyun.emr.rss.client.write.DataPusher$1] */
    public DataPusher(String str, int i, int i2, int i3, long j, int i4, int i5, RssConf rssConf, ShuffleClient shuffleClient, Consumer<Integer> consumer, LongAdder[] longAdderArr) throws IOException {
        int pushDataQueueCapacity = RssConf.pushDataQueueCapacity(rssConf);
        int pushDataBufferSize = RssConf.pushDataBufferSize(rssConf);
        this.idleQueue = new LinkedBlockingQueue<>(pushDataQueueCapacity);
        this.workingQueue = new LinkedBlockingQueue<>(pushDataQueueCapacity);
        for (int i6 = 0; i6 < pushDataQueueCapacity; i6++) {
            try {
                this.idleQueue.put(new PushTask(pushDataBufferSize));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e);
            }
        }
        this.appId = str;
        this.shuffleId = i;
        this.mapId = i2;
        this.attemptId = i3;
        this.numMappers = i4;
        this.numPartitions = i5;
        this.client = shuffleClient;
        this.afterPush = consumer;
        this.mapStatusLengths = longAdderArr;
        new Thread("DataPusher-" + j) { // from class: com.aliyun.emr.rss.client.write.DataPusher.1
            private void reclaimTask(PushTask pushTask) throws InterruptedException {
                DataPusher.this.idleLock.lockInterruptibly();
                try {
                    DataPusher.this.idleQueue.put(pushTask);
                    if (DataPusher.this.idleQueue.remainingCapacity() == 0) {
                        DataPusher.this.idleFull.signal();
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    DataPusher.this.exception.set(new IOException(e2));
                } finally {
                    DataPusher.this.idleLock.unlock();
                }
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!DataPusher.this.terminated && DataPusher.this.exception.get() == null) {
                    try {
                        PushTask pushTask = (PushTask) DataPusher.this.workingQueue.poll(DataPusher.this.WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
                        if (pushTask != null) {
                            DataPusher.this.pushData(pushTask);
                            reclaimTask(pushTask);
                        }
                    } catch (IOException e2) {
                        DataPusher.this.exception.set(e2);
                    } catch (InterruptedException e3) {
                        DataPusher.this.exception.set(new IOException(e3));
                    }
                }
            }
        }.start();
    }

    public void addTask(int i, byte[] bArr, int i2) throws IOException {
        PushTask pushTask = null;
        while (pushTask == null) {
            try {
                checkException();
                pushTask = this.idleQueue.poll(this.WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                IOException iOException = new IOException(e);
                this.exception.set(iOException);
                throw iOException;
            }
        }
        pushTask.setSize(i2);
        pushTask.setPartitionId(i);
        System.arraycopy(bArr, 0, pushTask.getBuffer(), 0, i2);
        while (!this.workingQueue.offer(pushTask, this.WAIT_TIME_NANOS, TimeUnit.NANOSECONDS)) {
            checkException();
        }
    }

    public void waitOnTermination() throws IOException {
        try {
            this.idleLock.lockInterruptibly();
            waitIdleQueueFullWithLock();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.exception.set(new IOException(e));
        }
        this.terminated = true;
        this.idleQueue.clear();
        this.workingQueue.clear();
        checkException();
    }

    private void checkException() throws IOException {
        if (this.exception.get() != null) {
            throw this.exception.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void pushData(PushTask pushTask) throws IOException {
        int pushData = this.client.pushData(this.appId, this.shuffleId, this.mapId, this.attemptId, pushTask.getPartitionId(), pushTask.getBuffer(), 0, pushTask.getSize(), this.numMappers, this.numPartitions);
        this.afterPush.accept(Integer.valueOf(pushData));
        this.mapStatusLengths[pushTask.getPartitionId()].add(pushData);
    }

    private void waitIdleQueueFullWithLock() {
        while (this.idleQueue.remainingCapacity() > 0 && this.exception.get() == null) {
            try {
                this.idleFull.await(this.WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.exception.set(new IOException(e));
                return;
            } finally {
                this.idleLock.unlock();
            }
        }
    }
}
