package org.apache.pulsar.io.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/apache/pulsar/io/canal/CanalAbstractSource.class */
public abstract class CanalAbstractSource<V> extends PushSource<V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CanalAbstractSource.class);
    private CanalConnector connector;
    private CanalSourceConfig canalSourceConfig;
    private static final String DESTINATION = "destination";
    protected Thread thread = null;
    protected volatile boolean running = false;
    protected final Thread.UncaughtExceptionHandler handler = (thread, th) -> {
        log.error("[{}] parse events has an error", thread.getName(), th);
    };

    /* loaded from: input_file:org/apache/pulsar/io/canal/CanalAbstractSource$CanalRecord.class */
    private static class CanalRecord<V> implements Record<V> {
        private V record;
        private Long id;
        private CanalConnector connector;

        public CanalRecord(CanalConnector canalConnector) {
            this.connector = canalConnector;
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Optional<String> getKey() {
            return Optional.of(Long.toString(this.id.longValue()));
        }

        @Override // org.apache.pulsar.functions.api.Record
        public V getValue() {
            return this.record;
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Optional<Long> getRecordSequence() {
            return Optional.of(this.id);
        }

        @Override // org.apache.pulsar.functions.api.Record
        public void ack() {
            CanalAbstractSource.log.info("CanalRecord ack id is {}", this.id);
            this.connector.ack(this.id.longValue());
        }

        public V getRecord() {
            return this.record;
        }

        public Long getId() {
            return this.id;
        }

        public CanalConnector getConnector() {
            return this.connector;
        }

        public void setRecord(V v) {
            this.record = v;
        }

        public void setId(Long l) {
            this.id = l;
        }

        public void setConnector(CanalConnector canalConnector) {
            this.connector = canalConnector;
        }
    }

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        this.canalSourceConfig = CanalSourceConfig.load(map);
        if (this.canalSourceConfig.getCluster().booleanValue()) {
            this.connector = CanalConnectors.newClusterConnector(this.canalSourceConfig.getZkServers(), this.canalSourceConfig.getDestination(), this.canalSourceConfig.getUsername(), this.canalSourceConfig.getPassword());
            log.info("Start canal connect in cluster mode, canal cluster info {}", this.canalSourceConfig.getZkServers());
        } else {
            this.connector = CanalConnectors.newSingleConnector(new InetSocketAddress(this.canalSourceConfig.getSingleHostname(), this.canalSourceConfig.getSinglePort()), this.canalSourceConfig.getDestination(), this.canalSourceConfig.getUsername(), this.canalSourceConfig.getPassword());
            log.info("Start canal connect in standalone mode, canal server info {}:{}", this.canalSourceConfig.getSingleHostname(), Integer.valueOf(this.canalSourceConfig.getSinglePort()));
        }
        log.info("canal source destination {}", this.canalSourceConfig.getDestination());
        start();
    }

    protected void start() {
        Objects.requireNonNull(this.connector, "connector is null");
        this.thread = new Thread(this::process);
        this.thread.setName("canal source thread");
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.running = true;
        this.thread.start();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        log.info("close canal source");
        if (this.running) {
            this.running = false;
            if (this.thread != null) {
                this.thread.interrupt();
                this.thread.join();
            }
            if (this.connector != null) {
                this.connector.disconnect();
            }
            MDC.remove("destination");
        }
    }

    protected void process() {
        while (this.running) {
            try {
                try {
                    MDC.put("destination", this.canalSourceConfig.getDestination());
                    this.connector.connect();
                    log.info("start canal process");
                    this.connector.subscribe();
                    while (this.running) {
                        Message withoutAck = this.connector.getWithoutAck(this.canalSourceConfig.getBatchSize());
                        withoutAck.setRaw(false);
                        List<FlatMessage> messageConverter = MessageUtils.messageConverter(withoutAck);
                        long longValue = getMessageId(withoutAck).longValue();
                        int size = withoutAck.getEntries().size();
                        if (longValue == -1 || size == 0) {
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e) {
                            }
                        } else if (messageConverter != null) {
                            CanalRecord canalRecord = new CanalRecord(this.connector);
                            canalRecord.setId(Long.valueOf(longValue));
                            canalRecord.setRecord(extractValue(messageConverter));
                            consume(canalRecord);
                        }
                    }
                    this.connector.disconnect();
                    MDC.remove("destination");
                } catch (Exception e2) {
                    log.error("process error!", (Throwable) e2);
                    this.connector.disconnect();
                    MDC.remove("destination");
                }
            } catch (Throwable th) {
                this.connector.disconnect();
                MDC.remove("destination");
                throw th;
            }
        }
    }

    public abstract Long getMessageId(Message message);

    public abstract V extractValue(List<FlatMessage> list);
}
