package com.walker.tcp.support;

import com.walker.infrastructure.ApplicationRuntimeException;
import com.walker.infrastructure.core.ApplicationBeanInitialized;
import com.walker.queue.AbstractQueueManager;
import com.walker.queue.QueueException;
import com.walker.tcp.ActionCallException;
import com.walker.tcp.ActionCallable;
import com.walker.tcp.ActionCallablePostProcessor;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Context;
import com.walker.tcp.Filter;
import com.walker.tcp.Request;
import com.walker.tcp.Response;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/* loaded from: input_file:com/walker/tcp/support/MemoryQueueManager.class */
public class MemoryQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized {
    private final BlockingQueue<Request<?>> dataQueue = new ArrayBlockingQueue(2048);
    private final List<Filter> filterList = new ArrayList();
    private int filterCount = 0;
    private ExecutorService service = Executors.newFixedThreadPool(this.maxWorkerThread);
    private Runnable consumer = new DefaultConsumer();
    private ConnectionManager connectionManager;

    /* loaded from: input_file:com/walker/tcp/support/MemoryQueueManager$ActionInvokeTask.class */
    private class ActionInvokeTask implements Runnable {
        private Context context;

        public ActionInvokeTask(Context context) {
            this.context = context;
        }

        @Override // java.lang.Runnable
        public void run() {
            Request<?> currentData = this.context.getCurrentData();
            ActionCallable action = ActionCallablePostProcessor.getAction(currentData.getProtocolNum());
            if (action == null) {
                throw new ApplicationRuntimeException("action未定义，protocol = " + currentData.getProtocolNum());
            }
            try {
                if (currentData.isRequireResponse()) {
                    Response<?> action2 = action.action(currentData);
                    action2.setName(currentData.getName());
                    Connection connectionByName = MemoryQueueManager.this.connectionManager.getConnectionByName(currentData.getName());
                    if (connectionByName == null || !connectionByName.isConnected()) {
                        MemoryQueueManager.this.processFailed(currentData);
                    } else {
                        connectionByName.write(action2);
                        MemoryQueueManager.this.processSuccess(currentData);
                    }
                } else {
                    action.action(currentData);
                    MemoryQueueManager.this.logger.debug("该请求不需要响应：" + currentData);
                }
            } catch (ActionCallException e) {
                MemoryQueueManager.this.processFailed(currentData);
                MemoryQueueManager.this.logger.error("执行action错误：" + e.getMessage(), e);
            }
        }
    }

    /* loaded from: input_file:com/walker/tcp/support/MemoryQueueManager$DefaultConsumer.class */
    private class DefaultConsumer implements Runnable {
        private DefaultConsumer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            MemoryQueueManager.this.logger.info("处理websocket线程启动：" + getClass().getName());
            while (true) {
                try {
                    Request<?> take = MemoryQueueManager.this.dataQueue.take();
                    SimpleContext simpleContext = new SimpleContext(take, MemoryQueueManager.this.dataQueue.peek());
                    MemoryQueueManager.this.logger.debug("消费者拿到一个数据：" + take);
                    if (MemoryQueueManager.this.filterCount > 0) {
                        boolean z = true;
                        Iterator<Filter> it = MemoryQueueManager.this.filterList.iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            Filter next = it.next();
                            z = next.doFilter(simpleContext);
                            if (!z) {
                                MemoryQueueManager.this.logger.debug("过滤器终止操作：" + next.getName());
                                break;
                            }
                        }
                        if (!z) {
                            return;
                        }
                    }
                    MemoryQueueManager.this.service.execute(new ActionInvokeTask(simpleContext));
                } catch (InterruptedException e) {
                    MemoryQueueManager.this.logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e);
                } catch (Exception e2) {
                    MemoryQueueManager.this.logger.error("执行任务出现错误", e2);
                }
            }
        }
    }

    public void addFilter(Filter filter) {
        Iterator<Filter> it = this.filterList.iterator();
        while (it.hasNext()) {
            if (filter.getName().equalsIgnoreCase(it.next().getName())) {
                throw new IllegalArgumentException("已经存在该过滤器，不能重复添加");
            }
        }
        this.filterList.add(filter);
        this.filterCount++;
    }

    public void push(String str, Object obj, Object obj2) throws QueueException {
        if (obj == null) {
            throw new QueueException("request is required!");
        }
        if (!Request.class.isAssignableFrom(obj.getClass())) {
            throw new QueueException("请求对象必须是: " + Request.class.getName());
        }
        try {
            this.dataQueue.put((Request) obj);
            this.logger.debug("加入一个数据到队列：" + obj);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    protected void processFailed(Request<?> request) {
        this.logger.error("tcp消息处理失败，可能连接不存在或者已关闭，记录日志：" + request.toString());
    }

    protected void processSuccess(Request<?> request) {
        this.logger.debug("tcp消息处理成功：" + request.getProtocolNum());
    }

    public void startup() {
        if (this.connectionManager == null) {
            throw new IllegalArgumentException("connectionManager未配置");
        }
        this.logger.info("初始化TcpAction消费者任务：" + this.consumer.getClass().getName());
        this.service.execute(this.consumer);
    }

    public void setConnectionManager(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }
}
