纵有疾风起
人生不言弃

重写线程池 execute 方法导致线程池“失效” 问题

一、背景

今天群里有个同学遇到一个看似很奇怪的问题,自定义 ThreadPoolTaskExecutor 子类,重写了 execute 方法,通过 execute 方法来执行任务时打印当前线程,日志显示任务一直在调用者线程里执行 (其实并不是),似乎线程池失效了。

二、场景复现

自定义 ThreadPoolTaskExecutor 子类

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor { 

    @Override
    public void execute(Runnable command) { 
        System.out.println("当前线程" + Thread.currentThread().getName());

        super.execute(command);
    }

}

编写测试代码:

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo { 

    public static void main(String[] args) throws InterruptedException { 
        // 构造线程池
        ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl();
        executor.initialize();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("[线程池的线程]");

        // 执行任务
         for (int i = 0; i < 5; i++) { 
            executor.execute(() -> { 
                System.out.println("测试");
            });
        }


        TimeUnit.SECONDS.sleep(3);

        executor.shutdown();
    }
}

执行结果:

当前线程main
当前线程main
测试
测试
当前线程main
测试
当前线程main
当前线程main
测试
测试

由此断定:自定义的线程池失效,在 execute 方法中获取当前线程时,并没有出现我们定义的线程名称前缀的线程,仍然使用 main 线程来执行任务。
但是,真的是这样吗?

三、分析

由于很多同学没有认真思考过多线程的本质,会想当然地认为线程池的 execute 方法的所有代码都是在线程池创建的线程中执行,可是真的是这样吗?

我们知道在没有使用新线程的情况下,程序会使用当前线程(main 线程)顺序执行。
重写线程池 execute 方法导致线程池“失效” 问题插图

因此,在 org.example.thread.ThreadPoolTaskExecutorImpl#execute 中,打印的 “当前线程” 的代码仍然是在 main 方法中执行的。
重写线程池 execute 方法导致线程池“失效” 问题插图1

进入 super.execute 方法

	@Override
	public void execute(Runnable task) { 
		Executor executor = getThreadPoolExecutor();
		try { 
			executor.execute(task);
		}
		catch (RejectedExecutionException ex) { 
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}

这里的 executor.execute 方法实际上是 java.util.concurrent.ThreadPoolExecutor#execute

 /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
    public void execute(Runnable command) { 
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

小于核心线程数,会执行到 `addWorker ,此时才真正创建新的线程去执行任务:

    /** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */
    private boolean addWorker(Runnable firstTask, boolean core) { 
        
        // 省略部分代码

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try { 
            //【关键1】 创建新的线程(run 方法)
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) { 
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try { 
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { 
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally { 
                    mainLock.unlock();
                }
                if (workerAdded) { 
                    //【关键2】启动创建的线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally { 
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我们再看下 new Worker 时做了什么。

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    { 
     

        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        
        /** Per-thread task counter */
        volatile long completedTasks;

        /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
        Worker(Runnable firstTask) { 
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //【关键】 使用线程工厂创建线程
            this.thread = getThreadFactory().newThread(this);
        }
        
        //【关键】 重写 Runnable 的 run 方法,里面就封装了外面传入的 Runnable
        /** Delegates main run loop to outer runWorker */
        public void run() { 
            runWorker(this);
        }
        
        
       final void runWorker(Worker w) { 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try { 
            while (task != null || (task = getTask()) != null) { 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try { 
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try { 
                        task.run();
                    } catch (RuntimeException x) { 
                        thrown = x; throw x;
                    } catch (Error x) { 
                        thrown = x; throw x;
                    } catch (Throwable x) { 
                        thrown = x; throw new Error(x);
                    } finally { 
                        afterExecute(task, thrown);
                    }
                } finally { 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally { 
            processWorkerExit(w, completedAbruptly);
        }
    }

        

// 省略其他

}

因此
(1)自定义的 ThreadPoolTaskExecutorImpl 类里重写的 execute 方法里打印的当前线程实际还是调用者线程。
(2)外面传入的 Runnable 参数最终会在 Worker 现成的 run 方法中执行到。

四、解决之道

我们可以使用包装器模式处理下即可:

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor { 

    @Override
    public void execute(Runnable command) { 
    // 当前在父线程里

        super.execute(() -> { 
            // 这里在子线程里执行
            
            // 可以在任务前打印下当前线程名称,线程池的状态等信息
            System.out.println("当前线程" + Thread.currentThread().getName());
            
            // 原始的任务
            command.run();
        });
    }

}

测试代码:

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo { 

    public static void main(String[] args) throws InterruptedException { 
        // 构造线程池
        ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl();
        executor.initialize();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("[线程池的线程]");

        // 执行任务
        for (int i = 0; i < 5; i++) { 
            executor.execute(() -> { 
                System.out.println(Thread.currentThread().getName() + "-> 测试");
            });
        }

        TimeUnit.SECONDS.sleep(3);

        executor.shutdown();
    }
}

输出结果:

当前线程[线程池的线程]2
当前线程[线程池的线程]1
[线程池的线程]1-> 测试
[线程池的线程]2-> 测试
当前线程[线程池的线程]3
[线程池的线程]3-> 测试
当前线程[线程池的线程]4
[线程池的线程]4-> 测试
当前线程[线程池的线程]5
[线程池的线程]5-> 测试

五、启示

5.1 关于提问

该同学提问非常模糊,甚至“反复修改问题”,最终给出关键代码截图,才真正理解问题是什么。

大家请教别人时,尽量能将问题转化为别人容易理解的表达方式。

大家请教别人时,一定自己先搞清楚问题究竟是什么,而不需要别人一再追问下,才不断逼近真实的问题。

大家请教别人时,最好能够有源码或者关键信息截图等。

5.2 现象与本质

我们使用线程池时,总是观察到我们传入的 Runnable 是在线程池中的线程执行的,我们是使用 execute 方法来执行的,但这并不意味着 execute 方法的所有步骤都是在线程池中的线程里执行的。

学习某个技术时,要真正理解技术的本质,而不是表象。

如调用线程的 start 方法才真正启动线程,在重写的 execute 方法第一行压根就没有创建新的线程,怎么会在新的线程里执行呢?

在实际开发和验证问题时,多进行代码调试,掌握高级的调试技巧,如调试时表达式计算、条件断点、移除栈帧回退调用等。

创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
重写线程池 execute 方法导致线程池“失效” 问题插图2

原文链接:https://mingmingruyue.blog.csdn.net/article/details/126326107

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

未经允许不得转载:起风网 » 重写线程池 execute 方法导致线程池“失效” 问题
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录