package org.apache.spark.shuffle.rss;

import com.aliyun.emr.rss.client.ShuffleClient;
import com.aliyun.emr.rss.client.read.MetricsCallback;
import com.aliyun.emr.rss.client.read.RssInputStream;
import com.aliyun.emr.rss.common.RssConf;
import org.apache.spark.Aggregator;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.util.CompletionIterator$;
import org.apache.spark.util.collection.ExternalSorter;
import org.apache.spark.util.collection.ExternalSorter$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Product2;
import scala.Some;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.immutable.IndexedSeq$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: RssShuffleReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005d\u0001\u0002\u000b\u0016\u0001\u0001B\u0001\u0002\u0011\u0001\u0003\u0002\u0003\u0006I!\u0011\u0005\t\u0013\u0002\u0011\t\u0011)A\u0005\u0015\"AQ\n\u0001B\u0001B\u0003%!\n\u0003\u0005O\u0001\t\u0005\t\u0015!\u0003K\u0011!y\u0005A!A!\u0002\u0013Q\u0005\u0002\u0003)\u0001\u0005\u0003\u0005\u000b\u0011B)\t\u0011U\u0003!\u0011!Q\u0001\nYC\u0001b\u0019\u0001\u0003\u0002\u0003\u0006I\u0001\u001a\u0005\u0006O\u0002!\t\u0001\u001b\u0005\bm\u0002\u0011\r\u0011\"\u0003x\u0011\u0019q\b\u0001)A\u0005q\"Aq\u0010\u0001b\u0001\n\u0013\t\t\u0001\u0003\u0005\u0002\u0010\u0001\u0001\u000b\u0011BA\u0002\u0011\u001d\t\t\u0002\u0001C!\u0003'9\u0011\"a\r\u0016\u0003\u0003E\t!!\u000e\u0007\u0011Q)\u0012\u0011!E\u0001\u0003oAaa\u001a\t\u0005\u0002\u0005e\u0002\"CA\u001e!E\u0005I\u0011AA\u001f\u0011%\tI\u0006EI\u0001\n\u0003\tYF\u0001\tSgN\u001c\u0006.\u001e4gY\u0016\u0014V-\u00193fe*\u0011acF\u0001\u0004eN\u001c(B\u0001\r\u001a\u0003\u001d\u0019\b.\u001e4gY\u0016T!AG\u000e\u0002\u000bM\u0004\u0018M]6\u000b\u0005qi\u0012AB1qC\u000eDWMC\u0001\u001f\u0003\ry'oZ\u0002\u0001+\r\tc\u0006O\n\u0005\u0001\tB#\b\u0005\u0002$M5\tAEC\u0001&\u0003\u0015\u00198-\u00197b\u0013\t9CE\u0001\u0004B]f\u0014VM\u001a\t\u0005S)bs'D\u0001\u0018\u0013\tYsCA\u0007TQV4g\r\\3SK\u0006$WM\u001d\t\u0003[9b\u0001\u0001B\u00030\u0001\t\u0007\u0001GA\u0001L#\t\tD\u0007\u0005\u0002$e%\u00111\u0007\n\u0002\b\u001d>$\b.\u001b8h!\t\u0019S'\u0003\u00027I\t\u0019\u0011I\\=\u0011\u00055BD!B\u001d\u0001\u0005\u0004\u0001$!A\"\u0011\u0005mrT\"\u0001\u001f\u000b\u0005uJ\u0012\u0001C5oi\u0016\u0014h.\u00197\n\u0005}b$a\u0002'pO\u001eLgnZ\u0001\u0007Q\u0006tG\r\\31\u0005\t;\u0005#B\"EY\u0019;T\"A\u000b\n\u0005\u0015+\"\u0001\u0005*tgNCWO\u001a4mK\"\u000bg\u000e\u001a7f!\tis\tB\u0005I\u0003\u0005\u0005\t\u0011!B\u0001a\t\u0019q\fJ\u0019\u0002\u001dM$\u0018M\u001d;QCJ$\u0018\u000e^5p]B\u00111eS\u0005\u0003\u0019\u0012\u00121!\u00138u\u00031)g\u000e\u001a)beRLG/[8o\u00035\u0019H/\u0019:u\u001b\u0006\u0004\u0018J\u001c3fq\u0006YQM\u001c3NCBLe\u000eZ3y\u0003\u001d\u0019wN\u001c;fqR\u0004\"AU*\u000e\u0003eI!\u0001V\r\u0003\u0017Q\u000b7o[\"p]R,\u0007\u0010^\u0001\u0005G>tg\r\u0005\u0002XC6\t\u0001L\u0003\u0002Z5\u000611m\\7n_:T!AF.\u000b\u0005qk\u0016aA3ne*\u0011alX\u0001\u0007C2L\u00170\u001e8\u000b\u0003\u0001\f1aY8n\u0013\t\u0011\u0007LA\u0004SgN\u001cuN\u001c4\u0002\u000f5,GO]5dgB\u0011\u0011&Z\u0005\u0003M^\u0011!d\u00155vM\u001adWMU3bI6+GO]5dgJ+\u0007o\u001c:uKJ\fa\u0001P5oSRtD#C5k_B\f(o\u001d;v!\u0011\u0019\u0005\u0001L\u001c\t\u000b\u0001K\u0001\u0019A61\u00051t\u0007#B\"EY5<\u0004CA\u0017o\t%A%.!A\u0001\u0002\u000b\u0005\u0001\u0007C\u0003J\u0013\u0001\u0007!\nC\u0003N\u0013\u0001\u0007!\nC\u0004O\u0013A\u0005\t\u0019\u0001&\t\u000f=K\u0001\u0013!a\u0001\u0015\")\u0001+\u0003a\u0001#\")Q+\u0003a\u0001-\")1-\u0003a\u0001I\u0006\u0019A-\u001a9\u0016\u0003a\u0004$!_?\u0011\u000bISH\u0006`\u001c\n\u0005mL\"!E*ik\u001a4G.\u001a#fa\u0016tG-\u001a8dsB\u0011Q& \u0003\n\u0011\u0006\t\t\u0011!A\u0003\u0002A\nA\u0001Z3qA\u0005\u0001\"o]:TQV4g\r\\3DY&,g\u000e^\u000b\u0003\u0003\u0007\u0001B!!\u0002\u0002\f5\u0011\u0011q\u0001\u0006\u0004\u0003\u0013Q\u0016AB2mS\u0016tG/\u0003\u0003\u0002\u000e\u0005\u001d!!D*ik\u001a4G.Z\"mS\u0016tG/A\tsgN\u001c\u0006.\u001e4gY\u0016\u001cE.[3oi\u0002\nAA]3bIR\u0011\u0011Q\u0003\t\u0007\u0003/\t9#!\f\u000f\t\u0005e\u00111\u0005\b\u0005\u00037\t\t#\u0004\u0002\u0002\u001e)\u0019\u0011qD\u0010\u0002\rq\u0012xn\u001c;?\u0013\u0005)\u0013bAA\u0013I\u00059\u0001/Y2lC\u001e,\u0017\u0002BA\u0015\u0003W\u0011\u0001\"\u0013;fe\u0006$xN\u001d\u0006\u0004\u0003K!\u0003#B\u0012\u000201:\u0014bAA\u0019I\tA\u0001K]8ek\u000e$('\u0001\tSgN\u001c\u0006.\u001e4gY\u0016\u0014V-\u00193feB\u00111\tE\n\u0003!\t\"\"!!\u000e\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00135+\u0019\ty$!\u0016\u0002XU\u0011\u0011\u0011\t\u0016\u0004\u0015\u0006\r3FAA#!\u0011\t9%!\u0015\u000e\u0005\u0005%#\u0002BA&\u0003\u001b\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005=C%\u0001\u0006b]:|G/\u0019;j_:LA!a\u0015\u0002J\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0005\u000b=\u0012\"\u0019\u0001\u0019\u0005\u000be\u0012\"\u0019\u0001\u0019\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00136+\u0019\ty$!\u0018\u0002`\u0011)qf\u0005b\u0001a\u0011)\u0011h\u0005b\u0001a\u0001")
/* loaded from: input_file:org/apache/spark/shuffle/rss/RssShuffleReader.class */
public class RssShuffleReader<K, C> implements ShuffleReader<K, C>, Logging {
    private final RssShuffleHandle<K, ?, C> handle;
    private final int startPartition;
    private final int endPartition;
    private final int startMapIndex;
    private final int endMapIndex;
    private final TaskContext context;
    public final ShuffleReadMetricsReporter org$apache$spark$shuffle$rss$RssShuffleReader$$metrics;
    private final ShuffleDependency<K, ?, C> dep;
    private final ShuffleClient rssShuffleClient;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private ShuffleDependency<K, ?, C> dep() {
        return this.dep;
    }

    private ShuffleClient rssShuffleClient() {
        return this.rssShuffleClient;
    }

    public Iterator<Product2<K, C>> read() {
        Iterator iterator;
        Ordering ordering;
        SerializerInstance newInstance = dep().serializer().newInstance();
        MetricsCallback metricsCallback = new MetricsCallback(this) { // from class: org.apache.spark.shuffle.rss.RssShuffleReader$$anon$1
            private final /* synthetic */ RssShuffleReader $outer;

            @Override // com.aliyun.emr.rss.client.read.MetricsCallback
            public void incBytesRead(long j) {
                this.$outer.org$apache$spark$shuffle$rss$RssShuffleReader$$metrics.incRemoteBytesRead(j);
                this.$outer.org$apache$spark$shuffle$rss$RssShuffleReader$$metrics.incRemoteBlocksFetched(1L);
            }

            @Override // com.aliyun.emr.rss.client.read.MetricsCallback
            public void incReadTime(long j) {
                this.$outer.org$apache$spark$shuffle$rss$RssShuffleReader$$metrics.incFetchWaitTime(j);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
        Iterator interruptibleIterator = new InterruptibleIterator(this.context, CompletionIterator$.MODULE$.apply(((IterableLike) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(this.startPartition), this.endPartition).map(obj -> {
            return $anonfun$read$1(this, metricsCallback, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toIterator().flatMap(rssInputStream -> {
            return newInstance.deserializeStream(rssInputStream).asKeyValueIterator();
        }).map(tuple2 -> {
            this.org$apache$spark$shuffle$rss$RssShuffleReader$$metrics.incRecordsRead(1L);
            return tuple2;
        }), () -> {
            this.context.taskMetrics().mergeShuffleReadMetrics();
        }));
        Iterator combineCombinersByKey = dep().aggregator().isDefined() ? dep().mapSideCombine() ? ((Aggregator) dep().aggregator().get()).combineCombinersByKey(interruptibleIterator, this.context) : ((Aggregator) dep().aggregator().get()).combineValuesByKey(interruptibleIterator, this.context) : interruptibleIterator;
        Some keyOrdering = dep().keyOrdering();
        if ((keyOrdering instanceof Some) && (ordering = (Ordering) keyOrdering.value()) != null) {
            ExternalSorter externalSorter = new ExternalSorter(this.context, ExternalSorter$.MODULE$.$lessinit$greater$default$2(), ExternalSorter$.MODULE$.$lessinit$greater$default$3(), new Some(ordering), dep().serializer());
            externalSorter.insertAll(combineCombinersByKey);
            this.context.taskMetrics().incMemoryBytesSpilled(externalSorter.memoryBytesSpilled());
            this.context.taskMetrics().incDiskBytesSpilled(externalSorter.diskBytesSpilled());
            this.context.taskMetrics().incPeakExecutionMemory(externalSorter.peakMemoryUsedBytes());
            this.context.addTaskCompletionListener(taskContext -> {
                externalSorter.stop();
                return BoxedUnit.UNIT;
            });
            iterator = CompletionIterator$.MODULE$.apply(externalSorter.iterator(), () -> {
                externalSorter.stop();
            });
        } else {
            if (!None$.MODULE$.equals(keyOrdering)) {
                throw new MatchError(keyOrdering);
            }
            iterator = combineCombinersByKey;
        }
        Iterator iterator2 = iterator;
        return iterator2 instanceof InterruptibleIterator ? iterator2 : new InterruptibleIterator(this.context, iterator2);
    }

    public static final /* synthetic */ RssInputStream $anonfun$read$1(RssShuffleReader rssShuffleReader, MetricsCallback metricsCallback, int i) {
        if (rssShuffleReader.handle.numMappers() <= 0) {
            return RssInputStream.empty();
        }
        long currentTimeMillis = System.currentTimeMillis();
        RssInputStream readPartition = rssShuffleReader.rssShuffleClient().readPartition(rssShuffleReader.handle.newAppId(), rssShuffleReader.handle.shuffleId(), i, rssShuffleReader.context.attemptNumber(), rssShuffleReader.startMapIndex, rssShuffleReader.endMapIndex);
        metricsCallback.incReadTime(System.currentTimeMillis() - currentTimeMillis);
        readPartition.setCallback(metricsCallback);
        return readPartition;
    }

    public RssShuffleReader(RssShuffleHandle<K, ?, C> rssShuffleHandle, int i, int i2, int i3, int i4, TaskContext taskContext, RssConf rssConf, ShuffleReadMetricsReporter shuffleReadMetricsReporter) {
        this.handle = rssShuffleHandle;
        this.startPartition = i;
        this.endPartition = i2;
        this.startMapIndex = i3;
        this.endMapIndex = i4;
        this.context = taskContext;
        this.org$apache$spark$shuffle$rss$RssShuffleReader$$metrics = shuffleReadMetricsReporter;
        Logging.$init$(this);
        this.dep = rssShuffleHandle.dependency();
        this.rssShuffleClient = ShuffleClient.get(rssShuffleHandle.essMetaServiceHost(), rssShuffleHandle.essMetaServicePort(), rssConf);
    }
}
