package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CopyData;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.CopyDone;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.replication.LogSequenceNumber;
import io.r2dbc.postgresql.replication.ReplicationRequest;
import io.r2dbc.postgresql.replication.ReplicationStream;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-0.8.8.RELEASE.jar:io/r2dbc/postgresql/PostgresReplicationStream.class */
public final class PostgresReplicationStream implements ReplicationStream {
    public static final long POSTGRES_EPOCH_2000_01_01 = 946684800000L;
    private static final char KEEP_ALIVE = 'k';
    private static final char X_LOG_DATA = 'w';
    private final EmitterProcessor<FrontendMessage> requestProcessor;
    private final ByteBufAllocator allocator;
    private final ReplicationRequest replicationRequest;
    private final EmitterProcessor<CopyData> responseProcessor = EmitterProcessor.create(false);
    private final AtomicReference<Disposable> subscription = new AtomicReference<>();
    private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
    private volatile LogSequenceNumber lastServerLSN = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber lastReceiveLSN = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber lastAppliedLSN = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber lastFlushedLSN = LogSequenceNumber.INVALID_LSN;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostgresReplicationStream(ByteBufAllocator byteBufAllocator, ReplicationRequest replicationRequest, EmitterProcessor<FrontendMessage> emitterProcessor, Flux<BackendMessage> flux) {
        this.allocator = byteBufAllocator;
        this.replicationRequest = replicationRequest;
        this.requestProcessor = emitterProcessor;
        Class<ReadyForQuery> cls = ReadyForQuery.class;
        ReadyForQuery.class.getClass();
        flux.takeUntil((v1) -> {
            return r1.isInstance(v1);
        }).doOnError(th -> {
            close().subscribe();
            this.closeFuture.complete(null);
        }).doOnComplete(() -> {
            this.closeFuture.complete(null);
        }).ofType(CopyData.class).handle((copyData, synchronousSink) -> {
            try {
                byte readByte = copyData.getData().readByte();
                switch (readByte) {
                    case 107:
                        if (processKeepAliveMessage(copyData.getData()) || this.replicationRequest.getStatusInterval().isZero()) {
                            sendStatusUpdate();
                        }
                        return;
                    case 119:
                        processXLogData(copyData.getData());
                        copyData.retain();
                        synchronousSink.next(copyData);
                        ReferenceCountUtil.release(copyData);
                        return;
                    default:
                        synchronousSink.error(new R2dbcNonTransientResourceException(String.format("Unexpected packet type during replication: %s", Integer.toString(readByte))));
                        ReferenceCountUtil.release(copyData);
                        return;
                }
            } finally {
                ReferenceCountUtil.release(copyData);
            }
        }).subscribeWith(this.responseProcessor);
        Disposable disposable = () -> {
        };
        Duration statusInterval = replicationRequest.getStatusInterval();
        if (!statusInterval.isZero()) {
            Scheduler.Worker createWorker = Schedulers.parallel().createWorker();
            createWorker.schedulePeriodically(this::sendStatusUpdate, statusInterval.toMillis(), statusInterval.toMillis(), TimeUnit.MILLISECONDS);
            disposable = createWorker;
        }
        this.subscription.set(disposable);
    }

    private boolean processKeepAliveMessage(ByteBuf byteBuf) {
        this.lastServerLSN = LogSequenceNumber.valueOf(byteBuf.readLong());
        if (this.lastServerLSN.asLong() > this.lastReceiveLSN.asLong()) {
            this.lastReceiveLSN = this.lastServerLSN;
        }
        byteBuf.readLong();
        return byteBuf.readByte() != 0;
    }

    private void processXLogData(ByteBuf byteBuf) {
        long readLong = byteBuf.readLong();
        this.lastServerLSN = LogSequenceNumber.valueOf(byteBuf.readLong());
        byteBuf.readLong();
        switch (this.replicationRequest.getReplicationType()) {
            case LOGICAL:
                this.lastReceiveLSN = LogSequenceNumber.valueOf(readLong);
                return;
            case PHYSICAL:
                this.lastReceiveLSN = LogSequenceNumber.valueOf(readLong + (byteBuf.readableBytes() - byteBuf.readerIndex()));
                return;
            default:
                return;
        }
    }

    private void sendStatusUpdate() {
        this.requestProcessor.onNext(new io.r2dbc.postgresql.message.frontend.CopyData(prepareUpdateStatus(this.lastReceiveLSN, this.lastFlushedLSN, this.lastAppliedLSN, false)));
    }

    private ByteBuf prepareUpdateStatus(LogSequenceNumber logSequenceNumber, LogSequenceNumber logSequenceNumber2, LogSequenceNumber logSequenceNumber3, boolean z) {
        return new KeepAliveMessage(logSequenceNumber, logSequenceNumber2, logSequenceNumber3, TimeUnit.MICROSECONDS.convert(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - 946684800000L, TimeUnit.MICROSECONDS), z).encode(this.allocator);
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream, io.r2dbc.spi.Closeable
    public Mono<Void> close() {
        Disposable disposable = this.subscription.get();
        if (disposable == null || !this.subscription.compareAndSet(disposable, null)) {
            return Mono.fromCompletionStage(this.closeFuture);
        }
        disposable.dispose();
        this.requestProcessor.onNext(CopyDone.INSTANCE);
        this.requestProcessor.onComplete();
        return this.responseProcessor.ignoreElements().then(Mono.fromCompletionStage(this.closeFuture));
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream
    public boolean isClosed() {
        return this.subscription.get() == null;
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream
    public <T> Flux<T> map(Function<ByteBuf, ? extends T> function) {
        Assert.requireNonNull(function, "mappingFunction must not be null");
        return (Flux<T>) this.responseProcessor.map(copyData -> {
            try {
                return function.apply(copyData.getData());
            } finally {
                ReferenceCountUtil.release(copyData);
            }
        });
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream
    public LogSequenceNumber getLastReceiveLSN() {
        return this.lastReceiveLSN;
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream
    public LogSequenceNumber getLastFlushedLSN() {
        return this.lastFlushedLSN;
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream
    public LogSequenceNumber getLastAppliedLSN() {
        return this.lastAppliedLSN;
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream
    public void setFlushedLSN(LogSequenceNumber logSequenceNumber) {
        this.lastFlushedLSN = logSequenceNumber;
    }

    @Override // io.r2dbc.postgresql.replication.ReplicationStream
    public void setAppliedLSN(LogSequenceNumber logSequenceNumber) {
        this.lastAppliedLSN = logSequenceNumber;
    }
}
