package tech.mlsql.plugins.execsql;

import com.alibaba.druid.util.JdbcUtils;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.commons.compress.utils.CharsetNames;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.crypto.keytools.HadoopFSKeyMaterialStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import shadeio.azure.com.fasterxml.jackson.databind.ObjectMapper;
import shadeio.azure.com.fasterxml.jackson.databind.SerializationFeature;
import streaming.core.datasource.JDBCUtils$;
import tech.mlsql.common.utils.cache.Cache;
import tech.mlsql.common.utils.cache.CacheBuilder;
import tech.mlsql.common.utils.cache.RemovalListener;
import tech.mlsql.common.utils.cache.RemovalNotification;
import tech.mlsql.common.utils.log.Logging;
import tech.mlsql.tool.HDFSOperatorV2$;

/* compiled from: JobUtils.scala */
/* loaded from: input_file:tech/mlsql/plugins/execsql/JobUtils$.class */
public final class JobUtils$ implements Logging {
    public static JobUtils$ MODULE$;
    private AtomicReference<String> tech$mlsql$plugins$execsql$JobUtils$$cacheDir;
    private final ConcurrentHashMap<String, ConnectionHolder> connectionPool;
    private final Cache<String, List<String>> cacheFiles;
    private final ScheduledExecutorService cleanThread;
    private transient Logger tech$mlsql$common$utils$log$Logging$$log_;
    private volatile boolean bitmap$0;

    static {
        new JobUtils$();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public Logger tech$mlsql$common$utils$log$Logging$$log_() {
        return this.tech$mlsql$common$utils$log$Logging$$log_;
    }

    public void tech$mlsql$common$utils$log$Logging$$log__$eq(Logger logger) {
        this.tech$mlsql$common$utils$log$Logging$$log_ = logger;
    }

    private ConcurrentHashMap<String, ConnectionHolder> connectionPool() {
        return this.connectionPool;
    }

    private Cache<String, List<String>> cacheFiles() {
        return this.cacheFiles;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [tech.mlsql.plugins.execsql.JobUtils$] */
    private AtomicReference<String> cacheDir$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                ObjectRef create = ObjectRef.create(ExecSQLApp$.MODULE$.getTmpPath());
                if (((String) create.elem) == null || ((String) create.elem).isEmpty()) {
                    logInfo(() -> {
                        return "hadoop.tmp.dir is not set";
                    });
                    create.elem = "/tmp";
                }
                Path path = new Path((String) create.elem, "execsql_cache");
                FileSystem fileSystem = FileSystem.get(HDFSOperatorV2$.MODULE$.hadoopConfiguration());
                if (fileSystem.exists(path)) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxesRunTime.boxToBoolean(fileSystem.mkdirs(path));
                }
                logInfo(() -> {
                    return new StringBuilder(23).append("cache data in ").append(path.toString()).append(" tmpPath:").append((String) create.elem).toString();
                });
                AtomicReference<String> atomicReference = new AtomicReference<>();
                atomicReference.set(path.toString());
                this.tech$mlsql$plugins$execsql$JobUtils$$cacheDir = atomicReference;
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.tech$mlsql$plugins$execsql$JobUtils$$cacheDir;
    }

    public AtomicReference<String> tech$mlsql$plugins$execsql$JobUtils$$cacheDir() {
        return !this.bitmap$0 ? cacheDir$lzycompute() : this.tech$mlsql$plugins$execsql$JobUtils$$cacheDir;
    }

    private ScheduledExecutorService cleanThread() {
        return this.cleanThread;
    }

    public void executeQueryInDriverWithoutResult(SparkSession sparkSession, String str, String str2) {
        ConnectionHolder fetchConnection = fetchConnection(str);
        fetchConnection.connection().setAutoCommit(true);
        if (fetchConnection == null) {
            throw new RuntimeException(new StringBuilder(21).append("connection ").append(str).append(" no found!").toString());
        }
        PreparedStatement prepareStatement = fetchConnection.connection().prepareStatement(str2);
        prepareStatement.execute();
        prepareStatement.close();
    }

    public void try_close(Function0<BoxedUnit> function0) {
        try {
            function0.apply$mcV$sp();
        } catch (Exception e) {
            logError(() -> {
                return "execute func failed";
            }, e);
        }
    }

    public boolean deleteCRCFile(FileSystem fileSystem, String str) {
        String str2 = (String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(str.split("/"))).last();
        return fileSystem.delete(new Path(new StringBuilder(0).append(new StringOps(Predef$.MODULE$.augmentString(str)).stripSuffix(String.valueOf(str2))).append(new StringBuilder(5).append(".").append(str).append(".crc").toString()).toString()), true);
    }

    public Dataset<Row> executeQueryInDriver(SparkSession sparkSession, String str, String str2) {
        ConnectionHolder fetchConnection = fetchConnection(str);
        if (fetchConnection == null) {
            throw new RuntimeException(new StringBuilder(21).append("connection ").append(str).append(" no found!").toString());
        }
        PreparedStatement prepareStatement = fetchConnection.connection().prepareStatement(str2);
        Seq rsToMaps = JDBCUtils$.MODULE$.rsToMaps(prepareStatement.executeQuery());
        prepareStatement.close();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(DefaultScalaModule$.MODULE$);
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return sparkSession.read().json(sparkSession.sparkContext().parallelize((Seq) rsToMaps.map(map -> {
            return objectMapper.writeValueAsString(JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava());
        }, Seq$.MODULE$.canBuildFrom()), sparkSession.sparkContext().parallelize$default$2(), ClassTag$.MODULE$.apply(String.class)));
    }

    private ConnectionHolder fetchConnection(String str) {
        ConnectionHolder connectionHolder = connectionPool().get(str);
        if (connectionHolder == null) {
            throw new RuntimeException(new StringBuilder(21).append("connection ").append(str).append(" no found!").toString());
        }
        Connection connection = connectionHolder.connection();
        if (!connection.isClosed() && connection.isValid(5)) {
            return connectionHolder;
        }
        logInfo(() -> {
            return new StringBuilder(50).append("connection ").append(str).append(" is closed or invalid, try to reconnect").toString();
        });
        removeConnection(str);
        newConnection(str, connectionHolder.options());
        return connectionPool().get(str);
    }

    public Dataset<Row> executeQueryWithDiskCache(SparkSession sparkSession, String str, String str2) {
        ConnectionHolder fetchConnection = fetchConnection(str);
        Connection connection = fetchConnection.connection();
        connection.setAutoCommit(false);
        boolean isMySqlDriver = JdbcUtils.isMySqlDriver((String) fetchConnection.options().apply("driver"));
        if (connection == null) {
            throw new RuntimeException(new StringBuilder(21).append("connection ").append(str).append(" no found!").toString());
        }
        PreparedStatement prepareStatement = connection.prepareStatement(str2);
        if (isMySqlDriver) {
            prepareStatement.setFetchSize(Integer.MIN_VALUE);
        }
        if (((String) fetchConnection.options().apply("driver")).equals("org.postgresql.Driver")) {
            prepareStatement.setFetchSize(ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
            connection.setAutoCommit(false);
        }
        ResultSet executeQuery = prepareStatement.executeQuery();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(DefaultScalaModule$.MODULE$);
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).registerModule(new JavaTimeModule());
        String sb = new StringBuilder(6).append(UUID.randomUUID().toString()).append("-").append(DateTime.now().toString("yyyyMMddHHmmss")).append(HadoopFSKeyMaterialStore.KEY_MATERIAL_FILE_SUFFFIX).toString();
        FSDataOutputStream create = FileSystem.get(HDFSOperatorV2$.MODULE$.hadoopConfiguration()).create(new Path(tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get(), sb), true);
        while (executeQuery.next()) {
            try {
                create.write(new StringBuilder(1).append(objectMapper.writeValueAsString(JavaConverters$.MODULE$.mapAsJavaMapConverter(JDBCUtils$.MODULE$.rsToMap(executeQuery, JDBCUtils$.MODULE$.getRsCloumns(executeQuery))).asJava())).append("\n").toString().getBytes(CharsetNames.UTF_8));
            } catch (Throwable th) {
                try_close(() -> {
                    create.close();
                });
                try_close(() -> {
                    executeQuery.close();
                });
                try_close(() -> {
                    prepareStatement.close();
                });
                throw th;
            }
        }
        try_close(() -> {
            create.close();
        });
        try_close(() -> {
            executeQuery.close();
        });
        try_close(() -> {
            prepareStatement.close();
        });
        ((List) cacheFiles().get(str, new Callable<List<String>>() { // from class: tech.mlsql.plugins.execsql.JobUtils$$anon$3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public List<String> call() {
                return new LinkedList();
            }
        })).add(new Path(tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get(), sb).toString());
        return sparkSession.read().json(new Path(tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get(), sb).toString());
    }

    public Dataset<Row> executeQueryWithDiskCacheParquet(SparkSession sparkSession, String str, String str2) {
        ConnectionHolder fetchConnection = fetchConnection(str);
        Connection connection = fetchConnection.connection();
        boolean isMySqlDriver = JdbcUtils.isMySqlDriver((String) fetchConnection.options().apply("driver"));
        if (connection == null) {
            throw new RuntimeException(new StringBuilder(21).append("connection ").append(str).append(" no found!").toString());
        }
        PreparedStatement prepareStatement = connection.prepareStatement(str2);
        if (isMySqlDriver) {
            prepareStatement.setFetchSize(Integer.MIN_VALUE);
        }
        if (((String) fetchConnection.options().apply("driver")).equals("org.postgresql.Driver")) {
            prepareStatement.setFetchSize(ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
            connection.setAutoCommit(false);
        }
        ResultSet executeQuery = prepareStatement.executeQuery();
        ResultSetMetaData metaData = executeQuery.getMetaData();
        SchemaBuilder.FieldAssembler<Schema> fields = SchemaBuilder.record("record").fields();
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), metaData.getColumnCount()).foreach(obj -> {
            return $anonfun$executeQueryWithDiskCacheParquet$1(metaData, fields, BoxesRunTime.unboxToInt(obj));
        });
        Schema endRecord = fields.endRecord();
        String sb = new StringBuilder(9).append(UUID.randomUUID().toString()).append("-").append(DateTime.now().toString("yyyyMMddHHmmss")).append(".parquet").toString();
        Path path = new Path(tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get(), sb);
        ParquetWriter<T> build = ((AvroParquetWriter.Builder) AvroParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withCompressionCodec(CompressionCodecName.SNAPPY)).withSchema(endRecord).build();
        GenericData.Record record = new GenericData.Record(endRecord);
        while (executeQuery.next()) {
            try {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), metaData.getColumnCount()).foreach$mVc$sp(i -> {
                    record.put(metaData.getColumnLabel(i), executeQuery.getObject(i));
                });
                build.write(record);
            } catch (Throwable th) {
                try_close(() -> {
                    build.close();
                });
                try_close(() -> {
                    executeQuery.close();
                });
                try_close(() -> {
                    prepareStatement.close();
                });
                throw th;
            }
        }
        try_close(() -> {
            build.close();
        });
        try_close(() -> {
            executeQuery.close();
        });
        try_close(() -> {
            prepareStatement.close();
        });
        ((List) cacheFiles().get(str, new Callable<List<String>>() { // from class: tech.mlsql.plugins.execsql.JobUtils$$anon$4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public List<String> call() {
                return new LinkedList();
            }
        })).add(new Path(tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get(), sb).toString());
        return sparkSession.read().parquet(new Path(tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get(), sb).toString());
    }

    public void cleanOldFiles(Path path) {
        FileSystem fileSystem = FileSystem.get(HDFSOperatorV2$.MODULE$.hadoopConfiguration());
        FileStatus[] listStatus = fileSystem.listStatus(path);
        DateTime now = DateTime.now();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(listStatus)).foreach(fileStatus -> {
            String name = fileStatus.getPath().getName();
            if (!name.endsWith(HadoopFSKeyMaterialStore.KEY_MATERIAL_FILE_SUFFFIX) && !name.endsWith(".parquet")) {
                return BoxedUnit.UNIT;
            }
            if (now.getMillis() - DateTime.parse(new StringOps(Predef$.MODULE$.augmentString((String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(name.split("-"))).last())).stripSuffix(name.endsWith(HadoopFSKeyMaterialStore.KEY_MATERIAL_FILE_SUFFFIX) ? HadoopFSKeyMaterialStore.KEY_MATERIAL_FILE_SUFFFIX : ".parquet"), DateTimeFormat.forPattern("yyyyMMddHHmmss")).getMillis() <= 172800000) {
                return BoxedUnit.UNIT;
            }
            MODULE$.logInfo(() -> {
                return new StringBuilder(11).append("clean file ").append(fileStatus.getPath().toString()).toString();
            });
            fileSystem.delete(fileStatus.getPath(), true);
            return BoxesRunTime.boxToBoolean(MODULE$.deleteCRCFile(fileSystem, fileStatus.getPath().toString()));
        });
    }

    public synchronized ConnectionHolder newConnection(String str, Map<String, String> map) {
        String str2 = (String) map.apply("driver");
        String str3 = (String) map.apply("url");
        logInfo(() -> {
            return new StringBuilder(19).append("create connection ").append(str).append(" ").append(str3).toString();
        });
        Class.forName(str2);
        Connection connection = DriverManager.getConnection(str3, JDBCUtils$.MODULE$.formatOptions(map));
        if (connectionPool().containsKey(str)) {
            removeConnection(str);
        }
        connectionPool().put(str, new ConnectionHolder(str, map, connection));
        return connectionPool().get(str);
    }

    public synchronized void removeConnection(String str) {
        if (connectionPool().containsKey(str)) {
            Connection connection = connectionPool().get(str).connection();
            try_close(() -> {
                connection.close();
            });
            connectionPool().remove(str);
            cacheFiles().invalidate(str);
        }
    }

    public static final /* synthetic */ SchemaBuilder.FieldAssembler $anonfun$executeQueryWithDiskCacheParquet$1(ResultSetMetaData resultSetMetaData, SchemaBuilder.FieldAssembler fieldAssembler, int i) {
        Schema stringType;
        String columnLabel = resultSetMetaData.getColumnLabel(i);
        switch (resultSetMetaData.getColumnType(i)) {
            case -5:
                stringType = SchemaBuilder.builder().longType();
                break;
            case -2:
                stringType = SchemaBuilder.builder().bytesType();
                break;
            case 1:
            case 12:
                stringType = SchemaBuilder.builder().stringType();
                break;
            case 4:
                stringType = SchemaBuilder.builder().intType();
                break;
            case 6:
                stringType = SchemaBuilder.builder().floatType();
                break;
            case 8:
                stringType = SchemaBuilder.builder().doubleType();
                break;
            case 16:
                stringType = SchemaBuilder.builder().booleanType();
                break;
            case 91:
                stringType = SchemaBuilder.builder().intType();
                break;
            default:
                stringType = SchemaBuilder.builder().stringType();
                break;
        }
        return fieldAssembler.name(columnLabel).type(stringType).noDefault();
    }

    private JobUtils$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.connectionPool = new ConcurrentHashMap<>();
        this.cacheFiles = CacheBuilder.newBuilder().maximumSize(100000L).removalListener(new RemovalListener<String, List<String>>() { // from class: tech.mlsql.plugins.execsql.JobUtils$$anon$1
            public void onRemoval(RemovalNotification<String, List<String>> removalNotification) {
                List list = (List) removalNotification.getValue();
                if (list != null) {
                    FileSystem fileSystem = FileSystem.get(HDFSOperatorV2$.MODULE$.hadoopConfiguration());
                    ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(list).asScala()).foreach(str -> {
                        try {
                            JobUtils$.MODULE$.logInfo(() -> {
                                return new StringBuilder(18).append("remove cache file ").append(str).toString();
                            });
                            fileSystem.delete(new Path(str), true);
                            return BoxesRunTime.boxToBoolean(JobUtils$.MODULE$.deleteCRCFile(fileSystem, str));
                        } catch (Exception e) {
                            JobUtils$.MODULE$.logError(() -> {
                                return new StringBuilder(25).append("remove cache file ").append(str).append(" failed").toString();
                            }, e);
                            return BoxedUnit.UNIT;
                        }
                    });
                }
            }
        }).expireAfterWrite(2L, TimeUnit.DAYS).build();
        this.cleanThread = Executors.newSingleThreadScheduledExecutor();
        cleanThread().scheduleWithFixedDelay(new Runnable() { // from class: tech.mlsql.plugins.execsql.JobUtils$$anon$2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    JobUtils$.MODULE$.logInfo(() -> {
                        return "try to clean old files...";
                    });
                    if (JobUtils$.MODULE$.tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get() != null) {
                        JobUtils$.MODULE$.cleanOldFiles(new Path(JobUtils$.MODULE$.tech$mlsql$plugins$execsql$JobUtils$$cacheDir().get()));
                    }
                } catch (Exception e) {
                    JobUtils$.MODULE$.logError(() -> {
                        return "clean old files failed";
                    }, e);
                }
            }
        }, 30L, 30L, TimeUnit.MINUTES);
    }
}
