package io.vlingo.http.resource;

import io.vlingo.actors.Actor;
import io.vlingo.actors.CompletesEventually;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduled;
import io.vlingo.http.Request;
import io.vlingo.http.RequestHeader;
import io.vlingo.http.Response;
import io.vlingo.http.ResponseHeader;
import io.vlingo.http.ResponseParser;
import io.vlingo.http.resource.Client;
import io.vlingo.http.resource.ClientConsumer;
import io.vlingo.wire.channel.ResponseChannelConsumer;
import io.vlingo.wire.message.ByteBufferAllocator;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.Converters;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/* loaded from: input_file:io/vlingo/http/resource/ClientCorrelatingRequesterConsumerActor.class */
public class ClientCorrelatingRequesterConsumerActor extends Actor implements ClientConsumer {
    private final Map<String, CompletesEventually> completables = new HashMap();
    private final ClientConsumer.State state;

    public ClientCorrelatingRequesterConsumerActor(Client.Configuration configuration) throws Exception {
        this.state = new ClientConsumer.State(configuration, ClientConsumerCommons.clientChannel(configuration, (ResponseChannelConsumer) selfAs(ResponseChannelConsumer.class), logger()), null, stage().scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, 1L, configuration.probeInterval), ByteBufferAllocator.allocate(configuration.writeBufferSize));
    }

    public void consume(ConsumerByteBuffer consumerByteBuffer) {
        if (this.state.parser == null) {
            this.state.parser = ResponseParser.parserFor(consumerByteBuffer.asByteBuffer());
        } else {
            this.state.parser.parseNext(consumerByteBuffer.asByteBuffer());
        }
        consumerByteBuffer.release();
        while (this.state.parser.hasFullResponse()) {
            Response fullResponse = this.state.parser.fullResponse();
            ResponseHeader headerOf = fullResponse.headers.headerOf("X-Correlation-ID");
            if (headerOf == null) {
                logger().warn("Client Consumer: Cannot complete response because no correlation id.");
                this.state.configuration.consumerOfUnknownResponses.consume(fullResponse);
            } else {
                CompletesEventually remove = this.state.configuration.keepAlive ? this.completables.get(headerOf.value) : this.completables.remove(headerOf.value);
                if (remove == null) {
                    this.state.configuration.stage.world().defaultLogger().warn("Client Consumer: Cannot complete response because mismatched correlation id: " + headerOf.value);
                    this.state.configuration.consumerOfUnknownResponses.consume(fullResponse);
                } else {
                    remove.with(fullResponse);
                }
            }
        }
    }

    @Override // io.vlingo.http.resource.ClientConsumer
    public void intervalSignal(Scheduled<Object> scheduled, Object obj) {
        this.state.channel.probeChannel();
    }

    @Override // io.vlingo.http.resource.ClientConsumer
    public Completes<Response> requestWith(Request request, Completes<Response> completes) {
        Request request2;
        RequestHeader headerOf = request.headers.headerOf("X-Correlation-ID");
        if (headerOf == null) {
            headerOf = RequestHeader.of("X-Correlation-ID", UUID.randomUUID().toString());
            request2 = request.and(headerOf);
        } else {
            request2 = request;
        }
        this.completables.put(headerOf.value, stage().world().completesFor(completes));
        this.state.buffer.clear();
        this.state.buffer.put(Converters.textToBytes(request2.toString()));
        this.state.buffer.flip();
        this.state.channel.requestWith(this.state.buffer);
        return completes;
    }

    public void stop() {
        this.state.channel.close();
        this.state.probe.cancel();
    }
}
