package org.nhindirect.monitor.aggregator.repository;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.hawtdb.HawtDBCamelCodec;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.fusesource.hawtbuf.Buffer;
import org.nhindirect.monitor.dao.AggregationDAO;
import org.nhindirect.monitor.dao.entity.Aggregation;
import org.nhindirect.monitor.dao.entity.AggregationCompleted;

/* loaded from: input_file:WEB-INF/lib/direct-msg-monitor-1.1.7.jar:org/nhindirect/monitor/aggregator/repository/ConcurrentJPAAggregationRepository.class */
public class ConcurrentJPAAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository {
    private static final Log LOGGER = LogFactory.getFactory().getInstance(ConcurrentJPAAggregationRepository.class);
    protected static final String AGGREGATION_ENTITY_VERSON = "AGGREGATION_ENTITY_VERSON";
    protected static final String AGGREGATION_COMPLETE_ENTITY_VERSON = "AGGREGATION_COMPLETE_ENTITY_VERSON";
    protected AggregationDAO dao;
    protected int maximumRedeliveries;
    protected String deadLetterUri;
    protected HawtDBCamelCodec codec = new HawtDBCamelCodec();
    protected long recoveryInterval = 5000;
    protected boolean useRecovery = true;

    public ConcurrentJPAAggregationRepository() {
    }

    public ConcurrentJPAAggregationRepository(AggregationDAO aggregationDAO) {
        this.dao = aggregationDAO;
    }

    public void setAggreationDAO(AggregationDAO aggregationDAO) {
        this.dao = aggregationDAO;
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public Exchange add(CamelContext camelContext, String str, Exchange exchange) {
        try {
            byte[] data = this.codec.marshallExchange(camelContext, exchange).getData();
            Integer num = (Integer) exchange.getProperty(AGGREGATION_ENTITY_VERSON);
            Aggregation aggregation = new Aggregation();
            aggregation.setExchangeBlob(data);
            aggregation.setId(str);
            aggregation.setVersion(num == null ? 0 : num.intValue());
            this.dao.addUpdateAggregation(aggregation);
            exchange.setProperty(AGGREGATION_ENTITY_VERSON, Integer.valueOf(aggregation.getVersion()));
            return null;
        } catch (Exception e) {
            throw new RuntimeException("Error adding to repository aggregation with key " + str, e);
        }
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public Exchange get(CamelContext camelContext, String str) {
        try {
            Aggregation aggregation = this.dao.getAggregation(str);
            if (aggregation == null) {
                return null;
            }
            Exchange unmarshallExchange = this.codec.unmarshallExchange(camelContext, new Buffer(aggregation.getExchangeBlob()));
            unmarshallExchange.setProperty(AGGREGATION_ENTITY_VERSON, Integer.valueOf(aggregation.getVersion()));
            return unmarshallExchange;
        } catch (Exception e) {
            throw new RuntimeException("Error retrieving from repository aggregation with key " + str, e);
        }
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public void remove(CamelContext camelContext, String str, Exchange exchange) {
        try {
            Integer num = (Integer) exchange.getProperty(AGGREGATION_ENTITY_VERSON);
            byte[] data = this.codec.marshallExchange(camelContext, exchange).getData();
            Aggregation aggregation = new Aggregation();
            aggregation.setExchangeBlob(data);
            aggregation.setId(str);
            aggregation.setVersion(num == null ? 0 : num.intValue());
            this.dao.removeAggregation(aggregation, exchange.getExchangeId());
        } catch (Exception e) {
            throw new RuntimeException("Error removing from repository aggregation with key " + str, e);
        }
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public void confirm(CamelContext camelContext, String str) {
        try {
            this.dao.confirmAggregation(str);
        } catch (Exception e) {
            throw new RuntimeException("Error confirming aggregation with key " + str, e);
        }
    }

    @Override // org.apache.camel.spi.AggregationRepository
    public Set<String> getKeys() {
        try {
            List<String> aggregationKeys = this.dao.getAggregationKeys();
            return (aggregationKeys == null || aggregationKeys.isEmpty()) ? Collections.emptySet() : new LinkedHashSet(aggregationKeys);
        } catch (Exception e) {
            throw new RuntimeException("Error retriving aggregation keys", e);
        }
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public Set<String> scan(CamelContext camelContext) {
        try {
            List<String> aggregationCompletedKeys = this.dao.getAggregationCompletedKeys();
            return (aggregationCompletedKeys == null || aggregationCompletedKeys.isEmpty()) ? Collections.emptySet() : new LinkedHashSet(aggregationCompletedKeys);
        } catch (Exception e) {
            throw new RuntimeException("Error retriving aggregation completed keys", e);
        }
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public Exchange recover(CamelContext camelContext, String str) {
        try {
            AggregationCompleted aggregationCompleted = this.dao.getAggregationCompleted(str, true);
            if (aggregationCompleted == null) {
                return null;
            }
            Exchange unmarshallExchange = this.codec.unmarshallExchange(camelContext, new Buffer(aggregationCompleted.getExchangeBlob()));
            unmarshallExchange.setProperty(AGGREGATION_COMPLETE_ENTITY_VERSON, Integer.valueOf(aggregationCompleted.getVersion()));
            return unmarshallExchange;
        } catch (Exception e) {
            throw new RuntimeException("Error recovering exchange from repository with exchangeId " + str, e);
        }
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setRecoveryInterval(long j, TimeUnit timeUnit) {
        this.recoveryInterval = timeUnit.toMillis(j);
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setRecoveryInterval(long j) {
        this.recoveryInterval = j;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public long getRecoveryIntervalInMillis() {
        return this.recoveryInterval;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setUseRecovery(boolean z) {
        this.useRecovery = z;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public boolean isUseRecovery() {
        return this.useRecovery;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setDeadLetterUri(String str) {
        this.deadLetterUri = str;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public String getDeadLetterUri() {
        return this.deadLetterUri;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public void setMaximumRedeliveries(int i) {
        this.maximumRedeliveries = i;
    }

    @Override // org.apache.camel.spi.RecoverableAggregationRepository
    public int getMaximumRedeliveries() {
        return this.maximumRedeliveries;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        if (this.dao == null) {
            throw new IllegalStateException("Aggregation respository DAO cannot be null");
        }
        int size = getKeys().size();
        int size2 = scan(null).size();
        if (size > 0) {
            LOGGER.info("On startup there are " + size + " aggregate exchanges (not completed) in repository");
        } else {
            LOGGER.info("On startup there are no existing aggregate exchanges (not completed) in repository");
        }
        if (size2 > 0) {
            LOGGER.warn("On startup there are " + size2 + " completed exchanges to be recovered in repository");
        } else {
            LOGGER.info("On startup there are no completed exchanges to be recovered in repository");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
    }
}
