SpringBoot线程池和Java线程池怎么使用

寻技术 JAVA编程 2023年07月12日 89

这篇文章主要介绍“SpringBoot线程池和Java线程池怎么使用”,在日常操作中,相信很多人在SpringBoot线程池和Java线程池怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot线程池和Java线程池怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

    SpringBoot线程池和Java线程池的用法和实现原理

    使用默认的线程池

    方式一:通过@Async注解调用

    public class AsyncTest {
        @Async
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }
    }

    启动类上需要添加

    @EnableAsync
    注解,否则不会生效。
    @SpringBootApplication
    //@EnableAsync
    public class Test1Application {
       public static void main(String[] args) throws InterruptedException {
          ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
          AsyncTest bean = run.getBean(AsyncTest.class);
          for(int index = 0; index <= 10; ++index){
             bean.async(String.valueOf(index));
          }
       }
    }

    方式二:直接注入 ThreadPoolTaskExecutor

    此时可不加

    @EnableAsync
    注解
    @SpringBootTest
    class Test1ApplicationTests {
    
       @Resource
       ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
       @Test
       void contextLoads() {
          Runnable runnable = () -> {
             System.out.println(Thread.currentThread().getName());
          };
    
          for(int index = 0; index <= 10; ++index){
             threadPoolTaskExecutor.submit(runnable);
          }
       }
    
    }

    线程池默认配置信息

    SpringBoot线程池的常见配置:

    spring:
      task:
        execution:
          pool:
            core-size: 8
            max-size: 16                          # 默认是 Integer.MAX_VALUE
            keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
            allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
            queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
          shutdown:
            await-termination: false              # 线程关闭等待
          thread-name-prefix: task-               # 线程名称的前缀

    SpringBoot 线程池的实现原理

    TaskExecutionAutoConfiguration
    类中定义了
    ThreadPoolTaskExecutor
    ,该类的内部实现也是基于java原生的
    ThreadPoolExecutor
    类。
    initializeExecutor()
    方法在其父类中被调用,但是在父类中
    RejectedExecutionHandler
    被定义为了
    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
    ,并通过
    initialize()
    方法将
    AbortPolicy
    传入
    initializeExecutor()
    中。

    注意在

    TaskExecutionAutoConfiguration
    类中,
    ThreadPoolTaskExecutor
    类的bean的名称为:
    applicationTaskExecutor
    taskExecutor
    // TaskExecutionAutoConfiguration#applicationTaskExecutor()
    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
          AsyncAnnotationBeanPostProcessor.DEFAUL
              T_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
       return builder.build();
    }
    // ThreadPoolTaskExecutor#initializeExecutor()
    @Override
    protected ExecutorService initializeExecutor(
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    
       BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
    
       ThreadPoolExecutor executor;
       if (this.taskDecorator != null) {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
             @Override
             public void execute(Runnable command) {
                Runnable decorated = taskDecorator.decorate(command);
                if (decorated != command) {
                   decoratedTaskMap.put(decorated, command);
                }
                super.execute(decorated);
             }
          };
       }
       else {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);
    
       }
    
       if (this.allowCoreThreadTimeOut) {
          executor.allowCoreThreadTimeOut(true);
       }
    
       this.threadPoolExecutor = executor;
       return executor;
    }
    // ExecutorConfigurationSupport#initialize()
    public void initialize() {
       if (logger.isInfoEnabled()) {
          logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
       }
       if (!this.threadNamePrefixSet && this.beanName != null) {
          setThreadNamePrefix(this.beanName + "-");
       }
       this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

    覆盖默认的线程池

    覆盖默认的

    taskExecutor
    对象,bean的返回类型可以是
    ThreadPoolTaskExecutor
    也可以是
    Executor
    @Configuration
    public class ThreadPoolConfiguration {
    
        @Bean("taskExecutor")
        public ThreadPoolTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //设置线程池参数信息
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setMaxPoolSize(50);
            taskExecutor.setQueueCapacity(200);
            taskExecutor.setKeepAliveSeconds(60);
            taskExecutor.setThreadNamePrefix("myExecutor--");
            taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            taskExecutor.setAwaitTerminationSeconds(60);
            //修改拒绝策略为使用当前线程执行
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //初始化线程池
            taskExecutor.initialize();
            return taskExecutor;
        }
    }

    管理多个线程池

    如果出现了多个线程池,例如再定义一个线程池

    taskExecutor2
    ,则直接执行会报错。此时需要指定bean的名称即可。
    @Bean("taskExecutor2")
    public ThreadPoolTaskExecutor taskExecutor2() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

    @Resource
    ThreadPoolTaskExecutor taskExecutor2;

    对于使用

    @Async
    注解的多线程则在注解中指定bean的名字即可。
    @Async("taskExecutor2")
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }

    线程池的四种拒绝策略

    JAVA常用的四种线程池

    ThreadPoolExecutor
    类的构造函数如下:
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    newCachedThreadPool

    不限制最大线程数(

    maximumPoolSize=Integer.MAX_VALUE
    ),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
    new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

    newFixedThreadPool

    定长线程池,超出线程数的任务会在队列中等待。

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

    newScheduledThreadPool

    类似于

    newCachedThreadPool
    ,线程数无上限,但是可以指定
    corePoolSize
    。可实现延迟执行、周期执行。
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    周期执行:

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.scheduleAtFixedRate(()->{
       System.out.println("rate");
    }, 1, 1, TimeUnit.SECONDS);

    延时执行:

    scheduledThreadPool.schedule(()->{
       System.out.println("delay 3 seconds");
    }, 3, TimeUnit.SECONDS);

    newSingleThreadExecutor

    单线程线程池,可以实现线程的顺序执行。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    Java 线程池中的四种拒绝策略

    • CallerRunsPolicy
      :线程池让调用者去执行。
    • AbortPolicy
      :如果线程池拒绝了任务,直接报错。
    • DiscardPolicy
      :如果线程池拒绝了任务,直接丢弃。
    • DiscardOldestPolicy
      :如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

    CallerRunsPolicy

    直接在主线程中执行了run方法。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
        public CallerRunsPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    效果类似于:

    Runnable thread = ()->{
       System.out.println(Thread.currentThread().getName());
       try {
          Thread.sleep(0);
       } catch (InterruptedException e) {
          throw new RuntimeException(e);
       }
    };
    
    thread.run();

    AbortPolicy

    直接抛出

    RejectedExecutionException
    异常,并指示任务的信息,线程池的信息。、
    public static class AbortPolicy implements RejectedExecutionHandler {
     
        public AbortPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    DiscardPolicy

    什么也不做。

    public static class DiscardPolicy implements RejectedExecutionHandler {
     
        public DiscardPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    DiscardOldestPolicy

    • e.getQueue().poll()
      : 取出队列最旧的任务。
    • e.execute(r)
      : 当前任务入队。
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     
        public DiscardOldestPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

    Java 线程复用的原理

    java
    的线程池中保存的是
    java.util.concurrent.ThreadPoolExecutor.Worker
    对象,该对象在 被维护在
    private final HashSet<Worker> workers = new HashSet<Worker>();
    workQueue
    是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到
    workQueue
    队列中。
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    work对象的执行依赖于

    runWorker()
    ,与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    关闭

    用微信“扫一扫”