package io.continual.services.processor.engine.library.sources;

import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.Program;
import io.continual.services.processor.engine.model.SimpleMessageAndRouting;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.util.time.Clock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/processor/engine/library/sources/BasicSource.class */
public abstract class BasicSource implements Source {
    private final String fDefPipeline;
    private final ArrayList<MessageAndRouting> fRequeued;
    private boolean fEof;
    private static final long[] skStdBackoffTimes = {1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987};
    private static final Logger log = LoggerFactory.getLogger(BasicSource.class);

    @Override // io.continual.services.processor.engine.model.Source
    public synchronized boolean isEof() {
        return this.fRequeued.isEmpty() && this.fEof;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        noteEndOfStream();
    }

    @Override // io.continual.services.processor.engine.model.Source
    public synchronized void requeue(MessageAndRouting messageAndRouting) {
        this.fRequeued.add(messageAndRouting);
    }

    @Override // io.continual.services.processor.engine.model.Source
    public final MessageAndRouting getNextMessage(StreamProcessingContext streamProcessingContext, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        long[] backoffTimes = getBackoffTimes();
        int i = 0;
        long now = Clock.now() + TimeUnit.MILLISECONDS.convert(j, timeUnit);
        do {
            synchronized (this) {
                if (!this.fRequeued.isEmpty()) {
                    return this.fRequeued.remove(0);
                }
                if (isEof()) {
                    return null;
                }
                MessageAndRouting internalGetNextMessage = internalGetNextMessage(streamProcessingContext);
                if (internalGetNextMessage != null) {
                    return internalGetNextMessage;
                }
                long max = Math.max(0L, now - Clock.now());
                if (max > 0) {
                    int i2 = i;
                    i++;
                    long min = Math.min(max, backoffTimes[i2]);
                    if (i == backoffTimes.length) {
                        i = 0;
                    }
                    log.debug("... backing off {} ms", Long.valueOf(min));
                    Thread.sleep(min);
                }
            }
        } while (Clock.now() < now);
        return null;
    }

    @Override // io.continual.services.processor.engine.model.Source
    public synchronized void markComplete(StreamProcessingContext streamProcessingContext, MessageAndRouting messageAndRouting) {
    }

    protected abstract MessageAndRouting internalGetNextMessage(StreamProcessingContext streamProcessingContext) throws IOException, InterruptedException;

    protected long[] getBackoffTimes() {
        return skStdBackoffTimes;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BasicSource(String str) {
        this.fEof = false;
        this.fDefPipeline = str == null ? Program.kDefaultPipeline : str;
        this.fRequeued = new ArrayList<>();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BasicSource(JSONObject jSONObject) {
        this(jSONObject.getString("pipeline"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BasicSource() {
        this((String) null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageAndRouting makeDefRoutingMessage(Message message) {
        return new SimpleMessageAndRouting(message, getDefaultPipelineName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void noteEndOfStream() {
        this.fEof = true;
    }

    protected String getDefaultPipelineName() {
        return this.fDefPipeline;
    }
}
