package io.datarouter.storage.config.schema;

import io.datarouter.instrumentation.changelog.ChangelogRecorder;
import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.config.executor.DatarouterStorageExecutors;
import io.datarouter.storage.config.properties.AdminEmail;
import io.datarouter.storage.config.properties.EnvironmentName;
import io.datarouter.storage.config.properties.ServerName;
import io.datarouter.storage.config.storage.clusterschemaupdatelock.ClusterSchemaUpdateLock;
import io.datarouter.storage.config.storage.clusterschemaupdatelock.DatarouterClusterSchemaUpdateLockDao;
import io.datarouter.storage.node.type.physical.PhysicalNode;
import io.datarouter.util.mutable.MutableString;
import io.datarouter.util.singletonsupplier.SingletonSupplier;
import jakarta.inject.Provider;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/storage/config/schema/BaseSchemaUpdateService.class */
public abstract class BaseSchemaUpdateService {
    private static final long THROTTLING_DELAY_SECONDS = 10;
    private final ServerName serverName;
    private final EnvironmentName environmentName;
    protected final AdminEmail adminEmail;
    private final DatarouterStorageExecutors.DatarouterSchemaUpdateScheduler executor;
    private final Provider<DatarouterClusterSchemaUpdateLockDao> schemaUpdateLockDao;
    private final Provider<ChangelogRecorder> changelogRecorder;
    private final String buildId;
    private static final Logger logger = LoggerFactory.getLogger(BaseSchemaUpdateService.class);
    private static final String BUILD_NUMBER = System.getenv("BUILD_NUMBER");
    private final List<Future<Optional<SchemaUpdateResult>>> futures = Collections.synchronizedList(new ArrayList());
    private final Map<ClientId, Supplier<List<String>>> existingTableNamesByClient = new ConcurrentHashMap();

    public BaseSchemaUpdateService(ServerName serverName, EnvironmentName environmentName, AdminEmail adminEmail, DatarouterStorageExecutors.DatarouterSchemaUpdateScheduler datarouterSchemaUpdateScheduler, Provider<DatarouterClusterSchemaUpdateLockDao> provider, Provider<ChangelogRecorder> provider2, String str) {
        this.serverName = serverName;
        this.environmentName = environmentName;
        this.adminEmail = adminEmail;
        this.executor = datarouterSchemaUpdateScheduler;
        this.schemaUpdateLockDao = provider;
        this.changelogRecorder = provider2;
        this.buildId = str;
        datarouterSchemaUpdateScheduler.scheduleWithFixedDelay(this::gatherSchemaUpdates, 0L, THROTTLING_DELAY_SECONDS, TimeUnit.SECONDS);
    }

    public Future<Optional<SchemaUpdateResult>> queueNodeForSchemaUpdate(ClientId clientId, PhysicalNode<?, ?, ?> physicalNode) {
        Future<Optional<SchemaUpdateResult>> submit = this.executor.submit(makeSchemaUpdateCallable(clientId, this.existingTableNamesByClient.computeIfAbsent(clientId, this::lazyFetchExistingTables), physicalNode));
        this.futures.add(submit);
        return submit;
    }

    protected abstract Callable<Optional<SchemaUpdateResult>> makeSchemaUpdateCallable(ClientId clientId, Supplier<List<String>> supplier, PhysicalNode<?, ?, ?> physicalNode);

    private void gatherSchemaUpdates() {
        gatherSchemaUpdates(false);
    }

    public synchronized void gatherSchemaUpdates(boolean z) {
        boolean z2 = true;
        HashMap hashMap = new HashMap();
        Iterator<Future<Optional<SchemaUpdateResult>>> it = this.futures.iterator();
        MutableString mutableString = new MutableString("");
        while (it.hasNext()) {
            Future<Optional<SchemaUpdateResult>> next = it.next();
            if (z || next.isDone()) {
                try {
                    Optional<SchemaUpdateResult> optional = next.get();
                    if (!optional.isEmpty()) {
                        hashMap.computeIfAbsent(optional.get().clientId, clientId -> {
                            return new ArrayList();
                        }).add(optional.get().ddl);
                        Optional<String> optional2 = optional.get().startupBlockReason;
                        mutableString.getClass();
                        optional2.ifPresent(mutableString::set);
                        it.remove();
                    }
                } catch (InterruptedException | ExecutionException e) {
                    logger.error("", e);
                    throw new RuntimeException(e);
                }
            } else {
                z2 = false;
            }
        }
        if (z2 && acquireSchemaUpdateLock(hashMap)) {
            sendEmail(hashMap, !mutableString.getString().isEmpty());
            recordChangelog(hashMap);
        }
        if (mutableString.getString().isEmpty()) {
            return;
        }
        logger.error(mutableString.getString());
        throw new SchemaUpdateBlockException(mutableString.getString());
    }

    private void sendEmail(Map<ClientId, List<String>> map, boolean z) {
        if (map.isEmpty()) {
            return;
        }
        map.forEach((clientId, list) -> {
            String str = clientId.getName() + " - " + this.environmentName.get() + (z ? " - blocking" : "") + " - schema update";
            StringBuilder sb = new StringBuilder();
            list.forEach(str2 -> {
                sb.append(str2).append("\n\n");
            });
            logger.warn("Sending schema update email for client={}, schemaupdate={}", clientId.getName(), sb);
            sendEmail(str, sb.toString());
        });
    }

    protected abstract void sendEmail(String str, String str2);

    private Supplier<List<String>> lazyFetchExistingTables(ClientId clientId) {
        return SingletonSupplier.of(() -> {
            return fetchExistingTables(clientId);
        });
    }

    protected abstract List<String> fetchExistingTables(ClientId clientId);

    private boolean acquireSchemaUpdateLock(Map<ClientId, List<String>> map) {
        if (map.isEmpty()) {
            return false;
        }
        String str = (String) map.entrySet().stream().findFirst().map(entry -> {
            return String.join("\n\n", (Iterable<? extends CharSequence>) entry.getValue());
        }).get();
        Instant now = Instant.now();
        ClusterSchemaUpdateLock clusterSchemaUpdateLock = new ClusterSchemaUpdateLock((Integer) Optional.ofNullable(this.buildId).filter(str2 -> {
            return !"${env.BUILD_NUMBER}".equals(str2);
        }).or(() -> {
            return Optional.ofNullable(BUILD_NUMBER);
        }).map(Integer::valueOf).orElseGet(() -> {
            return Integer.valueOf((int) now.getEpochSecond());
        }), str, this.serverName.get(), now);
        try {
            ((DatarouterClusterSchemaUpdateLockDao) this.schemaUpdateLockDao.get()).putAndAcquire(clusterSchemaUpdateLock);
            logger.warn("Acquired schema update lock for hash={}, build={}, statement={}", new Object[]{clusterSchemaUpdateLock.getKey().getStatementHash(), clusterSchemaUpdateLock.getKey().getBuildId(), clusterSchemaUpdateLock.getStatement()});
            return true;
        } catch (Exception e) {
            logger.warn("Didn't acquire schema update lock for hash={}", clusterSchemaUpdateLock.getKey().getStatementHash());
            return false;
        }
    }

    private void recordChangelog(Map<ClientId, List<String>> map) {
        map.forEach((clientId, list) -> {
            StringBuilder sb = new StringBuilder();
            list.forEach(str -> {
                sb.append(str).append("\n\n");
            });
            ((ChangelogRecorder) this.changelogRecorder.get()).record(new ChangelogRecorder.DatarouterChangelogDtoBuilder("SchemaUpdate", "clientId: " + clientId.getName(), "SchemaUpdate request", this.adminEmail.get()).withComment(sb.toString()).build());
        });
    }
}
