package io.delta.storage;

import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
import com.amazonaws.services.dynamodbv2.model.Condition;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.QueryRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import io.delta.storage.utils.ReflectionUtils;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/storage/S3DynamoDBLogStore.class */
public class S3DynamoDBLogStore extends BaseExternalLogStore {
    private static final Logger LOG = LoggerFactory.getLogger(S3DynamoDBLogStore.class);
    public static final String SPARK_CONF_PREFIX = "spark.io.delta.storage.S3DynamoDBLogStore";
    public static final String BASE_CONF_PREFIX = "io.delta.storage.S3DynamoDBLogStore";
    public static final String READ_RETRIES = "read.retries";
    public static final String DDB_CLIENT_TABLE = "ddb.tableName";
    public static final String DDB_CLIENT_REGION = "ddb.region";
    public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
    public static final String DDB_CREATE_TABLE_RCU = "provisionedThroughput.rcu";
    public static final String DDB_CREATE_TABLE_WCU = "provisionedThroughput.wcu";
    public static final String TTL_SECONDS = "ddb.ttl";
    private static final String ATTR_TABLE_PATH = "tablePath";
    private static final String ATTR_FILE_NAME = "fileName";
    private static final String ATTR_TEMP_PATH = "tempPath";
    private static final String ATTR_COMPLETE = "complete";
    private static final String ATTR_EXPIRE_TIME = "expireTime";
    private final AmazonDynamoDBClient client;
    private final String tableName;
    private final String credentialsProviderName;
    private final String regionName;
    private final long expirationDelaySeconds;

    public S3DynamoDBLogStore(Configuration configuration) throws IOException {
        super(configuration);
        this.tableName = getParam(configuration, DDB_CLIENT_TABLE, "delta_log");
        this.credentialsProviderName = getParam(configuration, DDB_CLIENT_CREDENTIALS_PROVIDER, "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
        this.regionName = getParam(configuration, DDB_CLIENT_REGION, "us-east-1");
        String param = getParam(configuration, TTL_SECONDS, null);
        this.expirationDelaySeconds = param == null ? BaseExternalLogStore.DEFAULT_EXTERNAL_ENTRY_EXPIRATION_DELAY_SECONDS : Long.parseLong(param);
        if (this.expirationDelaySeconds < 0) {
            throw new IllegalArgumentException(String.format("Can't use negative `%s` value of %s", TTL_SECONDS, Long.valueOf(this.expirationDelaySeconds)));
        }
        LOG.info("using tableName {}", this.tableName);
        LOG.info("using credentialsProviderName {}", this.credentialsProviderName);
        LOG.info("using regionName {}", this.regionName);
        LOG.info("using ttl (seconds) {}", Long.valueOf(this.expirationDelaySeconds));
        this.client = getClient();
        tryEnsureTableExists(configuration);
    }

    public CloseableIterator<String> read(Path path, Configuration configuration) throws IOException {
        return new RetryableCloseableIterator(() -> {
            return super.read(path, configuration);
        }, Integer.parseInt(getParam(configuration, READ_RETRIES, Integer.toString(3))));
    }

    @Override // io.delta.storage.BaseExternalLogStore
    protected long getExpirationDelaySeconds() {
        return this.expirationDelaySeconds;
    }

    @Override // io.delta.storage.BaseExternalLogStore
    protected void putExternalEntry(ExternalCommitEntry externalCommitEntry, boolean z) throws IOException {
        try {
            LOG.debug(String.format("putItem %s, overwrite: %s", externalCommitEntry, Boolean.valueOf(z)));
            this.client.putItem(createPutItemRequest(externalCommitEntry, z));
        } catch (ConditionalCheckFailedException e) {
            LOG.debug(e.toString());
            throw new FileAlreadyExistsException(externalCommitEntry.absoluteFilePath().toString());
        }
    }

    @Override // io.delta.storage.BaseExternalLogStore
    protected Optional<ExternalCommitEntry> getExternalEntry(String str, String str2) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(ATTR_TABLE_PATH, new AttributeValue(str));
        concurrentHashMap.put(ATTR_FILE_NAME, new AttributeValue(str2));
        Map<String, AttributeValue> item = this.client.getItem(new GetItemRequest(this.tableName, concurrentHashMap).withConsistentRead(true)).getItem();
        return item != null ? Optional.of(dbResultToCommitEntry(item)) : Optional.empty();
    }

    @Override // io.delta.storage.BaseExternalLogStore
    protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path path) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(ATTR_TABLE_PATH, new Condition().withComparisonOperator(ComparisonOperator.EQ).withAttributeValueList(new AttributeValue[]{new AttributeValue(path.toString())}));
        List items = this.client.query(new QueryRequest(this.tableName).withConsistentRead(true).withScanIndexForward(false).withLimit(1).withKeyConditions(concurrentHashMap)).getItems();
        return items.isEmpty() ? Optional.empty() : Optional.of(dbResultToCommitEntry((Map) items.get(0)));
    }

    private ExternalCommitEntry dbResultToCommitEntry(Map<String, AttributeValue> map) {
        AttributeValue attributeValue = map.get(ATTR_EXPIRE_TIME);
        return new ExternalCommitEntry(new Path(map.get(ATTR_TABLE_PATH).getS()), map.get(ATTR_FILE_NAME).getS(), map.get(ATTR_TEMP_PATH).getS(), map.get(ATTR_COMPLETE).getS().equals("true"), attributeValue != null ? Long.valueOf(Long.parseLong(attributeValue.getN())) : null);
    }

    private PutItemRequest createPutItemRequest(ExternalCommitEntry externalCommitEntry, boolean z) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(ATTR_TABLE_PATH, new AttributeValue(externalCommitEntry.tablePath.toString()));
        concurrentHashMap.put(ATTR_FILE_NAME, new AttributeValue(externalCommitEntry.fileName));
        concurrentHashMap.put(ATTR_TEMP_PATH, new AttributeValue(externalCommitEntry.tempPath));
        concurrentHashMap.put(ATTR_COMPLETE, new AttributeValue().withS(Boolean.toString(externalCommitEntry.complete)));
        if (externalCommitEntry.expireTime != null) {
            concurrentHashMap.put(ATTR_EXPIRE_TIME, new AttributeValue().withN(externalCommitEntry.expireTime.toString()));
        }
        PutItemRequest putItemRequest = new PutItemRequest(this.tableName, concurrentHashMap);
        if (!z) {
            ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
            concurrentHashMap2.put(ATTR_FILE_NAME, new ExpectedAttributeValue(false));
            putItemRequest.withExpected(concurrentHashMap2);
        }
        return putItemRequest;
    }

    private void tryEnsureTableExists(Configuration configuration) throws IOException {
        int i = 0;
        boolean z = false;
        while (i < 20) {
            String str = "CREATING";
            try {
                str = this.client.describeTable(this.tableName).getTable().getTableStatus();
            } catch (ResourceNotFoundException e) {
                long parseLong = Long.parseLong(getParam(configuration, DDB_CREATE_TABLE_RCU, "5"));
                long parseLong2 = Long.parseLong(getParam(configuration, DDB_CREATE_TABLE_WCU, "5"));
                LOG.info("DynamoDB table `{}` in region `{}` does not exist. Creating it now with provisioned throughput of {} RCUs and {} WCUs.", new Object[]{this.tableName, this.regionName, Long.valueOf(parseLong), Long.valueOf(parseLong2)});
                try {
                    this.client.createTable(Arrays.asList(new AttributeDefinition(ATTR_TABLE_PATH, ScalarAttributeType.S), new AttributeDefinition(ATTR_FILE_NAME, ScalarAttributeType.S)), this.tableName, Arrays.asList(new KeySchemaElement(ATTR_TABLE_PATH, KeyType.HASH), new KeySchemaElement(ATTR_FILE_NAME, KeyType.RANGE)), new ProvisionedThroughput(Long.valueOf(parseLong), Long.valueOf(parseLong2)));
                    z = true;
                } catch (ResourceInUseException e2) {
                }
            }
            if (str.equals("ACTIVE")) {
                if (z) {
                    LOG.info("Successfully created DynamoDB table `{}`", this.tableName);
                    return;
                } else {
                    LOG.info("Table `{}` already exists", this.tableName);
                    return;
                }
            }
            if (!str.equals("CREATING")) {
                LOG.error("table `{}` status: {}", this.tableName, str);
                return;
            }
            i++;
            LOG.info("Waiting for `{}` table creation", this.tableName);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e3) {
                throw new InterruptedIOException(e3.getMessage());
            }
        }
    }

    private AmazonDynamoDBClient getClient() throws IOException {
        try {
            AmazonDynamoDBClient amazonDynamoDBClient = new AmazonDynamoDBClient(ReflectionUtils.createAwsCredentialsProvider(this.credentialsProviderName, initHadoopConf()));
            amazonDynamoDBClient.setRegion(Region.getRegion(Regions.fromName(this.regionName)));
            return amazonDynamoDBClient;
        } catch (ReflectiveOperationException e) {
            throw new IOException(e);
        }
    }

    protected static String getParam(Configuration configuration, String str, String str2) {
        String format = String.format("%s.%s", SPARK_CONF_PREFIX, str);
        String format2 = String.format("%s.%s", BASE_CONF_PREFIX, str);
        String str3 = configuration.get(format);
        String str4 = configuration.get(format2);
        if (str3 == null || str4 == null || str3.equals(str4)) {
            return str3 != null ? str3 : str4 != null ? str4 : str2;
        }
        throw new IllegalArgumentException(String.format("Configuration properties `%s=%s` and `%s=%s` have different values. Please set only one.", format, str3, format2, str4));
    }
}
