package io.mantisrx.runtime.core.sources;

import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Subscription;

/* loaded from: input_file:io/mantisrx/runtime/core/sources/ObservableSourceImpl.class */
public class ObservableSourceImpl<R> implements SourceFunction<R> {
    private final Source<R> source;
    private final AtomicReference<R> elemContainer = new AtomicReference<>();
    private Subscription subscription;

    public ObservableSourceImpl(Source<R> source) {
        this.source = source;
    }

    @Override // io.mantisrx.runtime.core.functions.MantisFunction
    public void init() {
        Context context = new Context();
        Index index = new Index(0, 0);
        this.source.init(context, index);
        Observable flatMap = ((Observable) this.source.call(context, index)).flatMap(observable -> {
            return observable;
        });
        AtomicReference<R> atomicReference = this.elemContainer;
        atomicReference.getClass();
        this.subscription = flatMap.subscribe(atomicReference::set);
    }

    @Override // io.mantisrx.runtime.core.functions.MantisFunction, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.subscription != null) {
            this.subscription.unsubscribe();
        }
    }

    @Override // io.mantisrx.runtime.core.sources.SourceFunction
    public R next() {
        return this.elemContainer.get();
    }

    public Source<R> getSource() {
        return this.source;
    }
}
