前言
hello,大家好,我是霸气侧漏的秀总,时隔多日,今天秀总给大家带来什么呢?
大家都知道,此前我带来过一篇关于多线程的文章,如果你项目中多线程遇到不多,没必要使用线程池,那么今天给大家带来多线程一些常见的使用方法
一、理解
多线程用于堆积处理,就像一个大土堆,一个推土机很慢,那么10个推土机一起来处理,当然速度就快了,不过由于位置的限制,如果20个推土机,那么推土机之间会产生相互的避让,相互摩擦,相互拥挤,反而不如10个处理的好,所以,多线程处理,线程数要开的恰当,就可以提高效率。
二、场景
- 1、后台任务,例如:定时向大量(100w以上)的用户发送邮件;
- 2、异步处理,例如:发微博、记录日志等;
- 3、分布式计算
- 4、 数据库用到的多线程
- 5、 tomcat,tomcat内部采用多线程,上百个客户端访问同一个WEB应用,tomcat接入后就是把后续的处理扔给一个新的线程来处理,这个新的线程最后调用我们的servlet程序,比如doGet或者dpPost方法
- 6、 后台任务:如定时向大量(100W以上)的用户发送邮件;定期更新配置文件、任务调度(如quartz),一些监控用于定期信息采集
- 7、 自动作业处理:比如定期备份日志、定期备份数据库
- 8、 页面异步处理:比如大批量数据的核对工作(有10万个手机号码,核对哪些是已有用户)
- 9、 数据库的数据分析(待分析的数据太多),数据迁移
- 10、 多步骤的任务处理,可根据步骤特征选用不同个数和特征的线程来协作处理,多任务的分割,由一个主线程分割给多个线程完成
- 11、常见的浏览器、Web服务(现在写的web是中间件帮你完成了线程的控制),web处理请求,各种专用服务器(如游戏服务器)
- 12、 FTP下载,多线程操作文件
三、目的
-
1、 吞吐量:做WEB,容器帮你做了多线程,但是它只能帮你做请求层面的,简单的说,就是一个请求一个线程(如struts2,是多线程的,每个客户端请求创建一个实例,保证线程安全),或多个请求一个线程,如果是单线程,那只能是处理一个用户的请求
-
2、 伸缩性:通过增加CPU核数来提升性能。
四、线程
4.1、方法中使用多线程
public static void main(String[] args) {
System.out.println("------》接口请求:开始");
System.out.println("执行主要逻辑");
for (int i = 0; i < 5; i++) {
String ret = execLogic();
System.out.println("执行结果:" + ret);
}
System.out.println("------》接口请求:结束");
}
public static String execLogic() {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("执行逻辑、发送短信----》1");
// 模拟逻辑执行了5秒
Thread.sleep(5000);
System.out.println("执行逻辑----》2");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
return "我是线程方法返回的值";
}
解释:上面逻辑有处理主要的逻辑,另外还有发送短信逻辑,因为多线程是异步,多以先返回了值,然后执行里面逻辑,所以日志为
Connected to the target VM, address: '127.0.0.1:12141', transport: 'socket'
------》接口请求:开始
执行主要逻辑
执行结果:我是线程方法返回的值
执行结果:我是线程方法返回的值
执行逻辑----》1
执行逻辑----》1
执行结果:我是线程方法返回的值
执行逻辑----》1
执行结果:我是线程方法返回的值
执行结果:我是线程方法返回的值
------》接口请求:结束
执行逻辑----》1
执行逻辑----》1
执行逻辑----》2
执行逻辑----》2
执行逻辑----》2
执行逻辑----》2
执行逻辑----》2
Disconnected from the target VM, address: '127.0.0.1:12141', transport: 'socket'
4.2、方法中使用多线程池
public void threadMethod(List<NetPublishData> list) {
// 拆分多个list
List<List<NetPublishData>> lists = ListUtils.partition(list, 100);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50,
4, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy());
// 大集合拆分成N个小集合, 这里集合的size可以稍微小一些(这里我用100刚刚好), 以保证多线程异步执行, 过大容易回到单线程
// 记录单个任务的执行次数
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
// 对拆分的集合进行批量处理, 先拆分的集合, 再多线程执行
for (List<NetPublishData> singleList : lists) {
// 线程池执行
threadPool.execute(new Thread(new Runnable(){
@Override
public void run() {
netPublishDataMapper.insertList(singleList);
}
}));
// 任务个数 - 1, 直至为0时唤醒await()
countDownLatch.countDown();
}
try {
// 让当前线程处于阻塞状态,直到锁存器计数为零
countDownLatch.await();
} catch (InterruptedException e) {
}
}
五、线程池
5.1、配置
@Configuration
@EnableAsync
public class TaskExecutePool {
private static final Logger logger = LoggerFactory.getLogger(TaskExecutePool.class);
@Bean(name = "ThreadPoolA")
public ThreadPoolTaskExecutor threadPoolTaskExecutormyTaskAsyncPool() {
logger.info("start TaskExecutePool......");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(4);
// 最大线程数
executor.setMaxPoolSize(8);
// 队列大小
executor.setQueueCapacity(100);
// 空闲最大存活时间
executor.setKeepAliveSeconds(60);
// 线程名前缀
executor.setThreadNamePrefix("Pool-A");
// 拒绝策略:策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 加载
executor.initialize();
return executor;
}
@Bean(name = "ThreadPoolB")
public ThreadPoolTaskExecutor threadPoolTaskExecutorAsyncPoolB() {
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(8);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-B");
//当任务数量超过MaxPoolSize和QueueCapacity时使用的策略,该策略是又调用任务的线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
5.2、获取线程运行情况
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecutePool.class);
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(null == threadPoolExecutor) {
return;
}
logger.info("{},{},任务数量:[{}], 已完成:[{}], 执行中[{}], 队列大小[{}]",this.getThreadPoolExecutor()
, prefix
, threadPoolExecutor.getTaskCount()
, threadPoolExecutor.getCompletedTaskCount()
, threadPoolExecutor.getActiveCount()
, threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1.do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2.do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable callable) {
showThreadPoolInfo("1.do execute");
return super.submit(callable);
}
@Override
public <T> Future<T> submit(Callable<T> callable) {
showThreadPoolInfo("2.do execute");
return super.submit(callable);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1.do execute");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2.do execute");
return super.submitListenable(task);
}
}
5.3、线程使用
@Async(value = "ThreadPoolA")
@Override
public void taskA(int num) {
for (int i = 0; i < 5; i++) {
logger.info("我是任务:{}, 逻辑:{}", num, i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}