package io.cloudslang.engine.queue.repositories;

import com.google.common.collect.Iterables;
import io.cloudslang.engine.data.IdentityGenerator;
import io.cloudslang.engine.queue.entities.ExecStatus;
import io.cloudslang.engine.queue.entities.ExecutionMessage;
import io.cloudslang.engine.queue.entities.Payload;
import io.cloudslang.engine.queue.entities.StartNewBranchPayload;
import io.cloudslang.engine.queue.services.StatementAwareJdbcTemplateWrapper;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.SingleColumnRowMapper;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.jdbc.support.MetaDataAccessException;

/* loaded from: input_file:io/cloudslang/engine/queue/repositories/ExecutionQueueRepositoryImpl.class */
public class ExecutionQueueRepositoryImpl implements ExecutionQueueRepository {
    private static final int PARTITION_SIZE = 250;
    private static final String MYSQL = "mysql";
    private static final String MSSQL = "Microsoft";
    private static final String QUERY_PAYLOAD_BY_EXECUTION_IDS = "SELECT ID, PAYLOAD FROM OO_EXECUTION_STATES WHERE ID IN (:IDS)";
    private static final String FIND_OLD_STATES = "SELECT q.EXEC_STATE_ID, CREATE_TIME, MSG_SEQ_ID, ASSIGNED_WORKER, EXEC_GROUP, STATUS FROM OO_EXECUTION_QUEUES q,   (SELECT EXEC_STATE_ID FROM OO_EXECUTION_QUEUES qt WHERE (CREATE_TIME < ?) AND      (STATUS = " + ExecStatus.ASSIGNED.getNumber() + ") AND           (NOT EXISTS (SELECT qq.MSG_SEQ_ID               FROM OO_EXECUTION_QUEUES qq               WHERE (qq.EXEC_STATE_ID = qt.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > qt.MSG_SEQ_ID))   ) t WHERE (STATUS IN (" + ExecStatus.ASSIGNED.getNumber() + ", " + ExecStatus.PENDING.getNumber() + ")) AND q.EXEC_STATE_ID = t.EXEC_STATE_ID";
    private static final String FIND_EXEC_IDS = "SELECT DISTINCT MSG_ID FROM OO_EXECUTION_STATES WHERE ID IN (:IDS)";
    private StatementAwareJdbcTemplateWrapper pollJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper pollForRecoveryJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper pollMessagesWithoutAckJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper getFinishedExecStateIdsJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper countMessagesWithoutAckForWorkerJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper findByStatusesJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper findLargeJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper findExecIDsJdbcTemplate;
    private StatementAwareJdbcTemplateWrapper getFirstPendingBranchJdbcTemplate;
    private JdbcTemplate insertExecutionJdbcTemplate;
    private JdbcTemplate deleteFinishedStepsJdbcTemplate;
    private JdbcTemplate findPayloadByExecutionIdsJdbcTemplate;
    private JdbcTemplate getBusyWorkersJdbcTemplate;
    private JdbcTemplate updateExecutionStateStatusJdbcTemplate;
    private JdbcTemplate deletePendingExecutionStateJdbcTemplate;

    @Autowired
    private IdentityGenerator idGen;

    @Autowired
    private DataSource dataSource;
    private String workerQuery;
    private Logger logger = Logger.getLogger(getClass());
    private final String SELECT_FINISHED_STEPS_IDS = " SELECT DISTINCT EXEC_STATE_ID FROM OO_EXECUTION_QUEUES  WHERE         (STATUS = " + ExecStatus.TERMINATED.getNumber() + ") OR         (STATUS = " + ExecStatus.FAILED.getNumber() + ") OR         (STATUS = " + ExecStatus.FINISHED.getNumber() + ") ";
    private final String QUERY_DELETE_FINISHED_STEPS_FROM_QUEUES = "DELETE FROM OO_EXECUTION_QUEUES  WHERE EXEC_STATE_ID in (:ids)";
    private final String QUERY_DELETE_FINISHED_STEPS_FROM_STATES = "DELETE FROM OO_EXECUTION_STATES  WHERE ID in (:ids)";
    private final String QUERY_MESSAGES_WITHOUT_ACK_SQL = "SELECT EXEC_STATE_ID,             ASSIGNED_WORKER,             EXEC_GROUP ,              STATUS,              MSG_SEQ_ID,         CREATE_TIME   FROM  OO_EXECUTION_QUEUES q    WHERE       (q.STATUS  = ? ) AND      (NOT EXISTS (SELECT qq.MSG_SEQ_ID                   FROM OO_EXECUTION_QUEUES qq                   WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND                         qq.MSG_SEQ_ID > q.MSG_SEQ_ID                 )      ) AND       (q.MSG_VERSION < ?) ";
    private final String QUERY_COUNT_MESSAGES_WITHOUT_ACK_FOR_WORKER_SQL = "SELECT COUNT(*)    FROM  OO_EXECUTION_QUEUES  q    WHERE       (q.ASSIGNED_WORKER  = ? ) AND       (q.STATUS  = ? ) AND      (NOT EXISTS (SELECT qq.MSG_SEQ_ID                   FROM OO_EXECUTION_QUEUES qq                   WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND                         qq.MSG_SEQ_ID > q.MSG_SEQ_ID                  )      ) AND       (q.MSG_VERSION < ?)  ";
    private final String QUERY_WORKER_LEGACY_MEMORY_HANDLING_SQL = "SELECT EXEC_STATE_ID,             ASSIGNED_WORKER,             EXEC_GROUP ,              STATUS,              PAYLOAD,              MSG_SEQ_ID ,             MSG_ID,       q.CREATE_TIME  FROM  OO_EXECUTION_QUEUES q,        OO_EXECUTION_STATES s    WHERE        (q.ASSIGNED_WORKER =  ?)  AND       (q.STATUS IN (:status)) AND  \t   (s.ACTIVE = 1) AND  (q.EXEC_STATE_ID = s.ID) AND  (NOT EXISTS (SELECT qq.MSG_SEQ_ID               FROM OO_EXECUTION_QUEUES qq               WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID))  ORDER BY q.CREATE_TIME  ";
    private final String QUERY_WORKER_SQL = "SELECT EXEC_STATE_ID,     ASSIGNED_WORKER,     EXEC_GROUP,     STATUS,     PAYLOAD,     MSG_SEQ_ID,     MSG_ID,     CREATE_TIME FROM (   SELECT EXEC_STATE_ID,        ASSIGNED_WORKER,        EXEC_GROUP,        STATUS,        PAYLOAD,        MSG_SEQ_ID,        MSG_ID,        q.CREATE_TIME,        SUM(PAYLOAD_SIZE) OVER (ORDER BY q.CREATE_TIME ASC) AS total    FROM OO_EXECUTION_QUEUES q,        OO_EXECUTION_STATES s    WHERE (q.ASSIGNED_WORKER = ?)  AND        (q.STATUS IN (:status)) AND        (s.PAYLOAD_SIZE < ?) AND  \t    (s.ACTIVE = 1) AND        (q.EXEC_STATE_ID = s.ID) AND        (NOT EXISTS (SELECT qq.MSG_SEQ_ID                  FROM  OO_EXECUTION_QUEUES qq                  WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID))    ORDER BY q.CREATE_TIME) e WHERE total < ? ";
    private final String QUERY_WORKER_SQL_MSSQL = "SELECT EXEC_STATE_ID,     ASSIGNED_WORKER,     EXEC_GROUP,     STATUS,     PAYLOAD,     MSG_SEQ_ID,     MSG_ID,     CREATE_TIME FROM (   SELECT EXEC_STATE_ID,        ASSIGNED_WORKER,        EXEC_GROUP,        STATUS,        PAYLOAD,        MSG_SEQ_ID,        MSG_ID,        q.CREATE_TIME,        SUM(PAYLOAD_SIZE) OVER (ORDER BY q.CREATE_TIME ASC) AS total    FROM OO_EXECUTION_QUEUES q,        OO_EXECUTION_STATES s    WHERE (q.ASSIGNED_WORKER = ?)  AND        (q.STATUS IN (:status)) AND        (s.PAYLOAD_SIZE < ?) AND  \t    (s.ACTIVE = 1) AND        (q.EXEC_STATE_ID = s.ID) AND        (NOT EXISTS (SELECT qq.MSG_SEQ_ID                  FROM  OO_EXECUTION_QUEUES qq                  WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID)) ) e WHERE total < ? ";
    private final String QUERY_WORKER_RECOVERY_SQL = "SELECT         EXEC_STATE_ID,             ASSIGNED_WORKER,             EXEC_GROUP,              STATUS,              PAYLOAD,              MSG_SEQ_ID,             MSG_ID,       q.CREATE_TIME  FROM  OO_EXECUTION_QUEUES q,         OO_EXECUTION_STATES s1    WHERE        (q.ASSIGNED_WORKER =  ?)  AND       (q.STATUS IN (:status)) AND  q.EXEC_STATE_ID = s1.ID AND (NOT EXISTS (SELECT qq.MSG_SEQ_ID               FROM OO_EXECUTION_QUEUES qq               WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID)) ";
    private final String QUERY_MESSAGES_BY_STATUSES = "SELECT EXEC_STATE_ID,   ASSIGNED_WORKER,   EXEC_GROUP ,   STATUS,   MSG_SEQ_ID,   CREATE_TIME FROM  OO_EXECUTION_QUEUES q  WHERE STATUS IN (:status) AND   NOT EXISTS (     SELECT qq.MSG_SEQ_ID      FROM OO_EXECUTION_QUEUES qq      WHERE         qq.EXEC_STATE_ID = q.EXEC_STATE_ID         AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID  )";
    private final String BUSY_WORKERS_SQL = "SELECT ASSIGNED_WORKER       FROM  OO_EXECUTION_QUEUES q   WHERE        (q.STATUS IN (:status)) AND  (NOT EXISTS (SELECT qq.MSG_SEQ_ID               FROM OO_EXECUTION_QUEUES qq               WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID))  GROUP BY ASSIGNED_WORKER";
    private final String INSERT_EXEC_STATE = "INSERT INTO OO_EXECUTION_STATES  (ID, MSG_ID,  PAYLOAD, PAYLOAD_SIZE, CREATE_TIME, ACTIVE) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?)";
    private final String INSERT_QUEUE = "INSERT INTO OO_EXECUTION_QUEUES (ID, EXEC_STATE_ID, ASSIGNED_WORKER, EXEC_GROUP, STATUS,MSG_SEQ_ID, CREATE_TIME,MSG_VERSION) VALUES (?, ?, ?, ?, ?, ?,?,?)";
    private final String INSERT_EXECUTION_STATE_MAPPING = "INSERT INTO OO_EXECS_STATES_EXECS_MAPPINGS (ID, EXEC_STATE_ID, EXEC_ID) VALUES (?, ?, ?)";
    private boolean useLargeMessageQuery = true;

    /* loaded from: input_file:io/cloudslang/engine/queue/repositories/ExecutionQueueRepositoryImpl$BusyWorkerRowMapper.class */
    private class BusyWorkerRowMapper implements RowMapper<String> {
        private BusyWorkerRowMapper() {
        }

        /* renamed from: mapRow, reason: merged with bridge method [inline-methods] */
        public String m1mapRow(ResultSet resultSet, int i) throws SQLException {
            return resultSet.getString("ASSIGNED_WORKER");
        }
    }

    /* loaded from: input_file:io/cloudslang/engine/queue/repositories/ExecutionQueueRepositoryImpl$ExecutionMessageRowMapper.class */
    private class ExecutionMessageRowMapper implements RowMapper<ExecutionMessage> {
        private ExecutionMessageRowMapper() {
        }

        /* renamed from: mapRow, reason: merged with bridge method [inline-methods] */
        public ExecutionMessage m2mapRow(ResultSet resultSet, int i) throws SQLException {
            return new ExecutionMessage(resultSet.getLong("EXEC_STATE_ID"), resultSet.getString("ASSIGNED_WORKER"), resultSet.getString("EXEC_GROUP"), resultSet.getString("MSG_ID"), ExecStatus.find(resultSet.getInt("STATUS")), new Payload(resultSet.getBytes("PAYLOAD")), resultSet.getInt("MSG_SEQ_ID"), Long.valueOf(resultSet.getLong("CREATE_TIME")));
        }
    }

    /* loaded from: input_file:io/cloudslang/engine/queue/repositories/ExecutionQueueRepositoryImpl$ExecutionMessageWithoutPayloadRowMapper.class */
    private class ExecutionMessageWithoutPayloadRowMapper implements RowMapper<ExecutionMessage> {
        private ExecutionMessageWithoutPayloadRowMapper() {
        }

        /* renamed from: mapRow, reason: merged with bridge method [inline-methods] */
        public ExecutionMessage m3mapRow(ResultSet resultSet, int i) throws SQLException {
            return new ExecutionMessage(resultSet.getLong("EXEC_STATE_ID"), resultSet.getString("ASSIGNED_WORKER"), resultSet.getString("EXEC_GROUP"), "-1", ExecStatus.find(resultSet.getInt("STATUS")), (Payload) null, resultSet.getInt("MSG_SEQ_ID"), Long.valueOf(resultSet.getLong("CREATE_TIME")));
        }
    }

    @PostConstruct
    public void init() {
        this.pollJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "pollJdbcTemplate");
        this.pollForRecoveryJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "pollForRecoveryJdbcTemplate");
        this.pollMessagesWithoutAckJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "pollMessagesWithoutAckJdbcTemplate");
        this.getFinishedExecStateIdsJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "getFinishedExecStateIdsJdbcTemplate");
        this.countMessagesWithoutAckForWorkerJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "countMessagesWithoutAckForWorkerJdbcTemplate");
        this.findByStatusesJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "findByStatusesJdbcTemplate");
        this.findLargeJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "findLargeJdbcTemplate");
        this.findExecIDsJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "findExecIDsJdbcTemplate");
        this.getFirstPendingBranchJdbcTemplate = new StatementAwareJdbcTemplateWrapper(this.dataSource, "getFirstPendingBranchJdbcTemplate");
        this.insertExecutionJdbcTemplate = new JdbcTemplate(this.dataSource);
        this.deleteFinishedStepsJdbcTemplate = new JdbcTemplate(this.dataSource);
        this.findPayloadByExecutionIdsJdbcTemplate = new JdbcTemplate(this.dataSource);
        this.getBusyWorkersJdbcTemplate = new JdbcTemplate(this.dataSource);
        this.updateExecutionStateStatusJdbcTemplate = new JdbcTemplate(this.dataSource);
        this.deletePendingExecutionStateJdbcTemplate = new JdbcTemplate(this.dataSource);
        this.useLargeMessageQuery = Boolean.parseBoolean(System.getProperty("score.poll.use.large.message.query", "true"));
        this.workerQuery = isMssql() ? "SELECT EXEC_STATE_ID,     ASSIGNED_WORKER,     EXEC_GROUP,     STATUS,     PAYLOAD,     MSG_SEQ_ID,     MSG_ID,     CREATE_TIME FROM (   SELECT EXEC_STATE_ID,        ASSIGNED_WORKER,        EXEC_GROUP,        STATUS,        PAYLOAD,        MSG_SEQ_ID,        MSG_ID,        q.CREATE_TIME,        SUM(PAYLOAD_SIZE) OVER (ORDER BY q.CREATE_TIME ASC) AS total    FROM OO_EXECUTION_QUEUES q,        OO_EXECUTION_STATES s    WHERE (q.ASSIGNED_WORKER = ?)  AND        (q.STATUS IN (:status)) AND        (s.PAYLOAD_SIZE < ?) AND  \t    (s.ACTIVE = 1) AND        (q.EXEC_STATE_ID = s.ID) AND        (NOT EXISTS (SELECT qq.MSG_SEQ_ID                  FROM  OO_EXECUTION_QUEUES qq                  WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID)) ) e WHERE total < ? " : "SELECT EXEC_STATE_ID,     ASSIGNED_WORKER,     EXEC_GROUP,     STATUS,     PAYLOAD,     MSG_SEQ_ID,     MSG_ID,     CREATE_TIME FROM (   SELECT EXEC_STATE_ID,        ASSIGNED_WORKER,        EXEC_GROUP,        STATUS,        PAYLOAD,        MSG_SEQ_ID,        MSG_ID,        q.CREATE_TIME,        SUM(PAYLOAD_SIZE) OVER (ORDER BY q.CREATE_TIME ASC) AS total    FROM OO_EXECUTION_QUEUES q,        OO_EXECUTION_STATES s    WHERE (q.ASSIGNED_WORKER = ?)  AND        (q.STATUS IN (:status)) AND        (s.PAYLOAD_SIZE < ?) AND  \t    (s.ACTIVE = 1) AND        (q.EXEC_STATE_ID = s.ID) AND        (NOT EXISTS (SELECT qq.MSG_SEQ_ID                  FROM  OO_EXECUTION_QUEUES qq                  WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID))    ORDER BY q.CREATE_TIME) e WHERE total < ? ";
        if (this.useLargeMessageQuery) {
            try {
                poll("worker1", 1, 1L, ExecStatus.ASSIGNED);
            } catch (RuntimeException e) {
                this.useLargeMessageQuery = false;
                this.workerQuery = "SELECT EXEC_STATE_ID,             ASSIGNED_WORKER,             EXEC_GROUP ,              STATUS,              PAYLOAD,              MSG_SEQ_ID ,             MSG_ID,       q.CREATE_TIME  FROM  OO_EXECUTION_QUEUES q,        OO_EXECUTION_STATES s    WHERE        (q.ASSIGNED_WORKER =  ?)  AND       (q.STATUS IN (:status)) AND  \t   (s.ACTIVE = 1) AND  (q.EXEC_STATE_ID = s.ID) AND  (NOT EXISTS (SELECT qq.MSG_SEQ_ID               FROM OO_EXECUTION_QUEUES qq               WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID))  ORDER BY q.CREATE_TIME  ";
                this.logger.info("Large message poll query failed" + e.getMessage());
            }
        }
        this.logger.info("Poll using large message query: " + this.useLargeMessageQuery);
    }

    public long generateExecStateId() {
        return this.idGen.next().longValue();
    }

    public void insertExecutionStates(final List<ExecutionMessage> list) {
        this.insertExecutionJdbcTemplate.batchUpdate("INSERT INTO OO_EXECUTION_STATES  (ID, MSG_ID,  PAYLOAD, PAYLOAD_SIZE, CREATE_TIME, ACTIVE) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?)", new BatchPreparedStatementSetter() { // from class: io.cloudslang.engine.queue.repositories.ExecutionQueueRepositoryImpl.1
            public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
                ExecutionMessage executionMessage = (ExecutionMessage) list.get(i);
                preparedStatement.setLong(1, executionMessage.getExecStateId());
                preparedStatement.setString(2, executionMessage.getMsgId());
                preparedStatement.setBytes(3, executionMessage.getPayload().getData());
                preparedStatement.setLong(4, executionMessage.getPayloadSize());
                preparedStatement.setInt(5, executionMessage.isActive() ? 1 : 0);
            }

            public int getBatchSize() {
                return list.size();
            }
        });
    }

    public void insertExecutionQueue(final List<ExecutionMessage> list, final long j) {
        long currentTimeMillis = System.currentTimeMillis();
        this.insertExecutionJdbcTemplate.batchUpdate("INSERT INTO OO_EXECUTION_QUEUES (ID, EXEC_STATE_ID, ASSIGNED_WORKER, EXEC_GROUP, STATUS,MSG_SEQ_ID, CREATE_TIME,MSG_VERSION) VALUES (?, ?, ?, ?, ?, ?,?,?)", new BatchPreparedStatementSetter() { // from class: io.cloudslang.engine.queue.repositories.ExecutionQueueRepositoryImpl.2
            public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
                ExecutionMessage executionMessage = (ExecutionMessage) list.get(i);
                preparedStatement.setLong(1, ExecutionQueueRepositoryImpl.this.idGen.next().longValue());
                preparedStatement.setLong(2, executionMessage.getExecStateId());
                preparedStatement.setString(3, executionMessage.getWorkerId());
                preparedStatement.setString(4, executionMessage.getWorkerGroup());
                preparedStatement.setInt(5, executionMessage.getStatus().getNumber());
                preparedStatement.setInt(6, executionMessage.getMsgSeqId());
                preparedStatement.setLong(7, Calendar.getInstance().getTimeInMillis());
                preparedStatement.setLong(8, j);
            }

            public int getBatchSize() {
                return list.size();
            }
        });
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Insert to queue: " + list.size() + "/" + currentTimeMillis2 + " messages/ms");
        }
    }

    public void insertNotActiveExecutionsQueues(final List<ExecutionMessage> list) {
        this.insertExecutionJdbcTemplate.batchUpdate("INSERT INTO OO_EXECS_STATES_EXECS_MAPPINGS (ID, EXEC_STATE_ID, EXEC_ID) VALUES (?, ?, ?)", new BatchPreparedStatementSetter() { // from class: io.cloudslang.engine.queue.repositories.ExecutionQueueRepositoryImpl.3
            public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
                ExecutionMessage executionMessage = (ExecutionMessage) list.get(i);
                preparedStatement.setLong(1, ExecutionQueueRepositoryImpl.this.idGen.next().longValue());
                preparedStatement.setLong(2, executionMessage.getExecStateId());
                preparedStatement.setLong(3, Long.parseLong(executionMessage.getMsgId()));
            }

            public int getBatchSize() {
                return list.size();
            }
        });
    }

    public StartNewBranchPayload getFirstPendingBranch(long j) {
        this.getFirstPendingBranchJdbcTemplate.setStatementBatchSize(1);
        StartNewBranchPayload startNewBranchPayload = null;
        try {
            startNewBranchPayload = (StartNewBranchPayload) this.getFirstPendingBranchJdbcTemplate.queryForObject("SELECT ID, EXEC_STATE_ID FROM OO_EXECS_STATES_EXECS_MAPPINGS WHERE EXEC_ID = ?", new Object[]{Long.valueOf(j)}, (resultSet, i) -> {
                return new StartNewBranchPayload(resultSet.getLong("EXEC_STATE_ID"), resultSet.getLong("ID"));
            });
        } catch (EmptyResultDataAccessException e) {
        }
        return startNewBranchPayload;
    }

    public void activatePendingExecutionStateForAnExecution(long j) {
        this.updateExecutionStateStatusJdbcTemplate.update("UPDATE OO_EXECUTION_STATES SET ACTIVE = 1 WHERE ID = ?", new Object[]{Long.valueOf(j)});
    }

    public void deletePendingExecutionState(long j) {
        this.deletePendingExecutionStateJdbcTemplate.update("DELETE FROM OO_EXECS_STATES_EXECS_MAPPINGS WHERE ID = ?", new Object[]{Long.valueOf(j)});
    }

    public List<ExecutionMessage> pollRecovery(String str, int i, ExecStatus... execStatusArr) {
        this.pollForRecoveryJdbcTemplate.setStatementBatchSize(i);
        try {
            String replaceAll = "SELECT         EXEC_STATE_ID,             ASSIGNED_WORKER,             EXEC_GROUP,              STATUS,              PAYLOAD,              MSG_SEQ_ID,             MSG_ID,       q.CREATE_TIME  FROM  OO_EXECUTION_QUEUES q,         OO_EXECUTION_STATES s1    WHERE        (q.ASSIGNED_WORKER =  ?)  AND       (q.STATUS IN (:status)) AND  q.EXEC_STATE_ID = s1.ID AND (NOT EXISTS (SELECT qq.MSG_SEQ_ID               FROM OO_EXECUTION_QUEUES qq               WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID)) ".replaceAll(":status", StringUtils.repeat("?", ",", execStatusArr.length));
            Object[] objArr = new Object[execStatusArr.length + 1];
            objArr[0] = str;
            int i2 = 1;
            for (ExecStatus execStatus : execStatusArr) {
                int i3 = i2;
                i2++;
                objArr[i3] = Integer.valueOf(execStatus.getNumber());
            }
            List<ExecutionMessage> doSelectWithTemplate = doSelectWithTemplate(this.pollForRecoveryJdbcTemplate, replaceAll, new ExecutionMessageRowMapper(), objArr);
            this.pollForRecoveryJdbcTemplate.clearStatementBatchSize();
            return doSelectWithTemplate;
        } catch (Throwable th) {
            this.pollForRecoveryJdbcTemplate.clearStatementBatchSize();
            throw th;
        }
    }

    public List<ExecutionMessage> poll(String str, int i, long j, ExecStatus... execStatusArr) {
        return executePoll(i, this.workerQuery.replaceAll(":status", StringUtils.repeat("?", ",", execStatusArr.length)), this.useLargeMessageQuery ? preparePollArgs(str, j, execStatusArr) : prepareStdPollArgs(str, execStatusArr));
    }

    private boolean isMssql() {
        try {
            String str = (String) JdbcUtils.extractDatabaseMetaData(this.dataSource, "getDatabaseProductName");
            this.logger.info("Database product name: " + str);
            return StringUtils.containsIgnoreCase(str, MSSQL);
        } catch (MetaDataAccessException e) {
            this.logger.warn("Database type could not be determined!", e);
            return false;
        }
    }

    private List<ExecutionMessage> executePoll(int i, String str, Object[] objArr) {
        this.pollJdbcTemplate.setStatementBatchSize(i);
        try {
            List<ExecutionMessage> doSelectWithTemplate = doSelectWithTemplate(this.pollJdbcTemplate, str, (resultSet, i2) -> {
                return new ExecutionMessage(resultSet.getLong("EXEC_STATE_ID"), resultSet.getString("ASSIGNED_WORKER"), resultSet.getString("EXEC_GROUP"), resultSet.getString("MSG_ID"), ExecStatus.find(resultSet.getInt("STATUS")), new Payload(resultSet.getBytes("PAYLOAD")), resultSet.getInt("MSG_SEQ_ID"), Long.valueOf(resultSet.getLong("CREATE_TIME")));
            }, objArr);
            this.pollJdbcTemplate.clearStatementBatchSize();
            return doSelectWithTemplate;
        } catch (Throwable th) {
            this.pollJdbcTemplate.clearStatementBatchSize();
            throw th;
        }
    }

    private Object[] prepareStdPollArgs(String str, ExecStatus[] execStatusArr) {
        Object[] objArr = new Object[execStatusArr.length + 1];
        int i = 0 + 1;
        objArr[0] = str;
        for (ExecStatus execStatus : execStatusArr) {
            int i2 = i;
            i++;
            objArr[i2] = Integer.valueOf(execStatus.getNumber());
        }
        return objArr;
    }

    private Object[] preparePollArgs(String str, long j, ExecStatus[] execStatusArr) {
        Object[] objArr = new Object[execStatusArr.length + 3];
        int i = 0 + 1;
        objArr[0] = str;
        for (ExecStatus execStatus : execStatusArr) {
            int i2 = i;
            i++;
            objArr[i2] = Integer.valueOf(execStatus.getNumber());
        }
        int i3 = i;
        int i4 = i + 1;
        objArr[i3] = Long.valueOf(j);
        int i5 = i4 + 1;
        objArr[i4] = Long.valueOf(j);
        return objArr;
    }

    public void deleteFinishedSteps(Set<Long> set) {
        if (set == null || set.size() == 0) {
            return;
        }
        String replaceAll = "DELETE FROM OO_EXECUTION_STATES  WHERE ID in (:ids)".replaceAll(":ids", StringUtils.repeat("?", ",", set.size()));
        Object[] array = set.toArray(new Object[set.size()]);
        logSQL(replaceAll, array);
        int update = this.deleteFinishedStepsJdbcTemplate.update(replaceAll, array);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Deleted " + update + " rows of finished steps from OO_EXECUTION_STATES table.");
        }
        String replaceAll2 = "DELETE FROM OO_EXECUTION_QUEUES  WHERE EXEC_STATE_ID in (:ids)".replaceAll(":ids", StringUtils.repeat("?", ",", set.size()));
        logSQL(replaceAll2, array);
        int update2 = this.deleteFinishedStepsJdbcTemplate.update(replaceAll2, array);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Deleted " + update2 + " rows of finished steps from OO_EXECUTION_QUEUES table.");
        }
    }

    public Set<Long> getFinishedExecStateIds() {
        this.getFinishedExecStateIdsJdbcTemplate.setStatementBatchSize(1000000);
        try {
            return new HashSet(doSelectWithTemplate(this.getFinishedExecStateIdsJdbcTemplate, this.SELECT_FINISHED_STEPS_IDS, new SingleColumnRowMapper(Long.class), new Object[0]));
        } finally {
            this.getFinishedExecStateIdsJdbcTemplate.clearStatementBatchSize();
        }
    }

    public List<ExecutionMessage> pollMessagesWithoutAck(int i, long j) {
        this.pollMessagesWithoutAckJdbcTemplate.setStatementBatchSize(i);
        try {
            Object[] objArr = {Integer.valueOf(ExecStatus.SENT.getNumber()), Long.valueOf(j)};
            long currentTimeMillis = System.currentTimeMillis();
            List<ExecutionMessage> query = this.pollMessagesWithoutAckJdbcTemplate.query("SELECT EXEC_STATE_ID,             ASSIGNED_WORKER,             EXEC_GROUP ,              STATUS,              MSG_SEQ_ID,         CREATE_TIME   FROM  OO_EXECUTION_QUEUES q    WHERE       (q.STATUS  = ? ) AND      (NOT EXISTS (SELECT qq.MSG_SEQ_ID                   FROM OO_EXECUTION_QUEUES qq                   WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND                         qq.MSG_SEQ_ID > q.MSG_SEQ_ID                 )      ) AND       (q.MSG_VERSION < ?) ", objArr, new ExecutionMessageWithoutPayloadRowMapper());
            if (!query.isEmpty()) {
                this.logger.warn("Pool " + query.size() + " messages without ack, version = " + j);
                if (this.logger.isDebugEnabled()) {
                    for (ExecutionMessage executionMessage : query) {
                        this.logger.debug("Recovery msg [" + executionMessage.getExecStateId() + "," + executionMessage.getStatus() + "," + executionMessage.getCreateDate() + "]");
                    }
                }
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Query [SELECT EXEC_STATE_ID,             ASSIGNED_WORKER,             EXEC_GROUP ,              STATUS,              MSG_SEQ_ID,         CREATE_TIME   FROM  OO_EXECUTION_QUEUES q    WHERE       (q.STATUS  = ? ) AND      (NOT EXISTS (SELECT qq.MSG_SEQ_ID                   FROM OO_EXECUTION_QUEUES qq                   WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND                         qq.MSG_SEQ_ID > q.MSG_SEQ_ID                 )      ) AND       (q.MSG_VERSION < ?) ] took " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Got msg without ack :" + query.size() + ",for version:" + j);
            }
            return query;
        } finally {
            this.pollMessagesWithoutAckJdbcTemplate.clearStatementBatchSize();
        }
    }

    public Integer countMessagesWithoutAckForWorker(int i, long j, String str) {
        this.countMessagesWithoutAckForWorkerJdbcTemplate.setStatementBatchSize(i);
        try {
            Object[] objArr = {str, Integer.valueOf(ExecStatus.SENT.getNumber()), Long.valueOf(j)};
            long currentTimeMillis = System.currentTimeMillis();
            Integer num = (Integer) this.countMessagesWithoutAckForWorkerJdbcTemplate.queryForObject("SELECT COUNT(*)    FROM  OO_EXECUTION_QUEUES  q    WHERE       (q.ASSIGNED_WORKER  = ? ) AND       (q.STATUS  = ? ) AND      (NOT EXISTS (SELECT qq.MSG_SEQ_ID                   FROM OO_EXECUTION_QUEUES qq                   WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND                         qq.MSG_SEQ_ID > q.MSG_SEQ_ID                  )      ) AND       (q.MSG_VERSION < ?)  ", objArr, Integer.class);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Query [SELECT COUNT(*)    FROM  OO_EXECUTION_QUEUES  q    WHERE       (q.ASSIGNED_WORKER  = ? ) AND       (q.STATUS  = ? ) AND      (NOT EXISTS (SELECT qq.MSG_SEQ_ID                   FROM OO_EXECUTION_QUEUES qq                   WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND                         qq.MSG_SEQ_ID > q.MSG_SEQ_ID                  )      ) AND       (q.MSG_VERSION < ?)  ] took " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Got msg without ack :" + num + ",for version:" + j + ",for worker:" + str);
            }
            return num;
        } finally {
            this.countMessagesWithoutAckForWorkerJdbcTemplate.clearStatementBatchSize();
        }
    }

    public Map<Long, Payload> findPayloadByExecutionIds(Long... lArr) {
        String replace = QUERY_PAYLOAD_BY_EXECUTION_IDS.replace(":IDS", StringUtils.repeat("?", ",", lArr.length));
        final HashMap hashMap = new HashMap();
        this.findPayloadByExecutionIdsJdbcTemplate.query(replace, lArr, new RowCallbackHandler() { // from class: io.cloudslang.engine.queue.repositories.ExecutionQueueRepositoryImpl.4
            public void processRow(ResultSet resultSet) throws SQLException {
                hashMap.put(Long.valueOf(resultSet.getLong(1)), new Payload(resultSet.getBytes("payload")));
            }
        });
        return hashMap;
    }

    public List<ExecutionMessage> findByStatuses(int i, ExecStatus... execStatusArr) {
        this.findByStatusesJdbcTemplate.setStatementBatchSize(i);
        String replaceAll = "SELECT EXEC_STATE_ID,   ASSIGNED_WORKER,   EXEC_GROUP ,   STATUS,   MSG_SEQ_ID,   CREATE_TIME FROM  OO_EXECUTION_QUEUES q  WHERE STATUS IN (:status) AND   NOT EXISTS (     SELECT qq.MSG_SEQ_ID      FROM OO_EXECUTION_QUEUES qq      WHERE         qq.EXEC_STATE_ID = q.EXEC_STATE_ID         AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID  )".replaceAll(":status", StringUtils.repeat("?", ",", execStatusArr.length));
        Object[] objArr = new Object[execStatusArr.length];
        int i2 = 0;
        for (ExecStatus execStatus : execStatusArr) {
            try {
                int i3 = i2;
                i2++;
                objArr[i3] = Integer.valueOf(execStatus.getNumber());
            } catch (Throwable th) {
                this.findByStatusesJdbcTemplate.clearStatementBatchSize();
                throw th;
            }
        }
        try {
            List<ExecutionMessage> doSelectWithTemplate = doSelectWithTemplate(this.findByStatusesJdbcTemplate, replaceAll, new ExecutionMessageWithoutPayloadRowMapper(), objArr);
            this.findByStatusesJdbcTemplate.clearStatementBatchSize();
            return doSelectWithTemplate;
        } catch (RuntimeException e) {
            this.logger.error(replaceAll, e);
            throw e;
        }
    }

    public List<String> getBusyWorkers(ExecStatus... execStatusArr) {
        String replaceAll = "SELECT ASSIGNED_WORKER       FROM  OO_EXECUTION_QUEUES q   WHERE        (q.STATUS IN (:status)) AND  (NOT EXISTS (SELECT qq.MSG_SEQ_ID               FROM OO_EXECUTION_QUEUES qq               WHERE (qq.EXEC_STATE_ID = q.EXEC_STATE_ID) AND qq.MSG_SEQ_ID > q.MSG_SEQ_ID))  GROUP BY ASSIGNED_WORKER".replaceAll(":status", StringUtils.repeat("?", ",", execStatusArr.length));
        Object[] objArr = new Object[execStatusArr.length];
        for (ExecStatus execStatus : execStatusArr) {
            objArr[0] = Integer.valueOf(execStatus.getNumber());
        }
        return doSelectWithTemplate(this.getBusyWorkersJdbcTemplate, replaceAll, new BusyWorkerRowMapper(), objArr);
    }

    public List<ExecutionMessage> findOldMessages(long j) {
        return this.findLargeJdbcTemplate.query(FIND_OLD_STATES, new Object[]{Long.valueOf(j)}, (resultSet, i) -> {
            return new ExecutionMessage(resultSet.getLong("EXEC_STATE_ID"), resultSet.getString("ASSIGNED_WORKER"), resultSet.getString("EXEC_GROUP"), (String) null, ExecStatus.find(resultSet.getInt("STATUS")), (Payload) null, resultSet.getInt("MSG_SEQ_ID"), Long.valueOf(resultSet.getLong("CREATE_TIME")));
        });
    }

    public Set<Long> getExecutionIdsForExecutionStateIds(Set<Long> set) {
        HashSet hashSet = new HashSet();
        for (List list : Iterables.partition(set, PARTITION_SIZE)) {
            hashSet.addAll(this.findExecIDsJdbcTemplate.query(FIND_EXEC_IDS.replace(":IDS", StringUtils.repeat("?", ",", list.size())), list.toArray(), (resultSet, i) -> {
                return Long.valueOf(resultSet.getLong("MSG_ID"));
            }));
        }
        return hashSet;
    }

    private <T> List<T> doSelectWithTemplate(JdbcTemplate jdbcTemplate, String str, RowMapper<T> rowMapper, Object... objArr) {
        logSQL(str, objArr);
        try {
            long currentTimeMillis = System.currentTimeMillis();
            List<T> query = jdbcTemplate.query(str, objArr, rowMapper);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Fetched result: " + query.size() + '/' + (System.currentTimeMillis() - currentTimeMillis) + " rows/ms");
            }
            return query;
        } catch (RuntimeException e) {
            this.logger.error("Failed to execute query: " + str, e);
            throw e;
        }
    }

    private void logSQL(String str, Object... objArr) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Execute SQL: " + str);
            if (objArr == null || objArr.length <= 1) {
                return;
            }
            this.logger.debug("Parameters : " + Arrays.toString(objArr));
        }
    }
}
