package io.delta.dynamodbcommitcoordinator;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import io.delta.storage.CloseableIterator;
import io.delta.storage.LogStore;
import io.delta.storage.commit.Commit;
import io.delta.storage.commit.CommitCoordinatorClient;
import io.delta.storage.commit.CommitFailedException;
import io.delta.storage.commit.CommitResponse;
import io.delta.storage.commit.CoordinatedCommitsUtils;
import io.delta.storage.commit.GetCommitsResponse;
import io.delta.storage.commit.TableDescriptor;
import io.delta.storage.commit.TableIdentifier;
import io.delta.storage.commit.UpdatedActions;
import io.delta.storage.commit.actions.AbstractMetadata;
import io.delta.storage.commit.actions.AbstractProtocol;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/dynamodbcommitcoordinator/DynamoDBCommitCoordinatorClient.class */
public class DynamoDBCommitCoordinatorClient implements CommitCoordinatorClient {
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDBCommitCoordinatorClient.class);
    final String coordinatedCommitsTableName;
    final AmazonDynamoDB client;
    final String endpoint;
    final long writeCapacityUnits;
    final long readCapacityUnits;
    public final long backfillBatchSize;
    final boolean skipPathCheck;
    static final String TABLE_CONF_TABLE_ID_KEY = "tableId";
    final int CLIENT_VERSION = 1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/delta/dynamodbcommitcoordinator/DynamoDBCommitCoordinatorClient$GetCommitsResultInternal.class */
    public static class GetCommitsResultInternal {
        final GetCommitsResponse response;
        final boolean hasAcceptedCommits;

        GetCommitsResultInternal(GetCommitsResponse getCommitsResponse, boolean z) {
            this.response = getCommitsResponse;
            this.hasAcceptedCommits = z;
        }
    }

    public DynamoDBCommitCoordinatorClient(String str, String str2, AmazonDynamoDB amazonDynamoDB, long j) throws IOException {
        this(str, str2, amazonDynamoDB, j, 5L, 5L, false);
    }

    public DynamoDBCommitCoordinatorClient(String str, String str2, AmazonDynamoDB amazonDynamoDB, long j, long j2, long j3, boolean z) throws IOException {
        this.CLIENT_VERSION = 1;
        this.coordinatedCommitsTableName = str;
        this.endpoint = str2;
        this.client = amazonDynamoDB;
        this.backfillBatchSize = j;
        this.readCapacityUnits = j2;
        this.writeCapacityUnits = j3;
        this.skipPathCheck = z;
        tryEnsureTableExists();
    }

    private String getTableId(Map<String, String> map) {
        if (map.containsKey("tableId")) {
            return map.get("tableId");
        }
        throw new RuntimeException("tableId not found");
    }

    private GetItemResult getEntryFromCommitCoordinator(Map<String, String> map, String... strArr) {
        return this.client.getItem(new GetItemRequest().withTableName(this.coordinatedCommitsTableName).addKeyEntry("tableId", new AttributeValue().withS(getTableId(map))).withAttributesToGet(strArr));
    }

    protected CommitResponse commitToCoordinator(Path path, Map<String, String> map, long j, FileStatus fileStatus, long j2, boolean z) throws CommitFailedException {
        HashMap hashMap = new HashMap();
        hashMap.put(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, new ExpectedAttributeValue().withValue(new AttributeValue().withN(Long.toString(j - 1))));
        hashMap.put(DynamoDBTableEntryConstants.ACCEPTING_COMMITS, new ExpectedAttributeValue().withValue(new AttributeValue().withBOOL(true)));
        if (!this.skipPathCheck) {
            hashMap.put(DynamoDBTableEntryConstants.TABLE_PATH, new ExpectedAttributeValue().withValue(new AttributeValue().withS(path.getParent().toString())));
        }
        hashMap.put(DynamoDBTableEntryConstants.SCHEMA_VERSION, new ExpectedAttributeValue().withValue(new AttributeValue().withN(Integer.toString(1))));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(DynamoDBTableEntryConstants.COMMIT_VERSION, new AttributeValue().withN(Long.toString(j)));
        hashMap2.put(DynamoDBTableEntryConstants.COMMIT_TIMESTAMP, new AttributeValue().withN(Long.toString(j2)));
        hashMap2.put(DynamoDBTableEntryConstants.COMMIT_FILE_NAME, new AttributeValue().withS(fileStatus.getPath().getName()));
        hashMap2.put(DynamoDBTableEntryConstants.COMMIT_FILE_LENGTH, new AttributeValue().withN(Long.toString(fileStatus.getLen())));
        hashMap2.put(DynamoDBTableEntryConstants.COMMIT_FILE_MODIFICATION_TIMESTAMP, new AttributeValue().withN(Long.toString(fileStatus.getModificationTime())));
        UpdateItemRequest withExpected = new UpdateItemRequest().withTableName(this.coordinatedCommitsTableName).addKeyEntry("tableId", new AttributeValue().withS(getTableId(map))).addAttributeUpdatesEntry(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, new AttributeValueUpdate().withValue(new AttributeValue().withN(Long.toString(j))).withAction(AttributeAction.PUT)).addAttributeUpdatesEntry(DynamoDBTableEntryConstants.HAS_ACCEPTED_COMMITS, new AttributeValueUpdate().withValue(new AttributeValue().withBOOL(true)).withAction(AttributeAction.PUT)).addAttributeUpdatesEntry(DynamoDBTableEntryConstants.TABLE_LATEST_TIMESTAMP, new AttributeValueUpdate().withValue(new AttributeValue().withN(Long.toString(j2))).withAction(AttributeAction.PUT)).addAttributeUpdatesEntry(DynamoDBTableEntryConstants.COMMITS, new AttributeValueUpdate().withAction(AttributeAction.ADD).withValue(new AttributeValue().withL(new AttributeValue[]{new AttributeValue().withM(hashMap2)}))).withExpected(hashMap);
        if (z) {
            withExpected = withExpected.addAttributeUpdatesEntry(DynamoDBTableEntryConstants.ACCEPTING_COMMITS, new AttributeValueUpdate().withValue(new AttributeValue().withBOOL(false)).withAction(AttributeAction.PUT));
        }
        try {
            this.client.updateItem(withExpected);
        } catch (ConditionalCheckFailedException e) {
            GetItemResult entryFromCommitCoordinator = getEntryFromCommitCoordinator(map, DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, DynamoDBTableEntryConstants.ACCEPTING_COMMITS, DynamoDBTableEntryConstants.TABLE_PATH, DynamoDBTableEntryConstants.SCHEMA_VERSION);
            int parseInt = Integer.parseInt(((AttributeValue) entryFromCommitCoordinator.getItem().get(DynamoDBTableEntryConstants.SCHEMA_VERSION)).getN());
            if (parseInt != 1) {
                throw new CommitFailedException(false, false, "The schema version of the commit coordinator does not match the currentDynamoDBCommitCoordinatorClient version. The data schema version is  " + parseInt + " while the client version is 1. Make sure that the correct client is being used to access this table.");
            }
            long parseLong = Long.parseLong(((AttributeValue) entryFromCommitCoordinator.getItem().get(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION)).getN());
            if (!this.skipPathCheck && !((AttributeValue) entryFromCommitCoordinator.getItem().get(DynamoDBTableEntryConstants.TABLE_PATH)).getS().equals(path.getParent().toString())) {
                throw new CommitFailedException(false, false, "This commit was attempted from path " + path.getParent() + " while the table is registered at " + ((AttributeValue) entryFromCommitCoordinator.getItem().get(DynamoDBTableEntryConstants.TABLE_PATH)).getS() + ".");
            }
            if (!((AttributeValue) entryFromCommitCoordinator.getItem().get(DynamoDBTableEntryConstants.ACCEPTING_COMMITS)).getBOOL().booleanValue()) {
                throw new CommitFailedException(false, false, "The commit coordinator is not accepting any new commits for this table.");
            }
            if (parseLong != j - 1) {
                boolean z2 = parseLong > j - 1;
                throw new CommitFailedException(z2, z2, "Commit version " + j + " is not valid. Expected version: " + (parseLong + 1) + ".");
            }
        }
        return new CommitResponse(new Commit(j, fileStatus, j2));
    }

    public CommitResponse commit(LogStore logStore, Configuration configuration, TableDescriptor tableDescriptor, long j, Iterator<String> it, UpdatedActions updatedActions) throws CommitFailedException {
        Path logPath = tableDescriptor.getLogPath();
        if (j == 0) {
            throw new CommitFailedException(false, false, "Commit version 0 must go via filesystem.");
        }
        try {
            FileStatus writeUnbackfilledCommitFile = CoordinatedCommitsUtils.writeUnbackfilledCommitFile(logStore, configuration, logPath.toString(), j, it, UUID.randomUUID().toString());
            long commitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp();
            boolean isCoordinatedCommitsToFSConversion = CoordinatedCommitsUtils.isCoordinatedCommitsToFSConversion(Long.valueOf(j), updatedActions);
            LOG.info("Committing version {} with UUID delta file {} to DynamoDB.", Long.valueOf(j), writeUnbackfilledCommitFile.getPath());
            CommitResponse commitToCoordinator = commitToCoordinator(logPath, tableDescriptor.getTableConf(), j, writeUnbackfilledCommitFile, commitTimestamp, isCoordinatedCommitsToFSConversion);
            LOG.info("Commit {} was successful.", Long.valueOf(j));
            if (((this.backfillBatchSize > 1L ? 1 : (this.backfillBatchSize == 1L ? 0 : -1)) <= 0) || (((j % this.backfillBatchSize) > 0L ? 1 : ((j % this.backfillBatchSize) == 0L ? 0 : -1)) == 0) || isCoordinatedCommitsToFSConversion) {
                backfillToVersion(logStore, configuration, tableDescriptor, j, null);
            }
            return commitToCoordinator;
        } catch (IOException e) {
            throw new CommitFailedException(false, false, e.getMessage(), e);
        }
    }

    private GetCommitsResultInternal getCommitsImpl(Path path, Map<String, String> map, Long l, Long l2) throws IOException {
        Map item = getEntryFromCommitCoordinator(map, DynamoDBTableEntryConstants.COMMITS, DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, DynamoDBTableEntryConstants.HAS_ACCEPTED_COMMITS).getItem();
        long parseLong = Long.parseLong(((AttributeValue) item.get(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION)).getN());
        AttributeValue attributeValue = (AttributeValue) item.get(DynamoDBTableEntryConstants.COMMITS);
        ArrayList arrayList = new ArrayList();
        Path commitDirPath = CoordinatedCommitsUtils.commitDirPath(path);
        Iterator it = attributeValue.getL().iterator();
        while (it.hasNext()) {
            Map m = ((AttributeValue) it.next()).getM();
            long parseLong2 = Long.parseLong(((AttributeValue) m.get(DynamoDBTableEntryConstants.COMMIT_VERSION)).getN());
            if ((l == null || parseLong2 >= l.longValue()) && (l2 == null || l2.longValue() >= parseLong2)) {
                arrayList.add(new Commit(parseLong2, new FileStatus(Long.parseLong(((AttributeValue) m.get(DynamoDBTableEntryConstants.COMMIT_FILE_LENGTH)).getN()), false, 0, 0L, Long.parseLong(((AttributeValue) m.get(DynamoDBTableEntryConstants.COMMIT_FILE_MODIFICATION_TIMESTAMP)).getN()), new Path(commitDirPath, ((AttributeValue) m.get(DynamoDBTableEntryConstants.COMMIT_FILE_NAME)).getS())), Long.parseLong(((AttributeValue) m.get(DynamoDBTableEntryConstants.COMMIT_TIMESTAMP)).getN())));
            }
        }
        return new GetCommitsResultInternal(new GetCommitsResponse(new ArrayList(arrayList), parseLong), ((AttributeValue) item.get(DynamoDBTableEntryConstants.HAS_ACCEPTED_COMMITS)).getBOOL().booleanValue());
    }

    public GetCommitsResponse getCommits(TableDescriptor tableDescriptor, Long l, Long l2) {
        try {
            GetCommitsResultInternal commitsImpl = getCommitsImpl(tableDescriptor.getLogPath(), tableDescriptor.getTableConf(), l, l2);
            long latestTableVersion = commitsImpl.response.getLatestTableVersion();
            if (!commitsImpl.hasAcceptedCommits) {
                latestTableVersion = -1;
            }
            return new GetCommitsResponse(commitsImpl.response.getCommits(), latestTableVersion);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void writeActionsToBackfilledFile(LogStore logStore, Path path, long j, Iterator<String> it, Configuration configuration, boolean z) throws IOException {
        logStore.write(CoordinatedCommitsUtils.getBackfilledDeltaFilePath(path, Long.valueOf(j)), it, Boolean.valueOf(z), configuration);
    }

    private void validateBackfilledFileExists(Path path, Configuration configuration, Long l) {
        if (l == null) {
            return;
        }
        try {
            Path backfilledDeltaFilePath = CoordinatedCommitsUtils.getBackfilledDeltaFilePath(path, l);
            if (path.getFileSystem(configuration).exists(backfilledDeltaFilePath)) {
            } else {
                throw new IllegalArgumentException("Expected backfilled file at " + backfilledDeltaFilePath + " does not exist.");
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void backfillToVersion(LogStore logStore, Configuration configuration, TableDescriptor tableDescriptor, long j, Long l) throws IOException {
        LOG.info("Backfilling all unbackfilled commits.");
        final Path logPath = tableDescriptor.getLogPath();
        try {
            final GetCommitsResponse getCommitsResponse = getCommitsImpl(logPath, tableDescriptor.getTableConf(), l, null).response;
            validateBackfilledFileExists(logPath, configuration, l);
            if (j > getCommitsResponse.getLatestTableVersion()) {
                throw new IllegalArgumentException("The requested backfill version " + j + " is greater than the latest version " + getCommitsResponse.getLatestTableVersion() + " for the table.");
            }
            boolean z = !logStore.isPartialWriteVisible(logPath, configuration).booleanValue();
            for (Commit commit : getCommitsResponse.getCommits()) {
                CloseableIterator read = logStore.read(commit.getFileStatus().getPath(), configuration);
                try {
                    try {
                        writeActionsToBackfilledFile(logStore, logPath, commit.getVersion(), read, configuration, z);
                        read.close();
                    } catch (Throwable th) {
                        read.close();
                        throw th;
                    }
                } catch (FileAlreadyExistsException e) {
                    LOG.info("File {} already exists. Skipping backfill for this file.", commit.getFileStatus().getPath());
                    read.close();
                }
            }
            try {
                this.client.updateItem(new UpdateItemRequest().withTableName(this.coordinatedCommitsTableName).addKeyEntry("tableId", new AttributeValue().withS(getTableId(tableDescriptor.getTableConf()))).addAttributeUpdatesEntry(DynamoDBTableEntryConstants.COMMITS, new AttributeValueUpdate().withAction(AttributeAction.PUT).withValue(new AttributeValue().withL(new AttributeValue[0]))).withExpected(new HashMap<String, ExpectedAttributeValue>() { // from class: io.delta.dynamodbcommitcoordinator.DynamoDBCommitCoordinatorClient.1
                    {
                        put(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, new ExpectedAttributeValue().withValue(new AttributeValue().withN(Long.toString(getCommitsResponse.getLatestTableVersion()))));
                        put(DynamoDBTableEntryConstants.TABLE_PATH, new ExpectedAttributeValue().withValue(new AttributeValue().withS(logPath.getParent().toString())));
                        put(DynamoDBTableEntryConstants.SCHEMA_VERSION, new ExpectedAttributeValue().withValue(new AttributeValue().withN(Integer.toString(1))));
                    }
                }));
            } catch (ConditionalCheckFailedException e2) {
                LOG.warn("Backfill succeeded but the update to the commit coordinator failed. This is probably due to a concurrent update to the commit coordinator. This is not a critical error and  should rectify itself.");
            }
        } catch (IOException e3) {
            throw new UncheckedIOException(e3);
        }
    }

    public Map<String, String> registerTable(Path path, Optional<TableIdentifier> optional, long j, AbstractMetadata abstractMetadata, AbstractProtocol abstractProtocol) {
        HashMap hashMap = new HashMap();
        String uuid = UUID.randomUUID().toString();
        hashMap.put("tableId", new AttributeValue().withS(uuid));
        hashMap.put(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION, new AttributeValue().withN(Long.toString(j + 1)));
        hashMap.put(DynamoDBTableEntryConstants.HAS_ACCEPTED_COMMITS, new AttributeValue().withBOOL(false));
        hashMap.put(DynamoDBTableEntryConstants.TABLE_PATH, new AttributeValue().withS(path.getParent().toString()));
        hashMap.put(DynamoDBTableEntryConstants.COMMITS, new AttributeValue().withL(new AttributeValue[0]));
        hashMap.put(DynamoDBTableEntryConstants.ACCEPTING_COMMITS, new AttributeValue().withBOOL(true));
        hashMap.put(DynamoDBTableEntryConstants.SCHEMA_VERSION, new AttributeValue().withN(Integer.toString(1)));
        this.client.putItem(new PutItemRequest().withTableName(this.coordinatedCommitsTableName).withItem(hashMap).withConditionExpression(String.format("attribute_not_exists(%s)", "tableId")));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("tableId", uuid);
        return hashMap2;
    }

    private void tryEnsureTableExists() throws IOException {
        int i = 0;
        boolean z = false;
        while (i < 20) {
            String str = "CREATING";
            try {
                str = this.client.describeTable(this.coordinatedCommitsTableName).getTable().getTableStatus();
            } catch (ResourceNotFoundException e) {
                LOG.info("DynamoDB table `{}` for endpoint `{}` does not exist. Creating it now with provisioned throughput of {} RCUs and {} WCUs.", new Object[]{this.coordinatedCommitsTableName, this.endpoint, Long.valueOf(this.readCapacityUnits), Long.valueOf(this.writeCapacityUnits)});
                try {
                    this.client.createTable(Collections.singletonList(new AttributeDefinition("tableId", ScalarAttributeType.S)), this.coordinatedCommitsTableName, Collections.singletonList(new KeySchemaElement("tableId", KeyType.HASH)), new ProvisionedThroughput(Long.valueOf(this.readCapacityUnits), Long.valueOf(this.writeCapacityUnits)));
                    z = true;
                } catch (ResourceInUseException e2) {
                }
            }
            if (str.equals("ACTIVE")) {
                if (z) {
                    LOG.info("Successfully created DynamoDB table `{}`", this.coordinatedCommitsTableName);
                    return;
                } else {
                    LOG.info("Table `{}` already exists", this.coordinatedCommitsTableName);
                    return;
                }
            }
            if (!str.equals("CREATING")) {
                LOG.error("table `{}` status: {}", this.coordinatedCommitsTableName, str);
                throw new RuntimeException("DynamoDBCommitCoordinatorCliet: Unable to create table with name " + this.coordinatedCommitsTableName + " for endpoint " + this.endpoint + ". Ensure that the credentials provided have the necessary permissions to create tables in DynamoDB. If the table already exists, ensure that the table is in the ACTIVE state.");
            }
            i++;
            LOG.info("Waiting for `{}` table creation", this.coordinatedCommitsTableName);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e3) {
                throw new InterruptedIOException(e3.getMessage());
            }
        }
    }

    public boolean semanticEquals(CommitCoordinatorClient commitCoordinatorClient) {
        if (!(commitCoordinatorClient instanceof DynamoDBCommitCoordinatorClient)) {
            return false;
        }
        DynamoDBCommitCoordinatorClient dynamoDBCommitCoordinatorClient = (DynamoDBCommitCoordinatorClient) commitCoordinatorClient;
        return this.coordinatedCommitsTableName.equals(dynamoDBCommitCoordinatorClient.coordinatedCommitsTableName) && this.endpoint.equals(dynamoDBCommitCoordinatorClient.endpoint);
    }
}
