package reactor.netty.incubator.quic;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.incubator.codec.quic.DefaultQuicStreamFrame;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.incubator.codec.quic.QuicStreamFrame;
import io.netty.incubator.codec.quic.QuicStreamType;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:BOOT-INF/lib/reactor-netty-incubator-quic-0.1.20.jar:reactor/netty/incubator/quic/QuicStreamOperations.class */
class QuicStreamOperations extends ChannelOperations<QuicInbound, QuicOutbound> implements QuicInbound, QuicOutbound {
    volatile int finSent;
    static final String INBOUND_CANCEL_LOG = "Quic inbound stream cancelled, sending WRITE_FIN.";
    static final AtomicIntegerFieldUpdater<QuicStreamOperations> FIN_SENT = AtomicIntegerFieldUpdater.newUpdater(QuicStreamOperations.class, "finSent");
    static final Logger log = Loggers.getLogger((Class<?>) QuicStreamOperations.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public QuicStreamOperations(Connection connection, ConnectionObserver connectionObserver) {
        super(connection, connectionObserver);
        markPersistent(false);
    }

    @Override // reactor.netty.channel.ChannelOperations, reactor.netty.ChannelOperationsId
    public String asLongText() {
        return asShortText() + ", " + channel().localAddress();
    }

    @Override // reactor.netty.incubator.quic.QuicStreamInfo
    public boolean isLocalStream() {
        return ((QuicStreamChannel) connection().channel()).isLocalCreated();
    }

    @Override // reactor.netty.channel.ChannelOperations, reactor.netty.NettyOutbound
    public NettyOutbound send(Publisher<? extends ByteBuf> publisher, Predicate<ByteBuf> predicate) {
        Objects.requireNonNull(predicate, "predicate");
        return !channel().isActive() ? then(Mono.error(AbortedException.beforeSend())) : publisher instanceof Mono ? then(((Mono) publisher).flatMap(byteBuf -> {
            return markFinSent() ? FutureMono.from(channel().writeAndFlush(new DefaultQuicStreamFrame(byteBuf, true))) : FutureMono.from(channel().writeAndFlush(byteBuf));
        }).doOnDiscard(ByteBuf.class, (v0) -> {
            v0.release();
        })) : super.send(publisher, predicate);
    }

    @Override // reactor.netty.channel.ChannelOperations, reactor.netty.NettyOutbound
    public NettyOutbound sendObject(Object obj) {
        if (!channel().isActive()) {
            ReactorNetty.safeRelease(obj);
            return then(Mono.error(AbortedException.beforeSend()));
        }
        if (!(obj instanceof ByteBuf)) {
            return super.sendObject(obj);
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        return then(FutureMono.deferFuture(() -> {
            return markFinSent() ? connection().channel().writeAndFlush(new DefaultQuicStreamFrame(byteBuf, true)) : connection().channel().writeAndFlush(byteBuf);
        }), () -> {
            ReactorNetty.safeRelease(byteBuf);
        });
    }

    @Override // reactor.netty.incubator.quic.QuicStreamInfo
    public long streamId() {
        return ((QuicStreamChannel) connection().channel()).streamId();
    }

    @Override // reactor.netty.incubator.quic.QuicStreamInfo
    public QuicStreamType streamType() {
        return ((QuicStreamChannel) connection().channel()).type();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.netty.channel.ChannelOperations
    public void onInboundCancel() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(channel(), INBOUND_CANCEL_LOG));
        }
        sendFinNow(channelFuture -> {
            terminate();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.netty.channel.ChannelOperations
    public void onOutboundError(Throwable th) {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(channel(), "Outbound error happened. Sending WRITE_FIN."), th);
        }
        sendFinNow(channelFuture -> {
            terminate();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.netty.channel.ChannelOperations
    public final void onInboundComplete() {
        super.onInboundComplete();
    }

    final boolean markFinSent() {
        return FIN_SENT.compareAndSet(this, 0, 1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void sendFinNow() {
        sendFinNow(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void sendFinNow(@Nullable ChannelFutureListener channelFutureListener) {
        if (markFinSent()) {
            ChannelFuture writeAndFlush = channel().writeAndFlush(QuicStreamFrame.EMPTY_FIN);
            if (channelFutureListener != null) {
                writeAndFlush.addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFutureListener);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void callTerminate(Channel channel) {
        ChannelOperations<?, ?> channelOperations = get(channel);
        if (channelOperations == null) {
            return;
        }
        ((QuicStreamOperations) channelOperations).terminate();
    }
}
