package org.apache.pulsar.io.twitter;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.common.DelimitedStreamReader;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.twitter.data.TweetData;
import org.apache.pulsar.io.twitter.data.TwitterRecord;
import org.apache.pulsar.io.twitter.endpoint.SampleStatusesEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "twitter", type = IOType.SOURCE, help = "A simple connector moving tweets from Twitter FireHose to Pulsar", configClass = TwitterFireHoseConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/twitter/TwitterFireHose.class */
public class TwitterFireHose extends PushSource<TweetData> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TwitterFireHose.class);
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TwitterFireHose.class);
    private Object waitObject;
    private final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    @Override // org.apache.pulsar.io.core.PushSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws IOException {
        TwitterFireHoseConfig twitterFireHoseConfig = (TwitterFireHoseConfig) IOConfigUtils.loadWithSecrets(map, TwitterFireHoseConfig.class, sourceContext);
        twitterFireHoseConfig.validate();
        this.waitObject = new Object();
        startThread(twitterFireHoseConfig);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stopThread();
    }

    private void startThread(final TwitterFireHoseConfig twitterFireHoseConfig) {
        BasicClient build = new ClientBuilder().name(twitterFireHoseConfig.getClientName()).hosts(twitterFireHoseConfig.getClientHosts()).endpoint(getEndpoint(twitterFireHoseConfig)).authentication(getAuthentication(twitterFireHoseConfig)).processor(new HosebirdMessageProcessor() { // from class: org.apache.pulsar.io.twitter.TwitterFireHose.1
            public DelimitedStreamReader reader;

            @Override // com.twitter.hbc.core.processor.HosebirdMessageProcessor
            public void setup(InputStream inputStream) {
                this.reader = new DelimitedStreamReader(inputStream, Constants.DEFAULT_CHARSET, twitterFireHoseConfig.getClientBufferSize());
            }

            @Override // com.twitter.hbc.core.processor.HosebirdMessageProcessor
            public boolean process() throws IOException, InterruptedException {
                try {
                    TwitterFireHose.this.consume(new TwitterRecord((TweetData) TwitterFireHose.this.mapper.readValue(this.reader.readLine(), TweetData.class), twitterFireHoseConfig.getGuestimateTweetTime().booleanValue()));
                    return true;
                } catch (Exception e) {
                    TwitterFireHose.LOG.error("Exception thrown", (Throwable) e);
                    return true;
                }
            }
        }).build();
        Thread thread = new Thread(() -> {
            LOG.info("Started the Twitter FireHose Runner Thread");
            build.connect();
            LOG.info("Twitter Streaming API connection established successfully");
            try {
                synchronized (this.waitObject) {
                    this.waitObject.wait();
                }
            } catch (Exception e) {
                LOG.info("Got a exception in waitObject");
            }
            LOG.debug("Closing Twitter Streaming API connection");
            build.stop();
            LOG.info("Twitter Streaming API connection closed");
            LOG.info("Twitter FireHose Runner Thread ending");
        });
        thread.setName("TwitterFireHoseRunner");
        thread.start();
    }

    private void stopThread() {
        LOG.info("Source closed");
        synchronized (this.waitObject) {
            this.waitObject.notify();
        }
    }

    private Authentication getAuthentication(TwitterFireHoseConfig twitterFireHoseConfig) {
        return new OAuth1(twitterFireHoseConfig.getConsumerKey(), twitterFireHoseConfig.getConsumerSecret(), twitterFireHoseConfig.getToken(), twitterFireHoseConfig.getTokenSecret());
    }

    private StreamingEndpoint getEndpoint(TwitterFireHoseConfig twitterFireHoseConfig) {
        List<Long> followings = twitterFireHoseConfig.getFollowings();
        List<String> trackTerms = twitterFireHoseConfig.getTrackTerms();
        if (CollectionUtils.isEmpty(followings) && CollectionUtils.isEmpty(trackTerms)) {
            return new SampleStatusesEndpoint().createEndpoint();
        }
        StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
        if (CollectionUtils.isNotEmpty(followings)) {
            statusesFilterEndpoint.followings(followings);
        }
        if (CollectionUtils.isNotEmpty(trackTerms)) {
            statusesFilterEndpoint.trackTerms(trackTerms);
        }
        return statusesFilterEndpoint;
    }
}
