package io.reactivesocket.lease;

import io.reactivesocket.Frame;
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.Cancellable;
import io.reactivesocket.reactivestreams.extensions.internal.CancellableImpl;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/reactivesocket/lease/FairLeaseDistributor.class */
public final class FairLeaseDistributor implements DefaultLeaseEnforcingSocket.LeaseDistributor {
    private final LinkedBlockingQueue<Consumer<Lease>> activeRecipients;
    private Subscription ticksSubscription;
    private volatile boolean startTicks;
    private final IntSupplier capacitySupplier;
    private final int leaseTTLMillis;
    private final Publisher<Long> leaseDistributionTicks;
    private final boolean redistributeOnConnect;

    public FairLeaseDistributor(IntSupplier intSupplier, int i, Publisher<Long> publisher, boolean z) {
        this.capacitySupplier = intSupplier;
        this.leaseTTLMillis = (int) (i * 1.1d);
        this.leaseDistributionTicks = publisher;
        this.redistributeOnConnect = z;
        this.activeRecipients = new LinkedBlockingQueue<>();
    }

    public FairLeaseDistributor(IntSupplier intSupplier, int i, Publisher<Long> publisher) {
        this(intSupplier, i, publisher, true);
    }

    @Override // io.reactivesocket.lease.DefaultLeaseEnforcingSocket.LeaseDistributor
    public void shutdown() {
        this.ticksSubscription.cancel();
    }

    @Override // io.reactivesocket.lease.DefaultLeaseEnforcingSocket.LeaseDistributor
    public Cancellable registerSocket(final Consumer<Lease> consumer) {
        boolean z;
        this.activeRecipients.add(consumer);
        synchronized (this) {
            z = this.startTicks;
            if (!this.startTicks) {
                startTicks();
                this.startTicks = true;
            }
        }
        if (z && this.redistributeOnConnect) {
            distribute(this.capacitySupplier.getAsInt());
        }
        return new CancellableImpl() { // from class: io.reactivesocket.lease.FairLeaseDistributor.1
            protected void onCancel() {
                FairLeaseDistributor.this.activeRecipients.remove(consumer);
            }
        };
    }

    private void distribute(int i) {
        if (this.activeRecipients.isEmpty()) {
            return;
        }
        int size = this.activeRecipients.size();
        int i2 = i / size;
        int i3 = i - (i2 * size);
        LeaseImpl leaseImpl = new LeaseImpl(i2, this.leaseTTLMillis, Frame.NULL_BYTEBUFFER);
        Iterator<Consumer<Lease>> it = this.activeRecipients.iterator();
        while (it.hasNext()) {
            Consumer<Lease> next = it.next();
            LeaseImpl leaseImpl2 = leaseImpl;
            if (i3 > 0) {
                i3--;
                leaseImpl2 = new LeaseImpl(i2 + 1, this.leaseTTLMillis, Frame.NULL_BYTEBUFFER);
            }
            next.accept(leaseImpl2);
        }
    }

    private void startTicks() {
        Px.from(this.leaseDistributionTicks).doOnSubscribe(subscription -> {
            this.ticksSubscription = subscription;
        }).doOnNext(l -> {
            distribute(this.capacitySupplier.getAsInt());
        }).ignore().subscribe();
    }
}
