栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

Java自定义线程池任务拒绝策略,完成任务重新加入线程池

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java自定义线程池任务拒绝策略,完成任务重新加入线程池

Java中默认的线程池拒绝策略,在任务拒绝策略被触发时会将当前任务丢弃,并打断主线程的执行,其他3种拒绝策略不能完全满足代码实现的要求,所以需要重新自定义一个任务拒绝策略。

实现目的:
  1. 控制线程池等待队列的无限制增长
  2. 当触发拒绝策略时当前任务不能丢弃,需要重新加入线程池
  3. 当所有线程都在执行中且等待队列已满时,主线程要暂停创建新的任务,暂停往队列中添加
ThreadPoolExecutor中提供的4个拒绝策略
  • java.util.concurrent.ThreadPoolExecutor.AbortPolicy(线程池默认的拒绝策略)
    • 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。
    • 必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
  • java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy 
    • 当触发拒绝策略,并且线程池没有关闭时,则使用父线程直接运行任务
    • 这会阻塞父进程继续往线程池中添加新的任务。个人认为仅仅适用于比较特殊的场景
  • java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
    • 直接丢弃,不抛出任何一场,适用于比较特殊的场景
  • java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy 
    • 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
Demo业务场景:

1亿+的数据,需要读取到内存中,程序处理后在更新或插入到数据库

Demo实现:

因为每行数据之间在业务场景中是独立的,不用考虑数据之间的并发问题,所以使用多线程来处理

Demo设计:
  1. 使用ThreadPoolExecutor初始化固定容量的线程池
  2. 初始化固定容量的线程池等待队列(LinkedBlockingQueue),防止单线程处理较慢时,躲过的数据被加载到内存中,会存在内存溢出的风险
  3. 核心 :实现java.util.concurrent.RejectedExecutionHandler接口,自定义线程任务拒绝策略来实现任务的重新加入
Demo设计分析:
  1. ThreadPoolExecutor中使用的等待队列是BlockingQueue,在Executors中提供的几个默认线程池都是使用的BlockingQueue的实现类:LinkedBlockingQueue,LinkedBlockingQueue在初始化时如果没有提供容量参数,那么就会使用Integer.MAX_VALUE作为队列的初始化容量,这是一个大于20亿的值,可以相当于无限制队列
  2. 如果依旧使用默认的队列初始化方式,那么基本不会出现任务被拒绝的情况,所以我们需要将等待队列设置为固定的容量(基于JVM参数的设置来确定),主要的目的还是限制队列的无限制增长,带来内存溢出的风险的发生
Demo代码:
package com.test.example.threadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.test.utils.Print;

public class ThreadPoolRejectedTest {

	public static void main(String[] args) {
		try {
			// 自定义的线程池任务拒绝策略
			RejectedExecutionHandler rejected = (Runnable r, ThreadPoolExecutor executor) -> {
				if (r instanceof RunnableImpl) {
					RunnableImpl rm = (RunnableImpl) r;
					Print.outln("线程任务被拒绝 : " + rm.id);
					try {
						// 等待1.5秒后,尝试将当前被拒绝的任务重新加入线程队列
						// 此时主线程是会被阻塞的
						Thread.sleep(1500);
						Print.outln("尝试重新加入 : " + rm.id);
						executor.execute(r);
					} catch (Exception e) {
					}
				}
			};
			// 将线程池队列设置为有界队列
			LinkedBlockingQueue queue = new LinkedBlockingQueue(1);
			// 初始化线程池
			ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, queue, rejected);
			// 模拟一个迭代器
			ListIterator iterator = list(105).listIterator();
			// 假设每个线程每次处理20条数据
			List list = new ArrayList(20);
			int i = 1;
			while (iterator.hasNext()) {
				list.add(iterator.next());
				if (list.size() >= 20 || !iterator.hasNext()) {
					executor.execute(new RunnableImpl(i++, list));
					list.clear();
				}
			}

			Print.outln("--------------------任务添加完成");
			executor.shutdown();
			while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
			}
			Print.outln("--------------------线程池执行结束");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static List list(int size) {
		ArrayList list = new ArrayList();
		for (int i = 1; i <= size; i++) {
			list.add(i + "");
		}
		return list;
	}
}

class RunnableImpl implements Runnable {

	public Integer id = -1;

	public RunnableImpl(int id, List list) {
		this.id = id;
		Print.outln("初始化完成 : " + id);
	}

	@Override
	public void run() {
		try {
			Thread.sleep(3000); // 假设线程每次处理需要3秒钟
			Print.outln("-----------------------------------线程执行结束 : " + id);
		} catch (Exception e) {
		}
	}
}
分析:

package com.test.utils;

import java.text.SimpleDateFormat;
import java.util.Date;

public class Print {

	private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

	private static String time() {
		return "[" + sdf.format(new Date()) + "] ";
	}

	public static void outln(String s) {
		System.out.println(time() + s);
	}
}
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1039157.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号