package org.apache.spark.shuffle.rss;

import com.aliyun.emr.rss.client.ShuffleClient;
import com.aliyun.emr.rss.common.RssConf;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.Nullable;
import org.apache.spark.Aggregator;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.annotation.Private;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.PartitionIdPassthrough;
import org.apache.spark.sql.execution.UnsafeRowSerializer;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.unsafe.Platform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Product2;
import scala.collection.Iterator;
import scala.reflect.ClassTag$;

@Private
/* loaded from: input_file:org/apache/spark/shuffle/rss/SortBasedShuffleWriter.class */
public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
    private static final Logger logger;
    private static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1048576;
    private final ShuffleDependency<K, V, C> dep;
    private final Partitioner partitioner;
    private final ShuffleWriteMetricsReporter writeMetrics;
    private final String appId;
    private final int shuffleId;
    private final int mapId;
    private final TaskContext taskContext;
    private final ShuffleClient rssShuffleClient;
    private final int numMappers;
    private final int numPartitions;
    private final long pushBufferSize;
    private SortBasedPusher sortBasedPusher;
    private final MyByteArrayOutputStream serBuffer;
    private final SerializationStream serOutputStream;
    private final LongAdder[] mapStatusLengths;
    private final long[] mapStatusRecords;
    private final long[] tmpRecords;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Nullable
    private long peakMemoryUsedBytes = 0;
    private volatile boolean stopping = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/shuffle/rss/SortBasedShuffleWriter$MyByteArrayOutputStream.class */
    public static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
        MyByteArrayOutputStream(int i) {
            super(i);
        }

        public byte[] getBuf() {
            return this.buf;
        }
    }

    public SortBasedShuffleWriter(ShuffleDependency<K, V, C> shuffleDependency, String str, int i, TaskContext taskContext, RssConf rssConf, ShuffleClient shuffleClient, ShuffleWriteMetricsReporter shuffleWriteMetricsReporter) throws IOException {
        this.mapId = taskContext.partitionId();
        this.dep = shuffleDependency;
        this.appId = str;
        this.shuffleId = shuffleDependency.shuffleId();
        SerializerInstance newInstance = shuffleDependency.serializer().newInstance();
        this.partitioner = shuffleDependency.partitioner();
        this.writeMetrics = shuffleWriteMetricsReporter;
        this.taskContext = taskContext;
        this.numMappers = i;
        this.numPartitions = shuffleDependency.partitioner().numPartitions();
        this.rssShuffleClient = shuffleClient;
        this.serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
        this.serOutputStream = newInstance.serializeStream(this.serBuffer);
        this.mapStatusLengths = new LongAdder[this.numPartitions];
        this.mapStatusRecords = new long[this.numPartitions];
        for (int i2 = 0; i2 < this.numPartitions; i2++) {
            this.mapStatusLengths[i2] = new LongAdder();
        }
        this.tmpRecords = new long[this.numPartitions];
        this.pushBufferSize = RssConf.pushDataBufferSize(rssConf);
        TaskMemoryManager taskMemoryManager = taskContext.taskMemoryManager();
        ShuffleClient shuffleClient2 = this.rssShuffleClient;
        int i3 = this.shuffleId;
        int i4 = this.mapId;
        int attemptNumber = taskContext.attemptNumber();
        long taskAttemptId = taskContext.taskAttemptId();
        int i5 = this.numPartitions;
        ShuffleWriteMetricsReporter shuffleWriteMetricsReporter2 = this.writeMetrics;
        shuffleWriteMetricsReporter2.getClass();
        this.sortBasedPusher = new SortBasedPusher(taskMemoryManager, shuffleClient2, str, i3, i4, attemptNumber, taskAttemptId, i, i5, rssConf, (v1) -> {
            r13.incBytesWritten(v1);
        }, this.mapStatusLengths);
    }

    public void write(Iterator<Product2<K, V>> iterator) throws IOException {
        if (canUseFastWrite()) {
            fastWrite0(iterator);
        } else if (!this.dep.mapSideCombine()) {
            write0(iterator);
        } else {
            if (this.dep.aggregator().isEmpty()) {
                throw new UnsupportedOperationException("When using map side combine, an aggregator must be specified.");
            }
            write0(((Aggregator) this.dep.aggregator().get()).combineValuesByKey(iterator, this.taskContext));
        }
        close();
    }

    @VisibleForTesting
    boolean canUseFastWrite() {
        return (this.dep.serializer() instanceof UnsafeRowSerializer) && (this.partitioner instanceof PartitionIdPassthrough);
    }

    private void fastWrite0(Iterator iterator) throws IOException {
        SQLMetric unsafeRowSerializerDataSizeMetric = SparkUtils.getUnsafeRowSerializerDataSizeMetric(this.dep.serializer());
        while (iterator.hasNext()) {
            Product2 product2 = (Product2) iterator.next();
            int intValue = ((Integer) product2._1()).intValue();
            UnsafeRow unsafeRow = (UnsafeRow) product2._2();
            int sizeInBytes = unsafeRow.getSizeInBytes();
            int i = 4 + sizeInBytes;
            if (unsafeRowSerializerDataSizeMetric != null) {
                unsafeRowSerializerDataSizeMetric.add(i);
            }
            if (i > this.pushBufferSize) {
                byte[] bArr = new byte[i];
                Platform.putInt(bArr, Platform.BYTE_ARRAY_OFFSET, Integer.reverseBytes(sizeInBytes));
                Platform.copyMemory(unsafeRow.getBaseObject(), unsafeRow.getBaseOffset(), bArr, Platform.BYTE_ARRAY_OFFSET + 4, sizeInBytes);
                pushGiantRecord(intValue, bArr, i);
            } else {
                long nanoTime = System.nanoTime();
                this.sortBasedPusher.insertRecord(unsafeRow.getBaseObject(), unsafeRow.getBaseOffset(), sizeInBytes, intValue, true);
                this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
            }
            long[] jArr = this.tmpRecords;
            jArr[intValue] = jArr[intValue] + 1;
        }
    }

    private void write0(Iterator iterator) throws IOException {
        while (iterator.hasNext()) {
            Product2 product2 = (Product2) iterator.next();
            Object _1 = product2._1();
            int partition = this.partitioner.getPartition(_1);
            this.serBuffer.reset();
            this.serOutputStream.writeKey(_1, ClassTag$.MODULE$.apply(_1.getClass()));
            this.serOutputStream.writeValue(product2._2(), ClassTag$.MODULE$.apply(product2._2().getClass()));
            this.serOutputStream.flush();
            int size = this.serBuffer.size();
            if (!$assertionsDisabled && size <= 0) {
                throw new AssertionError();
            }
            if (size > this.pushBufferSize) {
                pushGiantRecord(partition, this.serBuffer.getBuf(), size);
            } else {
                long nanoTime = System.nanoTime();
                this.sortBasedPusher.insertRecord(this.serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, size, partition, false);
                this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
            }
            long[] jArr = this.tmpRecords;
            jArr[partition] = jArr[partition] + 1;
        }
    }

    private void pushGiantRecord(int i, byte[] bArr, int i2) throws IOException {
        logger.info("Push giant record, size {}.", Integer.valueOf(i2));
        long nanoTime = System.nanoTime();
        int pushData = this.rssShuffleClient.pushData(this.appId, this.shuffleId, this.mapId, this.taskContext.attemptNumber(), i, bArr, 0, i2, this.numMappers, this.numPartitions);
        this.mapStatusLengths[i].add(pushData);
        this.writeMetrics.incBytesWritten(pushData);
        this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
    }

    private void close() throws IOException {
        logger.info("Pushdata in close, memory used " + this.sortBasedPusher.getUsed());
        long nanoTime = System.nanoTime();
        this.sortBasedPusher.pushData();
        this.sortBasedPusher.close();
        this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
        this.rssShuffleClient.pushMergedData(this.appId, this.shuffleId, this.mapId, this.taskContext.attemptNumber());
        updateMapStatus();
        long nanoTime2 = System.nanoTime();
        this.rssShuffleClient.mapperEnd(this.appId, this.shuffleId, this.mapId, this.taskContext.attemptNumber(), this.numMappers);
        this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime2);
    }

    private void updateMapStatus() {
        long j = 0;
        for (int i = 0; i < this.partitioner.numPartitions(); i++) {
            long[] jArr = this.mapStatusRecords;
            int i2 = i;
            jArr[i2] = jArr[i2] + this.tmpRecords[i];
            j += this.tmpRecords[i];
            this.tmpRecords[i] = 0;
        }
        this.writeMetrics.incRecordsWritten(j);
    }

    public Option<MapStatus> stop(boolean z) {
        try {
            this.taskContext.taskMetrics().incPeakExecutionMemory(this.peakMemoryUsedBytes);
            if (this.stopping) {
                Option<MapStatus> apply = Option.apply((Object) null);
                this.rssShuffleClient.cleanup(this.appId, this.shuffleId, this.mapId, this.taskContext.attemptNumber());
                return apply;
            }
            this.stopping = true;
            if (!z) {
                Option<MapStatus> apply2 = Option.apply((Object) null);
                this.rssShuffleClient.cleanup(this.appId, this.shuffleId, this.mapId, this.taskContext.attemptNumber());
                return apply2;
            }
            MapStatus createMapStatus = SparkUtils.createMapStatus(SparkEnv.get().blockManager().shuffleServerId(), SparkUtils.unwrap(this.mapStatusLengths), this.taskContext.taskAttemptId());
            if (createMapStatus == null) {
                throw new IllegalStateException("Cannot call stop(true) without having called write()");
            }
            Option<MapStatus> apply3 = Option.apply(createMapStatus);
            this.rssShuffleClient.cleanup(this.appId, this.shuffleId, this.mapId, this.taskContext.attemptNumber());
            return apply3;
        } catch (Throwable th) {
            this.rssShuffleClient.cleanup(this.appId, this.shuffleId, this.mapId, this.taskContext.attemptNumber());
            throw th;
        }
    }

    public long[] getPartitionLengths() {
        throw new UnsupportedOperationException("RSS is not compatible with Spark push mode, please set spark.shuffle.push.enabled to false");
    }

    static {
        $assertionsDisabled = !SortBasedShuffleWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(SortBasedShuffleWriter.class);
    }
}
