package io.confluent.connect.cdc;

import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import io.confluent.connect.cdc.CDCSourceConnectorConfig;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/cdc/BaseServiceTask.class */
public abstract class BaseServiceTask<CONF extends CDCSourceConnectorConfig> extends CDCSourceTask<CONF> {
    private static final Logger log = LoggerFactory.getLogger(BaseServiceTask.class);
    ServiceManager serviceManager;

    protected abstract Service service(ChangeWriter changeWriter, OffsetStorageReader offsetStorageReader);

    @Override // io.confluent.connect.cdc.CDCSourceTask
    public void start(Map<String, String> map) {
        super.start(map);
        this.serviceManager = new ServiceManager(Arrays.asList(service(this, this.context.offsetStorageReader())));
        log.info("Starting Services");
        this.serviceManager.startAsync();
        try {
            this.serviceManager.awaitHealthy(60L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            throw new ConnectException("Timeout while starting service.", e);
        }
    }

    public void stop() {
        log.info("Stopping Services");
        this.serviceManager.stopAsync();
        try {
            this.serviceManager.awaitStopped(60L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            throw new ConnectException("Timeout while stopping service.", e);
        }
    }
}
