package com.pushtechnology.diffusion.client.internal.services;

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.IncompatibleDatatypeException;
import com.pushtechnology.diffusion.client.features.Messaging;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.command.ErrorReasonException;
import com.pushtechnology.diffusion.command.commands.control.client.CountOrParserErrors;
import com.pushtechnology.diffusion.command.commands.control.client.FilterResponse;
import com.pushtechnology.diffusion.command.commands.control.client.MessagingClientFilterSendRequest;
import com.pushtechnology.diffusion.command.commands.control.client.MessagingResponse;
import com.pushtechnology.diffusion.command.receiver.AbstractCommandService;
import com.pushtechnology.diffusion.command.receiver.CommandService;
import com.pushtechnology.diffusion.command.sender.ServiceReference;
import com.pushtechnology.diffusion.command.services.definition.CommonServices;
import com.pushtechnology.diffusion.conversation.ConversationId;
import com.pushtechnology.diffusion.conversation.ConversationSet;
import com.pushtechnology.diffusion.conversation.NoSuchConversationException;
import com.pushtechnology.diffusion.conversation.ResponseHandler;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.datatype.InvalidDataException;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.session.impl.InternalSessionId;
import java8.util.concurrent.CompletableFuture;
import net.jcip.annotations.Immutable;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;

@Immutable
/* loaded from: input_file:com/pushtechnology/diffusion/client/internal/services/FilterResponseListenerImpl.class */
public final class FilterResponseListenerImpl extends AbstractRegistration implements FilterResponseListener {
    private final InternalSession session;
    private final DataTypes dataTypes;
    private static final Logger LOG = I18nLogger.getLogger((Class<?>) FilterResponseListenerImpl.class);

    @NotThreadSafe
    /* loaded from: input_file:com/pushtechnology/diffusion/client/internal/services/FilterResponseListenerImpl$FilterResponseHandler.class */
    private static final class FilterResponseHandler<R> implements ResponseHandler {
        private final Messaging.FilteredRequestCallback<? super R> callback;
        private final Class<R> responseType;
        private int expected;
        private int counter;
        static final /* synthetic */ boolean $assertionsDisabled;

        private FilterResponseHandler(Messaging.FilteredRequestCallback<? super R> filteredRequestCallback, Class<R> cls) {
            this.expected = -1;
            this.callback = filteredRequestCallback;
            this.responseType = cls;
        }

        @Override // com.pushtechnology.diffusion.conversation.ResponseHandler
        public boolean onResponse(ConversationId conversationId, Object obj) {
            if (obj == null) {
                return true;
            }
            if (obj instanceof FilterResponse) {
                this.counter++;
                return onFilterResponse((FilterResponse) obj);
            }
            if ($assertionsDisabled || (obj instanceof CountOrParserErrors)) {
                return onCountOrParserErrorsResponse((CountOrParserErrors) obj);
            }
            throw new AssertionError();
        }

        @Override // com.pushtechnology.diffusion.conversation.ResponseHandler
        public void onDiscard(ConversationId conversationId, Throwable th) {
        }

        private boolean onCountOrParserErrorsResponse(CountOrParserErrors countOrParserErrors) {
            if (!countOrParserErrors.getErrors().isEmpty()) {
                return true;
            }
            this.expected = countOrParserErrors.getCount();
            return isDone();
        }

        private boolean onFilterResponse(FilterResponse filterResponse) {
            InternalSessionId sessionId = filterResponse.getSessionId();
            MessagingResponse response = filterResponse.getResponse();
            ErrorReason errorReason = filterResponse.getErrorReason();
            if (errorReason != null) {
                this.callback.onResponseError(sessionId, ErrorReasonException.toApiException(errorReason, errorReason.getDescription()));
            } else {
                DataType<?> dataType = response.getDataType();
                if (dataType.canReadAs(this.responseType)) {
                    try {
                        try {
                            this.callback.onResponse(sessionId, dataType.readAs(this.responseType, response.getResponse()));
                        } catch (Exception e) {
                            FilterResponseListenerImpl.LOG.error("MESSAGING_FILTER_EXCEPTION", this.callback, e);
                        }
                    } catch (InvalidDataException e2) {
                        this.callback.onResponseError(sessionId, e2);
                        return isDone();
                    }
                } else {
                    this.callback.onResponseError(sessionId, new IncompatibleDatatypeException("The server sent a " + dataType + " which cannot be read as a " + this.responseType));
                }
            }
            return isDone();
        }

        private boolean isDone() {
            return this.counter >= this.expected && this.expected != -1;
        }

        static {
            $assertionsDisabled = !FilterResponseListenerImpl.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/pushtechnology/diffusion/client/internal/services/FilterResponseListenerImpl$FilterResponseResultReceiver.class */
    private static final class FilterResponseResultReceiver extends AbstractCommandService<FilterResponse, Void, InternalSession> {
        private FilterResponseResultReceiver() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.pushtechnology.diffusion.command.receiver.AbstractCommandService
        public void safeOnRequest(InternalSession internalSession, FilterResponse filterResponse, CommandService.ServiceCallback<Void> serviceCallback) throws NoSuchConversationException {
            internalSession.getConversations().respond(filterResponse.getContext(), filterResponse);
        }
    }

    public FilterResponseListenerImpl(InternalSession internalSession, DataTypes dataTypes, MutableServiceRegistry mutableServiceRegistry) {
        super(internalSession);
        mutableServiceRegistry.add(CommonServices.FILTER_RESPONSE, new FilterResponseResultReceiver());
        this.session = internalSession;
        this.dataTypes = dataTypes;
    }

    @Override // com.pushtechnology.diffusion.client.internal.services.FilterResponseListener
    public <T, R> CompletableFuture<CountOrParserErrors> sendFilterRequest(String str, String str2, T t, Class<T> cls, Class<R> cls2, Messaging.FilteredRequestCallback<? super R> filteredRequestCallback) {
        DataType byClass = this.dataTypes.getByClass(cls);
        ConversationSet conversations = this.session.getConversations();
        ServiceReference obtainService = this.session.getServiceLocator().obtainService(CommonServices.MESSAGING_FILTER_SENDER);
        ConversationId newConversation = conversations.newConversation(new FilterResponseHandler(filteredRequestCallback, cls2));
        return obtainService.sendCommand(new MessagingClientFilterSendRequest(newConversation, str, str2, byClass, IBytes.toIBytes(byClass.toBytes(t)))).whenComplete((countOrParserErrors, th) -> {
            conversations.respondIfPresent(newConversation, th == null ? countOrParserErrors : null);
        });
    }
}
