线程池的核心参数

代码

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数介绍

  1. corePoolSize 核心线程数目
  2. maximumPoolSize 最大线程数目 = (核心线程+救急线程的最大数目)
  3. keepAliveTime 生存时间(救急线程的生存时间,生存时间内没有新任务,此线程资源会释放)
  4. unit 时间单位(救急线程生存时间单位,如秒,毫秒等)
  5. workQueue 阻塞队列(当没有空闲的核心线程时,新来的任务会加入到这个队列中进行排队,队列满就会创建救急线程执行任务)
  6. threadFactory 线程工厂(可以定制线程对象的创建,例如:设置线程的名字,是否为守护线程)
  7. handler 拒绝策略(当所有的线程都在繁忙,workQueue也满了,会触发拒绝策略)

线程池的执行原理

流程图

image.png

流程图说明

  1. 首先提交任务后,判断核心线程数是否已经满了,如果没有满,添加到工作线程进行执行。
  2. 如果核心线程数满了,判断阻塞队列是否满了,如果阻塞队列还有空间,添加到队列进行排队等待。
  3. 如果阻塞队列满了, 再去判断线程数是否小于最大的线程数,如果小于最大线程数,会创建救急线程(非核心线程)进行执行。
  4. 如果核心线程或救急线程执行完任务后会检查阻塞队列中是否有需要执行的任务,如果有,则使用非核心线程进行执行。
  5. 如果线程不小于最大线程数了,会触发拒绝策略。

拒绝策略处理

1. AbortPolicy:直接抛出异常,默认策略。
2. CallerRunsPolicy:用调用者所在的线程来执行任务。
3. DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并执行当前任务。
4. DiscardPolicy:直接丢弃任务。

代码演示

package com.itheima.threadpool;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class TestThreadPoolExecutor {

    static class MyTask implements Runnable {
        private final String name;
        private final long duration;

        public MyTask(String name) {
            this(name, 0);
        }

        public MyTask(String name, long duration) {
            this.name = name;
            this.duration = duration;
        }

        @Override
        public void run() {
            try {
                LoggerUtils.get("myThread").debug("running..." + this);
                Thread.sleep(duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "MyTask(" + name + ")";
        }
    }

    public static void main(String[] args) throws InterruptedException {
        AtomicInteger c = new AtomicInteger(1);
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);

        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                3,
                0,
                TimeUnit.MILLISECONDS,
                queue,
                r -> new Thread(r, "myThread" + c.getAndIncrement()),
                new ThreadPoolExecutor.AbortPolicy());
        showState(queue, threadPool);
        threadPool.submit(new MyTask("1", 3600000));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("2", 3600000));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("3"));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("4"));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("5",3600000));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("6"));
        showState(queue, threadPool);
    }

    private static void showState(ArrayBlockingQueue queue, ThreadPoolExecutor threadPool) {
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        List tasks = new ArrayList();
        for (Runnable runnable : queue) {
            try {
                Field callable = FutureTask.class.getDeclaredField("callable");
                callable.setAccessible(true);
                Object adapter = callable.get(runnable);
                Class clazz = Class.forName("java.util.concurrent.Executors$RunnableAdapter");
                Field task = clazz.getDeclaredField("task");
                task.setAccessible(true);
                Object o = task.get(adapter);
                tasks.add(o);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        LoggerUtils.main.debug("pool size: {}, queue: {}", threadPool.getPoolSize(), tasks);
    }


}

线程池中常见的阻塞队列

workQueue (阻塞队列介绍)

1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
2. LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。

3. DelayedWorkQueue :是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的
4. SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。

ArrayBlockingQueue的LinkedBlockingQueue区别

开发中常用的是这两种
image.png

如何确定核心线程数

  • IO密集型任务(核心线程数大小设置为2N+1,N为CPU核数

    • 文件读写、DB读写、网络请求等
  • CPU密集型任务(核心线程数大小设置为N+1,N为CPU核数

    • 计算型代码、Bitmap转换、Gson转换等

目录

Total Likes
1
Total Comments
0