package io.trino.aws.proxy.server;

import com.google.common.io.Resources;
import com.google.inject.Inject;
import io.trino.aws.proxy.server.testing.TestingTrinoAwsProxyServer;
import io.trino.aws.proxy.server.testing.containers.DockerAttachUtil;
import io.trino.aws.proxy.server.testing.containers.PySparkContainer;
import io.trino.aws.proxy.server.testing.harness.BuilderFilter;
import io.trino.aws.proxy.server.testing.harness.TrinoAwsProxyTest;
import java.nio.file.Path;
import java.util.Objects;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.S3Client;

@TrinoAwsProxyTest(filters = {Filter.class})
/* loaded from: input_file:io/trino/aws/proxy/server/TestPySparkSql.class */
public class TestPySparkSql {
    public static final String DATABASE_NAME = "db";
    public static final String TABLE_NAME = "people";
    private final S3Client s3Client;
    private final PySparkContainer pySparkContainer;

    /* loaded from: input_file:io/trino/aws/proxy/server/TestPySparkSql$Filter.class */
    public static class Filter implements BuilderFilter {
        @Override // io.trino.aws.proxy.server.testing.harness.BuilderFilter
        public TestingTrinoAwsProxyServer.Builder filter(TestingTrinoAwsProxyServer.Builder builder) {
            return builder.withV3PySparkContainer();
        }
    }

    @Inject
    public TestPySparkSql(S3Client s3Client, PySparkContainer pySparkContainer) {
        this.s3Client = (S3Client) Objects.requireNonNull(s3Client, "s3Client is null");
        this.pySparkContainer = (PySparkContainer) Objects.requireNonNull(pySparkContainer, "pySparkContainer is null");
    }

    @Test
    public void testSql() throws Exception {
        createDatabaseAndTable(this.s3Client, this.pySparkContainer);
        DockerAttachUtil.clearInputStreamAndClose(DockerAttachUtil.inputToContainerStdin(this.pySparkContainer.containerId(), "spark.sql(\"select * from %s.%s\").show()".formatted("db", "people")), str -> {
            return str.equals("|    John Galt| 28|");
        });
        DockerAttachUtil.clearInputStreamAndClose(DockerAttachUtil.inputToContainerStdin(this.pySparkContainer.containerId(), "columns = ['name', 'age']\nvals = [('a', 10), ('b', 20), ('c', 30)]\n\ndf = spark.createDataFrame(vals, columns)\ndf.write.insertInto('%s.%s')\n".formatted("db", "people")), str2 -> {
            return str2.contains("Stage 1:");
        });
        DockerAttachUtil.clearInputStreamAndClose(DockerAttachUtil.inputToContainerStdin(this.pySparkContainer.containerId(), "spark.sql(\"select * from %s.%s\").show()".formatted("db", "people")), str3 -> {
            return str3.equals("|            c| 30|");
        });
    }

    public static void createDatabaseAndTable(S3Client s3Client, PySparkContainer pySparkContainer) throws Exception {
        s3Client.createBucket(builder -> {
            builder.bucket("test");
        });
        s3Client.putObject(builder2 -> {
            builder2.bucket("test").key("table/file.csv");
        }, Path.of(Resources.getResource("test.csv").toURI()));
        DockerAttachUtil.clearInputStreamAndClose(DockerAttachUtil.inputToContainerStdin(pySparkContainer.containerId(), "spark.sql(\"create database %s\")".formatted("db")), str -> {
            return str.equals("DataFrame[]");
        });
        DockerAttachUtil.clearInputStreamAndClose(DockerAttachUtil.inputToContainerStdin(pySparkContainer.containerId(), "spark.sql(\"\"\"\n  CREATE TABLE IF NOT EXISTS %s.%s(name STRING, age INT)\n  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'\n  LOCATION 's3a://test/table/'\n  TBLPROPERTIES (\"s3select.format\" = \"csv\");\n  \"\"\")\n".formatted("db", "people")), str2 -> {
            return str2.equals("DataFrame[]");
        });
    }
}
