package org.joyqueue.server.retry.db;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.joyqueue.datasource.DataSourceConfig;
import org.joyqueue.datasource.DataSourceFactory;
import org.joyqueue.domain.ConsumeRetry;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.server.retry.api.RetryPolicyProvider;
import org.joyqueue.server.retry.db.config.DbRetryConfigKey;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.server.retry.model.RetryStatus;
import org.joyqueue.server.retry.util.RetryUtil;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.joyqueue.toolkit.db.DaoUtil;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.retry.RetryPolicy;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/server/retry/db/DBMessageRetry.class */
public class DBMessageRetry implements MessageRetry<Long> {
    private static final String CREATE_SQL = "insert into message_retry (message_id, business_id, topic, app, send_time, expire_time, retry_time, retry_count, status, data, exception, create_time, update_time) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
    private static final String QUERY_ENTITY_SQL = "select id, create_time, retry_count from message_retry where id = ? and topic = ?";
    private DataSource writeDataSource;
    private DataSource readDataSource;
    private RetryPolicyProvider retryPolicyProvider;
    private static final Logger logger = LoggerFactory.getLogger(DBMessageRetry.class);
    private static final String QUERY_SQL_NOID = "select id, business_id, topic, app, data, exception, send_time from message_retry where status = " + ((int) RetryStatus.RETRY_ING.getValue()) + " and topic = ? and app = ? and retry_time<=? limit ?, ?";
    private static final String QUERY_COUNT_SQL = "select count(1) from message_retry where status = " + ((int) RetryStatus.RETRY_ING.getValue()) + " and topic = ? and app = ?";
    private static final String ERROR_UPDATE_SQL = "update message_retry set retry_time = ?, retry_count = retry_count + 1, update_time = ?, status = ? where topic = ? and app = ? and id = ? and status = " + ((int) RetryStatus.RETRY_ING.getValue());
    private static final String EXPIRE_UPDATE_SQL = "update message_retry set status = " + ((int) RetryStatus.RETRY_EXPIRE.getValue()) + ", update_time = ? where topic = ? and app = ? and id = ? and status = " + ((int) RetryStatus.RETRY_ING.getValue());
    private static final String SUCCESS_UPDATE_SQL = "update message_retry set status = " + ((int) RetryStatus.RETRY_SUCCESS.getValue()) + ", retry_count = retry_count + 1, update_time = ? where topic = ? and app = ? and id = ? and status = " + ((int) RetryStatus.RETRY_ING.getValue());
    private static final String GET_SQL = "select id, business_id, topic, app, data, exception, send_time from message_retry where status = " + ((int) RetryStatus.RETRY_ING.getValue()) + " and topic = ? and app = ? and retry_time <= ? and id = ?";
    private static final String QUERY_ID_RETRY_TIME_SQL = "select id, retry_time from message_retry where status = " + ((int) RetryStatus.RETRY_ING.getValue()) + " and topic = ? and app = ? and retry_time < NOW() limit ?";
    private boolean isStartFlag = false;
    private DataSourceConfig writeDataSourceConfig = null;
    private DataSourceConfig readDataSourceConfig = null;
    private RetryPolicy retryPolicy = null;

    public DataSource getDataSource() {
        return this.writeDataSource;
    }

    public DataSource getReadDataSource() {
        return this.readDataSource;
    }

    public void start() {
        this.writeDataSource = DataSourceFactory.build(this.writeDataSourceConfig);
        this.readDataSource = DataSourceFactory.build(this.readDataSourceConfig);
        this.isStartFlag = true;
        logger.info("db retry manager is started");
    }

    public boolean isStarted() {
        return this.isStartFlag;
    }

    public void stop() {
        if (this.writeDataSource != null) {
            Close.close(this.writeDataSource);
            this.writeDataSource = null;
        }
        if (this.readDataSource != null) {
            Close.close(this.readDataSource);
            this.readDataSource = null;
        }
        this.isStartFlag = false;
        logger.info("db retry manager is stopped");
    }

    public void setRetryPolicyProvider(RetryPolicyProvider retryPolicyProvider) {
        this.retryPolicyProvider = retryPolicyProvider;
    }

    public void addRetry(List<RetryMessageModel> list) throws JoyQueueException {
        insertConsumeRetry(generateConsumeRetry(list));
    }

    public List<ConsumeRetry> generateConsumeRetry(List<RetryMessageModel> list) throws JoyQueueException {
        LinkedList linkedList = new LinkedList();
        for (RetryMessageModel retryMessageModel : list) {
            ConsumeRetry consumeRetry = new ConsumeRetry();
            consumeRetry.setMessageId(RetryUtil.generateMessageId(retryMessageModel.getTopic(), retryMessageModel.getPartition(), retryMessageModel.getIndex(), retryMessageModel.getSendTime()));
            consumeRetry.setBusinessId(retryMessageModel.getBusinessId());
            consumeRetry.setTopic(retryMessageModel.getTopic());
            consumeRetry.setApp(retryMessageModel.getApp());
            consumeRetry.setSendTime(retryMessageModel.getSendTime());
            RetryPolicy policy = this.retryPolicyProvider.getPolicy(TopicName.parse(retryMessageModel.getTopic()), retryMessageModel.getApp());
            consumeRetry.setExpireTime(getExpireTime(policy, SystemClock.now()));
            consumeRetry.setRetryTime(getRetryTime(policy, SystemClock.now(), 1));
            consumeRetry.setRetryCount(0);
            consumeRetry.setData(retryMessageModel.getBrokerMessage());
            consumeRetry.setException(retryMessageModel.getException());
            consumeRetry.setCreateTime(SystemClock.now());
            consumeRetry.setUpdateTime(SystemClock.now());
            consumeRetry.setStatus(RetryStatus.RETRY_ING.getValue());
            linkedList.add(consumeRetry);
        }
        return linkedList;
    }

    public Map<Long, ConsumeRetry> insertConsumeRetry(List<ConsumeRetry> list) throws JoyQueueException {
        final HashMap hashMap = new HashMap();
        String topic = list.get(0).getTopic();
        String app = list.get(0).getApp();
        try {
            DaoUtil.insert(this.writeDataSource, list, CREATE_SQL, new DaoUtil.InsertCallback<ConsumeRetry>() { // from class: org.joyqueue.server.retry.db.DBMessageRetry.1
                public void after(ResultSet resultSet, ConsumeRetry consumeRetry) throws Exception {
                    hashMap.put(Long.valueOf(resultSet.getLong(1)), consumeRetry);
                }

                public void before(PreparedStatement preparedStatement, ConsumeRetry consumeRetry) throws Exception {
                    preparedStatement.setString(1, consumeRetry.getMessageId());
                    preparedStatement.setString(2, consumeRetry.getBusinessId());
                    if (consumeRetry.getBusinessId() != null && consumeRetry.getBusinessId().length() > 100) {
                        DBMessageRetry.logger.error("businessId to long,topic:{},app:{},businessId:{}.", new Object[]{consumeRetry.getTopic(), consumeRetry.getApp(), consumeRetry.getBusinessId()});
                    }
                    preparedStatement.setString(3, consumeRetry.getTopic());
                    preparedStatement.setString(4, consumeRetry.getApp());
                    preparedStatement.setTimestamp(5, new Timestamp(consumeRetry.getSendTime()));
                    preparedStatement.setTimestamp(6, new Timestamp(consumeRetry.getExpireTime()));
                    preparedStatement.setTimestamp(7, new Timestamp(consumeRetry.getRetryTime()));
                    preparedStatement.setInt(8, consumeRetry.getRetryCount());
                    preparedStatement.setShort(9, consumeRetry.getStatus());
                    preparedStatement.setBytes(10, consumeRetry.getData());
                    preparedStatement.setBytes(11, consumeRetry.getException());
                    preparedStatement.setTimestamp(12, new Timestamp(consumeRetry.getCreateTime()));
                    preparedStatement.setTimestamp(13, new Timestamp(consumeRetry.getUpdateTime()));
                }
            });
            return hashMap;
        } catch (Exception e) {
            logger.error("insertConsumeRetry error.", e);
            throw new JoyQueueException(JoyQueueCode.CN_DB_ERROR.getMessage(new Object[0]) + ",topic:" + topic + ",app:" + app, e, JoyQueueCode.CN_DB_ERROR.getCode());
        }
    }

    public long getExpireTime(RetryPolicy retryPolicy, long j) {
        long intValue = retryPolicy.getExpireTime() != null ? retryPolicy.getExpireTime().intValue() : 0L;
        long j2 = intValue > 0 ? j + intValue : j + 2592000000L;
        if (j2 < 1000) {
            j2 = 1000;
        }
        return j2;
    }

    private long getRetryTime(RetryPolicy retryPolicy, long j, int i) {
        return retryPolicy.getTime(j, i, j);
    }

    public void retrySuccess(String str, String str2, Long[] lArr) throws JoyQueueException {
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty() || lArr == null || lArr.length == 0) {
            return;
        }
        ArrayList arrayList = new ArrayList(lArr.length);
        for (Long l : lArr) {
            long longValue = l.longValue();
            if (longValue > 0) {
                arrayList.add(Long.valueOf(longValue));
            }
        }
        try {
            Timestamp timestamp = new Timestamp(SystemClock.now());
            DaoUtil.update(this.writeDataSource, arrayList, SUCCESS_UPDATE_SQL, (preparedStatement, l2) -> {
                preparedStatement.setTimestamp(1, timestamp);
                preparedStatement.setString(2, str);
                preparedStatement.setString(3, str2);
                preparedStatement.setLong(4, l2.longValue());
            });
        } catch (Exception e) {
            logger.error("retrySuccess error.", e);
            throw new JoyQueueException(JoyQueueCode.CN_DB_ERROR, e, new Object[0]);
        }
    }

    public void retryError(String str, String str2, Long[] lArr) throws JoyQueueException {
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty() || lArr == null || lArr.length == 0) {
            return;
        }
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = this.writeDataSource.getConnection();
                connection.setAutoCommit(false);
                preparedStatement = connection.prepareStatement(ERROR_UPDATE_SQL);
                Timestamp timestamp = new Timestamp(SystemClock.now());
                for (Long l : lArr) {
                    long longValue = l.longValue();
                    Long nextRetryTime = getNextRetryTime(longValue, str, str2);
                    if (nextRetryTime != null) {
                        if (nextRetryTime.longValue() <= 0) {
                            preparedStatement.setTimestamp(1, timestamp);
                            preparedStatement.setTimestamp(2, timestamp);
                            preparedStatement.setInt(3, RetryStatus.RETRY_EXPIRE.getValue());
                        } else {
                            preparedStatement.setTimestamp(1, new Timestamp(nextRetryTime.longValue()));
                            preparedStatement.setTimestamp(2, timestamp);
                            preparedStatement.setInt(3, RetryStatus.RETRY_ING.getValue());
                        }
                        preparedStatement.setString(4, str);
                        preparedStatement.setString(5, str2);
                        preparedStatement.setLong(6, longValue);
                        preparedStatement.executeUpdate();
                    }
                }
                connection.commit();
                Close.close(connection, preparedStatement, (ResultSet) null);
            } catch (Throwable th) {
                Close.close(connection, preparedStatement, (ResultSet) null);
                throw th;
            }
        } catch (SQLException e) {
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (SQLException e2) {
                }
            }
            logger.error("retryError error.", e);
            throw new JoyQueueException(JoyQueueCode.CN_DB_ERROR, e, new Object[0]);
        }
    }

    protected Long getNextRetryTime(long j, String str, String str2) throws JoyQueueException {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        Long l = null;
        try {
            try {
                connection = this.writeDataSource.getConnection();
                preparedStatement = connection.prepareStatement(QUERY_ENTITY_SQL);
                if (j <= 0) {
                    Close.close(connection, preparedStatement, (ResultSet) null);
                    return null;
                }
                preparedStatement.setLong(1, j);
                preparedStatement.setString(2, str);
                resultSet = preparedStatement.executeQuery();
                if (resultSet.next()) {
                    resultSet.getLong(1);
                    l = Long.valueOf(this.retryPolicyProvider.getPolicy(TopicName.parse(str), str2).getTime(SystemClock.now(), resultSet.getInt(3), resultSet.getTimestamp(2).getTime()));
                }
                Close.close(connection, preparedStatement, resultSet);
                return l;
            } catch (Exception e) {
                throw new JoyQueueException(JoyQueueCode.CN_DB_ERROR, e, new Object[0]);
            }
        } catch (Throwable th) {
            Close.close(connection, preparedStatement, resultSet);
            throw th;
        }
    }

    public void retryExpire(String str, String str2, Long[] lArr) throws JoyQueueException {
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty() || lArr == null || lArr.length == 0) {
            return;
        }
        ArrayList arrayList = new ArrayList(lArr.length);
        for (Long l : lArr) {
            long longValue = l.longValue();
            if (longValue > 0) {
                arrayList.add(Long.valueOf(longValue));
            }
        }
        try {
            DaoUtil.update(this.writeDataSource, arrayList, EXPIRE_UPDATE_SQL, (preparedStatement, l2) -> {
                preparedStatement.setTimestamp(1, new Timestamp(SystemClock.now()));
                preparedStatement.setString(2, str);
                preparedStatement.setString(3, str2);
                preparedStatement.setLong(4, l2.longValue());
            });
        } catch (Exception e) {
            logger.error("retryExpire error", e);
            throw new JoyQueueException(JoyQueueCode.CN_DB_ERROR, e, new Object[0]);
        }
    }

    public List<RetryMessageModel> getRetry(final String str, final String str2, final short s, final long j) throws JoyQueueException {
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty() || s <= 0) {
            return new ArrayList(0);
        }
        final long now = SystemClock.now();
        try {
            return DaoUtil.queryList(this.readDataSource, QUERY_SQL_NOID, new DaoUtil.QueryCallback<RetryMessageModel>() { // from class: org.joyqueue.server.retry.db.DBMessageRetry.2
                /* renamed from: map, reason: merged with bridge method [inline-methods] */
                public RetryMessageModel m1map(ResultSet resultSet) throws Exception {
                    RetryMessageModel retryMessageModel = new RetryMessageModel();
                    retryMessageModel.setIndex(resultSet.getInt(1));
                    retryMessageModel.setBusinessId(resultSet.getString(2));
                    retryMessageModel.setTopic(resultSet.getString(3));
                    retryMessageModel.setApp(resultSet.getString(4));
                    retryMessageModel.setPartition(Short.MAX_VALUE);
                    retryMessageModel.setBrokerMessage(resultSet.getBytes(5));
                    retryMessageModel.setException(resultSet.getBytes(6));
                    retryMessageModel.setSendTime(resultSet.getLong(7));
                    return retryMessageModel;
                }

                public void before(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setString(1, str);
                    preparedStatement.setString(2, str2);
                    preparedStatement.setTimestamp(3, new Timestamp(now));
                    preparedStatement.setLong(4, j);
                    preparedStatement.setInt(5, s);
                }
            });
        } catch (Exception e) {
            logger.error("getRetry error.", e);
            throw new JoyQueueException(String.format("%s topic:%s,app:%s,count:%d", JoyQueueCode.CN_DB_ERROR.getMessage(new Object[0]), str, str2, Short.valueOf(s)), e, JoyQueueCode.CN_DB_ERROR.getCode());
        }
    }

    /* JADX WARN: Finally extract failed */
    public int countRetry(final String str, final String str2) throws JoyQueueException {
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty()) {
            return 0;
        }
        long now = SystemClock.now();
        try {
            try {
                int intValue = ((Integer) DaoUtil.queryObject(this.readDataSource, QUERY_COUNT_SQL, new DaoUtil.QueryCallback<Integer>() { // from class: org.joyqueue.server.retry.db.DBMessageRetry.3
                    /* renamed from: map, reason: merged with bridge method [inline-methods] */
                    public Integer m2map(ResultSet resultSet) throws Exception {
                        return Integer.valueOf(resultSet.getInt(1));
                    }

                    public void before(PreparedStatement preparedStatement) throws Exception {
                        preparedStatement.setString(1, str);
                        preparedStatement.setString(2, str2);
                    }
                })).intValue();
                if (logger.isDebugEnabled()) {
                    logger.debug("从数据库获取重试记录总数耗时统计,topic=" + str + ",app=" + str2 + ",time=" + (SystemClock.now() - now));
                }
                return intValue;
            } catch (Exception e) {
                logger.error("countRetry error.", e);
                throw new JoyQueueException(JoyQueueCode.CN_DB_ERROR, e, new Object[0]);
            }
        } catch (Throwable th) {
            if (logger.isDebugEnabled()) {
                logger.debug("从数据库获取重试记录总数耗时统计,topic=" + str + ",app=" + str2 + ",time=" + (SystemClock.now() - now));
            }
            throw th;
        }
    }

    public RetryMessageModel getMessageById(final String str, final String str2, final long j) throws JoyQueueException {
        final long now = SystemClock.now();
        try {
            return (RetryMessageModel) DaoUtil.queryObject(this.readDataSource, GET_SQL, new DaoUtil.QueryCallback<RetryMessageModel>() { // from class: org.joyqueue.server.retry.db.DBMessageRetry.4
                public void before(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setString(1, str);
                    preparedStatement.setString(2, str2);
                    preparedStatement.setTimestamp(3, new Timestamp(now));
                    preparedStatement.setLong(4, j);
                }

                /* renamed from: map, reason: merged with bridge method [inline-methods] */
                public RetryMessageModel m3map(ResultSet resultSet) throws Exception {
                    RetryMessageModel retryMessageModel = new RetryMessageModel();
                    retryMessageModel.setIndex(resultSet.getInt(1));
                    retryMessageModel.setBusinessId(resultSet.getString(2));
                    retryMessageModel.setTopic(resultSet.getString(3));
                    retryMessageModel.setApp(resultSet.getString(4));
                    retryMessageModel.setPartition(Short.MAX_VALUE);
                    retryMessageModel.setBrokerMessage(resultSet.getBytes(5));
                    retryMessageModel.setException(resultSet.getBytes(6));
                    retryMessageModel.setSendTime(resultSet.getLong(7));
                    return retryMessageModel;
                }
            });
        } catch (Exception e) {
            logger.error("getMessageById error", e);
            throw new JoyQueueException(String.format("%s topic:%s,app:%s,id:%d", JoyQueueCode.CN_DB_ERROR.getMessage(new Object[0]), str, str2, Long.valueOf(j)), e, JoyQueueCode.CN_DB_ERROR.getCode());
        }
    }

    public List<long[]> queryIdAndRetryTime(final String str, final String str2, final int i) throws JoyQueueException {
        try {
            return DaoUtil.queryList(this.readDataSource, QUERY_ID_RETRY_TIME_SQL, new DaoUtil.QueryCallback<long[]>() { // from class: org.joyqueue.server.retry.db.DBMessageRetry.5
                public void before(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setString(1, str);
                    preparedStatement.setString(2, str2);
                    preparedStatement.setInt(3, i);
                }

                /* renamed from: map, reason: merged with bridge method [inline-methods] */
                public long[] m4map(ResultSet resultSet) throws Exception {
                    return new long[]{resultSet.getLong(1), resultSet.getTimestamp(2).getTime()};
                }
            });
        } catch (Exception e) {
            throw new JoyQueueException(String.format("%s topic:%s,app:%s,count:%d", JoyQueueCode.CN_DB_ERROR.getMessage(new Object[0]), str, str2, Integer.valueOf(i)), e, JoyQueueCode.CN_DB_ERROR.getCode());
        }
    }

    public void setSupplier(PropertySupplier propertySupplier) {
        this.writeDataSourceConfig = new DataSourceConfig();
        this.writeDataSourceConfig.setDriver((String) propertySupplier.getValue(DbRetryConfigKey.DRIVER));
        this.writeDataSourceConfig.setUrl((String) propertySupplier.getValue(DbRetryConfigKey.WRITE_URL));
        this.writeDataSourceConfig.setUser((String) propertySupplier.getValue(DbRetryConfigKey.WRITE_USER_NAME));
        this.writeDataSourceConfig.setPassword((String) propertySupplier.getValue(DbRetryConfigKey.WRITE_PASSWORD));
        this.readDataSourceConfig = new DataSourceConfig();
        this.readDataSourceConfig.setDriver((String) propertySupplier.getValue(DbRetryConfigKey.DRIVER));
        this.readDataSourceConfig.setUrl((String) propertySupplier.getValue(DbRetryConfigKey.READ_URL));
        this.readDataSourceConfig.setUser((String) propertySupplier.getValue(DbRetryConfigKey.READ_USER_NAME));
        this.readDataSourceConfig.setPassword((String) propertySupplier.getValue(DbRetryConfigKey.READ_PASSWORD));
        this.retryPolicy = new RetryPolicy((Integer) propertySupplier.getValue(DbRetryConfigKey.RETRY_DELAY), (Integer) propertySupplier.getValue(DbRetryConfigKey.MAX_RETRY_TIMES));
    }
}
