package alluxio.master.table.transform;

import alluxio.client.job.JobMasterClient;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.heartbeat.HeartbeatScheduler;
import alluxio.heartbeat.ManuallyScheduleHeartbeat;
import alluxio.job.JobConfig;
import alluxio.job.wire.PlanInfo;
import alluxio.job.wire.Status;
import alluxio.master.CoreMasterContext;
import alluxio.master.MasterTestUtils;
import alluxio.master.PortRegistry;
import alluxio.master.journal.JournalSystem;
import alluxio.master.journal.JournalTestUtils;
import alluxio.master.table.DefaultTableMaster;
import alluxio.master.table.Partition;
import alluxio.master.table.TableMaster;
import alluxio.master.table.TestDatabase;
import alluxio.master.table.TestUdbFactory;
import alluxio.table.common.Layout;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:alluxio/master/table/transform/TransformManagerTest.class */
public class TransformManagerTest {
    private static final int NUM_TABLES = 3;
    private static final String DB = "test_udb_name";
    private static final String EMPTY_DEFINITION = "";
    private static final String DEFINITION1 = "write(hive).option(hive.file.count.max, 1)";
    private static final String DEFINITION2 = "write(hive).option(hive.file.count.max, 2)";
    private JournalSystem mJournalSystem;
    private TableMaster mTableMaster;
    private JobMasterClient mMockJobMasterClient;

    @Rule
    public TemporaryFolder mTemporaryFolder = new TemporaryFolder();

    @Rule
    public ExpectedException mException = ExpectedException.none();

    @Rule
    public ManuallyScheduleHeartbeat mManualScheduler = new ManuallyScheduleHeartbeat(new String[]{"Master Table Transformation Monitor"});
    private static final String TABLE1 = TestDatabase.getTableName(0);
    private static final String TABLE2 = TestDatabase.getTableName(1);
    private static final int NUM_PARTITIONS = 2;
    private static final String TABLE3 = TestDatabase.getTableName(NUM_PARTITIONS);

    @Before
    public void before() throws Exception {
        ServerConfiguration.set(PropertyKey.MASTER_HOSTNAME, "localhost");
        ServerConfiguration.set(PropertyKey.MASTER_RPC_PORT, Integer.valueOf(PortRegistry.getFreePort()));
        ServerConfiguration.set(PropertyKey.MASTER_JOURNAL_TYPE, "UFS");
        ServerConfiguration.set(PropertyKey.TABLE_TRANSFORM_MANAGER_JOB_HISTORY_RETENTION_TIME, "1h");
        this.mJournalSystem = JournalTestUtils.createJournalSystem(this.mTemporaryFolder);
        this.mJournalSystem.format();
        CoreMasterContext testMasterContext = MasterTestUtils.testMasterContext(this.mJournalSystem);
        this.mMockJobMasterClient = (JobMasterClient) Mockito.mock(JobMasterClient.class);
        this.mTableMaster = new DefaultTableMaster(testMasterContext, this.mMockJobMasterClient);
        start();
        TestDatabase.genTable(NUM_TABLES, NUM_PARTITIONS, false);
        this.mTableMaster.attachDatabase(TestUdbFactory.TYPE, "connect", "test_udb_name", "test_udb_name", Collections.emptyMap(), false);
    }

    @After
    public void after() throws Exception {
        stop();
    }

    @Test
    public void noConcurrentJobOnSameTable() throws Exception {
        expectException(IOException.class, ExceptionMessage.TABLE_BEING_TRANSFORMED.getMessage(new Object[]{Long.toString(transform(TABLE1, DEFINITION1)), TABLE1, "test_udb_name"}));
        transform(TABLE1, DEFINITION2);
    }

    @Test
    public void noRepeatedJobOnSameTable() throws Exception {
        mockJobStatus(transform(TABLE1, DEFINITION1), Status.COMPLETED, null);
        heartbeat();
        expectException(IOException.class, ExceptionMessage.TABLE_ALREADY_TRANSFORMED.getMessage(new Object[]{"test_udb_name", TABLE1, DEFINITION1}));
        transform(TABLE1, DEFINITION1);
    }

    @Test
    public void getInfoForNonExistingTransformJob() throws Exception {
        Assert.assertTrue(this.mTableMaster.getAllTransformJobInfo().isEmpty());
        expectException(IOException.class, ExceptionMessage.TRANSFORM_JOB_DOES_NOT_EXIST.getMessage(new Object[]{-1L}));
        this.mTableMaster.getTransformJobInfo(-1L);
    }

    @Test
    public void defaultJob() throws Exception {
        Assert.assertTrue(this.mTableMaster.getAllTransformJobInfo().isEmpty());
        long transform = transform(TABLE1, EMPTY_DEFINITION);
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform), TABLE1, "write(hive)", transform, Status.RUNNING, null);
        mockJobStatus(transform, Status.COMPLETED, null);
        heartbeat();
        Assert.assertEquals(1L, this.mTableMaster.getAllTransformJobInfo().size());
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform), TABLE1, "write(hive)", transform, Status.COMPLETED, null);
    }

    @Test
    public void jobHistory() throws Exception {
        Assert.assertTrue(this.mTableMaster.getAllTransformJobInfo().isEmpty());
        long transform = transform(TABLE1, DEFINITION1);
        long transform2 = transform(TABLE2, DEFINITION2);
        long transform3 = transform(TABLE3, DEFINITION1);
        Assert.assertEquals(3L, this.mTableMaster.getAllTransformJobInfo().size());
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform), TABLE1, DEFINITION1, transform, Status.RUNNING, null);
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform2), TABLE2, DEFINITION2, transform2, Status.RUNNING, null);
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform3), TABLE3, DEFINITION1, transform3, Status.RUNNING, null);
        mockJobStatus(transform, Status.COMPLETED, null);
        mockJobStatus(transform2, Status.FAILED, "error");
        mockJobStatus(transform3, Status.RUNNING, null);
        heartbeat();
        Assert.assertEquals(3L, this.mTableMaster.getAllTransformJobInfo().size());
        TransformJobInfo transformJobInfo = this.mTableMaster.getTransformJobInfo(transform);
        checkTransformJobInfo(transformJobInfo, TABLE1, DEFINITION1, transform, Status.COMPLETED, null);
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform2), TABLE2, DEFINITION2, transform2, Status.FAILED, "error");
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform3), TABLE3, DEFINITION1, transform3, Status.RUNNING, null);
        restart();
        checkLayout(transformJobInfo, TABLE1);
        Assert.assertEquals(1L, this.mTableMaster.getAllTransformJobInfo().size());
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform3), TABLE3, DEFINITION1, transform3, Status.RUNNING, null);
        mockJobStatus(transform3, Status.COMPLETED, null);
        heartbeat();
        Assert.assertEquals(1L, this.mTableMaster.getAllTransformJobInfo().size());
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform3), TABLE3, DEFINITION1, transform3, Status.COMPLETED, null);
    }

    @Test
    public void jobMasterRestart() throws Exception {
        long transform = transform(TABLE1, DEFINITION1);
        Mockito.when(this.mMockJobMasterClient.getJobStatus(transform)).thenThrow(new Throwable[]{new NotFoundException("none")});
        heartbeat();
        Assert.assertEquals(1L, this.mTableMaster.getAllTransformJobInfo().size());
        checkTransformJobInfo(this.mTableMaster.getTransformJobInfo(transform), TABLE1, DEFINITION1, transform, Status.FAILED, ExceptionMessage.TRANSFORM_JOB_ID_NOT_FOUND_IN_JOB_SERVICE.getMessage(new Object[]{Long.valueOf(transform), "test_udb_name", TABLE1, "none"}));
    }

    private void start() throws Exception {
        this.mJournalSystem.start();
        this.mJournalSystem.gainPrimacy();
        this.mTableMaster.start(true);
    }

    private void stop() throws Exception {
        this.mTableMaster.stop();
        this.mJournalSystem.stop();
    }

    private void restart() throws Exception {
        stop();
        start();
    }

    private long getRandomJobId() {
        return new Random().nextLong();
    }

    private long transform(String str, String str2) throws Exception {
        Mockito.when(Long.valueOf(this.mMockJobMasterClient.run((JobConfig) Matchers.any(JobConfig.class)))).thenReturn(Long.valueOf(getRandomJobId()));
        return this.mTableMaster.transformTable("test_udb_name", str, str2);
    }

    private void expectException(Class<? extends Throwable> cls, String str) {
        this.mException.expect(cls);
        this.mException.expectMessage(str);
    }

    private void checkTransformJobInfo(TransformJobInfo transformJobInfo, String str, String str2, long j, Status status, @Nullable String str3) throws Exception {
        Assert.assertEquals("test_udb_name", transformJobInfo.getDb());
        Assert.assertEquals(str, transformJobInfo.getTable());
        Assert.assertEquals(str2, transformJobInfo.getDefinition());
        Assert.assertEquals(j, transformJobInfo.getJobId());
        Assert.assertEquals(status, transformJobInfo.getJobStatus());
        if (str3 != null) {
            Assert.assertEquals(str3, transformJobInfo.getJobErrorMessage());
        } else {
            Assert.assertEquals(EMPTY_DEFINITION, transformJobInfo.getJobErrorMessage());
        }
        if (status == Status.COMPLETED) {
            checkLayout(transformJobInfo, str);
        }
    }

    private void checkLayout(TransformJobInfo transformJobInfo, String str) throws IOException {
        for (Map.Entry entry : transformJobInfo.getTransformedLayouts().entrySet()) {
            String str2 = (String) entry.getKey();
            Layout layout = (Layout) entry.getValue();
            Partition partition = this.mTableMaster.getTable("test_udb_name", str).getPartition(str2);
            Assert.assertTrue(partition.isTransformed(transformJobInfo.getDefinition()));
            Assert.assertEquals(layout, partition.getLayout());
        }
    }

    private void mockJobStatus(long j, Status status, @Nullable String str) throws Exception {
        Mockito.when(this.mMockJobMasterClient.getJobStatus(j)).thenReturn(new PlanInfo(j, "test", status, 0L, str));
    }

    private void heartbeat() throws Exception {
        HeartbeatScheduler.schedule("Master Table Transformation Monitor");
        HeartbeatScheduler.await("Master Table Transformation Monitor");
    }
}
