黑龙江电视台直播:java线程池原理剖析

admin 4周前 (08-31) 科技 50 2

五一假期大雄看了一本《jAVa并发编程艺术》,了解了线程池的基本事情流程,竟然发现线程池事情原理和互联网公司运作模式十分相似。

线程池处置流程

原理剖析

互联网公司与线程池的关系

这里用一个比喻来形貌一下线程池,中心有一些名词你可能不是太清晰,后边源码剖析的部门会讲到。

你可以把线程池看作是一个研发部门,研发部门有许多程序员(Worker), 他们在一个大办公室里(HashSet workers)。程序员干不完的需求(RunnABLe/Callable)放在需求池(workQueue)里排队。每个研发部都设置有主干程序员数目(corePoolSize)最大能容纳的程序员数目(maximumPoolSize)。详细要做的义务就是产物的需求

new 一个线程池相当于建立了一个研发部,建立研发部时需要指定主干程序员数目,最大能容纳的程序员数目,需求池用哪种(BlockingQueue),若是忙不过来的需求怎么给产物回复(拒绝计谋)等等内容。刚开始这个研发部一个程序员也没有。

当产物给这个研发部提一个需求时(固然一定不会只提一个,他们会一直的提需求。这里以提一个需求为例)

首先会看主干程序员招聘满了没。

若是没满,会招聘一个主干程序员,招聘进来就让他一直的事情(很残酷啊),干完刚派过来的义务他会自动在需求池找下一个需求来做(好员工),若是需求池没有需求了,他就停止事情了,然后研发部会把他裁掉,若是裁掉后发现主干程序员数目不够了,就会再招聘一个程序员。裁掉后,要是主干程序员数目还够就不招聘了。

若是主干程序员数目满了,就看需求池满没满,若是需求池没满,就把需求扔进需求池里;若是需求池满了,就看程序员数目有没有到达上限,若是到达了,就对产物说,这个需求我们做不了,没资源;若是没到达,就招聘一个程序员,招聘进来就让他一直的事情,干完刚派过来的需求他会自动到需求池找下一个义务来做,若是需求池没有义务了,他就停止事情了,然后研发部会把他裁掉,若是裁掉后发现主干程序员数目不够了,就会再招聘一个程序员。裁掉后,要是主干程序员数目还够就不招聘了。

源码剖析

首先是worker(程序员)

Worker被装在一个HashSet(workers)里边, 他是用来执行义务的,他们的职责就是一直的从workQueue里边取义务,然后执行。当workQueue(需求池)里边拿不到义务,或者线程池到达特定状态,worker就会从workers里边移走(被裁)。

下边是Worker源码,移除了非要害的器械

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    // 标识这个义务是在哪个线程运行
    final Thread thread;
    Runnable firstTask;
    // 完成了几个义务
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // 阻止中止,知道runWorker执行
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 直接用你提供的线程工厂搞个线程出来
        this.thread = getThreadFactory().newThread(this);
    }

    // 挪用ThreadPoolExecutor里边的runWorker方式
    public void run() {
        runWorker(this);
    }

    // 以下这些是AQS相关的器械

    // 0代表没有加锁
    // 1代表加锁了
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException IGnore) {
            }
        }
    }
}

Worker实现了Runnable接口,以是他是个义务,有run方式;同时有继续了AQS,以是他也是一把锁。

下边是提交义务的历程

提交义务有subMit和execute, submit就是首先将Callable或者Runnable包装成FutureTask,然后挪用execute, 以是焦点是剖析execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 这个c里边有两个信息,一个是现在有若干worker, 另一个是现在线程池的状态是啥
    // workerCountOf方式就是从里边提取 worker的数目的
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { // 当前worker的数目比需要的焦点线程数少
        // 加worker去执行,加乐成就完事了,也就是说只要worker比焦点线程数少,就会建立worker
        // 不管现在焦点线程是否在事情,也不管workQueue是不是满的
        // addWorker的第二个参数示意是不是要加焦点线程(或者叫焦点worker)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 当前worker到达或跨越了焦点线程数,或者加worker失败了,才会走下边的流程
    // worker已经比焦点线程数多了

    // 若是 线程池没有shutdown的话 
    // 就实验将义务加到workQueue里边,事情行列入队乐成的话再往里边走
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            // 再次检查状态若是线程池要停了,那么就拒绝义务,而且把worker从事情行列扔掉
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 若是没有worker的话(说明没加进去,这种场景我没想到是什么情形),加一个worker
            addWorker(null, false);
        // 其他情形,丢到事情行列就不用管了,等着worker去处置
    }
    // 若是行列满了加失败了,或者线程池状态不知足了,就实验加通俗worker(非焦点线程)
    else if (!addWorker(command, false))
        // 加失败了就拒绝义务
        // 失败一方面可能是worker数目已经到达你的给的maximumPoolSize
        // 另一方面,可能是检查到线程池的状态不对了
        reject(command);
}

可以发现execute方式就是完成了上边说的“线程池处置流程”这个图里形貌的历程。 大雄看到这里另有几个疑问,一个是Woker是若何建立并加入workers的,一个是worker是若何启动的,再就是worker是若何运行的

生涯还要继续

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 做一些校验,线程池的状态要知足一定条件
        // 而且得提交义务过来,再就是workQueue不能是空的
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 看你是要建立焦点worker照样通俗worker
            // 焦点看超没跨越corePoolSize, 通俗看超没跨越maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
            // 增添worker数目失败就在来
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                // 中途线程池状态发生变化了
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // worker就是这么建立的
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加worker是要加全局锁的
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // worker是在这里启动的
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这段代码解决了 Woker是若何建立并加入workers的以及worker是若何启动的的问题

addWorker做的焦点事情就是,建立worker, 启动worker, 在建立之前还会做一些校验。挪用了worker里边线程的start后就要守候cpu调剂执行worker的run方式了。

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task是建立worker带进去的义务,会先执行他,然后从workQueue里边取
        // 若是没有的话跳出去
        while (task != null || (task = getTask()) != null) {
            w.lock();   // 首先加锁,若是不加锁,可能几个线程提交的义务同时进来了,会导致一些共享状态出问题

            // 做一些状态的校验
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行义务前挪用一下beforeExecute, 默认是空的
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 这个跟我们平时明白的Runnable还不一样,可以体会下,他这个run就是一个通俗的方式
                    // 他直接调run是要执行义务,线程的start只是把worker里边的谁人run跑起来了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行完了调一下,里边可以拿到异常
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 从while跳出来解释没有义务可以执行了
        processWorkerExit(w, completedAbruptly);
    }
}

这个也对照容易,就是一直的从workQueue取义务,执行,直到没义务了跳出来。接下来就是worker若何被销毁的问题了

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 移除掉worker(裁员)
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 比焦点线程数多的话,执行完的Worker直接移除就好
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 小于焦点线程数就会再加个Worker, 让他继续守候吸收义务(招人)
        addWorker(null, false);
    }
}

直接从workers里边移除worker, 移除后若是worker数目比焦点线程数还少,就再加个worker, 否则不加。

一些体会

看源码一定不要太过纠结细节,就像这个线程池,我看网上许多文章去算那几个位运算的十进制数,感受是在浪费时间,没有捉住重点。

固然这也不是绝对的(似乎说的矛盾了),一些细节的设计照样异常精妙值得学习的。照样这个位运算,为什么只用一个int示意线程池状态和worker的数目呢。

要多多遐想,照样这个位运算,他是不是和读写锁用一个int既示意写状态又示意读状态十分相似。Worker继续AQS,是否能让你想起AQS的种种。

总之,小我私家以为第一遍看是一定不能沉溺于细节的,他会让你迷惘和损失信心;第二遍、第三遍可以关注一下细节,感受大师级的设计的美妙之处。固然笔者仅仅大略看了一遍(逃~)

最后

大雄五一假期阅读了《java并发编程艺术》这本书,整理了一本gitbook条记(还没写完),需要的同砚可以扫描文末二维码关注“大雄和你一起学编程”民众号,后台回复我爱java领取。这本gitbook还没彻底完成,以是可能另有些小错误。未来会约莫每两天推送其中的一篇文章。

如下是这本gitbook的目录截图

,

suNBet 申博

申博sunbet是Sunbet www.sunbet.xyz指定的Sunbet官网,Sunbet提供Sunbet(Sunbet)、Sunbet、申博代理合作等业务。

Allbet声明:该文看法仅代表作者自己,与本平台无关。转载请注明:黑龙江电视台直播:java线程池原理剖析

网友评论

  • (*)

最新评论

  • 联博API接口 2020-08-31 00:00:27 回复

    欧博亚洲官方注册欢迎进入欧博亚洲官方注册(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。还好啦。

    1
  • 联博API接口 2020-08-31 00:01:10 回复

    欧博亚洲官方注册欢迎进入欧博亚洲官方注册(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。还好啦。

    2

文章归档

站点信息

  • 文章总数:458
  • 页面总数:0
  • 分类总数:8
  • 标签总数:880
  • 评论总数:124
  • 浏览总数:2789