package io.leoplatform.sdk.oracle;

import io.leoplatform.schema.ChangeEvent;
import io.leoplatform.schema.Field;
import io.leoplatform.schema.FieldType;
import io.leoplatform.schema.Op;
import io.leoplatform.schema.Source;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.changes.SchemaChangeQueue;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.TableChangeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/leoplatform/sdk/oracle/OracleChangeWriter.class */
public final class OracleChangeWriter implements DatabaseChangeListener {
    private static final Logger log = LoggerFactory.getLogger(OracleChangeWriter.class);
    private final SchemaChangeQueue changeQueue;
    private final BlockingQueue<DatabaseChangeEvent> payloads = new LinkedBlockingQueue();
    private final Lock lock = new ReentrantLock();
    private final Condition changedRows = this.lock.newCondition();
    private final AtomicBoolean running = new AtomicBoolean(true);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.leoplatform.sdk.oracle.OracleChangeWriter$1, reason: invalid class name */
    /* loaded from: input_file:io/leoplatform/sdk/oracle/OracleChangeWriter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$oracle$jdbc$dcn$RowChangeDescription$RowOperation = new int[RowChangeDescription.RowOperation.values().length];

        static {
            try {
                $SwitchMap$oracle$jdbc$dcn$RowChangeDescription$RowOperation[RowChangeDescription.RowOperation.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$oracle$jdbc$dcn$RowChangeDescription$RowOperation[RowChangeDescription.RowOperation.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$oracle$jdbc$dcn$RowChangeDescription$RowOperation[RowChangeDescription.RowOperation.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Inject
    public OracleChangeWriter(SchemaChangeQueue schemaChangeQueue, ExecutorManager executorManager) {
        this.changeQueue = schemaChangeQueue;
        CompletableFuture.runAsync(this::asyncWriter, executorManager.get());
    }

    public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {
        log.info("Received database notification {}", databaseChangeEvent);
        if (this.running.get()) {
            this.lock.lock();
            try {
                this.payloads.put(databaseChangeEvent);
            } catch (InterruptedException e) {
                log.warn("Batch writer stopped unexpectedly");
                this.running.set(false);
            } finally {
                this.lock.unlock();
            }
            signalAll();
        }
    }

    private void asyncWriter() {
        while (this.running.get()) {
            LinkedList linkedList = new LinkedList();
            this.lock.lock();
            try {
                this.changedRows.await();
                this.payloads.drainTo(linkedList);
            } catch (InterruptedException e) {
                log.warn("Oracle batch change writer stopped unexpectedly");
                this.running.set(false);
            } finally {
                this.lock.unlock();
            }
            if (!linkedList.isEmpty()) {
                sendToChangeQueue(linkedList);
            }
        }
    }

    private void sendToChangeQueue(Queue<DatabaseChangeEvent> queue) {
        toChangeEvents(queue).forEach(changeEvent -> {
            List list = (List) changeEvent.getFields().stream().map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toList());
            log.info("Sending notification for {} {} changes", Integer.valueOf(list.size()), changeEvent.getName());
            log.debug("ROWIDs changed: {}", String.join(",", list));
            this.changeQueue.add(changeEvent);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void end() {
        this.running.set(false);
        signalAll();
        drainBuffer();
        this.changeQueue.end();
    }

    private void signalAll() {
        this.lock.lock();
        try {
            this.changedRows.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private Collection<ChangeEvent> toChangeEvents(Queue<DatabaseChangeEvent> queue) {
        return ((ConcurrentMap) queue.parallelStream().flatMap(this::tableChanges).map(this::rowChanges).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toConcurrentMap((v0) -> {
            return v0.getName();
        }, Function.identity(), this::combineEvents))).values();
    }

    private ChangeEvent combineEvents(ChangeEvent changeEvent, ChangeEvent changeEvent2) {
        return new ChangeEvent(changeEvent.getSource(), changeEvent.getOp(), changeEvent.getName(), (List) Stream.of((Object[]) new List[]{changeEvent.getFields(), changeEvent2.getFields()}).flatMap((v0) -> {
            return v0.stream();
        }).distinct().collect(Collectors.toList()));
    }

    private Stream<TableChangeDescription> tableChanges(DatabaseChangeEvent databaseChangeEvent) {
        return (Stream) Optional.of(databaseChangeEvent).map((v0) -> {
            return v0.getTableChangeDescription();
        }).map((v0) -> {
            return Stream.of(v0);
        }).orElse(Stream.empty());
    }

    private Set<ChangeEvent> rowChanges(TableChangeDescription tableChangeDescription) {
        String tableName = tableName(tableChangeDescription);
        return (Set) rowChangeDescription(tableChangeDescription).stream().map(rowChangeDescription -> {
            return new AbstractMap.SimpleImmutableEntry(getOp(rowChangeDescription), new Field("ROWID", FieldType.STRING, rowChangeDescription.getRowid().stringValue()));
        }).map(simpleImmutableEntry -> {
            return new ChangeEvent(Source.ORACLE, (Op) simpleImmutableEntry.getKey(), tableName, Collections.singletonList(simpleImmutableEntry.getValue()));
        }).collect(Collectors.toSet());
    }

    private Op getOp(RowChangeDescription rowChangeDescription) {
        switch (AnonymousClass1.$SwitchMap$oracle$jdbc$dcn$RowChangeDescription$RowOperation[((RowChangeDescription.RowOperation) Optional.of(rowChangeDescription).map((v0) -> {
            return v0.getRowOperation();
        }).orElse(RowChangeDescription.RowOperation.UPDATE)).ordinal()]) {
            case 1:
                return Op.INSERT;
            case 2:
                return Op.UPDATE;
            case 3:
                return Op.DELETE;
            default:
                return Op.UPDATE;
        }
    }

    private String tableName(TableChangeDescription tableChangeDescription) {
        return (String) Optional.ofNullable(tableChangeDescription).map((v0) -> {
            return v0.getTableName();
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Missing table name in description " + tableChangeDescription);
        });
    }

    private void drainBuffer() {
        LinkedList linkedList = new LinkedList();
        this.lock.lock();
        try {
            this.payloads.drainTo(linkedList);
            if (linkedList.isEmpty()) {
                return;
            }
            sendToChangeQueue(linkedList);
        } finally {
            this.lock.unlock();
        }
    }

    private List<RowChangeDescription> rowChangeDescription(TableChangeDescription tableChangeDescription) {
        return (List) Optional.ofNullable(tableChangeDescription).map((v0) -> {
            return v0.getRowChangeDescription();
        }).map((v0) -> {
            return Arrays.asList(v0);
        }).orElse(Collections.emptyList());
    }
}
