package com.aliyun.emr.rss.common.haclient;

import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.protocol.RpcNameConstants;
import com.aliyun.emr.rss.common.protocol.message.ControlMessages$OneWayMessageResponse$;
import com.aliyun.emr.rss.common.protocol.message.MasterRequestMessage;
import com.aliyun.emr.rss.common.protocol.message.Message;
import com.aliyun.emr.rss.common.rpc.RpcAddress;
import com.aliyun.emr.rss.common.rpc.RpcEndpointRef;
import com.aliyun.emr.rss.common.rpc.RpcEnv;
import com.aliyun.emr.rss.common.rpc.RpcTimeout;
import com.aliyun.emr.rss.common.util.RpcUtils;
import com.aliyun.emr.rss.common.util.ThreadUtils;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.concurrent.duration.Duration;
import scala.reflect.ClassTag$;

/* loaded from: input_file:com/aliyun/emr/rss/common/haclient/RssHARetryClient.class */
public class RssHARetryClient {
    private final RpcEnv rpcEnv;
    private final int masterPort;
    private final List<String> masterHosts;
    private final int maxTries;
    private final RpcTimeout rpcTimeout;
    private final AtomicReference<RpcEndpointRef> rpcEndpointRef = new AtomicReference<>();
    private final ExecutorService oneWayMessageSender = ThreadUtils.newDaemonSingleThreadExecutor("One-Way-Message-Sender");
    private static final String SPLITER = "#";
    private static final Logger LOG = LoggerFactory.getLogger(RssHARetryClient.class);
    private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();

    public RssHARetryClient(RpcEnv rpcEnv, RssConf rssConf) {
        this.rpcEnv = rpcEnv;
        this.masterPort = RssConf.masterPort(rssConf);
        this.masterHosts = Arrays.asList(RssConf.haMasterHosts(rssConf).split(","));
        this.maxTries = Math.max(this.masterHosts.size(), RssConf.haClientMaxTries(rssConf));
        this.rpcTimeout = RpcUtils.askRpcTimeout(rssConf);
    }

    static long nextCallId() {
        return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
    }

    public static Tuple2<String, Long> decodeRequestId(String str) {
        if (str.contains(SPLITER)) {
            return new Tuple2<>(str.split(SPLITER)[0], Long.valueOf(str.split(SPLITER)[1]));
        }
        return null;
    }

    public static String encodeRequestId(String str, long j) {
        return String.format("%s%s%d", str, SPLITER, Long.valueOf(j));
    }

    public static String genRequestId() {
        return encodeRequestId(UUID.randomUUID().toString(), nextCallId());
    }

    public void send(Message message) throws Throwable {
        this.oneWayMessageSender.submit(() -> {
            try {
                sendMessageInner(message, ControlMessages$OneWayMessageResponse$.class);
            } catch (Throwable th) {
                LOG.warn("Exception occurs while send one-way message.", th);
            }
        });
        LOG.debug("Send one-way message {}.", message);
    }

    public <T> T askSync(Message message, Class<T> cls) throws Throwable {
        return (T) sendMessageInner(message, cls);
    }

    public void close() {
        ThreadUtils.shutdown(this.oneWayMessageSender, Duration.apply("800ms"));
    }

    private <T> T sendMessageInner(Message message, Class<T> cls) throws Throwable {
        Throwable th = null;
        int i = 0;
        boolean z = true;
        if (message instanceof MasterRequestMessage) {
            ((MasterRequestMessage) message).requestId_(encodeRequestId(UUID.randomUUID().toString(), nextCallId()));
        }
        LOG.debug("Send rpc message " + message);
        RpcEndpointRef rpcEndpointRef = null;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        while (i < this.maxTries && z) {
            try {
                rpcEndpointRef = getOrSetupRpcEndpointRef(atomicInteger);
                return (T) this.rpcTimeout.awaitResult(rpcEndpointRef.ask(message, this.rpcTimeout, ClassTag$.MODULE$.apply(cls)));
            } catch (Throwable th2) {
                th = th2;
                z = shouldRetry(rpcEndpointRef, th);
                if (z) {
                    i++;
                    Uninterruptibles.sleepUninterruptibly(Math.min(i * 100, 2000L), TimeUnit.MILLISECONDS);
                }
            }
        }
        LOG.error("Send rpc with failure, has tried {}, max try {}!", new Object[]{Integer.valueOf(i), Integer.valueOf(this.maxTries), th});
        throw th;
    }

    private boolean shouldRetry(@Nullable RpcEndpointRef rpcEndpointRef, Throwable th) {
        if (!(th.getCause() instanceof MasterNotLeaderException)) {
            if (!(th.getCause() instanceof IOException)) {
                return false;
            }
            resetRpcEndpointRef(rpcEndpointRef);
            return true;
        }
        String suggestedLeaderAddress = ((MasterNotLeaderException) th.getCause()).getSuggestedLeaderAddress();
        if (suggestedLeaderAddress.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) {
            LOG.warn("Master leader is not present currently, please check masters' status!");
            return true;
        }
        setRpcEndpointRef(suggestedLeaderAddress);
        return true;
    }

    private void setRpcEndpointRef(String str) {
        this.rpcEndpointRef.set(setupEndpointRef(str));
        LOG.info("Fail over to master {}.", str);
    }

    private void resetRpcEndpointRef(@Nullable RpcEndpointRef rpcEndpointRef) {
        if (this.rpcEndpointRef.compareAndSet(rpcEndpointRef, null)) {
            LOG.debug("Reset the connection to master {}.", rpcEndpointRef != null ? rpcEndpointRef.address() : "null");
        }
    }

    private RpcEndpointRef getOrSetupRpcEndpointRef(AtomicInteger atomicInteger) {
        RpcEndpointRef rpcEndpointRef = this.rpcEndpointRef.get();
        if (rpcEndpointRef == null) {
            int i = atomicInteger.get();
            do {
                if (this.rpcEndpointRef.compareAndSet(null, setupEndpointRef(this.masterHosts.get(i)))) {
                    i = (i + 1) % this.masterHosts.size();
                }
                rpcEndpointRef = this.rpcEndpointRef.get();
                if (rpcEndpointRef != null) {
                    break;
                }
            } while (i != atomicInteger.get());
            atomicInteger.set(i);
            if (rpcEndpointRef == null) {
                throw new IllegalStateException("After trying all the available Master Addresses, an usable link still couldn't be created.");
            }
            LOG.info("connect to master {}.", rpcEndpointRef.address());
        }
        return rpcEndpointRef;
    }

    private RpcEndpointRef setupEndpointRef(String str) {
        RpcEndpointRef rpcEndpointRef = null;
        try {
            rpcEndpointRef = this.rpcEnv.setupEndpointRef(new RpcAddress(str, this.masterPort), RpcNameConstants.MASTER_EP);
        } catch (Exception e) {
            LOG.warn("Connect to {}:{} failed.", new Object[]{str, Integer.valueOf(this.masterPort), e});
        }
        return rpcEndpointRef;
    }
}
