package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.network.WritableEndpoint;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/SafeWriter.class */
public class SafeWriter {
    static final Logger logger = LoggerFactory.getLogger(SafeWriter.class);
    private static final AtomicLong checkIsOpenCounter = new AtomicLong();
    private static final int CHECK_IS_OPEN_INTERVAL = 1000;

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> boolean safeWrite(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection, List<RemoteRxEvent> list, MutableReference<Subscription> mutableReference, Action0 action0, Action1<Throwable> action1, SlottingStrategy<T> slottingStrategy, WritableEndpoint<T> writableEndpoint) {
        boolean checkWriteableAndWrite;
        if (checkIsOpenCounter.getAndIncrement() % 1000 != 0) {
            checkWriteableAndWrite = checkWriteableAndWrite(observableConnection, list, action0, action1);
        } else if (observableConnection.isCloseIssued() || !observableConnection.getChannel().isActive()) {
            checkWriteableAndWrite = false;
            logger.warn("Detected closed or inactive client connection, force unsubscribe.");
            mutableReference.getValue().unsubscribe();
            if (slottingStrategy != null) {
                logger.info("Removing slot for endpoint: " + writableEndpoint);
                if (!slottingStrategy.removeConnection(writableEndpoint)) {
                    logger.error("Failed to remove endpoint from slot,  endpoint: " + writableEndpoint);
                }
            }
        } else {
            checkWriteableAndWrite = checkWriteableAndWrite(observableConnection, list, action0, action1);
        }
        return checkWriteableAndWrite;
    }

    private boolean checkWriteableAndWrite(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection, List<RemoteRxEvent> list, Action0 action0, Action1<Throwable> action1) {
        boolean z = true;
        if (observableConnection.getChannel().isWritable()) {
            observableConnection.writeAndFlush(list).doOnError(action1).doOnCompleted(action0).subscribe();
        } else {
            z = false;
        }
        return z;
    }
}
