package org.apache.spark.shuffle.rss;

import com.aliyun.emr.rss.client.ShuffleClient;
import com.aliyun.emr.rss.client.write.DataPusher;
import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.util.Utils;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TooLargePageException;
import org.apache.spark.shuffle.rss.ShuffleInMemorySorter;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/spark/shuffle/rss/SortBasedPusher.class */
public class SortBasedPusher extends MemoryConsumer {
    private static final Logger logger;
    private ShuffleInMemorySorter inMemSorter;
    private final LinkedList<MemoryBlock> allocatedPages;
    private MemoryBlock currentPage;
    private long pageCursor;
    private final ShuffleClient rssShuffleClient;
    private final DataPusher dataPusher;
    private final int pushBufferSize;
    private final long PushThreshold;
    final int uaoSize;
    String appId;
    int shuffleId;
    int mapId;
    int attemptNumber;
    long taskAttemptId;
    int numMappers;
    int numPartitions;
    RssConf conf;
    Consumer<Integer> afterPush;
    LongAdder[] mapStatusLengths;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SortBasedPusher(TaskMemoryManager taskMemoryManager, ShuffleClient shuffleClient, String str, int i, int i2, int i3, long j, int i4, int i5, RssConf rssConf, Consumer<Integer> consumer, LongAdder[] longAdderArr) throws IOException {
        super(taskMemoryManager, (int) Math.min(134217728L, taskMemoryManager.pageSizeBytes()), taskMemoryManager.getTungstenMemoryMode());
        this.allocatedPages = new LinkedList<>();
        this.currentPage = null;
        this.pageCursor = -1L;
        this.uaoSize = UnsafeAlignedOffset.getUaoSize();
        this.rssShuffleClient = shuffleClient;
        this.appId = str;
        this.shuffleId = i;
        this.mapId = i2;
        this.attemptNumber = i3;
        this.taskAttemptId = j;
        this.numMappers = i4;
        this.numPartitions = i5;
        this.conf = rssConf;
        this.afterPush = consumer;
        this.mapStatusLengths = longAdderArr;
        this.dataPusher = new DataPusher(str, i, i2, i3, j, i4, i5, rssConf, shuffleClient, consumer, longAdderArr);
        this.pushBufferSize = RssConf.pushDataBufferSize(rssConf);
        this.PushThreshold = RssConf.sortPushThreshold(rssConf);
        this.inMemSorter = new ShuffleInMemorySorter(this, 4194304);
    }

    public long pushData() throws IOException {
        ShuffleInMemorySorter.ShuffleSorterIterator sortedIterator = this.inMemSorter.getSortedIterator();
        byte[] bArr = new byte[this.pushBufferSize];
        int i = 0;
        int i2 = -1;
        while (sortedIterator.hasNext()) {
            sortedIterator.loadNext();
            int partitionId = sortedIterator.packedRecordPointer.getPartitionId();
            if (!$assertionsDisabled && partitionId < i2) {
                throw new AssertionError();
            }
            if (partitionId != i2) {
                if (i2 == -1) {
                    i2 = partitionId;
                } else {
                    int mergeData = this.rssShuffleClient.mergeData(this.appId, this.shuffleId, this.mapId, this.attemptNumber, i2, bArr, 0, i, this.numMappers, this.numPartitions);
                    this.mapStatusLengths[i2].add(mergeData);
                    this.afterPush.accept(Integer.valueOf(mergeData));
                    i2 = partitionId;
                    i = 0;
                }
            }
            long recordPointer = sortedIterator.packedRecordPointer.getRecordPointer();
            Object page = this.taskMemoryManager.getPage(recordPointer);
            long offsetInPage = this.taskMemoryManager.getOffsetInPage(recordPointer);
            int size = UnsafeAlignedOffset.getSize(page, offsetInPage);
            if (i + size > bArr.length) {
                this.dataPusher.addTask(partitionId, bArr, i);
                i = 0;
            }
            Platform.copyMemory(page, offsetInPage + this.uaoSize, bArr, Platform.BYTE_ARRAY_OFFSET + i, size);
            i += size;
        }
        if (i > 0) {
            this.dataPusher.addTask(i2, bArr, i);
        }
        long freeMemory = freeMemory();
        this.inMemSorter.reset();
        return freeMemory;
    }

    public void insertRecord(Object obj, long j, int i, int i2, boolean z) throws IOException {
        if (getUsed() > this.PushThreshold && this.pageCursor + Utils.byteStringAsBytes("8k") > this.currentPage.getBaseOffset() + this.currentPage.size()) {
            logger.info("Memory Used across threshold, trigger push. Memory: " + getUsed() + ", currentPage size: " + this.currentPage.size());
            pushData();
        }
        growPointerArrayIfNecessary();
        int uaoSize = UnsafeAlignedOffset.getUaoSize();
        acquireNewPageIfNecessary(z ? i + 4 + uaoSize : i + uaoSize);
        if (!$assertionsDisabled && this.currentPage == null) {
            throw new AssertionError();
        }
        Object baseObject = this.currentPage.getBaseObject();
        long encodePageNumberAndOffset = this.taskMemoryManager.encodePageNumberAndOffset(this.currentPage, this.pageCursor);
        if (z) {
            UnsafeAlignedOffset.putSize(baseObject, this.pageCursor, i + 4);
            this.pageCursor += uaoSize;
            Platform.putInt(baseObject, this.pageCursor, Integer.reverseBytes(i));
            this.pageCursor += 4;
            Platform.copyMemory(obj, j, baseObject, this.pageCursor, i);
            this.pageCursor += i;
        } else {
            UnsafeAlignedOffset.putSize(baseObject, this.pageCursor, i);
            this.pageCursor += uaoSize;
            Platform.copyMemory(obj, j, baseObject, this.pageCursor, i);
            this.pageCursor += i;
        }
        this.inMemSorter.insertRecord(encodePageNumberAndOffset, i2);
    }

    private void growPointerArrayIfNecessary() throws IOException {
        if (!$assertionsDisabled && this.inMemSorter == null) {
            throw new AssertionError();
        }
        if (this.inMemSorter.hasSpaceForAnotherRecord()) {
            return;
        }
        try {
            LongArray allocateArray = allocateArray((this.inMemSorter.getMemoryUsage() / 8) * 2);
            if (this.inMemSorter.hasSpaceForAnotherRecord()) {
                freeArray(allocateArray);
            } else {
                this.inMemSorter.expandPointerArray(allocateArray);
            }
        } catch (SparkOutOfMemoryError e) {
            if (this.inMemSorter.hasSpaceForAnotherRecord()) {
                return;
            }
            logger.error("Unable to grow the pointer array");
            throw e;
        } catch (TooLargePageException e2) {
            logger.info("Pushdata in growPointerArrayIfNecessary, memory used " + getUsed());
            pushData();
        }
    }

    private void acquireNewPageIfNecessary(int i) {
        if (this.currentPage == null || this.pageCursor + i > this.currentPage.getBaseOffset() + this.currentPage.size()) {
            this.currentPage = allocatePage(i);
            this.pageCursor = this.currentPage.getBaseOffset();
            this.allocatedPages.add(this.currentPage);
        }
    }

    public long spill(long j, MemoryConsumer memoryConsumer) throws IOException {
        logger.info("Pushdata in spill, memory used " + getUsed());
        if (getUsed() == 0) {
            return 0L;
        }
        logger.info("Pushdata is not empty , do push.");
        return pushData();
    }

    private long freeMemory() {
        long j = 0;
        Iterator<MemoryBlock> it = this.allocatedPages.iterator();
        while (it.hasNext()) {
            MemoryBlock next = it.next();
            j += next.size();
            freePage(next);
        }
        this.allocatedPages.clear();
        this.currentPage = null;
        this.pageCursor = 0L;
        return j;
    }

    public void cleanupResources() {
        freeMemory();
        if (this.inMemSorter != null) {
            this.inMemSorter.free();
            this.inMemSorter = null;
        }
    }

    public void close() throws IOException {
        cleanupResources();
        this.dataPusher.waitOnTermination();
    }

    public long getUsed() {
        return super.getUsed();
    }

    static {
        $assertionsDisabled = !SortBasedPusher.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(SortBasedPusher.class);
    }
}
