package poussecafe.spring.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import poussecafe.jackson.JacksonMessageAdapter;
import poussecafe.messaging.MessageReceiver;
import poussecafe.messaging.MessageSender;
import poussecafe.processing.MessageBroker;
import poussecafe.processing.ReceivedMessage;
import poussecafe.runtime.OriginalAndMarshaledMessage;
import poussecafe.spring.kafka.KafkaMessageReceiver;
import poussecafe.spring.kafka.KafkaMessageSender;

@Component
/* loaded from: input_file:poussecafe/spring/kafka/MessageSenderAndReceiverFactory.class */
public class MessageSenderAndReceiverFactory implements InitializingBean, MessageListener<String, String> {

    @Autowired
    private KafkaMessageListenerContainer<String, String> listenerContainer;

    @Autowired
    private KafkaTemplate<String, String> template;
    private KafkaMessageReceiver kafkaReceiver;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private JacksonMessageAdapter messageAdapter = new JacksonMessageAdapter();

    public void afterPropertiesSet() throws Exception {
        this.logger.info("Configuring Spring Kafka messaging");
        SpringKafkaMessaging.setFactory(this);
        this.listenerContainer.setupMessageListener(this);
    }

    public MessageSender buildMessageSender() {
        return new KafkaMessageSender.Builder().kafkaTemplate(this.template).topic(this.listenerContainer.getContainerProperties().getTopics()[0]).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startListenerContainer() {
        this.listenerContainer.start();
    }

    public MessageReceiver buildMessageReceiver(MessageBroker messageBroker) {
        return new KafkaMessageReceiver.Builder().messageBroker(messageBroker).messageSenderAndReceiverFactory(this).build();
    }

    public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
        String str = (String) consumerRecord.value();
        KafkaMessageReceiver kafkaMessageReceiver = this.kafkaReceiver;
        ReceivedMessage.Builder payload = new ReceivedMessage.Builder().payload(new OriginalAndMarshaledMessage.Builder().marshaled(str).original(this.messageAdapter.adaptSerializedMessage(str)).build());
        acknowledgment.getClass();
        ReceivedMessage.Builder acker = payload.acker(acknowledgment::acknowledge);
        KafkaMessageReceiver kafkaMessageReceiver2 = this.kafkaReceiver;
        kafkaMessageReceiver2.getClass();
        kafkaMessageReceiver.consume(acker.interrupter(kafkaMessageReceiver2::stopReceiving).build());
    }

    public void onMessage(ConsumerRecord<String, String> consumerRecord) {
        throw new UnsupportedOperationException("Acknowledgment is required");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void registerReceiver(KafkaMessageReceiver kafkaMessageReceiver) {
        this.kafkaReceiver = kafkaMessageReceiver;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void deregisterReceiver(KafkaMessageReceiver kafkaMessageReceiver) {
        if (this.kafkaReceiver == kafkaMessageReceiver) {
            this.kafkaReceiver = null;
            this.listenerContainer.stop();
        }
    }
}
