package io.sermant.implement.service.xds.handler;

import com.alibaba.nacos.api.common.Constants;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import io.sermant.core.common.LoggerFactory;
import io.sermant.core.utils.FileUtils;
import io.sermant.core.utils.NetworkUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.implement.service.xds.client.XdsClient;
import io.sermant.implement.service.xds.constants.XdsEnvConstant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:io/sermant/implement/service/xds/handler/XdsHandler.class */
public abstract class XdsHandler<T> implements XdsServiceAction {
    private static final Logger LOGGER = LoggerFactory.getLogger();
    private static final int DELAY_TIME = 3000;
    private static final int THREAD_CORE_SIZE = 1;
    private static final int THREAD_MAX_SIZE = 3;
    private static final long THREAD_KEEP_ALIVE_TIME = 60;
    private static final int THREAD_QUEUE_CAPACITY = 20;
    private static volatile ThreadPoolExecutor executor;
    protected final XdsClient client;
    protected String resourceType;
    protected Node node;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/sermant/implement/service/xds/handler/XdsHandler$NamedThreadFactory.class */
    public static class NamedThreadFactory implements ThreadFactory {
        private static final String THREAD_NAME_PREFIX = "xds-reconnection-thread-";
        private final AtomicInteger threadNumber = new AtomicInteger(1);

        NamedThreadFactory() {
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, THREAD_NAME_PREFIX + this.threadNumber.getAndIncrement());
        }
    }

    public XdsHandler(XdsClient xdsClient) {
        this.client = xdsClient;
        createNode();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DiscoveryRequest buildDiscoveryRequest(String str, String str2, String str3, Set<String> set) {
        DiscoveryRequest.Builder typeUrl = DiscoveryRequest.newBuilder().setNode(this.node).setTypeUrl(str);
        if (str2 != null) {
            typeUrl.setVersionInfo(str2);
        }
        if (str3 != null) {
            typeUrl.setResponseNonce(str3);
        }
        typeUrl.addAllResourceNames(set);
        return typeUrl.build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<T> decodeResources(DiscoveryResponse discoveryResponse, Class cls) {
        ArrayList arrayList = new ArrayList();
        for (Any any : discoveryResponse.getResourcesList()) {
            if (any != null) {
                try {
                    arrayList.add(any.unpack(cls));
                } catch (InvalidProtocolBufferException e) {
                    LOGGER.log(Level.SEVERE, "Decode {0} resource failed, the error message is {1}", new Object[]{cls.getSimpleName(), e});
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DiscoveryRequest builtAckDiscoveryRequest(DiscoveryResponse discoveryResponse, Set<String> set) {
        return buildDiscoveryRequest(discoveryResponse.getTypeUrl(), discoveryResponse.getVersionInfo(), discoveryResponse.getNonce(), set);
    }

    private void createNode() {
        String readFileToString = FileUtils.readFileToString(XdsEnvConstant.K8S_POD_NAMESPACE_PATH);
        String str = StringUtils.isEmpty(readFileToString) ? "default" : readFileToString;
        StringBuilder sb = new StringBuilder();
        sb.append(XdsEnvConstant.SIDECAR).append("~").append(NetworkUtils.getMachineIp()).append("~").append(System.getenv("HOSTNAME")).append(".").append(str).append("~").append(str).append(".").append(XdsEnvConstant.HOST_SUFFIX);
        this.node = Node.newBuilder().setId(sb.toString()).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamObserver<DiscoveryResponse> getResponseStreamObserver(final String str, final CountDownLatch countDownLatch) {
        return new StreamObserver<DiscoveryResponse>() { // from class: io.sermant.implement.service.xds.handler.XdsHandler.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(DiscoveryResponse discoveryResponse) {
                XdsHandler.this.handleResponse(str, discoveryResponse);
                XdsHandler.this.countDown(countDownLatch);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                XdsHandler.this.countDown(countDownLatch);
                XdsHandler.this.handleError(th, str);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                XdsHandler.this.countDown(countDownLatch);
                XdsHandler.this.handleCompletion(str);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void countDown(CountDownLatch countDownLatch) {
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(Throwable th, String str) {
        initExecutor();
        executor.submit(() -> {
            try {
                Thread.sleep(Constants.DEFAULT_REDO_DELAY_TIME);
            } catch (InterruptedException e) {
                LOGGER.log(Level.WARNING, "An error occurred in thread sleeping.", (Throwable) e);
            }
            this.client.updateChannel();
            subscribe(str, null);
        });
        LOGGER.log(Level.SEVERE, "An error occurred in Xds communication with istiod.", th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCompletion(String str) {
        subscribe(str, null);
        LOGGER.log(Level.WARNING, "Xds stream is closed, new stream has been created for communication.");
    }

    protected abstract void handleResponse(String str, DiscoveryResponse discoveryResponse);

    private static void initExecutor() {
        if (executor == null) {
            synchronized (XdsHandler.class) {
                if (executor == null) {
                    executor = new ThreadPoolExecutor(1, 3, THREAD_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(20), new NamedThreadFactory());
                    executor.allowCoreThreadTimeOut(true);
                }
            }
        }
    }
}
