前言

hello,大家好,我是霸气侧漏的秀总,时隔多日,今天秀总给大家带来什么呢?
大家都知道,此前我带来过一篇关于多线程的文章,如果你项目中多线程遇到不多,没必要使用线程池,那么今天给大家带来多线程一些常见的使用方法

一、理解

多线程用于堆积处理,就像一个大土堆,一个推土机很慢,那么10个推土机一起来处理,当然速度就快了,不过由于位置的限制,如果20个推土机,那么推土机之间会产生相互的避让,相互摩擦,相互拥挤,反而不如10个处理的好,所以,多线程处理,线程数要开的恰当,就可以提高效率。

二、场景

  • 1、后台任务,例如:定时向大量(100w以上)的用户发送邮件;
  • 2、异步处理,例如:发微博、记录日志等;
  • 3、分布式计算
  • 4、 数据库用到的多线程
  • 5、 tomcat,tomcat内部采用多线程,上百个客户端访问同一个WEB应用,tomcat接入后就是把后续的处理扔给一个新的线程来处理,这个新的线程最后调用我们的servlet程序,比如doGet或者dpPost方法
  • 6、 后台任务:如定时向大量(100W以上)的用户发送邮件;定期更新配置文件、任务调度(如quartz),一些监控用于定期信息采集
  • 7、 自动作业处理:比如定期备份日志、定期备份数据库
  • 8、 页面异步处理:比如大批量数据的核对工作(有10万个手机号码,核对哪些是已有用户)
  • 9、 数据库的数据分析(待分析的数据太多),数据迁移
  • 10、 多步骤的任务处理,可根据步骤特征选用不同个数和特征的线程来协作处理,多任务的分割,由一个主线程分割给多个线程完成
  • 11、常见的浏览器、Web服务(现在写的web是中间件帮你完成了线程的控制),web处理请求,各种专用服务器(如游戏服务器)
  • 12、 FTP下载,多线程操作文件

三、目的

  • 1、 吞吐量:做WEB,容器帮你做了多线程,但是它只能帮你做请求层面的,简单的说,就是一个请求一个线程(如struts2,是多线程的,每个客户端请求创建一个实例,保证线程安全),或多个请求一个线程,如果是单线程,那只能是处理一个用户的请求

  • 2、 伸缩性:通过增加CPU核数来提升性能。

四、线程

4.1、方法中使用多线程

 public static void main(String[] args) {
        System.out.println("------》接口请求:开始");
        System.out.println("执行主要逻辑");
        for (int i = 0; i < 5; i++) {
            String ret = execLogic();
            System.out.println("执行结果:" + ret);
        }
        System.out.println("------》接口请求:结束");
    }


    public static String execLogic() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("执行逻辑、发送短信----》1");
                    // 模拟逻辑执行了5秒
                    Thread.sleep(5000);
                    System.out.println("执行逻辑----》2");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        return "我是线程方法返回的值";
    }

解释:上面逻辑有处理主要的逻辑,另外还有发送短信逻辑,因为多线程是异步,多以先返回了值,然后执行里面逻辑,所以日志为

Connected to the target VM, address: '127.0.0.1:12141', transport: 'socket'
------》接口请求:开始
执行主要逻辑
执行结果:我是线程方法返回的值
执行结果:我是线程方法返回的值
执行逻辑----》1
执行逻辑----》1
执行结果:我是线程方法返回的值
执行逻辑----》1
执行结果:我是线程方法返回的值
执行结果:我是线程方法返回的值
------》接口请求:结束
执行逻辑----》1
执行逻辑----》1
执行逻辑----》2
执行逻辑----》2
执行逻辑----》2
执行逻辑----》2
执行逻辑----》2
Disconnected from the target VM, address: '127.0.0.1:12141', transport: 'socket'

4.2、方法中使用多线程池

public void threadMethod(List<NetPublishData> list) {
        // 拆分多个list
        List<List<NetPublishData>> lists = ListUtils.partition(list, 100);

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50,
                4, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy());
        // 大集合拆分成N个小集合, 这里集合的size可以稍微小一些(这里我用100刚刚好), 以保证多线程异步执行, 过大容易回到单线程
        // 记录单个任务的执行次数
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        // 对拆分的集合进行批量处理, 先拆分的集合, 再多线程执行
        for (List<NetPublishData> singleList : lists) {
            // 线程池执行
            threadPool.execute(new Thread(new Runnable(){
                @Override
                public void run() {
                    netPublishDataMapper.insertList(singleList);
                }
            }));
            // 任务个数 - 1, 直至为0时唤醒await()
            countDownLatch.countDown();
        }
        try {
            // 让当前线程处于阻塞状态,直到锁存器计数为零
            countDownLatch.await();
        } catch (InterruptedException e) {
        }
    }

五、线程池

5.1、配置

@Configuration
@EnableAsync
public class TaskExecutePool {

    private static final Logger logger = LoggerFactory.getLogger(TaskExecutePool.class);

    @Bean(name = "ThreadPoolA")
    public ThreadPoolTaskExecutor threadPoolTaskExecutormyTaskAsyncPool() {
        logger.info("start TaskExecutePool......");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(4);
        // 最大线程数
        executor.setMaxPoolSize(8);
        // 队列大小
        executor.setQueueCapacity(100);
        // 空闲最大存活时间
        executor.setKeepAliveSeconds(60);
        // 线程名前缀
        executor.setThreadNamePrefix("Pool-A");
        // 拒绝策略:策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 加载
        executor.initialize();
        return executor;
    }

    @Bean(name = "ThreadPoolB")
    public ThreadPoolTaskExecutor threadPoolTaskExecutorAsyncPoolB() {
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(4);
        executor.setQueueCapacity(8);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("Pool-B");
        //当任务数量超过MaxPoolSize和QueueCapacity时使用的策略,该策略是又调用任务的线程执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

5.2、获取线程运行情况

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(TaskExecutePool.class);

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if(null == threadPoolExecutor) {
            return;
        }
        logger.info("{},{},任务数量:[{}], 已完成:[{}], 执行中[{}], 队列大小[{}]",this.getThreadPoolExecutor()
                , prefix
                , threadPoolExecutor.getTaskCount()
                , threadPoolExecutor.getCompletedTaskCount()
                , threadPoolExecutor.getActiveCount()
                , threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1.do execute");
        super.execute(task);
    }
    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2.do execute");
        super.execute(task, startTimeout);
    }
    @Override
    public Future<?> submit(Runnable callable) {
        showThreadPoolInfo("1.do execute");
        return super.submit(callable);
    }
    @Override
    public <T> Future<T> submit(Callable<T> callable) {
        showThreadPoolInfo("2.do execute");
        return super.submit(callable);
    }
    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1.do execute");
        return super.submitListenable(task);
    }
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2.do execute");
        return super.submitListenable(task);
    }
}

5.3、线程使用

@Async(value = "ThreadPoolA")
@Override
public void taskA(int num) {
    for (int i = 0; i < 5; i++) {
        logger.info("我是任务:{}, 逻辑:{}", num, i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}