package io.bigdime.core.sink;

import io.bigdime.alert.Logger;
import io.bigdime.alert.LoggerFactory;
import io.bigdime.core.ActionEvent;
import io.bigdime.core.AdaptorConfigurationException;
import io.bigdime.core.Handler;
import io.bigdime.core.HandlerException;
import io.bigdime.core.Sink;
import io.bigdime.core.commons.AdaptorLogger;
import io.bigdime.core.config.AdaptorConfig;
import io.bigdime.core.constants.AdaptorConstants;
import io.bigdime.core.handler.HandlerManager;
import io.bigdime.core.handler.IllegalHandlerStateException;
import java.util.LinkedHashSet;
import java.util.Observable;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.apache.flume.lifecycle.LifecycleState;

/* loaded from: input_file:io/bigdime/core/sink/DataSink.class */
public class DataSink extends Observable implements Sink {
    private static final AdaptorLogger logger = new AdaptorLogger(LoggerFactory.getLogger(DataSink.class));
    private final HandlerManager handlerManager;
    private LinkedHashSet<Handler> handlers;
    private String name;
    private String description;
    private boolean interrupted;
    private FutureTask<Object> futureTask;
    private ExecutorService executorService;
    private long sleepForMillis = AdaptorConstants.SLEEP_WHILE_WAITING_FOR_DATA;
    private LifecycleState lifecycleState = LifecycleState.IDLE;
    private final String id = UUID.randomUUID().toString();
    private int errorCount = 0;
    private int errorThreshold = 3;

    public DataSink(LinkedHashSet<Handler> linkedHashSet, String str) throws AdaptorConfigurationException {
        if (linkedHashSet == null || linkedHashSet.isEmpty()) {
            logger.alert(Logger.ALERT_TYPE.ADAPTOR_FAILED_TO_START, Logger.ALERT_CAUSE.INVALID_ADAPTOR_CONFIGURATION, Logger.ALERT_SEVERITY.BLOCKER, "handlers not configured");
            throw new AdaptorConfigurationException("null or empty collection not allowed for handlers");
        }
        this.handlers = linkedHashSet;
        this.handlerManager = new HandlerManager(linkedHashSet, str);
        this.name = str;
    }

    public String toString() {
        return "DataSink [handlers=" + this.handlers + ", name=" + this.name + ", description=" + this.description + "]";
    }

    @Override // io.bigdime.core.HasHandlers
    public LinkedHashSet<Handler> getHandlers() {
        return this.handlers;
    }

    @Override // io.bigdime.core.HasHandlers
    public void setHandlers(LinkedHashSet<Handler> linkedHashSet) {
        this.handlers = linkedHashSet;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public void setDescription(String str) {
        this.description = str;
    }

    public LifecycleState getLifecycleState() {
        return this.lifecycleState;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [io.bigdime.core.sink.DataSink$1] */
    private void startHealthcheckThread() {
        new Thread() { // from class: io.bigdime.core.sink.DataSink.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Thread.currentThread().setName("healthcheck for " + DataSink.this.getName());
                    DataSink.logger.debug("heathcheck thread for sink", "sink_name=\"{}\" handlerManager=\"{}\"", DataSink.this.getName(), DataSink.this.handlerManager);
                    DataSink.this.futureTask.get();
                    DataSink.logger.info("heathcheck thread for sink, future task completed", "sink_name=\"{}\" handlerManager=\"{}\" futureTask.isDone=\"{}\"", DataSink.this.getName(), DataSink.this.handlerManager, Boolean.valueOf(DataSink.this.futureTask.isDone()));
                    DataSink.this.lifecycleState = LifecycleState.STOP;
                } catch (CancellationException e) {
                    DataSink.logger.info("heathcheck thread for sink, future task cancelled", "sink_name=\"{}\" handlerManager=\"{}\" futureTask.isDone=\"{}\"", DataSink.this.getName(), DataSink.this.handlerManager, Boolean.valueOf(DataSink.this.futureTask.isDone()));
                    DataSink.this.lifecycleState = LifecycleState.STOP;
                } catch (Exception e2) {
                    DataSink.logger.debug("notifying observer", "observer_count=\"{}\"", Integer.valueOf(DataSink.this.countObservers()));
                    DataSink.this.lifecycleState = LifecycleState.ERROR;
                    DataSink.this.setChanged();
                    DataSink.this.notifyObservers(e2);
                    DataSink.logger.alert(Logger.ALERT_TYPE.INGESTION_FAILED, Logger.ALERT_CAUSE.APPLICATION_INTERNAL_ERROR, Logger.ALERT_SEVERITY.BLOCKER, "\"task completed with an exception\"", e2);
                }
            }
        }.start();
        logger.info("started heathcheck thread for sink", "sink_name=\"{}\" handlerManager=\"{}\"", getName(), this.handlerManager);
    }

    public void start() {
        logger.debug("starting sink", "sink_name=\"{}\" handlerManager=\"{}\"", getName(), this.handlerManager);
        this.lifecycleState = LifecycleState.START;
        this.executorService = Executors.newSingleThreadExecutor();
        this.futureTask = new FutureTask<>(new Callable<Object>() { // from class: io.bigdime.core.sink.DataSink.2
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                Thread.currentThread().setName(DataSink.this.getName());
                DataSink.logger.debug("starting sink thread", "sink_name=\"{}\" adaptor_type=\"{}\"", DataSink.this.getName(), AdaptorConfig.getInstance().getType());
                DataSink.this.runForever();
                return null;
            }
        });
        startHealthcheckThread();
        this.executorService.execute(this.futureTask);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runForever() throws HandlerException {
        while (!this.interrupted) {
            try {
                logger.debug("starting sink thread", "sink_name=\"{}\"", getName());
                postHandlerChain(executeHandlerChain());
            } catch (HandlerException e) {
                this.errorCount++;
                logger.alert(Logger.ALERT_TYPE.INGESTION_FAILED, Logger.ALERT_CAUSE.APPLICATION_INTERNAL_ERROR, Logger.ALERT_SEVERITY.BLOCKER, "handler chain threw an exception, error_count=\"{}\" error_threshold=\"{}\"", Integer.valueOf(this.errorCount), Integer.valueOf(this.errorThreshold), e);
                if (this.errorCount > this.errorThreshold) {
                    throw new HandlerException("unable to continue the adaptor, error count exceeded threshold");
                }
            }
        }
    }

    private ActionEvent.Status executeHandlerChain() throws HandlerException {
        return this.handlerManager.execute();
    }

    private void postHandlerChain(ActionEvent.Status status) {
        try {
            if (status == ActionEvent.Status.BACKOFF) {
                Thread.sleep(this.sleepForMillis);
            }
        } catch (InterruptedException e) {
            logger.warn("data sink running", "thread interrupted while sleeping");
        }
    }

    public void stop() {
        logger.warn("data sink stopping", "setting interrupted to true");
        this.interrupted = true;
        try {
            this.handlerManager.shutdown();
        } catch (IllegalHandlerStateException e) {
        }
        this.futureTask.cancel(true);
        this.executorService.shutdown();
        this.lifecycleState = LifecycleState.STOP;
    }

    public String getId() {
        return this.id;
    }

    public int hashCode() {
        return (31 * 1) + (this.name == null ? 0 : this.name.hashCode());
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        DataSink dataSink = (DataSink) obj;
        return this.name == null ? dataSink.name == null : this.name.equals(dataSink.name);
    }
}
