package io.datakernel.stream;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import java.util.EnumSet;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

/* loaded from: input_file:io/datakernel/stream/StreamConsumers.class */
public final class StreamConsumers {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX INFO: Add missing generic type declarations: [T] */
    /* renamed from: io.datakernel.stream.StreamConsumers$1, reason: invalid class name */
    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$1.class */
    public static class AnonymousClass1<T> extends ForwardingStreamConsumer<T> {
        final SettableStage endOfStream;
        final /* synthetic */ StreamConsumer val$consumer;
        final /* synthetic */ Decorator val$decorator;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass1(StreamConsumer streamConsumer, StreamConsumer streamConsumer2, Decorator decorator) {
            super(streamConsumer);
            this.val$consumer = streamConsumer2;
            this.val$decorator = decorator;
            this.endOfStream = SettableStage.create();
            Stage<Void> endOfStream = this.val$consumer.getEndOfStream();
            SettableStage settableStage = this.endOfStream;
            settableStage.getClass();
            endOfStream.whenComplete((v1, v2) -> {
                r1.trySet(v1, v2);
            });
        }

        @Override // io.datakernel.stream.ForwardingStreamConsumer, io.datakernel.stream.StreamConsumer
        public void setProducer(StreamProducer<T> streamProducer) {
            super.setProducer(new ForwardingStreamProducer<T>(streamProducer) { // from class: io.datakernel.stream.StreamConsumers.1.1
                @Override // io.datakernel.stream.ForwardingStreamProducer, io.datakernel.stream.StreamProducer
                public void produce(StreamDataReceiver<T> streamDataReceiver) {
                    final StreamDataReceiver<T>[] streamDataReceiverArr = {AnonymousClass1.this.val$decorator.decorate(new Decorator.Context() { // from class: io.datakernel.stream.StreamConsumers.1.1.1
                        final Eventloop eventloop = Eventloop.getCurrentEventloop();

                        @Override // io.datakernel.stream.StreamConsumers.Decorator.Context
                        public void suspend() {
                            C00001.this.producer.suspend();
                        }

                        @Override // io.datakernel.stream.StreamConsumers.Decorator.Context
                        public void resume() {
                            Eventloop eventloop = this.eventloop;
                            StreamDataReceiver[] streamDataReceiverArr2 = streamDataReceiverArr;
                            eventloop.post(() -> {
                                C00001.this.producer.produce(streamDataReceiverArr2[0]);
                            });
                        }

                        @Override // io.datakernel.stream.StreamConsumers.Decorator.Context
                        public void closeWithError(Throwable th) {
                            AnonymousClass1.this.endOfStream.trySetException(th);
                        }
                    }, streamDataReceiver)};
                    super.produce(streamDataReceiverArr[0]);
                }
            });
        }

        @Override // io.datakernel.stream.ForwardingStreamConsumer, io.datakernel.stream.StreamConsumer
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$ClosingWithErrorImpl.class */
    public static final class ClosingWithErrorImpl<T> implements StreamConsumer<T> {
        private final Throwable exception;
        private final SettableStage<Void> endOfStream = SettableStage.create();

        /* JADX INFO: Access modifiers changed from: package-private */
        public ClosingWithErrorImpl(Throwable th) {
            this.exception = th;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public void setProducer(StreamProducer<T> streamProducer) {
            Eventloop.getCurrentEventloop().post(() -> {
                this.endOfStream.setException(this.exception);
            });
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$Decorator.class */
    public interface Decorator<T> {

        /* loaded from: input_file:io/datakernel/stream/StreamConsumers$Decorator$Context.class */
        public interface Context {
            void suspend();

            void resume();

            void closeWithError(Throwable th);
        }

        StreamDataReceiver<T> decorate(Context context, StreamDataReceiver<T> streamDataReceiver);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$IdleImpl.class */
    public static final class IdleImpl<T> implements StreamConsumer<T> {
        private final SettableStage<Void> endOfStream = SettableStage.create();

        @Override // io.datakernel.stream.StreamConsumer
        public void setProducer(StreamProducer<T> streamProducer) {
            Stage<Void> endOfStream = streamProducer.getEndOfStream();
            SettableStage<Void> settableStage = this.endOfStream;
            settableStage.getClass();
            endOfStream.whenComplete((v1, v2) -> {
                r1.set(v1, v2);
            });
            streamProducer.produce(obj -> {
            });
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Stage<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$OfConsumerImpl.class */
    static final class OfConsumerImpl<T> extends AbstractStreamConsumer<T> {
        private final Consumer<T> consumer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OfConsumerImpl(Consumer<T> consumer) {
            this.consumer = consumer;
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            StreamProducer<T> producer = getProducer();
            Consumer<T> consumer = this.consumer;
            consumer.getClass();
            producer.produce(consumer::accept);
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onEndOfStream() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }
    }

    private StreamConsumers() {
    }

    public static <T> StreamConsumerModifier<T, T> decorator(Decorator<T> decorator) {
        return streamConsumer -> {
            return new AnonymousClass1(streamConsumer, streamConsumer, decorator);
        };
    }

    public static <T> StreamConsumerModifier<T, T> errorDecorator(Function<T, Throwable> function) {
        return decorator((context, streamDataReceiver) -> {
            return obj -> {
                Throwable th = (Throwable) function.apply(obj);
                if (th == null) {
                    streamDataReceiver.onData(obj);
                } else {
                    context.closeWithError(th);
                }
            };
        });
    }

    public static <T> StreamConsumerModifier<T, T> suspendDecorator(Predicate<T> predicate, Consumer<Decorator.Context> consumer) {
        return decorator((context, streamDataReceiver) -> {
            return obj -> {
                streamDataReceiver.onData(obj);
                if (predicate.test(obj)) {
                    context.suspend();
                    consumer.accept(context);
                }
            };
        });
    }

    public static <T> StreamConsumerModifier<T, T> suspendDecorator(Predicate<T> predicate) {
        return suspendDecorator(predicate, (v0) -> {
            v0.resume();
        });
    }

    public static <T> StreamConsumerModifier<T, T> oneByOne() {
        return suspendDecorator(obj -> {
            return true;
        });
    }

    public static <T> StreamConsumerModifier<T, T> randomlySuspending(Random random, double d) {
        return suspendDecorator(obj -> {
            return random.nextDouble() < d;
        });
    }

    public static <T> StreamConsumerModifier<T, T> randomlySuspending(double d) {
        return randomlySuspending(new Random(), d);
    }

    public static <T> StreamConsumerModifier<T, T> randomlySuspending() {
        return randomlySuspending(0.5d);
    }
}
