package org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl;

import org.apache.pulsar.jetcd.shaded.io.vertx.core.AsyncResult;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Closeable;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Future;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Handler;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Promise;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.Message;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.ReplyException;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.ReplyFailure;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.ContextInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.future.PromiseInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.tracing.SpanKind;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.tracing.TagExtractor;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.spi.tracing.VertxTracer;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.tracing.TracingPolicy;

/* loaded from: input_file:META-INF/bundled-dependencies/jetcd-core-shaded-3.0.10.6.jar:org/apache/pulsar/jetcd/shaded/io/vertx/core/eventbus/impl/HandlerRegistration.class */
public abstract class HandlerRegistration<T> implements Closeable {
    public final ContextInternal context;
    public final EventBusImpl bus;
    public final String address;
    public final boolean src;
    private HandlerHolder<T> registered;
    private Object metric;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/jetcd-core-shaded-3.0.10.6.jar:org/apache/pulsar/jetcd/shaded/io/vertx/core/eventbus/impl/HandlerRegistration$InboundDeliveryContext.class */
    public class InboundDeliveryContext extends DeliveryContextBase<T> {
        private final Handler<Message<T>> handler;

        private InboundDeliveryContext(MessageImpl<?, T> messageImpl, Handler<Message<T>> handler, ContextInternal contextInternal) {
            super(messageImpl, messageImpl.bus.inboundInterceptors(), contextInternal);
            this.handler = handler;
        }

        @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.DeliveryContextBase
        protected void execute() {
            ContextInternal contextInternal = this.context;
            Object obj = HandlerRegistration.this.metric;
            VertxTracer tracer = contextInternal.tracer();
            if (HandlerRegistration.this.bus.metrics != null) {
                HandlerRegistration.this.bus.metrics.messageDelivered(obj, this.message.isLocal());
            }
            if (tracer == null || HandlerRegistration.this.src) {
                HandlerRegistration.this.dispatch(this.message, contextInternal, this.handler);
                return;
            }
            this.message.trace = tracer.receiveRequest(contextInternal, SpanKind.RPC, TracingPolicy.PROPAGATE, this.message, this.message.isSend() ? "send" : "publish", this.message.headers(), MessageTagExtractor.INSTANCE);
            HandlerRegistration.this.dispatch(this.message, contextInternal, this.handler);
            Object obj2 = this.message.trace;
            if (this.message.replyAddress != null || obj2 == null) {
                return;
            }
            tracer.sendResponse(this.context, null, obj2, null, TagExtractor.empty());
        }

        @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.DeliveryContext
        public boolean send() {
            return this.message.isSend();
        }

        @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.DeliveryContext
        public Object body() {
            return this.message.receivedBody;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HandlerRegistration(ContextInternal contextInternal, EventBusImpl eventBusImpl, String str, boolean z) {
        this.context = contextInternal;
        this.bus = eventBusImpl;
        this.src = z;
        this.address = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receive(MessageImpl messageImpl) {
        if (this.bus.metrics != null) {
            this.bus.metrics.scheduleMessage(this.metric, messageImpl.isLocal());
        }
        this.context.executor().execute(() -> {
            if (doReceive(messageImpl)) {
                return;
            }
            discard(messageImpl);
        });
    }

    protected abstract boolean doReceive(Message<T> message);

    protected abstract void dispatch(Message<T> message, ContextInternal contextInternal, Handler<Message<T>> handler);

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void register(String str, boolean z, Promise<Void> promise) {
        if (this.registered != null) {
            throw new IllegalStateException();
        }
        this.registered = this.bus.addRegistration(this.address, this, str != null, z, promise);
        if (this.bus.metrics != null) {
            this.metric = this.bus.metrics.handlerRegistered(this.address, str);
        }
    }

    public synchronized boolean isRegistered() {
        return this.registered != null;
    }

    public Future<Void> unregister() {
        PromiseInternal<T> promise = this.context.promise();
        synchronized (this) {
            if (this.registered != null) {
                this.bus.removeRegistration(this.registered, promise);
                this.registered = null;
                if (this.bus.metrics != null) {
                    this.bus.metrics.handlerUnregistered(this.metric);
                }
            } else {
                promise.complete();
            }
        }
        return promise.future();
    }

    public void unregister(Handler<AsyncResult<Void>> handler) {
        Future<Void> unregister = unregister();
        if (handler != null) {
            unregister.onComplete2(handler);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatch(Handler<Message<T>> handler, Message<T> message, ContextInternal contextInternal) {
        new InboundDeliveryContext((MessageImpl) message, handler, contextInternal).dispatch();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void discard(Message<T> message) {
        if (this.bus.metrics != null) {
            this.bus.metrics.discardMessage(this.metric, ((MessageImpl) message).isLocal(), message);
        }
        String replyAddress = message.replyAddress();
        if (replyAddress != null) {
            message.reply(new ReplyException(ReplyFailure.TIMEOUT, "Discarded the request. address: " + replyAddress + ", repliedAddress: " + message.address()));
        }
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.Closeable
    public void close(Promise<Void> promise) {
        unregister(promise);
    }
}
