终止线程池执行 - 实战

  • 时间:
  • 浏览:
  • 来源:互联网

终止线程池执行

    • 一、区别线程池的关闭与终止
      • 1. 线程池关闭
      • 2. 线程池终止
    • 二、终止线程池执行需求背景
    • 三、自研线程池终止方法
      • 1. 终止代码
      • 2. 注意事项
      • 3. 思路分析(分布式服务情况)
      • 4. 实现思路(分布式服务情况)
      • 5. 代码实现
        • a. 定义 [线程终止标识] 变量
        • b.定义guava对象
        • c.判断 [线程终止标识]
        • d.终止线程池执行
    • 四、最后说明
    • 五、相关博客

一、区别线程池的关闭与终止

1. 线程池关闭

线程池中多线程运行结束后可以使用 shutdown() 或 shutdownNow() 将其正确关闭,关闭时容易造成线程阻塞,需注意正确使用(可参考文末相关博客)。

2. 线程池终止

结束线程池此次执行,不影响后续任务添加和执行。


二、终止线程池执行需求背景

由于业务需要,每晚有跑批任务,任务执行的数据量由当天业务操作决定,而且对数据库压力较大,为防止执行时间过长,影响第二天系统使用,故在跑批过程中需要终止其执行,经分析,发现耗时较长的地方是线程池执行过程,所以终止线程池执行成为关键,而且终止后要不影响下一次跑批任务。


三、自研线程池终止方法

1. 终止代码

threadPoolExecutor.getQueue().clear();

2. 注意事项

服务分布式部署时,需要考虑做共享变量或通知机制,以保证跑批任务准确终止;

3. 思路分析(分布式服务情况)

可以使用redis广播或消息中间件做消息通知,但遇到过redis广播不稳定的问题,消息中间件又需要建一个topic感觉很浪费,所以最终采用redis变量加guava本地变量实现功能。

4. 实现思路(分布式服务情况)

a.定义 [线程终止标识] 变量,存储在redis中来实现服务间共享;

b.多线程执行时,会频繁读取 [线程终止标识] 变量,如果直接从redis读取,会给其造成不必要的压力,
故从redis中读取的变量会缓存到guava本地变量中,guava本地变量可设置为定时失效;

c.查询 [线程终止标识] 时,会先从guava本地变量中读取,若变量不存在,则再从redis中查询;

d.线程池执行过程中,会判断 [线程终止标识],当变量值为true时,则清空线程池的队列数据,
以达到终止线程执行的目的;

e.当线程池配置的等待队列容量小于要执行的数据量,并采用 CallerRunsPolicy 时,
要注意清空线程池队列的时机。

5. 代码实现

a. 定义 [线程终止标识] 变量

/**
 * 线程终止标识名称
 */
private static final String INTERRUPT_FLAG = "interrupted";

/**
 * redis template
 */
private RedisTemplate<String, Boolean> redisTemplate;

/**
 * 注入redis对象
 */
public TestServiceImpl(RedisTemplate redisTemplate) {
    super();
    this.redisTemplate = redisTemplate;
}

b.定义guava对象

/**
 * 定义guava对象用于存储线程终止标识
 * 5秒刷新
 * 可按照能忍受的最大中断间隙时间设置
 */
private static volatile Cache<String, Boolean> guavaCache = 
                        CacheBuilder.newBuilder()
                        .maximumSize(5)
                        .expireAfterWrite(5, TimeUnit.SECONDS)
                        .build();

c.判断 [线程终止标识]

/**
 * 判断线程终止标识
 * 先从本地变量获取标识再从redis中获取标识,以节省redis资源
 * 本地变量定期失效
 */
private boolean isInterrupt() {
    // 先取本地变量
    Boolean interruptFlag = guavaCache
                            .getIfPresent(INTERRUPT_FLAG);
    if (Objects.nonNull(interruptFlag)) {
        return interruptFlag;
    }

    // 若本地变量为空,则从redis中获取并设置值
    log.info("从redis中获取缓存标识");
    Boolean interrupted = redisTemplate.opsForValue()
                                        .get(INTERRUPT_FLAG);
    boolean flag = BooleanUtils.isTrue(interrupted);
    guavaCache.asMap().put(INTERRUPT_FLAG, flag);
    return flag;
}

/**
 * 设置中断标识
 */
private void setInterruptFlag(boolean interrupted) {
    try {
        redisTemplate.opsForValue()
                        .set(INTERRUPT_FLAG, interrupted);
    } catch (Exception e) {
        log.error("设置中断标识时异常,原因:{}", e.getMessage());
    }
}

d.终止线程池执行

@Description("声明线程池对象")
@Bean(name = "threadPoolExecutor")
public ThreadPoolExecutor threadPoolExecutor() {
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("threadPoolExecutor-%d").build();

    return new ThreadPoolExecutor(15, 90, 30,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(3000000),
        threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}

@Autowired
@Qualifier("deliveryGeoBindExecutor")
private ThreadPoolExecutor threadPoolExecutor;

/**
 * 清空线程池任务
 */
private void clearThreadPool() {
    log.info("线程池清空时还剩余任务量:{}", 
                threadPoolExecutor.getQueue().size());

    threadPoolExecutor.getQueue().clear();
}

/**
 * 线程池执行
 */
private void excute(List paramList) {
    if (CollectionUtils.isEmpty(paramList)) {
        return;
    }

    // 定义多线程执行器
    ExecutorCompletionService<Void> ecs = 
            new ExecutorCompletionService(threadPoolExecutor);
    
    // 提交任务
    for (Param vo : paramList) {
        
        /*
         * 当线程池配置的等待队列容量小于要执行的数据量,
         * 并采用 CallerRunsPolicy 时,
         * 需要在此处也判断终止标识,
         * 否则当前线程会在此直接执行任务,
         * 导致终止线程失败
         */
        /*if (this.isInterrupt()) {
            this.clearThreadPool();
            log.info("多线程查询被终止...");
            break;
        }*/

        ecs.submit(() -> {
            // TODO 具体任务
            return null;
        });
    }
    
    // 获取执行结果
    for (Param vo : paramList) {
        
        // 终止线程
        if (this.isInterrupt()) {
            this.clearThreadPool();
            log.info("多线程查询被终止...");
            break;
        }
    
        try {
            ecs.take().get();
        } catch (Exception e) {
            log.error("线程池执行异常,原因:{}", e.getMessage());
        }
    }
}

四、最后说明

在分布式部署的环境中,经过多次测试,均能成功终止线程池中多线程的执行,并且不会影响后续的跑批任务。

总结至此,欢迎交流~


五、相关博客

  1. 线程池 多线程运行结束后 如何关闭? ExecutorService的正确关闭方法

  2. 通过类图简单介绍ThreadPoolExecutor

  3. Google Guava Cache 全解析

  4. Guava Cache官方文档

本文链接http://www.dzjqx.cn/news/show-617027.html