我的解决方案非常简单,就是自己写一个类继承java.util.concurrent.ThreadPoolExecutor,然后重写里面的几个方法如下
public class ThreadPoolExecute extends ThreadPoolExecutor { public ThreadPoolExecute(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ThreadPoolExecute(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,handler); } public ThreadPoolExecute(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public void execute(Runnable command) { Runnable wrap = wrap(command); super.execute(wrap); } @Override public Future<?> submit(Runnable task) { Runnable wrap = wrap(task); return super.submit(wrap); } @Override public <T> Future<T> submit(Runnable task, T result) { Runnable warp = wrap(task); return super.submit(warp, result); } @Override public <T> Future<T> submit(Callable<T> task) { Callable wrap = wrap(task); return super.submit(wrap); } private <T> Callable<T> wrap(final Callable<T> task) { return new Callable<T>() { @Override public T call() throws Exception { try { return task.call(); } catch (Exception e) { log.error("线程池运行错误", e); throw e; } } }; } private Runnable wrap(final Runnable task) { return new Runnable() { @Override public void run() { try { task.run(); } catch (Throwable t) { log.error("线程池运行错误", t); } } }; } }
使用起来也很简单,下面是我使用的一个例子,大概做的一个事情就是用这个executorService执行(提交)一个线程,线程内部是一个死循环take一个阻塞队列,当get到内容后做我的业务,具体业务内容您可以忽略不看。
private final ThreadPoolExecute executorService = new ThreadPoolExecute(4, 8, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
Thread logThread = new Thread(() -> { while (true) { try { Object ele = queue.take(); if (ele instanceof SpreadingParamVo) { log.debug("有日志要插入..."); logService.recordEventLog((SpreadingParamVo) ele); } else if (ele instanceof AskResp) { log.debug("有日志要更新..."); logService.updateEventLog((AskResp) ele); } else { log.error("不支持的队列类型!" + ele.getClass().getName()); } } catch (Exception e) { log.error("日志消费发生异常:" + e.getMessage(), e); log.info("日志消费发生异常:" + e.getMessage()); } } }, "esa-log-th");
executorService.submit(logThread);