package org.ikasan.rest.client;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.ikasan.spec.module.client.LogStreamingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.env.Environment;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/ikasan/rest/client/LogStreamingServiceRestImpl.class */
public class LogStreamingServiceRestImpl extends ModuleRestService implements LogStreamingService<ServerSentEvent<String>> {
    private static final Logger LOG = LoggerFactory.getLogger(LogStreamingServiceRestImpl.class);
    private static final String FULL_FILE_PATH = "fullFilePath";
    private Flux<ServerSentEvent<String>> eventStream;
    private AtomicBoolean runFlag;

    public LogStreamingServiceRestImpl(Environment environment, HttpComponentsClientHttpRequestFactory httpComponentsClientHttpRequestFactory) {
        super(environment, httpComponentsClientHttpRequestFactory);
        this.runFlag = new AtomicBoolean(true);
    }

    public void streamLogFile(String str, String str2, String str3, Consumer<ServerSentEvent<String>> consumer, Consumer<Throwable> consumer2, Runnable runnable) throws InterruptedException {
        try {
            WebClient webClient = getWebClient(str);
            this.eventStream = webClient.get().uri(str2 + "?fullFilePath=" + str3, new Object[0]).retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() { // from class: org.ikasan.rest.client.LogStreamingServiceRestImpl.1
            });
            this.eventStream.subscribe(consumer, consumer2, runnable);
            while (this.runFlag.get()) {
                TimeUnit.SECONDS.sleep(1L);
            }
        } catch (Exception e) {
            this.runFlag.set(false);
            LOG.error(e.getMessage());
            throw e;
        }
    }

    private WebClient getWebClient(String str) {
        List list = super.createHttpHeaders().get("Authorization");
        return list != null ? WebClient.builder().baseUrl(str).defaultHeader("Authorization", (String[]) list.toArray(new String[0])).build() : WebClient.builder().baseUrl(str).build();
    }
}
