package org.schemarepo.zookeeper;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;
import org.eclipse.jetty.util.URIUtil;
import org.schemarepo.InMemorySubjectCache;
import org.schemarepo.Repository;
import org.schemarepo.RepositoryUtil;
import org.schemarepo.SchemaEntry;
import org.schemarepo.SchemaValidationException;
import org.schemarepo.Subject;
import org.schemarepo.SubjectConfig;
import org.schemarepo.ValidatorFactory;
import org.schemarepo.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/schemarepo/zookeeper/ZooKeeperRepository.class */
public class ZooKeeperRepository implements Repository, Closeable {
    private final ValidatorFactory validators;
    private static final String LOCKFILE = ".repo.lock";
    private static final String SUBJECT_PROPERTIES = "subject.properties";
    private static final String SCHEMA_IDS = "schema_ids";
    private static final String SCHEMA_POSTFIX = ".schema";
    CuratorFramework zkClient;
    InterProcessSemaphoreMutex zkLock;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final InMemorySubjectCache localSubjectsCache = new InMemorySubjectCache();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/schemarepo/zookeeper/ZooKeeperRepository$ZooKeeperSubject.class */
    public class ZooKeeperSubject extends Subject {
        private final String endOfLine;

        protected ZooKeeperSubject(String str) {
            super(str);
            this.endOfLine = System.getProperty("line.separator");
            try {
                if (ZooKeeperRepository.this.zkClient.checkExists().forPath(str) == null) {
                    throw new RuntimeException("The Subject does not exist in ZK!");
                }
                Set<String> schemaFiles = getSchemaFiles();
                HashSet hashSet = new HashSet();
                for (Integer num : getSchemaIds()) {
                    if (!hashSet.add(num)) {
                        throw new RuntimeException("Corrupt id file, id '" + num + "' duplicated in " + getSchemaIdsFilePath());
                    }
                    schemaFiles.remove(getSchemaFileName(num.intValue()));
                }
                if (schemaFiles.size() > 0) {
                    throw new RuntimeException("Schema files found in subject directory " + getSubjectPath() + " that are not referenced in the " + ZooKeeperRepository.SCHEMA_IDS + " file: " + schemaFiles.toString());
                }
            } catch (IOException e) {
                throw new RuntimeException("An IOException occurred while reading the properties at: " + getConfigFilePath(), e);
            } catch (Exception e2) {
                throw new RuntimeException("An exception occurred while accessing ZK!", e2);
            }
        }

        private String getSchemaFileName(String str) {
            return str + ZooKeeperRepository.SCHEMA_POSTFIX;
        }

        private String getSchemaFileName(int i) {
            return getSchemaFileName(String.valueOf(i));
        }

        private String getSubjectPath() {
            return URIUtil.SLASH + getName();
        }

        private String getConfigFilePath() {
            return getSubjectPath() + URIUtil.SLASH + ZooKeeperRepository.SUBJECT_PROPERTIES;
        }

        private String getSchemaIdsFilePath() {
            return getSubjectPath() + URIUtil.SLASH + ZooKeeperRepository.SCHEMA_IDS;
        }

        private String getSchemaFilePath(String str) {
            return getSubjectPath() + URIUtil.SLASH + getSchemaFileName(str);
        }

        private Set<String> getSchemaFiles() {
            try {
                List<String> forPath = ZooKeeperRepository.this.zkClient.getChildren().forPath(getSubjectPath());
                HashSet hashSet = new HashSet();
                for (String str : forPath) {
                    if (str.endsWith(ZooKeeperRepository.SCHEMA_POSTFIX)) {
                        hashSet.add(str);
                    }
                }
                return hashSet;
            } catch (Exception e) {
                throw new RuntimeException("An exception occurred while accessing ZK!", e);
            }
        }

        private List<Integer> getSchemaIds() {
            try {
                byte[] forPath = ZooKeeperRepository.this.zkClient.getData().forPath(getSchemaIdsFilePath());
                ArrayList arrayList = new ArrayList();
                Scanner scanner = new Scanner(new ByteArrayInputStream(forPath));
                while (scanner.hasNext()) {
                    String nextLine = scanner.nextLine();
                    try {
                        arrayList.add(Integer.valueOf(Integer.parseInt(nextLine)));
                    } catch (NumberFormatException e) {
                        ZooKeeperRepository.this.logger.error("Got an invalid ID ({}) in {} !", nextLine, getSchemaIdsFilePath(), e);
                    }
                }
                return arrayList;
            } catch (Exception e2) {
                throw new RuntimeException("An exception occurred while accessing ZK!", e2);
            }
        }

        private Integer getLatestSchemaId(List<Integer> list) {
            Integer num = -1;
            for (Integer num2 : list) {
                if (num2.intValue() > num.intValue()) {
                    num = num2;
                }
            }
            return num;
        }

        private Integer getLatestSchemaId() {
            return getLatestSchemaId(getSchemaIds());
        }

        private String readSchemaForId(String str) {
            try {
                byte[] forPath = ZooKeeperRepository.this.zkClient.getData().forPath(getSchemaFilePath(str));
                if (forPath == null || forPath.length == 0) {
                    return null;
                }
                return new String(forPath);
            } catch (KeeperException.NoNodeException e) {
                return null;
            } catch (Exception e2) {
                throw new RuntimeException("An exception occurred while accessing ZK!", e2);
            }
        }

        private String serializeSchemaIds(List<Integer> list) {
            StringBuilder sb = new StringBuilder();
            boolean z = true;
            for (Integer num : list) {
                if (z) {
                    z = false;
                } else {
                    sb.append(this.endOfLine);
                }
                sb.append(num.toString());
            }
            return sb.toString();
        }

        private synchronized SchemaEntry createNewSchema(String str) {
            try {
                List<Integer> schemaIds = getSchemaIds();
                Integer valueOf = Integer.valueOf(getLatestSchemaId(schemaIds).intValue() + 1);
                schemaIds.add(valueOf);
                byte[] bytes = str.getBytes();
                ZooKeeperRepository.this.zkClient.inTransaction().create().forPath(getSchemaFilePath(valueOf.toString()), bytes).and().setData().forPath(getSchemaIdsFilePath(), serializeSchemaIds(schemaIds).getBytes()).and().commit();
                return new SchemaEntry(String.valueOf(valueOf), str);
            } catch (Exception e) {
                throw new RuntimeException("An exception occurred while accessing ZK!", e);
            }
        }

        @Override // org.schemarepo.Subject
        public SubjectConfig getConfig() {
            try {
                Properties properties = new Properties();
                properties.load(new ByteArrayInputStream(ZooKeeperRepository.this.zkClient.getData().forPath(getConfigFilePath())));
                return RepositoryUtil.configFromProperties(properties);
            } catch (Exception e) {
                throw new RuntimeException("An exception occurred while accessing ZK!", e);
            }
        }

        @Override // org.schemarepo.Subject
        public boolean integralKeys() {
            return true;
        }

        @Override // org.schemarepo.Subject
        public SchemaEntry register(String str) throws SchemaValidationException {
            RepositoryUtil.validateSchemaOrSubject(str);
            if (0 != 0) {
                return null;
            }
            ZooKeeperRepository.this.acquireLock();
            SchemaEntry lookupBySchema = lookupBySchema(str);
            if (lookupBySchema == null) {
                lookupBySchema = createNewSchema(str);
            }
            ZooKeeperRepository.this.releaseLock();
            return lookupBySchema;
        }

        @Override // org.schemarepo.Subject
        public SchemaEntry registerIfLatest(String str, SchemaEntry schemaEntry) throws SchemaValidationException {
            SchemaEntry latest = latest();
            if (schemaEntry == latest || (schemaEntry != null && schemaEntry.equals(latest))) {
                return register(str);
            }
            return null;
        }

        @Override // org.schemarepo.Subject
        public SchemaEntry lookupBySchema(String str) {
            RepositoryUtil.validateSchemaOrSubject(str);
            Iterator<Integer> it = getSchemaIds().iterator();
            while (it.hasNext()) {
                String num = it.next().toString();
                if (str.equals(readSchemaForId(num))) {
                    return new SchemaEntry(num, str);
                }
            }
            return null;
        }

        @Override // org.schemarepo.Subject
        public SchemaEntry lookupById(String str) {
            String readSchemaForId;
            if (0 == 0 && (readSchemaForId = readSchemaForId(str)) != null) {
                return new SchemaEntry(str, readSchemaForId);
            }
            return null;
        }

        @Override // org.schemarepo.Subject
        public SchemaEntry latest() {
            String readSchemaForId;
            Integer latestSchemaId = getLatestSchemaId();
            if (0 == 0 && (readSchemaForId = readSchemaForId(latestSchemaId.toString())) != null) {
                return new SchemaEntry(latestSchemaId.toString(), readSchemaForId);
            }
            return null;
        }

        @Override // org.schemarepo.Subject
        public Iterable<SchemaEntry> allEntries() {
            ArrayList arrayList = new ArrayList();
            Iterator<Integer> it = getSchemaIds().iterator();
            while (it.hasNext()) {
                String num = it.next().toString();
                arrayList.add(new SchemaEntry(num, readSchemaForId(num)));
            }
            Collections.reverse(arrayList);
            return arrayList;
        }
    }

    @Inject
    public ZooKeeperRepository(@Named("schema-repo.zookeeper.ensemble") String str, @Named("schema-repo.zookeeper.path-prefix") String str2, @Named("schema-repo.zookeeper.session-timeout") Integer num, @Named("schema-repo.zookeeper.connection-timeout") Integer num2, @Named("schema-repo.zookeeper.curator.sleep-time-between-retries") Integer num3, @Named("schema-repo.zookeeper.curator.number-of-retries") Integer num4, ValidatorFactory validatorFactory) {
        this.validators = validatorFactory;
        if (str == null || str.isEmpty()) {
            this.logger.error("The '{}' config is missing. Exiting.", Config.ZK_ENSEMBLE);
            System.exit(1);
        }
        this.logger.info("Starting ZookeeperRepository with the following parameters:\nschema-repo.zookeeper.ensemble: " + str + "\n" + Config.ZK_PATH_PREFIX + ": " + str2 + "\n" + Config.ZK_SESSION_TIMEOUT + ": " + num + "\n" + Config.ZK_CONNECTION_TIMEOUT + ": " + num2 + "\n" + Config.ZK_CURATOR_SLEEP_TIME_BETWEEN_RETRIES + ": " + num3 + "\n" + Config.ZK_CURATOR_NUMBER_OF_RETRIES + ": " + num4);
        CuratorFrameworkFactory.Builder defaultData = CuratorFrameworkFactory.builder().connectString(str).sessionTimeoutMs(num.intValue()).connectionTimeoutMs(num2.intValue()).retryPolicy(new RetryNTimes(num3.intValue(), num4.intValue())).defaultData(new byte[0]);
        CuratorFramework build = defaultData.build();
        build.start();
        try {
            build.blockUntilConnected();
            build.create().creatingParentsIfNeeded().forPath(str2);
            this.logger.info("The ZK Path Prefix ({}) was created in ZK.", str2);
        } catch (IllegalArgumentException e) {
            this.logger.error("Got an IllegalArgumentException while attempting to create the ZK Path Prefix (" + str2 + "). Exiting.", (Throwable) e);
            System.exit(1);
        } catch (KeeperException.NodeExistsException e2) {
            this.logger.info("The ZK Path Prefix ({}) was found in ZK.", str2);
        } catch (Exception e3) {
            this.logger.error("There was an unrecoverable exception during the ZooKeeperRepository startup. Exiting.", (Throwable) e3);
            System.exit(1);
        }
        this.zkClient = defaultData.namespace(str2.substring(1)).build();
        this.zkClient.start();
        try {
            this.zkClient.blockUntilConnected();
            this.zkLock = new InterProcessSemaphoreMutex(this.zkClient, LOCKFILE);
            this.logger.info("ZooKeeperRepository startup finished!");
        } catch (Exception e4) {
            this.logger.error("There was an unrecoverable exception during the ZooKeeperRepository startup. Exiting.", (Throwable) e4);
            System.exit(1);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void acquireLock() {
        try {
            this.zkLock.acquire();
        } catch (Exception e) {
            this.logger.error("An exception occurred while trying to get the ZK lock!", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releaseLock() {
        try {
            this.zkLock.release();
        } catch (Exception e) {
            this.logger.error("An exception occurred while trying to release the ZK lock!", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.schemarepo.Repository
    public Subject register(String str, SubjectConfig subjectConfig) {
        Subject lookup = this.localSubjectsCache.lookup(str);
        if (null == lookup) {
            acquireLock();
            try {
                try {
                    this.zkClient.create().forPath(str);
                    createNewSubject(str, subjectConfig);
                    releaseLock();
                } catch (KeeperException.NodeExistsException e) {
                    releaseLock();
                } catch (Exception e2) {
                    this.logger.error("An exception occurred while accessing ZK!", (Throwable) e2);
                    throw new RuntimeException(e2);
                }
                lookup = fetchAndCache(str);
            } catch (Throwable th) {
                releaseLock();
                throw th;
            }
        }
        return lookup;
    }

    private void createNewSubject(String str, SubjectConfig subjectConfig) throws Exception {
        this.zkClient.create().forPath(str + URIUtil.SLASH + SCHEMA_IDS);
        Properties properties = new Properties();
        properties.putAll(RepositoryUtil.safeConfig(subjectConfig).asMap());
        StringWriter stringWriter = new StringWriter();
        properties.store(stringWriter, "Schema Repository Subject Properties");
        this.zkClient.create().forPath(str + URIUtil.SLASH + SUBJECT_PROPERTIES, stringWriter.toString().getBytes());
    }

    private Subject fetchAndCache(String str) {
        return this.localSubjectsCache.add(Subject.validatingSubject(new ZooKeeperSubject(str), this.validators));
    }

    @Override // org.schemarepo.Repository
    public Subject lookup(String str) {
        Subject lookup = this.localSubjectsCache.lookup(str);
        if (lookup == null) {
            try {
                if (this.zkClient.checkExists().forPath(str) != null) {
                    lookup = fetchAndCache(str);
                }
            } catch (Exception e) {
                this.logger.error("An exception occurred while accessing ZK!", (Throwable) e);
                throw new RuntimeException(e);
            }
        }
        return lookup;
    }

    @Override // org.schemarepo.Repository
    public Iterable<Subject> subjects() {
        try {
            for (String str : this.zkClient.getChildren().forPath("")) {
                if (!str.equals(LOCKFILE) && this.localSubjectsCache.lookup(str) == null) {
                    fetchAndCache(str);
                }
            }
            return this.localSubjectsCache.values();
        } catch (Exception e) {
            this.logger.error("An exception occurred while accessing ZK!", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Integer num = 100;
        while (this.zkLock.isAcquiredInThisProcess()) {
            try {
                this.logger.info("ZooKeeperRepository's close() called while lock is acquired. Waiting " + num + " ms before trying again.");
                wait(num.intValue());
            } catch (InterruptedException e) {
                this.logger.warn("Interrupted while waiting", (Throwable) e);
            }
        }
        this.zkClient.close();
    }
}
