package com.netflix.conductor.cassandra.dao;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.cassandra.config.CassandraProperties;
import com.netflix.conductor.cassandra.dao.CassandraBaseDAO;
import com.netflix.conductor.cassandra.util.Constants;
import com.netflix.conductor.cassandra.util.Statements;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Trace
/* loaded from: input_file:com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.class */
public class CassandraExecutionDAO extends CassandraBaseDAO implements ExecutionDAO, ConcurrentExecutionLimitDAO {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraExecutionDAO.class);
    private static final String CLASS_NAME = CassandraExecutionDAO.class.getSimpleName();
    protected final PreparedStatement insertWorkflowStatement;
    protected final PreparedStatement insertTaskStatement;
    protected final PreparedStatement insertEventExecutionStatement;
    protected final PreparedStatement selectTotalStatement;
    protected final PreparedStatement selectTaskStatement;
    protected final PreparedStatement selectWorkflowStatement;
    protected final PreparedStatement selectWorkflowWithTasksStatement;
    protected final PreparedStatement selectTaskLookupStatement;
    protected final PreparedStatement selectTasksFromTaskDefLimitStatement;
    protected final PreparedStatement selectEventExecutionsStatement;
    protected final PreparedStatement updateWorkflowStatement;
    protected final PreparedStatement updateTotalTasksStatement;
    protected final PreparedStatement updateTotalPartitionsStatement;
    protected final PreparedStatement updateTaskLookupStatement;
    protected final PreparedStatement updateTaskDefLimitStatement;
    protected final PreparedStatement updateEventExecutionStatement;
    protected final PreparedStatement deleteWorkflowStatement;
    protected final PreparedStatement deleteTaskStatement;
    protected final PreparedStatement deleteTaskLookupStatement;
    protected final PreparedStatement deleteTaskDefLimitStatement;
    protected final PreparedStatement deleteEventExecutionStatement;
    protected final int eventExecutionsTTL;

    public CassandraExecutionDAO(Session session, ObjectMapper objectMapper, CassandraProperties cassandraProperties, Statements statements) {
        super(session, objectMapper, cassandraProperties);
        this.eventExecutionsTTL = (int) cassandraProperties.getEventExecutionPersistenceTtl().getSeconds();
        this.insertWorkflowStatement = session.prepare(statements.getInsertWorkflowStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.insertTaskStatement = session.prepare(statements.getInsertTaskStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.insertEventExecutionStatement = session.prepare(statements.getInsertEventExecutionStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.selectTotalStatement = session.prepare(statements.getSelectTotalStatement()).setConsistencyLevel(cassandraProperties.getReadConsistencyLevel());
        this.selectTaskStatement = session.prepare(statements.getSelectTaskStatement()).setConsistencyLevel(cassandraProperties.getReadConsistencyLevel());
        this.selectWorkflowStatement = session.prepare(statements.getSelectWorkflowStatement()).setConsistencyLevel(cassandraProperties.getReadConsistencyLevel());
        this.selectWorkflowWithTasksStatement = session.prepare(statements.getSelectWorkflowWithTasksStatement()).setConsistencyLevel(cassandraProperties.getReadConsistencyLevel());
        this.selectTaskLookupStatement = session.prepare(statements.getSelectTaskFromLookupTableStatement()).setConsistencyLevel(cassandraProperties.getReadConsistencyLevel());
        this.selectTasksFromTaskDefLimitStatement = session.prepare(statements.getSelectTasksFromTaskDefLimitStatement()).setConsistencyLevel(cassandraProperties.getReadConsistencyLevel());
        this.selectEventExecutionsStatement = session.prepare(statements.getSelectAllEventExecutionsForMessageFromEventExecutionsStatement()).setConsistencyLevel(cassandraProperties.getReadConsistencyLevel());
        this.updateWorkflowStatement = session.prepare(statements.getUpdateWorkflowStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.updateTotalTasksStatement = session.prepare(statements.getUpdateTotalTasksStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.updateTotalPartitionsStatement = session.prepare(statements.getUpdateTotalPartitionsStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.updateTaskLookupStatement = session.prepare(statements.getUpdateTaskLookupStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.updateTaskDefLimitStatement = session.prepare(statements.getUpdateTaskDefLimitStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.updateEventExecutionStatement = session.prepare(statements.getUpdateEventExecutionStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.deleteWorkflowStatement = session.prepare(statements.getDeleteWorkflowStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.deleteTaskStatement = session.prepare(statements.getDeleteTaskStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.deleteTaskLookupStatement = session.prepare(statements.getDeleteTaskLookupStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.deleteTaskDefLimitStatement = session.prepare(statements.getDeleteTaskDefLimitStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
        this.deleteEventExecutionStatement = session.prepare(statements.getDeleteEventExecutionsStatement()).setConsistencyLevel(cassandraProperties.getWriteConsistencyLevel());
    }

    public List<TaskModel> getPendingTasksByWorkflow(String str, String str2) {
        return (List) getTasksForWorkflow(str2).stream().filter(taskModel -> {
            return str.equals(taskModel.getTaskType());
        }).filter(taskModel2 -> {
            return TaskModel.Status.IN_PROGRESS.equals(taskModel2.getStatus());
        }).collect(Collectors.toList());
    }

    public List<TaskModel> getTasks(String str, String str2, int i) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public List<TaskModel> createTasks(List<TaskModel> list) {
        validateTasks(list);
        String workflowInstanceId = list.get(0).getWorkflowInstanceId();
        UUID uuid = toUUID(workflowInstanceId, "Invalid workflow id");
        try {
            int totalTasks = getWorkflowMetadata(workflowInstanceId).getTotalTasks() + list.size();
            list.forEach(taskModel -> {
                if (taskModel.getScheduledTime() == 0) {
                    taskModel.setScheduledTime(System.currentTimeMillis());
                }
                this.session.execute(this.updateTaskLookupStatement.bind(new Object[]{uuid, toUUID(taskModel.getTaskId(), "Invalid task id")}));
            });
            BatchStatement batchStatement = new BatchStatement();
            list.forEach(taskModel2 -> {
                String json = toJson(taskModel2);
                batchStatement.add(this.insertTaskStatement.bind(new Object[]{uuid, 1, taskModel2.getTaskId(), json}));
                recordCassandraDaoRequests("createTask", taskModel2.getTaskType(), taskModel2.getWorkflowType());
                recordCassandraDaoPayloadSize("createTask", json.length(), taskModel2.getTaskType(), taskModel2.getWorkflowType());
            });
            batchStatement.add(this.updateTotalTasksStatement.bind(new Object[]{Integer.valueOf(totalTasks), uuid, 1}));
            this.session.execute(batchStatement);
            this.session.execute(this.updateTotalPartitionsStatement.bind(new Object[]{1, Integer.valueOf(totalTasks), uuid}));
            return list;
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "createTasks");
            String format = String.format("Error creating %d tasks for workflow: %s", Integer.valueOf(list.size()), workflowInstanceId);
            LOGGER.error(format, e);
            throw new TransientException(format, e);
        }
    }

    public void updateTask(TaskModel taskModel) {
        try {
            String json = toJson(taskModel);
            recordCassandraDaoRequests("updateTask", taskModel.getTaskType(), taskModel.getWorkflowType());
            recordCassandraDaoPayloadSize("updateTask", json.length(), taskModel.getTaskType(), taskModel.getWorkflowType());
            this.session.execute(this.insertTaskStatement.bind(new Object[]{UUID.fromString(taskModel.getWorkflowInstanceId()), 1, taskModel.getTaskId(), json}));
            if (taskModel.getTaskDefinition().isPresent() && ((TaskDef) taskModel.getTaskDefinition().get()).concurrencyLimit() > 0) {
                if (taskModel.getStatus().isTerminal()) {
                    removeTaskFromLimit(taskModel);
                } else if (taskModel.getStatus() == TaskModel.Status.IN_PROGRESS) {
                    addTaskToLimit(taskModel);
                }
            }
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "updateTask");
            String format = String.format("Error updating task: %s in workflow: %s", taskModel.getTaskId(), taskModel.getWorkflowInstanceId());
            LOGGER.error(format, e);
            throw new TransientException(format, e);
        }
    }

    public boolean exceedsLimit(TaskModel taskModel) {
        int concurrencyLimit;
        Optional taskDefinition = taskModel.getTaskDefinition();
        if (taskDefinition.isEmpty() || (concurrencyLimit = ((TaskDef) taskDefinition.get()).concurrencyLimit()) <= 0) {
            return false;
        }
        try {
            recordCassandraDaoRequests("selectTaskDefLimit", taskModel.getTaskType(), taskModel.getWorkflowType());
            List list = (List) this.session.execute(this.selectTasksFromTaskDefLimitStatement.bind(new Object[]{taskModel.getTaskDefName()})).all().stream().map(row -> {
                return row.getUUID(Constants.TASK_ID_KEY).toString();
            }).collect(Collectors.toList());
            long size = list.size();
            if (list.contains(taskModel.getTaskId()) || size < concurrencyLimit) {
                return false;
            }
            LOGGER.info("Task execution count limited. task - {}:{}, limit: {}, current: {}", new Object[]{taskModel.getTaskId(), taskModel.getTaskDefName(), Integer.valueOf(concurrencyLimit), Long.valueOf(size)});
            Monitors.recordTaskConcurrentExecutionLimited(taskModel.getTaskDefName(), concurrencyLimit);
            return true;
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "exceedsLimit");
            String format = String.format("Failed to get in progress limit - %s:%s in workflow :%s", taskModel.getTaskDefName(), taskModel.getTaskId(), taskModel.getWorkflowInstanceId());
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    public boolean removeTask(String str) {
        TaskModel task = getTask(str);
        if (task != null) {
            return removeTask(task);
        }
        LOGGER.warn("No such task found by id {}", str);
        return false;
    }

    public TaskModel getTask(String str) {
        try {
            String lookupWorkflowIdFromTaskId = lookupWorkflowIdFromTaskId(str);
            if (lookupWorkflowIdFromTaskId == null) {
                return null;
            }
            return (TaskModel) Optional.ofNullable(this.session.execute(this.selectTaskStatement.bind(new Object[]{UUID.fromString(lookupWorkflowIdFromTaskId), 1, str})).one()).map(row -> {
                String string = row.getString(Constants.PAYLOAD_KEY);
                TaskModel taskModel = (TaskModel) readValue(string, TaskModel.class);
                recordCassandraDaoRequests("getTask", taskModel.getTaskType(), taskModel.getWorkflowType());
                recordCassandraDaoPayloadSize("getTask", string.length(), taskModel.getTaskType(), taskModel.getWorkflowType());
                return taskModel;
            }).orElse(null);
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "getTask");
            String format = String.format("Error getting task by id: %s", str);
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    public List<TaskModel> getTasks(List<String> list) {
        Preconditions.checkNotNull(list);
        Preconditions.checkArgument(list.size() > 0, "Task ids list cannot be empty");
        String lookupWorkflowIdFromTaskId = lookupWorkflowIdFromTaskId(list.get(0));
        if (lookupWorkflowIdFromTaskId == null) {
            return null;
        }
        return (List) getWorkflow(lookupWorkflowIdFromTaskId, true).getTasks().stream().filter(taskModel -> {
            return list.contains(taskModel.getTaskId());
        }).collect(Collectors.toList());
    }

    public List<TaskModel> getPendingTasksForTaskType(String str) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public List<TaskModel> getTasksForWorkflow(String str) {
        return getWorkflow(str, true).getTasks();
    }

    public String createWorkflow(WorkflowModel workflowModel) {
        try {
            List tasks = workflowModel.getTasks();
            workflowModel.setTasks(new LinkedList());
            String json = toJson(workflowModel);
            recordCassandraDaoRequests("createWorkflow", "n/a", workflowModel.getWorkflowName());
            recordCassandraDaoPayloadSize("createWorkflow", json.length(), "n/a", workflowModel.getWorkflowName());
            this.session.execute(this.insertWorkflowStatement.bind(new Object[]{UUID.fromString(workflowModel.getWorkflowId()), 1, "", json, 0, 1}));
            workflowModel.setTasks(tasks);
            return workflowModel.getWorkflowId();
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "createWorkflow");
            String format = String.format("Error creating workflow: %s", workflowModel.getWorkflowId());
            LOGGER.error(format, e);
            throw new TransientException(format, e);
        }
    }

    public String updateWorkflow(WorkflowModel workflowModel) {
        try {
            List tasks = workflowModel.getTasks();
            workflowModel.setTasks(new LinkedList());
            String json = toJson(workflowModel);
            recordCassandraDaoRequests("updateWorkflow", "n/a", workflowModel.getWorkflowName());
            recordCassandraDaoPayloadSize("updateWorkflow", json.length(), "n/a", workflowModel.getWorkflowName());
            this.session.execute(this.updateWorkflowStatement.bind(new Object[]{json, UUID.fromString(workflowModel.getWorkflowId())}));
            workflowModel.setTasks(tasks);
            return workflowModel.getWorkflowId();
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "updateWorkflow");
            String format = String.format("Failed to update workflow: %s", workflowModel.getWorkflowId());
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    public boolean removeWorkflow(String str) {
        WorkflowModel workflow = getWorkflow(str, true);
        boolean z = false;
        if (workflow != null) {
            try {
                recordCassandraDaoRequests("removeWorkflow", "n/a", workflow.getWorkflowName());
                z = this.session.execute(this.deleteWorkflowStatement.bind(new Object[]{UUID.fromString(str), 1})).wasApplied();
                workflow.getTasks().forEach(this::removeTaskLookup);
            } catch (DriverException e) {
                Monitors.error(CLASS_NAME, "removeWorkflow");
                String format = String.format("Failed to remove workflow: %s", str);
                LOGGER.error(format, e);
                throw new TransientException(format);
            }
        }
        return z;
    }

    public boolean removeWorkflowWithExpiry(String str, int i) {
        throw new UnsupportedOperationException("This method is not currently implemented in CassandraExecutionDAO. Please use RedisDAO mode instead now for using TTLs.");
    }

    public void removeFromPendingWorkflow(String str, String str2) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public WorkflowModel getWorkflow(String str) {
        return getWorkflow(str, true);
    }

    public WorkflowModel getWorkflow(String str, boolean z) {
        UUID uuid = toUUID(str, "Invalid workflow id");
        WorkflowModel workflowModel = null;
        try {
            if (z) {
                ResultSet execute = this.session.execute(this.selectWorkflowWithTasksStatement.bind(new Object[]{uuid, 1}));
                ArrayList arrayList = new ArrayList();
                List<Row> all = execute.all();
                if (all.size() == 0) {
                    LOGGER.info("Workflow {} not found in datastore", str);
                    return null;
                }
                for (Row row : all) {
                    String string = row.getString(Constants.ENTITY_KEY);
                    if (Constants.ENTITY_TYPE_WORKFLOW.equals(string)) {
                        workflowModel = (WorkflowModel) readValue(row.getString(Constants.PAYLOAD_KEY), WorkflowModel.class);
                    } else {
                        if (!Constants.ENTITY_TYPE_TASK.equals(string)) {
                            throw new NonTransientException(String.format("Invalid row with entityKey: %s found in datastore for workflow: %s", string, str));
                        }
                        arrayList.add((TaskModel) readValue(row.getString(Constants.PAYLOAD_KEY), TaskModel.class));
                    }
                }
                if (workflowModel != null) {
                    recordCassandraDaoRequests("getWorkflow", "n/a", workflowModel.getWorkflowName());
                    arrayList.sort(Comparator.comparingInt((v0) -> {
                        return v0.getSeq();
                    }));
                    workflowModel.setTasks(arrayList);
                }
            } else {
                workflowModel = (WorkflowModel) Optional.ofNullable(this.session.execute(this.selectWorkflowStatement.bind(new Object[]{uuid})).one()).map(row2 -> {
                    WorkflowModel workflowModel2 = (WorkflowModel) readValue(row2.getString(Constants.PAYLOAD_KEY), WorkflowModel.class);
                    recordCassandraDaoRequests("getWorkflow", "n/a", workflowModel2.getWorkflowName());
                    return workflowModel2;
                }).orElse(null);
            }
            return workflowModel;
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "getWorkflow");
            String format = String.format("Failed to get workflow: %s", str);
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    public List<String> getRunningWorkflowIds(String str, int i) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public List<WorkflowModel> getPendingWorkflowsByType(String str, int i) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public long getPendingWorkflowCount(String str) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public long getInProgressTaskCount(String str) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public List<WorkflowModel> getWorkflowsByType(String str, Long l, Long l2) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public List<WorkflowModel> getWorkflowsByCorrelationId(String str, String str2, boolean z) {
        throw new UnsupportedOperationException("This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
    }

    public boolean canSearchAcrossWorkflows() {
        return false;
    }

    public boolean addEventExecution(EventExecution eventExecution) {
        try {
            String json = toJson(eventExecution);
            recordCassandraDaoEventRequests("addEventExecution", eventExecution.getEvent());
            recordCassandraDaoPayloadSize("addEventExecution", json.length(), eventExecution.getEvent(), "n/a");
            return this.session.execute(this.insertEventExecutionStatement.bind(new Object[]{eventExecution.getMessageId(), eventExecution.getName(), eventExecution.getId(), json})).wasApplied();
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "addEventExecution");
            String format = String.format("Failed to add event execution for event: %s, handler: %s", eventExecution.getEvent(), eventExecution.getName());
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    public void updateEventExecution(EventExecution eventExecution) {
        try {
            String json = toJson(eventExecution);
            recordCassandraDaoEventRequests("updateEventExecution", eventExecution.getEvent());
            recordCassandraDaoPayloadSize("updateEventExecution", json.length(), eventExecution.getEvent(), "n/a");
            this.session.execute(this.updateEventExecutionStatement.bind(new Object[]{Integer.valueOf(this.eventExecutionsTTL), json, eventExecution.getMessageId(), eventExecution.getName(), eventExecution.getId()}));
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "updateEventExecution");
            String format = String.format("Failed to update event execution for event: %s, handler: %s", eventExecution.getEvent(), eventExecution.getName());
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    public void removeEventExecution(EventExecution eventExecution) {
        try {
            recordCassandraDaoEventRequests("removeEventExecution", eventExecution.getEvent());
            this.session.execute(this.deleteEventExecutionStatement.bind(new Object[]{eventExecution.getMessageId(), eventExecution.getName(), eventExecution.getId()}));
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "removeEventExecution");
            String format = String.format("Failed to remove event execution for event: %s, handler: %s", eventExecution.getEvent(), eventExecution.getName());
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    @VisibleForTesting
    List<EventExecution> getEventExecutions(String str, String str2, String str3) {
        try {
            return (List) this.session.execute(this.selectEventExecutionsStatement.bind(new Object[]{str3, str})).all().stream().filter(row -> {
                return !row.isNull(Constants.PAYLOAD_KEY);
            }).map(row2 -> {
                return (EventExecution) readValue(row2.getString(Constants.PAYLOAD_KEY), EventExecution.class);
            }).collect(Collectors.toList());
        } catch (DriverException e) {
            String format = String.format("Failed to fetch event executions for event: %s, handler: %s", str2, str);
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    public void addTaskToLimit(TaskModel taskModel) {
        try {
            recordCassandraDaoRequests("addTaskToLimit", taskModel.getTaskType(), taskModel.getWorkflowType());
            this.session.execute(this.updateTaskDefLimitStatement.bind(new Object[]{UUID.fromString(taskModel.getWorkflowInstanceId()), taskModel.getTaskDefName(), UUID.fromString(taskModel.getTaskId())}));
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "addTaskToLimit");
            String format = String.format("Error updating taskDefLimit for task - %s:%s in workflow: %s", taskModel.getTaskDefName(), taskModel.getTaskId(), taskModel.getWorkflowInstanceId());
            LOGGER.error(format, e);
            throw new TransientException(format, e);
        }
    }

    public void removeTaskFromLimit(TaskModel taskModel) {
        try {
            recordCassandraDaoRequests("removeTaskFromLimit", taskModel.getTaskType(), taskModel.getWorkflowType());
            this.session.execute(this.deleteTaskDefLimitStatement.bind(new Object[]{taskModel.getTaskDefName(), UUID.fromString(taskModel.getTaskId())}));
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "removeTaskFromLimit");
            String format = String.format("Error updating taskDefLimit for task - %s:%s in workflow: %s", taskModel.getTaskDefName(), taskModel.getTaskId(), taskModel.getWorkflowInstanceId());
            LOGGER.error(format, e);
            throw new TransientException(format, e);
        }
    }

    protected boolean removeTask(TaskModel taskModel) {
        try {
            int totalTasks = getWorkflowMetadata(taskModel.getWorkflowInstanceId()).getTotalTasks();
            removeTaskLookup(taskModel);
            recordCassandraDaoRequests("removeTask", taskModel.getTaskType(), taskModel.getWorkflowType());
            BatchStatement batchStatement = new BatchStatement();
            batchStatement.add(this.deleteTaskStatement.bind(new Object[]{UUID.fromString(taskModel.getWorkflowInstanceId()), 1, taskModel.getTaskId()}));
            batchStatement.add(this.updateTotalTasksStatement.bind(new Object[]{Integer.valueOf(totalTasks - 1), UUID.fromString(taskModel.getWorkflowInstanceId()), 1}));
            ResultSet execute = this.session.execute(batchStatement);
            if (taskModel.getTaskDefinition().isPresent() && ((TaskDef) taskModel.getTaskDefinition().get()).concurrencyLimit() > 0) {
                removeTaskFromLimit(taskModel);
            }
            return execute.wasApplied();
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "removeTask");
            String format = String.format("Failed to remove task: %s", taskModel.getTaskId());
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    protected void removeTaskLookup(TaskModel taskModel) {
        try {
            recordCassandraDaoRequests("removeTaskLookup", taskModel.getTaskType(), taskModel.getWorkflowType());
            if (taskModel.getTaskDefinition().isPresent() && ((TaskDef) taskModel.getTaskDefinition().get()).concurrencyLimit() > 0) {
                removeTaskFromLimit(taskModel);
            }
            this.session.execute(this.deleteTaskLookupStatement.bind(new Object[]{UUID.fromString(taskModel.getTaskId())}));
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "removeTaskLookup");
            String format = String.format("Failed to remove task lookup: %s", taskModel.getTaskId());
            LOGGER.error(format, e);
            throw new TransientException(format);
        }
    }

    @VisibleForTesting
    void validateTasks(List<TaskModel> list) {
        Preconditions.checkNotNull(list, "Tasks object cannot be null");
        Preconditions.checkArgument(!list.isEmpty(), "Tasks object cannot be empty");
        list.forEach(taskModel -> {
            Preconditions.checkNotNull(taskModel, "task object cannot be null");
            Preconditions.checkNotNull(taskModel.getTaskId(), "Task id cannot be null");
            Preconditions.checkNotNull(taskModel.getWorkflowInstanceId(), "Workflow instance id cannot be null");
            Preconditions.checkNotNull(taskModel.getReferenceTaskName(), "Task reference name cannot be null");
        });
        String workflowInstanceId = list.get(0).getWorkflowInstanceId();
        if (list.stream().filter(taskModel2 -> {
            return !workflowInstanceId.equals(taskModel2.getWorkflowInstanceId());
        }).findAny().isPresent()) {
            throw new NonTransientException("Tasks of multiple workflows cannot be created/updated simultaneously");
        }
    }

    @VisibleForTesting
    CassandraBaseDAO.WorkflowMetadata getWorkflowMetadata(String str) {
        ResultSet execute = this.session.execute(this.selectTotalStatement.bind(new Object[]{UUID.fromString(str)}));
        recordCassandraDaoRequests("getWorkflowMetadata");
        return (CassandraBaseDAO.WorkflowMetadata) Optional.ofNullable(execute.one()).map(row -> {
            CassandraBaseDAO.WorkflowMetadata workflowMetadata = new CassandraBaseDAO.WorkflowMetadata();
            workflowMetadata.setTotalTasks(row.getInt(Constants.TOTAL_TASKS_KEY));
            workflowMetadata.setTotalPartitions(row.getInt(Constants.TOTAL_PARTITIONS_KEY));
            return workflowMetadata;
        }).orElseThrow(() -> {
            return new NotFoundException("Workflow with id: %s not found in data store", new Object[]{str});
        });
    }

    @VisibleForTesting
    String lookupWorkflowIdFromTaskId(String str) {
        try {
            return (String) Optional.ofNullable(this.session.execute(this.selectTaskLookupStatement.bind(new Object[]{toUUID(str, "Invalid task id")})).one()).map(row -> {
                return row.getUUID(Constants.WORKFLOW_ID_KEY).toString();
            }).orElse(null);
        } catch (DriverException e) {
            Monitors.error(CLASS_NAME, "lookupWorkflowIdFromTaskId");
            String format = String.format("Failed to lookup workflowId from taskId: %s", str);
            LOGGER.error(format, e);
            throw new TransientException(format, e);
        }
    }
}
