Java中默认的线程池拒绝策略,在任务拒绝策略被触发时会将当前任务丢弃,并打断主线程的执行,其他3种拒绝策略不能完全满足代码实现的要求,所以需要重新自定义一个任务拒绝策略。
实现目的:- 控制线程池等待队列的无限制增长
- 当触发拒绝策略时当前任务不能丢弃,需要重新加入线程池
- 当所有线程都在执行中且等待队列已满时,主线程要暂停创建新的任务,暂停往队列中添加
- java.util.concurrent.ThreadPoolExecutor.AbortPolicy(线程池默认的拒绝策略)
- 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。
- 必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
- java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
- 当触发拒绝策略,并且线程池没有关闭时,则使用父线程直接运行任务
- 这会阻塞父进程继续往线程池中添加新的任务。个人认为仅仅适用于比较特殊的场景
- java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
- 直接丢弃,不抛出任何一场,适用于比较特殊的场景
- java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
- 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
1亿+的数据,需要读取到内存中,程序处理后在更新或插入到数据库
Demo实现:因为每行数据之间在业务场景中是独立的,不用考虑数据之间的并发问题,所以使用多线程来处理
Demo设计:- 使用ThreadPoolExecutor初始化固定容量的线程池
- 初始化固定容量的线程池等待队列(LinkedBlockingQueue),防止单线程处理较慢时,躲过的数据被加载到内存中,会存在内存溢出的风险
- 核心 :实现java.util.concurrent.RejectedExecutionHandler接口,自定义线程任务拒绝策略来实现任务的重新加入
- ThreadPoolExecutor中使用的等待队列是BlockingQueue,在Executors中提供的几个默认线程池都是使用的BlockingQueue的实现类:LinkedBlockingQueue,LinkedBlockingQueue在初始化时如果没有提供容量参数,那么就会使用Integer.MAX_VALUE作为队列的初始化容量,这是一个大于20亿的值,可以相当于无限制队列
- 如果依旧使用默认的队列初始化方式,那么基本不会出现任务被拒绝的情况,所以我们需要将等待队列设置为固定的容量(基于JVM参数的设置来确定),主要的目的还是限制队列的无限制增长,带来内存溢出的风险的发生
package com.test.example.threadPool; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.test.utils.Print; public class ThreadPoolRejectedTest { public static void main(String[] args) { try { // 自定义的线程池任务拒绝策略 RejectedExecutionHandler rejected = (Runnable r, ThreadPoolExecutor executor) -> { if (r instanceof RunnableImpl) { RunnableImpl rm = (RunnableImpl) r; Print.outln("线程任务被拒绝 : " + rm.id); try { // 等待1.5秒后,尝试将当前被拒绝的任务重新加入线程队列 // 此时主线程是会被阻塞的 Thread.sleep(1500); Print.outln("尝试重新加入 : " + rm.id); executor.execute(r); } catch (Exception e) { } } }; // 将线程池队列设置为有界队列 LinkedBlockingQueue分析:queue = new LinkedBlockingQueue (1); // 初始化线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, queue, rejected); // 模拟一个迭代器 ListIterator iterator = list(105).listIterator(); // 假设每个线程每次处理20条数据 List list = new ArrayList (20); int i = 1; while (iterator.hasNext()) { list.add(iterator.next()); if (list.size() >= 20 || !iterator.hasNext()) { executor.execute(new RunnableImpl(i++, list)); list.clear(); } } Print.outln("--------------------任务添加完成"); executor.shutdown(); while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { } Print.outln("--------------------线程池执行结束"); } catch (Exception e) { e.printStackTrace(); } } public static List list(int size) { ArrayList list = new ArrayList (); for (int i = 1; i <= size; i++) { list.add(i + ""); } return list; } } class RunnableImpl implements Runnable { public Integer id = -1; public RunnableImpl(int id, List list) { this.id = id; Print.outln("初始化完成 : " + id); } @Override public void run() { try { Thread.sleep(3000); // 假设线程每次处理需要3秒钟 Print.outln("-----------------------------------线程执行结束 : " + id); } catch (Exception e) { } } }
package com.test.utils; import java.text.SimpleDateFormat; import java.util.Date; public class Print { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); private static String time() { return "[" + sdf.format(new Date()) + "] "; } public static void outln(String s) { System.out.println(time() + s); } }