Java 多线程

April 22, 2023 作者: dyzmj 分类: 源码 浏览: 3 评论: 0

第一篇:基础篇

一、进程与线程的基本概念

1.1 进程产生的背景

最初的计算机只能接受一些特定的指令,用户每输入一个指令,计算机就做出一个操作。当用户在思考或者输入时,计算机就在等待。这样效率非常低下,在很多时候,计算机都处在等待状态。

批处理操作系统

后来有了批处理操作系统,把一系列需要操作的指令写下来,形成一个清单,一次性交给计算机。用户需要将多个需要执行的程序写在磁带上,然后交由计算机去读取并逐个执行这些程序,并将输出结果写在另一个磁带上。

批处理操作系统在一定程度上提高了计算机的效率,但是由于批处理操作系统的指令运行方式仍然是串行的,内存中始终只有一个程序在运行,后面的程序需要等待前面的程序执行完后才能开始执行,而前面的程序有时会由于 I/O 操作、网络等原因阻塞,所以批处理操作效率也不高。

进程的提出

人们对于计算机的性能要求越来越高,现有的批处理操作系统并不能满足人们的需求,而批处理操作系统的瓶颈在于内存中只存在一个程序,那么内存中能不能存在多个程序呢?这是人们亟待解决的问题。

于是,科学家们提出来进程的概念。

进程就是应用程序在内存中分配的空间,也就是正在运行的程序,各个进程之间互不干扰。同时进程保存着程序每个时刻运行的状态。

程序:有某种变成语言(Java、Python 等)编写,能够完成一定任务或者功能的代码集合,是指令和数据的有序集合,是一段静态代码。

此时,CPU 采用时间片轮转的方式运行这进程:CPU 为每个进程分配一个时间段,称作它的时间片。如果在时间片结束时进程还在运行,这暂停这个进程的运行,并将 CPU 分配给另一个进程(这个过程叫做上下文切换)。如果进程在时间片结束前阻塞或结束,则 CPU 立即进行切换,不用等待时间片用完。

当进程暂停时,它会保存当前进程的状态(进程标识、进程使用的资源等),在下一次切换回来时根据之前保存的状态进行恢复,接着执行下去。

使用进程 + 时间片轮转方式的操作系统,在宏观上看起来同一时间执行多个任务,换句话说,进程让操作系统的并发成为了可能。虽然从宏观上看有多个任务在并发执行,但实际上,对于单核 CPU 来说,任意具体时刻都只有一个任务在占用 CPU 资源。

对操作系统的要求进一步提高

虽然进程的出现,使得操作系统的性能大大提升,但是随着时间的推移,人们并不满足一个进程在一段时间只能做一件事情,如果一个进程有多个子任务时,只能逐个的执行这些子任务,很影响效率。

比如杀毒软件在检测用户电脑时,如果在某一项检测中卡住了,那么后面的检测项也会受到影响。或者说当你使用杀毒软件中的扫描病毒功能时,在检测病毒结束前,无法使用杀毒软件中清理垃圾的功能,这显然是无法满足人们的要求。

线程的提出

那么能不能让这些子任务同时执行呢?于是人们又提出了线程的概念,让一个线程执行一个子任务,这样一个进程就包含了多个线程,每个线程负责一个单独的子任务

使用线程之后,事情就变得简单多了。当用户使用扫描病毒的功能时,就让扫描病毒这个线程去执行。同时如果用户又使用清理垃圾的功能,那么可以先暂停扫描病毒线程,先响应用户清理垃圾的操作,让清理垃圾这个线程去执行。响应完后再切换回来,接着执行扫描病毒线程。

注意:操作系统如何分配时间片给每一个线程,涉及到线程的调度策略,有兴趣的同学可以看下《操作系统》这本书,本文不做深入详解。

总之,进程和线程的提出极大的提高了操作系统的性能。进程让操作系统的并发性成为了可能,而线程让进程内部并发成为了可能。

多进程的方式也可以试下并发,为什么我们要使用多线程?

多进程方式可以实现并发,但使用多线程,有以下几个好处:

  • 进程间的通信比较复杂,而线程间的通信比较简单,通常情况下我们需要使用共享资源,这些资源在线程间的通信比较容易获取。
  • 进程是重量级的,而线程是轻量级的,故多线程方式的系统开销更小。

进程和线程的区别

进程是一个独立的运行环境,而线程是在进程中执行的一个任务。他们两个的本质区别是是否单独占用内存地址空间及其他系统资源(比如 I/O)

  • 进程单独占用一定的内存地址空间,所以进程间存在内存隔离,数据是分开的,数据共享复杂,但是同步简单,各个进程之间互不干扰;而线程共享所属进程占有的内存地址空间和资源,数据共享简单,但是同步复杂。
  • 进程单独占有一定的内存地址空间,一个进程出现问题不会影响到其他进程,不影响主程序的稳定,可靠性高;一个线程的崩溃可能影响整个程序的稳定,可靠性较低。
  • 进程单独占有一定的内存地址空间,进程的创建和销毁不仅需要保存寄存器和栈信息,还需要资源的分配回收及页调度,开销较大;线程只需要保存寄存器和栈信息,开销较小。

另外一个重要区别就是,进程是操作系统进行资源分配的基本单位,而线程是操作系统进行调度的基本单位,即 CPU 分配时间的单位。

1.2 上下文切换

上下文切换(有时也称作进程切换或任务切换)是指 CPU 从一个进程(或线程)切换到另一个进程(或线程)。上下文是指某一时间点 CPU 寄存器和程序计数器的内容

寄存器是 CPU 内部的少量的速度很快的闪存,通常存储和访问计算过程中的中间值,提高计算机程序的运行速度。

程序计数器是一个专用的寄存器,用于表明指令序列中 CPU 正在执行的位置,存的值为正在执行的指令的位置或者下一个将要被执行的指令的位置,具体实现依赖于特定的系统。

举例说明 线程 A -B

1、先挂起线程 A,将其在 CPU 中的状态保存在内存中。

2、在内存中检索下一个线程 B 的上下文并将其在 CPU 的寄存器中恢复,执行 B 线程。

3、当 B 执行完,根据程序计数器中指向的位置恢复线程 A。

CPU 通过为每个线程分配 CPU 时间片来实现多线程机制。CPU 通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务。

但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再加载这个任务的状态。所以任务从保存到再加载的过程就是一次上下文切换。

上下文切换通常是计算密集型的,意味着此操作会消耗大量的 CPU 时间,故线程也不是越多越好。如何减少系统中上下文切换次数,是提升多线程性能的一个重点课题。

二、Java 多线程入门类和接口

2.1 Thread 类和 Runnable 接口

上一章我们了解了操作系统中多线程的基本概念,那么在 Java 中,我们是如何使用多线程的呢?

首先,我们需要一个 “线程” 类。JDK 提供了 ThreadRunnable 接口来让我们实现自己的 “线程” 类。

  • 继承 Thread 类,并重写 run() 方法。
  • 实现 Runnable 接口的 run() 方法。

2.1.1 继承 Thread 类

先学会怎么用,再学原理。首先我们来看怎么用 ThreadRunnable 来写一个 Java 多线程程序。

首先是继承 Thread 类:

public class Demo {
    public static void main(String[] args) {
        Thread myThread = new MyThread();
        myThread.start();
    }

    public static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("MyThread");
        }
    }
}

注意调用 start() 方法后,该线程才算启动!

我们在程序里面调用了 start() 方法后,虚拟机会先为我们创建一个线程,然后等到这个线程第一次得到时间片是再调用 run() 方法。

注意不可多次调用 start() 方法。在第一次调用 start() 方法后,再次调用 start() 方法会抛出 IllegalThreadStateException 异常。

2.1.2 实现 Runnable 接口

接着我们来看一下 Runnable 接口(JDK 1.8+):

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

可以看到 Runnable 是一个函数式的接口,这意味着我们可以使用 Java 8 的函数式编程来简化代码。

示例代码:

public class Demo {
    public static void main(String[] args) {
        new Thread(new MyRunnable()).start();

        // Java 8 函数式编程
        new Thread(() -> {
            System.out.println("Java 8 匿名内部类");
        }).start();
    }

    public static class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.out.println("MyRunnable");
        }
    }
}

2.1.3 Thread 类构造方法

Thread 类是一个 Runnable 接口的实现类,我们来看看 Thread 类的源码。

查看 Thread 类的构造方法,发现其实是简单调用一个私有的 init() 方法来实现初始化。init() 方法的签名:

// Thread 类源码

// 片段1 - init 方法
private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals)

// 片段2 - 构造方法调用 init 方法
public Thread(Runnable target) {
    init(null, target, "Thread-" + nextThreadNum(), 0);
}

// 片段3 - 使用在 init 方法里初始化 AccessControlContext 类型的私有属性
this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();

// 片段4 - 两个用于支持 ThreadLocal 的私有属性
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

我们挨个来解释下 init 方法的这些参数:

  • g:线程组,指定这个线程是在哪个线程组下;

  • target:指定要执行的任务;

  • name:线程的名字,多个线程的名字是可以重复的。如果不知道名称,见片段2;

  • acc:见片段3,用于初始化私有变量 inheritedAccessControlContext

    这个变量有点神奇。它是一个私有变量,但是在 Thread 类里只有 init 方法对它进行初始化,在 exit 方法把它设为 null。其他没有任何地方使用它。一般我们是不会使用它的,那么什么使用会使用到这个变量呢?可以参考这个 Stack Overflow 的问题:Restrict permissions to threads which execute third party software

  • inheritThreadLocals:可继承的 ThreadLocal,见片段4,Thread 类里有两个私有属性来支持 ThreadLocal,我们会在后面的章节介绍 ThreadLocal 的概念。

实际情况下,我们大多是直接调用下面这两个构造方法:

Thread(Runnable target);
Thread(Runnable, String name);

2.1.4 Thread 类的几个常用方法

这里介绍一下 Thread 类的几个常用的方法:

  • currentThread():静态方法,返回对当前正在执行的线程对象的引用。
  • start():开始执行线程的方法,Java 虚拟机会调用线程内的 run() 方法。
  • yield():yield 在英语里有放弃的意思,同样,这里的 yield() 指的是当前线程愿意让出对当前处理器的占用。这里需要注意的是,就算当前线程调用了 yield() 方法,程序在调度的时候,也还有可能继续云溪这个线程的。
  • sleep():静态方法,使当前线程睡眠了一段时间。
  • join():是当前线程等待另一个线程执行完毕之后再继续执行,内部调用的是 Object 类的 wait() 方法实现的。

2.1.5 Thread 类与 Runnable 接口的比较

实现一个自定义的线程类,可以由继承 Thread 类或实现 Runnable 接口这两种方式,它们之间有什么优劣呢?

  • 由于 Java “单继承,多实现” 的特性,Runnable 接口使用起来比 Thread 更灵活。
  • Runnable 接口出现更符合面向对象,将线程单独进行对象的封装。
  • Runnable 接口的实现 ,降低了线程对象和线程任务的耦合性。
  • 如果使用线程时不需要使用 Thread 类的诸多方法,显然使用 Runnable 接口更为轻量。

所以,我们通常优先使用实现 Runnable 接口的方式开自定义线程类。

2.2 Callable、Future 与 FutureTask

通常来说,我们使用 ThreadRunnable 来创建一个新的线程,但是它们有一个弊端,就是 run() 方法是没有返回值的。而有时候我们希望开启一个线程去执行一个任务,并且这个任务执行完成后有一个返回值。

JDK 提供了 Callable 接口与 Future 接口为我们解决这个问题,这也就是所谓的 “异步” 模型。

2.2.1 Callable 接口

CallableRunnable 类似,同样是只有一个抽象方法的函数式接口。不同的是,Callable 提供的方法是有返回值的,而且支持泛型。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

那一般是怎么使用 Callable 的呢? Callable 一般是配合线程池工具 ExecutorService 来使用的。我会在后续的章节解释线程池的使用。这里只介绍 ExecutorService 可以使用 submit() 方法来让一个 Callable 接口执行。它会返回一个 Future,我们后续的程序可以通过这个 Futureget() 方法得到结果。

这里可以看一个简单使用的 demo:

public class DemoTask implements Callable<Integer> {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        DemoTask demo = new DemoTask();
        Future<Integer> result = executor.submit(demo);
        // 注意:调用 get 方法会阻塞当前线程,直到得到结果。
        // 所以实际编码中建议使用可以设置超时时间的重载 get 方法。
        System.out.println(result.get());
    }

    @Override
    public Integer call() throws Exception {
        // 模拟 计算需要耗时1秒
        Thread.sleep(1000);
        return 2;
    }
}

输出结果: 2

2.2.2 Future 接口

Future 接口只有几个比较简单的方法:

public abstract interface Future<V> {
    public abstract boolean cancel(boolean paramBoolean);
    public abstract boolean isCancelled();
    public abstract boolean isDone();
    public abstract V get() throws InterruptedException, ExecutionException;
    public abstract V get(long paramLong, TimeUnit paramTimeUnit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

cancel() 方法是试图取消一个线程的执行。

注意是试图取消,并不一定能取消成功。因为任务可能已完成、已取消或者一些其他因素不能取消,存在取消失败的可能。boolean 类型的返回值是 “是否取消成功” 的意思。参数 paramBoolean 表示是否采用中断的方式取消线程执行。

所以,有的时候为了让任务有能够取消的功能,就使用 Callable 来代替 Runnable。如果为了可取消性而使用 Future 但又不提供可用的返回结果,则可以声明 Future<?> 形式类型,并返回 null 作为底层任务的结果。

2.2.3 FutureTask 类

上面介绍了 Future 接口,而这个接口有一个实现类叫 FutureTaskFutureTask 是实现 RunnableFuture 接口的,而 RunnableFuture 接口同时继承了 Runnable 接口和 Future 接口:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask 类有什么用?为什么要有一个 FutureTask 类?前面说到了 Future 只是一个接口,而它里面的 cancelgetisDone 等方法要自己实现起来都是非常复杂的。所以 JDK 提供了一个 FutureTask 类来供我们使用。

示例代码:

public class DemoTask implements Callable<Integer> {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask<Integer> futureTask = new FutureTask<>(new DemoTask());
        executor.submit(futureTask);
        System.out.println(futureTask.get());
    }

    @Override
    public Integer call() throws Exception {
        // 模拟 计算需要耗时1秒
        Thread.sleep(1000);
        return 2;
    }
}

上面的例子与第一个 demo 有一点小的区别,首先 submit 方法是没有返回值的,这里实际上调用的是 submit(Runnable task) 方法,而第一个 demo调用的是 submit(Callable<T> task) 方法。

然后,这里是使用 FutureTask 直接用 get() 方法取值,而第一个 demo 是通过 submit 方法返回的 Future 去取值。

在高并发的环境下,有可能 CallableFutureTask 会创建多次,FutureTask 能够在高并发环境下确保任务只执行一次,对这块有兴趣的同学可以去看看 FutureTask 的源码。

2.2.4 FutureTask 的几个状态

/**
  *
  * state可能的状态转变路径如下:
  * NEW -> COMPLETING -> NORMAL
  * NEW -> COMPLETING -> EXCEPTIONAL
  * NEW -> CANCELLED
  * NEW -> INTERRUPTING -> INTERRUPTED
  */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

state 表示任务的运行状态,初始状态为 NEW。运行状态只会在 set、setException、cancel 方法中终止。COMPLETING、INTERRUPTING 是任务完成后的瞬时状态。

以上就是 Java 多线程几个基本的类和接口的介绍。可以打开 JDK 源码看看,体会这几个类的设计思路和用途。

三、线程组和线程优先级

3.1 线程组(ThreadGroup)

Java 中用 ThreadGroup 来表示线程组,我们可以使用线程组对线程进行批量控制。

ThreadGroupThread 的关系就如同它们的字面意思一样简单粗暴,每个 Thread 必然存在于一个 ThreadGroup 中,Thread 不能独立于 ThreadGroup 存在。执行 main() 方法线程的名字是 main,如果在 New Thread() 时没有显式指定,那么默认将父线程(当前执行 New Thread() 的线程)的线程组设置为自己的线程组。

示例代码:

public class Demo {
    public static void main(String[] args) {
        Thread testThread = new Thread(() -> {
            System.out.println("-- testThread 当前线程组的名字" + Thread.currentThread().getThreadGroup().getName());
            System.out.println("-- testThread 当前线程的名字" + Thread.currentThread().getName());

        });

        testThread.start();

        System.out.println(">> 执行main方法所在线程组的名字" + Thread.currentThread().getThreadGroup().getName());
        System.out.println(">> 执行main方法线程的名字" + Thread.currentThread().getName());
    }
}

输出结果:

image-20220121111136342

ThreadGroup 管理着它下面的 ThreadThreadGroup 是一个标准的向下引用的树状结构,这样设计的原因是防止 “上级” 线程被 “下级” 线程引用而无法有效的被 GC 回收

3.2 线程的优先级

Java 中的线程优先级是可以指定的,返回是1~10。但是并不是所有的操作系统都支持10级优先级的划分(比如有的操作系统只支持3级划分:低、中、高),Java 只是给操作系统一个优先级的参考值,线程最终在操作系统的优先级是多少还是由操作系统决定。

Java 默认的线程优先级为5,线程的执行顺序由调度程序来决定,线程的优先级会在线程被调用之前设定。

通常情况下,高优先级的线程将会比低优先级的线程有更高的几率得到执行。我们使用 Thread 类的 setPriority() 实例方法来设定线程的优先级。

public class Demo {
    public static void main(String[] args) {
        Thread a = new Thread();
        System.out.println("我是默认线程优先级:" + a.getPriority());
        Thread b = new Thread();
        b.setPriority(10);
        System.out.println("我是设置过的线程优先级:" + b.getPriority());
    }
}

输出结果:

image-20220121111859167

既然有1-10的级别来设定线程的优先级,这个时候可能会问了,是不是可以在业务实现的时候,采用这种方法来指定一线线程执行的先后顺序呢?

对于这个问题,我们的答案是:NO!

Java 中的优先级不是特别的可靠,Java 程序中对线程所设置的优先级只是给操作系统一个建议,操作系统不一定会采纳,而真正的调用顺序,是由操作系统的线程调度算法决定的。

我们来通过代码来验证一下:

public class Demo {
    public static void main(String[] args) {
        IntStream.range(1, 10).forEach(i -> {
            Thread thread = new Thread(new T1());
            thread.setPriority(i);
            thread.start();
        });
    }

    public static class T1 extends Thread {
        @Override
        public void run() {
            super.run();
            System.out.println(String.format("当前执行的线程是:%s,优先级:%d",
                    Thread.currentThread().getName(),
                    Thread.currentThread().getPriority()));
        }
    }
}

输出结果:

image-20220121113140294

Java 提供一个线程调度器来监视和控制处于RUNNABLE状态的线程。线程的调度策略采用抢占式,优先级高的线程比优先级低的线程会有更大的几率优先执行。在优先级相同的情况下,按照 “先到先得” 的原则。每个 Java 程序都有个默认的主线程,就是通过 JVM 启动的第一个线程 main 线程。

还有一种线程称为 守护线程(Daemon),守护线程的默认优先级比较低。

如果某线程是守护线程,那如果所有的非守护线程都结束了,这个守护线程也自动结束。

应用场景是:当所有非守护线程结束时,结束其余的子线程(守护线程)自动关闭,就免去了还要继续关闭子线程的麻烦。

一个线程默认是非守护线程,可以通过 Thread 类的 setDaemon(boolean on) 方法来设置。

在之前,我们有谈到一个线程必然存在于一个线程组中,那么当线程和线程组的优先级不一致的时候将会怎样呢?我们用下面的案例来验证一下:

public class Demo {
    public static void main(String[] args) {
        ThreadGroup threadGroup = new ThreadGroup("t1");
        threadGroup.setMaxPriority(6);
        Thread thread = new Thread(threadGroup, "thread");
        thread.setPriority(9);
        System.out.println("我是线程组的优先级" + threadGroup.getMaxPriority());
        System.out.println("我是线程的优先级" + thread.getPriority());
    }
}

输出结果:

image-20220121114531598

所以,如果某个线程优先级大于线程所在线程组的优先级,那么该线程的优先级将会消失,取而代之的是线程组的最大优先级。

3.3 线程组的常用方法及数据结构

3.3.1 线程组的常用方法

获取当前线程组的名字

Thread.currentThread().getThreadGroup().getName();

复制线程组

// 获取当前的线程组
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
// 复制一个线程组到一个线程数组(获取Thread信息)
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);

线程组统一异常处理

public class Demo {
    public static void main(String[] args) {

        ThreadGroup threadGroup = new ThreadGroup("group-1") {
            // 继承 ThreadGroup并重写方法
            // 在线程成员抛出 unchecked exception 时会执行此方法
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println(t.getName() + ":" + e.getMessage());
            }
        };

        // 这个线程是 threadGroup 的一员
        Thread thread1 = new Thread(threadGroup, new Runnable() {
            @Override
            public void run() {
                // 抛出 unchecked 异常
                throw new RuntimeException("测试异常");
            }
        });

        thread1.start();

    }
}

3.3.2 线程组的数据结构

线程组还可以包含其他的线程组,不仅仅是线程。

首先看看 ThreadGroup 源码中的成员变量:

public class ThreadGroup implements Thread.UncaughtExceptionHandler {
    private final ThreadGroup parent; // 父ThreadGroup
    String name; // ThreadGroupr 的名称
    int maxPriority; // 线程最大优先级
    boolean destroyed; // 是否被销毁
    boolean daemon; // 是否守护线程
    boolean vmAllowSuspension; // 是否可以中断

    int nUnstartedThreads = 0; // 还未启动的线程
    int nthreads; // ThreadGroup中线程数目
    Thread threads[]; // ThreadGroup中的线程

    int ngroups; // 线程组数目
    ThreadGroup groups[]; // 线程组数组
}

然后再看看构造函数:

// 私有构造函数
private ThreadGroup() { 
    this.name = "system";
    this.maxPriority = Thread.MAX_PRIORITY;
    this.parent = null;
}

// 默认是以当前ThreadGroup传入作为parent  ThreadGroup,新线程组的父线程组是目前正在运行线程的线程组。
public ThreadGroup(String name) {
    this(Thread.currentThread().getThreadGroup(), name);
}

// 构造函数
public ThreadGroup(ThreadGroup parent, String name) {
    this(checkParentAccess(parent), parent, name);
}

// 私有构造函数,主要的构造函数
private ThreadGroup(Void unused, ThreadGroup parent, String name) {
    this.name = name;
    this.maxPriority = parent.maxPriority;
    this.daemon = parent.daemon;
    this.vmAllowSuspension = parent.vmAllowSuspension;
    this.parent = parent;
    parent.add(this);
}

第三个构造函数里调用了 checkParentAccess 方法,这里看看这个方法的源码:

// 检查parent ThreadGroup
private static Void checkParentAccess(ThreadGroup parent) {
    parent.checkAccess();
    return null;
}

// 判断当前运行的线程是否具有修改线程组的权限
public final void checkAccess() {
    SecurityManager security = System.getSecurityManager();
    if (security != null) {
        security.checkAccess(this);
    }
}

这里涉及到 SecurityManager 这个类,它是 Java 的安全管理器,它允许应用程序在执行一个可能不安全或敏感的操作前确定该操作的是什么,以及是否是在允许执行该操作的安全上下文中执行它。应用程序可以允许或不允许该操作。

比如引入了第三方类库,但是不能保证它的安全性。

其实 Thread 类也有一个 checkAccess() 方法,不过是用来确认当前运行的线程是否有权限修改被调用的这个线程实例(Determines if the currently running thread has permission to modify this thread )。

总结来说,线程组是一个树状的结构,每个线程组下面可以由多个线程或者线程组。线程组可以起到统一控制线程的优先级和检查线程权限的作用。

四、Java线程的状态及主要转化方法

4.1 操作系统中的线程状态转换

首先我们来看看操作系统中的线程状态转换。

在现在的操作系统中,线程是被视为轻量级进程的,所以操作系统线程的状态其实和操作系统进程的状态是一致的。

image-20220121151853280

操作系统线程主要有以下三个状态:

  • 就绪状态(ready):线程在等待使用 CPU,经调度程序调用之后可进入 running 状态。
  • 执行状态(running):线程正在使用 CPU。
  • 等待状态(waiting):线程经过等待事件的调用或者正在等待其他资源(如 I/O)。

4.2 Java 线程的6个状态

// Thread.State 源码
public enum State {
    NEW,
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED;
}

4.2.1 NEW

处于 NEW 状态的线程此时尚未启动。这里的尚未启动指的是还没有调用 Thread 实例的 start() 方法。

public void testStateNew() {
    Thread thread = new Thread(() -> {});
    System.out.println(thread.getState()); // 输出 NEW 
}

从上面可以看出,只是创建了线程而没有调用 start() 方法,此时线程处于 NEW 状态。

关于 start() 方法的两个引申问题

1、反复调用同一个线程的 start() 方法是否可行?

2、加入一个线程执行完毕(此时处于 TERMINATED 状态),再次调用这个线程的 start() 方法是否可行?

要分析这两个问题,我们先来看看 start() 方法的源码:

public synchronized void start() {
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    /* Notify the group that this thread is about to be started
     * so that it can be added to the group's list of threads
     * and the group's unstarted count can be decremented. 
     */
    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
        }
    }
}

我们可以看到,在 start() 内部有一个 threadStatus 的变量。如果它不等于0,调用 start() 是会直接抛出异常的。

接着往下看,有一个 native 修饰的 start0() 方法,这个方法里并没有对 threadStatus 的处理。到了这里我们仿佛就拿这个 threadStatus 没辙了,我们通过 debug 的方式再看一下:

@Test
public void testStatusMethod(){
    Thread thread = new Thread(() ->{});
    // 第一次调用
    thread.start();
    // 第二次调用
    thread.start();
}

我在 start() 方法内部的最开始打的断点,下面是运行时的结果:

  • 第一次调用时 threadStatus 的值为 0。
  • 第二次调用时 threadStatus 的值不为 0。

查看当前线程状态的源码:

// Thread.getState 方法源码
public State getState() {
    // get current thread state
    return jdk.internal.misc.VM.toThreadState(threadStatus);
}

// jdk.internal.misc.VM 源码
public static Thread.State toThreadState(int threadStatus) {
    if ((threadStatus & JVMTI_THREAD_STATE_RUNNABLE) != 0) {
        return RUNNABLE;
    } else if ((threadStatus & JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER) != 0) {
        return BLOCKED;
    } else if ((threadStatus & JVMTI_THREAD_STATE_WAITING_INDEFINITELY) != 0) {
        return WAITING;
    } else if ((threadStatus & JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT) != 0) {
        return TIMED_WAITING;
    } else if ((threadStatus & JVMTI_THREAD_STATE_TERMINATED) != 0) {
        return TERMINATED;
    } else if ((threadStatus & JVMTI_THREAD_STATE_ALIVE) == 0) {
        return NEW;
    } else {
        return RUNNABLE;
    }
}

所以,我们结合上面源码可以得到引申的两个问题的结果:

两个问题的答案都是不可行,在调用一次 start() 之后,threadStatus 的值会改变(threadStatus != 0) ,此时再次调用 start() 方法会抛出 IllegalThreadStateException 异常。

比如,threadStatus 为 2 代表当前线程状态为 TERMINATED

4.2.1 RUNNABLE

表示当前线程正在运行中。处于 RUNNABLE 状态的线程在 Java 虚拟机中运行,也有可能在等待 CPU 分配资源。

Java 中线程的 RUNNABLE 状态

看了操作系统线程的几个状态之后我们来看看 Thread 源码里对 RUNNABLE 状态的定义:

/**
* Thread state for a runnable thread.  A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/

Java 线程的 RUNNABLE 状态其实是包括了传统操作系统线程的 readyrunning 两个状态的。

4.2.3 BLOCKED

阻塞状态。处于 BLOCKED 状态的线程正等待锁的释放以进入同步区。

我们用 BLOCKED 状态举个生活中的例子:

假如今天你下班后准备去食堂吃饭,你来到食堂仅有的一个窗口,发现前面已经有个人在窗口前了,此时你必须得等前面的人从窗口离开才行。

假设你是线程 t2,你前面的那个人是线程 t1,此时 t1 占有了锁(食堂唯一的窗口),t2 正在等待锁的释放,所以此时 t2 就处于 BLOCKED 状态。

4.2.4 WAITING

等待状态。处于等待状态的线程变成 RUNNABLE 状态需要其他线程唤醒。

调用如下3个方法会使线程进入等待状态:

  • Object.wait(): 使当前线程处于等待状态直到另一个线程唤醒它;
  • Thread.join(): 等待线程执行完毕,底层调用的是 Object 实例的 wait 方法;
  • LockSupport.part(): 除非获得调用许可,否则禁用当前线程进行线程调度。

我们延续上面的例子继续解释一下 WAITING 状态:

你等待了好几分钟现在终于轮到你了,突然你们有一个 “不懂事” 的经理突然来了。你看到他你就有一种不祥的预感,果然,他是来找你的。

他把你拉到一旁叫你待会儿再吃饭,说他下午要去作报告,赶紧来找你了解一下项目的情况。你心里虽然有一万个不愿意但是你还是从食堂窗口走开了。

此时,假设你还是线程 t2,你的经理是线程 t1。虽然你此时都占有锁(窗口)了,“不速之客” 来了你还是得释放掉锁。此时你 t2 的状态就是 WAITING。然后经理 t1 获得锁,进入 RUNNABLE 状态。

要是经理 t1 不主动唤醒你 t2(notify、notifyAll..),可以说你 t2 只能一直等待了。

4.2.5 TIMED_WAITING

超时等待状态。线程等待一个具体的时间,时间到后会被自动唤醒。

调用如下方法会使线程进入超时等待状态:

  • Thread.sleep(long millis): 为当前线程睡眠指定时间;
  • Object.wait(long timeout): 线程休眠指定时间,等待期间可以通过 notify()/notifyAll() 唤醒;
  • Thread.join(long millis): 等待当前线程最多执行 milis 毫秒,如果 millis 为0,则会一直执行;
  • LockSupport.parkNanos(long nanos): 除非获得调用许可,否则禁用当前线程进行线程调度指定时间;
  • LockSupport.partUntil(long deadline): 同上,也是禁用线程进行调度指定时间;

我们继续延续上面的例子来解释一下 TIME_WAITING 状态:

到了第二天中午,又到了饭点,你还是到了窗口前。

突然间想起你的同事叫你等他一起,他说让你等他十分钟他改个bug。

好吧,你说那你就等等吧,你就离开了窗口。很快十分钟过去了,你见他还没来,你想都等了这么久了还不来,那你还是先去吃饭好了。

这时你还是线程t1,你改bug的同事是线程t2。t2让t1等待了指定时间,此时t1等待期间就属于TIMED_WATING状态。

t1等待10分钟后,就自动唤醒,拥有了去争夺锁的资格。

4.2.6 TERMINATED

终止状态。此时线程已执行完毕。

4.3 线程状态转换

根据上面关于线程状态的介绍我们可以得到下面的线程状态转换图:

image-20220209201239371

4.3.1 BLOCKED 与 RUNNABLE 状态的转换

我们在上面说到:处于 BLOCKED 状态的线程因为是在等待锁的释放,假如这里有两个线程 a 和线程 b,a 线程提前获得了锁并且暂未释放锁,此时 b 就处于 BLOCKED 状态。先看一个例子:

@Test
public void blockedTest() {
    Thread a = new Thread(() -> testMethod(), "a线程");
    Thread b = new Thread(() -> testMethod(), "b线程");
    a.start();
    b.start();
    System.out.println(a.getName() + ": " + a.getState());
    System.out.println(b.getName() + ": " + b.getState());
}

/**
* 同步方法争夺锁
*/
private synchronized void testMethod() {
    try {
    	Thread.sleep(2000L);
    } catch (InterruptedException e) {
    	e.printStackTrace();
    }
}

初看之下,大家可能会觉得线程 a 会先调用同步方法,同步方法内又调用了 Thread.sleep() 方法,必然会输出 TIMED_WAITING,而线程 b 因为等待线程 a 释放锁所以必然会输出 BLOCKED

其实不然,有两点需要值得大家注意,一是在测试方法 blockedTest() 内还有一个 main 线程,二是启动线程后执行 run() 方法还是需要消耗一定时间的。

测试方法的 main 线程只是保证了 a,b 两个线程调用 start() 方法(转化为 RUNNABLE 状态),如果 CPU 执行效率高一点,还没等两个线程真正开始争夺锁,就已经打印测试两个线程的状态(RUNNABLE)了。

当然,如果 CPU 执行效率低一点,其中某个线程也是可能打印出 BLOCKED 状态的(此时两个线程已经开始争夺锁了)。

image-20220209203455829

这时你可能又会问了,要是我想要打印出 BLOCKED 状态我该怎么处理么?BLOCKED 状态的产生需要两个线程争夺锁才行。那么我们处理下测试方法里的 main 线程就可以了,让它 “休息一会儿”,调用一下 Thread.sleep() 方法。

这里需要注意的是 main 线程休息的时间,要保证在线程争夺锁的时间内,不要等到前一个线程都释放了你再去争夺锁,此时还是得不到 BLOCKED 状态。

我们把上面的测试方法 blockedTest() 改动一下:

@Test
public void blockedTest() throws InterruptedException {
    Thread a = new Thread(() -> testMethod(), "a线程");
    Thread b = new Thread(() -> testMethod(), "b线程");
    a.start();
    // 需要注意这里main线程休眠了1k毫秒,而testMethod()里休眠了2k毫秒
    Thread.sleep(1000L);
    b.start();
    System.out.println(a.getName() + ": " + a.getState());
    System.out.println(b.getName() + ": " + b.getState());
}

在这个例子中两个线程的状态转换如下:

  • a 的状态转换过程:RUNNABLE( a.start() )-> TIMED_WATING( Thread.sleep()) -> RUNNABLE ( sleep() 时间到) -> BLOCKED(未抢到锁) -> TERMINATED
  • b 的状态转换过程:RUNNABLE( b.start() ) -> BLOCKED(未抢到锁)-> TERMINATED

斜体表示可能出现的状态,可以在自己的电脑多试几次看看输出。同样,这里的输出也可能有多重情况。

4.3.2 WAITING 状态与 RUNNABLE 状态的转换

根据转换图我们知道有3个方法可以使线程从 RUNNABLE 状态转换为 WAITING 状态。这里主要介绍下 Object.wait() Thread.join()

Object.wait()

调用 wait() 方法前线程必须持有对象的锁。

线程调用 wait() 方法时,会释放当前的锁,直到有其他线程调用 notify()/notifyAll() 方法唤醒等待锁的线程。

需要注意的是,其他线程调用 notify() 方法只会唤醒单个等待锁的线程,如有多个线程都在等待这个锁的话,不一定会唤醒到之前调用 wait() 方法的线程。

同样,调用 notifyAll() 方法唤醒所有等待锁的线程之后,也不一定马上把时间片分配给刚才放弃锁的那个线程,具体要看系统的调度。

Thread.join()

调用 join() 方法,会一直等待这个线程执行完毕(转换为 TERMINATED 状态)。

我们再把上面的例子线程启动那里改变一下:

@Test
public void blockedTest() throws InterruptedException {
    Thread a = new Thread(() -> testMethod(), "a线程");
    Thread b = new Thread(() -> testMethod(), "b线程");
    a.start();
    a.join();
    b.start();
    System.out.println(a.getName() + ": " + a.getState());
    System.out.println(b.getName() + ": " + b.getState());
}

要是没有调用 join() 方法,main 线程不管 a 线程是否执行完毕都会继续往下走。

a 线程启动之后马上调用了 join() 方法,这里 main 线程就会等到 a 线程执行完毕,所以这里 a 线程打印的状态固定是 TERMINATED

至于 b 线程的状态,有可能打印 RUNNABLE(尚未进入同步方法),也有可能打印 TIMED_WAITING(进入了同步方法)

4.3.3 TIMED_WAITING 与 RUNNABLE 状态转换

TIMED_WAITINGWAITING 状态类似,只是 TIMED_WAITING 状态等待的时间是指定的。

Thread.sleep(long)

使当前线程睡眠指定时间。需要注意这里的 “睡眠” 只是暂时使线程停止执行,并不会释放锁。时间到后,线程会重新进入 RUNNABLE 状态。

Object.wait(long)

wait(long) 方法使线程进入 TIMED_WAITING 状态。这里的 wait(long) 与无参方法 wait() 相同的地方是,都可以通过其他线程调用 notify()notifyAll() 方法来唤醒。

不同的地方是,有参方法 wait(long) 就算其他线程不来唤醒它,经过指定时间 long 之后它会自动唤醒,拥有去争夺锁的资格。

Thread.join(long)

join(long) 使当前线程执行指定时间,并且使线程进入 TIMED_WAITING 状态。

我们再来改一下刚才的示例:

@Test
public void blockedTest() throws InterruptedException {
    Thread a = new Thread(() -> testMethod(), "a线程");
    Thread b = new Thread(() -> testMethod(), "b线程");
    a.start();
    a.join(1000L);
    b.start();
    System.out.println(a.getName() + ": " + a.getState());
    System.out.println(b.getName() + ": " + b.getState());
}

这里调用 a.join(1000L),因为是指定了具体 a 线程执行的时间,并且执行时间是小于 a 线程 sleep 的时间,所以 a 线程状态输出 TIMED_WAITING

b 线程状态仍然不固定(RUNNABLEBLOCKED)。

4.3.4 线程中断

在某些情况下,我们在线程启动后发现并不需要它继续执行下去时,需要中断线程。目前在 Java 里还没有安全直接的方法来停止线程,但是 Java 提供了线程中断机制来处理中断线程的情况。

线程中断机制是一种协作机制。需要注意,通过中断操作并不能直接终止一个线程,而是通知需要被中断的线程自行处理。

简单介绍下 Thread 类里提供的关于线程中断的几个方法:

  • Thread.interript(): 中断线程。这里的中断线程并不会立即停止线程,而是设置线程的中断状态为 true (默认是 false)。
  • Thread.currentThread().isInterrupted(): 测试当前线程是否被中断。线程的中断状态收这个方法的影响,意思是调用一次使线程中断状态设置为 true,连续调用两次会使得这个线程的中断状态重新转为 false
  • Thread.isInterrupted(): 测试当前线程是否被中断。与上面方法不同的是调用这个方法并不会影响线程的中断状态。

在线程中断机制里,当其他线程通知需要被中断的线程后,线程中断的状态被设置为 true,但是具体被要求中断的线程要怎么处理,完全由被中断线程自己而定,可以在合适的实际处理中断请求,也可以完全不处理继续执行下去。

五、Java 线程间的通信

合理的使用 Java 多线程可以更好的利用服务器资源。一般来讲,线程内部有自己私有的线程上下文,互不干扰。但是当我们需要多个线程之间相互协作的时候,就需要我们掌握 Java 线程的通信方式。本文将介绍 Java 线程之间的几种通信原理。

5.1 锁与同步

在 Java 中,锁的概念都是基于对象,所以我们又经常称它为对象锁。线程和锁的关系,我们可以用婚姻关系来理解,一个锁同一时间只能被一个线程持有,也就是说,一个锁如果和一个线程“结婚”(持有),那其他线程如果需要得到这个锁,就得等到这个线程和这个锁“离婚”(释放)。

在我们的线程之间,有一个同步的概念。什么是同步呢,假如我们现在有2位正在抄暑假作业答案的同学:线程 A 和线程 B。当他们正在抄的时候,老师突然来修改了一些答案,可能 A 和 B 最后写出的暑假作业就不一样。我们为了 A,B 能写出2本相同的暑假作业,我们就需要让老师先修改答案,然后 A,B 同学再抄。或者 A,B 同学先抄完,老师再修改答案。这就是线程 A,线程 B 的线程同步。

可以解释为:线程同步是线程之间按照一定的顺序执行。

为了达到线程同步,我们可以使用锁来实现它。

我们先来看看一个无锁的程序:

public class NoneLock {
    public static void main(String[] args) {
        new Thread(new ThreadA()).start();
        new Thread(new ThreadB()).start();
    }

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                System.out.println("Thread A: " + i);
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                System.out.println("Thread B: " + i);
            }
        }
    }

}

执行这个程序,你会在控制台看到,线程 A 和线程 B 各自独立工作,输出自己的打印值。如下是我电脑上某一次的运行结果。每一次运行结果都会不一样:

image-20220210112701667

那我现在有一个需求,我想等 线程 A 执行完之后,再由线程 B 去执行,怎么办呢?最简单的方式就是使用一个 “对象锁”:

public class ObjectLock {
    private static final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(10L);
        new Thread(new ThreadB()).start();
    }

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 1000; i++) {
                    System.out.println("Thread A: " + i);
                }
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 1000; i++) {
                    System.out.println("Thread B: " + i);
                }
            }
        }
    }
}

这里声明一个名字为 lock 的对象锁。我们在 ThreadAThreadB 内需要同步的代码块里,都是用 synchronized 关键字加上了同一个对象锁 lock

上文我们说到了,根据线程和锁的关系,同一时间只有一个线程持有一个锁,那么线程 B 就会等线程 A 执行完成后释放 lock,线程 B 才能获得锁 lock

这里主线程里使用 sleep() 方法休眠了 10毫秒,是为了防止线程 B 先获得锁。因为如果同时 start,线程 A 和线程 B 都是处于就绪状态,操作系统可能会先让 B 运行。这样就会先输出 B 的内容,然后 B 执行完成之后自动释放锁,线程 A 再执行。

5.2 等待 / 通知机制

上面一种基于 “锁” 的方式,线程需要不断地尝试获得锁,如果失败了,再继续尝试。这可能会耗费服务器资源。

而等待/通知机制是另一种方式。

Java 多线程的等待/通知机制是基于 Object 类的 wait() 方法和 notify()、notifyAll() 方法来实现的。

notify() 方法会随机叫醒一个正在等待的线程,而 notifyAll() 会叫醒所有正在等待的线程。

前面我们讲到,一个锁同一时刻只能被一个线程持有。而假如线程 A 现在持有了一个锁 lock 并开始执行,它可以使用 lock.wait() 让自己进入等待状态。这个时候,lock 这个锁是被释放了的。

这时,线程 B 获得了 lock 这个锁并开始执行,它可以在某一时刻,使用 lock.notify(),通知之前持有 lock 锁并进入等待状态的线程 A,说 “线程 A 你不用等了,可以往下执行了”。

需要注意的是,这个时候线程 B 并没有释放锁 lock,除非线程 B 这个时候使用 lock.wait() 释放锁,或者线程 B 执行结束自行释放锁,线程 A 才能等到 lock 锁。

我们用代码来实现一下:

public class WaitAndNotify {
    private static final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000L);
        new Thread(new ThreadB()).start();
    }

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("Thread A: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                lock.notify();
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("Thread B: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                lock.notify();
            }
        }
    }

}

输出结果:

image-20220210155005519

在这个 Demo 里,线程 A 和线程 B 首先打印自己需要的东西,然后使用 notify() 方法唤醒另一个正在等待的线程,然后自己使用 wait() 方法陷入等待并释放 lock 锁。

需要注意的是,等待/通知机制使用的是同一个对象锁,如果两个线程使用的是不同的对象锁,那么它们之间是不可能用等待/通知机制通讯的。

5.3 信号量

JDK 提供了一个类似于 “信号量” 功能的类 Semaphore。但是本文不是要介绍这个类,而是介绍一种基于 volatile 关键字自己实现的信号量通讯。

后面会有专门的章节介绍 volatile 关键字,这里只是做一个简单的介绍:

volatile 关键字能够保证内存的可见性,如果使用 volatile 关键字声明了一个变量,在一个线程里面改变了这个变量的值,那其他线程是立马可见更改后的值的。

比如我现在有一个需求,我想让线程 A 输出0,然后线程 B 输出1,再然后线程 A 输出2 ... 以此类推。我应该怎么实现呢?

代码:

public class SignalDemo {

    private static volatile int signal = 0;

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000L);
        new Thread(new ThreadB()).start();
    }

    static class ThreadA implements Runnable {

        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 0) {
                    System.out.println("Thread A: " + signal);
                    signal++;
                }
            }
        }
    }

    static class ThreadB implements Runnable {

        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 1) {
                    System.out.println("Thread B: " + signal);
                    signal++;
                }
            }
        }
    }
}

运行结果:

image-20220210160508937

我们可以看到,使用一个 volatile 变量 signal 来实现了 “信号量” 的模型。这里需要注意的是,volatile 变量需要进行原子操作。

需要注意的是,signal++ 并不是一个原子操作,所以我们在实际开发中,会根据需要使用 synchronized给它 “上锁”,或者是使用 AtomicInteger 等原子类。并且上面的程序也并不是线程安全的,因为执行 while 语句后,可能当前线程就暂停等待时间片了,等线程醒来,可能 signal 已经大于等于 5 了。

这种实现方式并不一定高效,本例只是演示信号量。

信号量的应用场景:

假如在一个停车场中,车位是我们的公共资源,线程就如同车辆,而看门的管理员就是起的 “信号量” 的作用。

因为在这种场景下,多个线程需要相互合作,我们用简单的 “锁” 和 “等待/通知机制” 就不那么方便了,这个时候就可以用到信号量。

其实 JDK 中提供的很多多线程通信工具类都是基于信号量模型的,后面会在第三篇的文章中介绍一些常用的通信工具类。

5.4 管道

管道是基于 “管道流” 的通信方式。JDK 提供了 PipeWritePipeReaderPipeOutputStreamPipeInputStream。其中,前面两个是基于字符的,后面两个是基于字节流的。

这里的示例代码使用的是基于字符的:

public class PipeDemo {

    public static void main(String[] args) throws InterruptedException, IOException {
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        writer.connect(reader);

        new Thread(new ReaderThread(reader)).start();
        Thread.sleep(1000L);
        new Thread(new WriterThread(writer)).start();
    }

    static class ReaderThread implements Runnable {
        private final PipedReader reader;

        public ReaderThread(PipedReader reader) {
            this.reader = reader;
        }

        @Override
        public void run() {
            System.out.println("This is reader");
            int receive = 0;
            try {
                while ((receive = reader.read()) != -1) {
                    System.out.print((char) receive + " ");
                }
                System.out.println();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class WriterThread implements Runnable {
        private final PipedWriter writer;

        public WriterThread(PipedWriter writer) {
            this.writer = writer;
        }

        @Override
        public void run() {
            System.out.println("This is writer");
            int receive = 0;
            try {
                writer.write("test");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行结果:

image-20220210163708424

我们通过线程的构造函数,传入了 PipedWritePipedReader 对象。可以简单分析一下这个示例代码的执行流程:

  1. 线程 ReaderThread 开始执行;
  2. 线程 ReaderThread 使用管道 reader.read() 进入“阻塞”;
  3. 线程 WriterThread 开始执行;
  4. 线程 WritedThreadwriter.write("test") 往管道写入字符串;
  5. 线程 WritedThread 使用 writer.close() 结束管道写入,并执行完毕;
  6. 线程 ReaderThread 接受到管道输出的字符串并打印;
  7. 线程 ReaderThread 执行完毕。

管道通信的应用场景:

这个很好理解,使用管道多半与 I/O 流相关。当我们一个线程需要先另一个线程发送一个信息(比如字符串)或者文件等等时,就需要使用管道通信了。

5.5 其他通信相关

以上介绍了一些线程间通信的基本原理和方法。除此以外,还有一些与线程通信相关的知识点,这里一并介绍。

5.5.1 join 方法

join() 方法是 Thread 类的一个实例方法。它的作用是当线程陷入 “等待” 状态,等 join 的这个线程执行完成后,再继续执行当前线程。

有时候,主线程创建并启动了子线程,如果子线程中需要进行大量的耗时运算,主线程往往将早于子线程结束之前结束。

如果主线程想等待子线程执行完毕后,获得子线程中处理完的某个数据,就要用到 join() 方法了。

示例代码:

public class JoinDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new ThreadA());
        thread.start();
        thread.join();
        System.out.println("如果不加join方法,我会被先打印出来,加了就不一样了");
    }

    static class ThreadA implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println("我是子线程,我先休眠一秒");
                Thread.sleep(1000L);
                System.out.println("我是子线程,我睡完一秒了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

注意 join() 方法有两个重载方法,一个是 join(long),一个是 join(long,int)

实际上,通过源码会发现,join() 方法及其重载方法底层都是利用了 wait(long) 这个方法。

对于 join(long,int),通过查看源码发现,底层并没有精确到纳秒,而是对第二个参数做了简单的判断和处理。

5.5.2 sleep 方法

sleep() 方法是 Thread 类的一个静态方法。它的作用是让当前线程睡眠一段时间。它有这样两个方法:

  • Thread.sleep(long)
  • Thread.sleep(long, int)

同样,查看源码发现,第二个方法只是对第二个参数进行简单的处理,没有精确到纳秒。实际上还是调用的第一个方法。

这里需要强调一下:sleep() 方法是不会释放当前的锁的,而 wait() 方法会。

它们还有这些区别:

  • wait() 可以指定时间,也可以不指定;而 sleep() 必须指定时间。
  • wait() 释放 CPU 资源,同时释放锁;sleep 释放 CPU 资源,但是不释放锁,所以易死锁。
  • wait() 必须放在同步代码块或同步方法中,而 sleep() 可以放在任意位置。

5.5.3 ThreadLocal 类

ThreadLocal 是一个本地线程副本变量工具类。内部是一个 弱引用Map 来维护。这里不详细介绍它的原理,而是仅介绍它的使用,后面有独立章节来介绍 ThreadLocal 类的原理。

有些朋友称 ThreadLocal 为线程本地变量或线程本地存储。严格来说,ThreadLocal 类并不属于多线程间的通信,而是让每个线程有自己 “独立” 的变量,线程之间互不影响。它为每个线程都创建了一个副本,每个线程可以访问自己内部的副本变量。

ThreadLocal 类最常用的就是 set()get() 方法。实例代码:

public class ThreadLocalDemo {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new ThreadLocal<>();
        new Thread(new ThreadA(threadLocal)).start();
        new Thread(new ThreadB(threadLocal)).start();
    }

    static class ThreadA implements Runnable {

        private final ThreadLocal<String> threadLocal;

        public ThreadA(ThreadLocal<String> threadLocal) {
            this.threadLocal = threadLocal;
        }

        @Override
        public void run() {
            threadLocal.set("A");
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ThreadA 输出:" + threadLocal.get());
        }
    }

    static class ThreadB implements Runnable {

        private final ThreadLocal<String> threadLocal;

        public ThreadB(ThreadLocal<String> threadLocal) {
            this.threadLocal = threadLocal;
        }

        @Override
        public void run() {
            threadLocal.set("B");
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ThreadB 输出:" + threadLocal.get());
        }
    }

}

可以看到,虽然两个线程使用的同一个 ThreadLocal 实例(通过构造方法传入),但是它们各自可以存取自己当前线程的一个值。

那么 ThreadLocal 有什么用呢?如果只是单纯的想要线程隔离,在每个线程中声明一个私有变量就好了呀,为什么要使用 ThreadLocal

如果开发者希望将类的某个静态变量(user ID 或者 transcation ID)与线程状态关联,则可以考虑使用 ThreadLocal

最常见的 ThreadLocal 使用场景是用来解决数据库连接、Session 管理等。数据库连接和 Session 管理设计多个复杂对象的初始化和关闭。如果在每个线程中声明一些私有变量来进行操作,那这个线程就变得不那么 “轻量” 了,需要频繁的创建和关闭连接。

5.5.4 InheritableThreadLocal

InheritableThreadLocal 类与 ThreadLocal 类稍有不同,Inheritable 是继承的意思。它不仅仅是当前线程可以存取副本值,而且它的子类也可以存取这个副本值。

第二篇:原理篇

六、Java 内存模型基础知识

6.1 并发编程模型的两个关键问题

  • 线程如何通信?即:线程之间以何种机制来交换信息
  • 线程如何同步?即:线程以何种机制来控制不同线程间操作发生的相对顺序

有两种并发模型可以解决这两个问题:

  • 消息传递并发模型
  • 共享内存并发模型

这两种模型之间的区别如下表所示:

如何通信如何同步
消息传递并发模型线程之间没有公共状态,线程间的通信必须通过发送消息来显式进行通信发送消息天然同步,因为发送消息总是在接受消息之前,因此同步是隐式的
共享内存并发模型线程之间共享程序的公共状态,通过读-写内存中的公共状态进行隐式通信必须显式指定某段代码需要在线程之间互斥执行,同步是显式的

在 Java 中,使用的是共享内存并发模型。

6.2 Java 内存模型的抽象结构

6.2.1 运行时内存的划分

先谈一下运行时数据区,下面这张图相信大家一点都不陌生:

image-20220211142955195

对于每一个线程来说,栈都是私有的,而堆是共有的。

也就是说在栈中的变量(局部变量、方法定义参数、异常处理器参数)不会在线程之间共享,也就不会有内存可见性的问题,也不受内存模型的影响。而在堆中的变量是共享的,本文称为共享变量。

所以,内存可见性是针对的共享变量。

6.2.2 堆中内存不可见问题

既然堆是共享的,为什么在堆中会有内存不可见问题?

这是因为现代计算机为了高效,往往会在高速缓存区中缓存共享变量,因为 CPU 访问缓存区比访问内存要快得多。

线程之间的共享变量存在主内存中,每个线程都有一个私有的本地内存,存储了该线程以读、写共享变量的副本。本地内存是 Java 内存模型的一个抽象概念,并不真实存在,它涵盖了缓存、写缓冲区、寄存器等。

Java 线程之间的通信由 Java 内存模型(简称 JMM)控制,从抽象的角度来说,JMM 定义了线程和主内存之间的抽象关系。JMM 的抽象示意图如图所示:

image-20220211164000416

从图中可以看出:

  1. 所有的共享变量都存在主内存中。
  2. 每个线程都保存了一份该线程使用到的共享变量的副本。
  3. 如果线程 A 与线程 B 之间要通信的话,必须经历下面2个步骤:
    • 线程 A 将本地内存 A 中更新过的共享变量刷新到主内存中去;
    • 线程 B 到主内存中去读取线程 A 之前已经更新过的共享变量。

所以,线程 A 无法直接访问线程 B 的工作内存,线程间通信必须经过主内存。

注意,根据 JMM 的规定,线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主内存中读取。

所以线程 B 并不是直接去主内存中读取共享变量的值,而是现在本地内存 B 中找到这个共享变量,发现这个共享变量已经被更新了,然后本地内存 B 去从主内存中读取这个共享变量的新值,并拷贝到本地内存 B 中,最后线程 B 再读取本地内存 B 中的新值。

那么怎么知道这个共享变量被其他线程更新了呢?这就是 JMM 的功劳了,也是 JMM 存在的必要性之一。

JMM 通过控制主内存与每个线程的本地内存之间的交互,来提供内存可见性保证

Java 中的 volatile 关键字可以保证多线程操作共享变量的可见性已经禁止指令重排序,synchronized 关键字不仅保证可见性,同时也保证了原子性(互斥性)。在更底层,JMM 通过内存屏障来实现内存的可见性以及禁止重排序。

为了程序员的方便理解,提出了 happens-before,它更加的简单易懂,从而避免了程序员为了理解内存可见性而且协议复杂的重排序规则以及这些规则的实现方法。这里涉及到的所有内容后面都会有专门的章节去介绍。

6.2.3 JMM 与 Java 内存区域划分的区别和联系

上面两小节分别提到了 JMM 和 Java 运行时内存区域的划分,这两者既有差别又有联系:

  • 区别

    两者是不同的概念层次。JMM 是抽象的,它是用来描述一组规则,通过这个规则来控制各个变量的访问方式,围绕原子性、有序性、可见性等展开的。而 Java 运行时内存的划分是具体的,是 JVM 运行 Java 程序时,必要的内存划分。

  • 联系

    都存在私有数据区域和共享数据区域。一般来说,JMM 中的主内存属于共享数据区域,它是包含了堆和方法区;同样,JMM 中的本地内存属于私有数据区域,包含了程序计数器、本地方法栈、虚拟机栈。

实际上,他们表达的是同一种含义,这里不做区分。

七、重排序与 happens-before

7.1 什么是重排序

计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排。

为什么指令重排序可以提高性能?

简单地说,每一个指令都会包含多个步骤,每个步骤可能使用不同的硬件。因此,流水线技术 产生了,它的原理是指令 1 还没有执行完,就可以开始执行指令 2,而不用等到指令 1 执行结束之后再执行指令 2,这样就大大提高了效率。

但是,流水线技术最害怕 中断,恢复中断的代价是比较大的,所以我们要想尽办法不让流水线中断。指令重排就是减少中断的一种技术。

我们分析一下下面这个代码的执行情况:

a = b + c;
d = e + f;

先加载 b、c(注意,即有可能先加载 b,也有可能先加载 c),但是在执行 add(b,c) 的时候,需要等待 b、c 装载结束才能继续执行,也就是增加了停顿,那么后面的指令也会一次有停顿,这降低了计算机的执行效率。

为了减少这个停顿,我们可以先加载 e 和 f,然后哦再去加载 add(b,c),这样做对程序(串行)是没有影响的,但却减少了停顿。既然 add(b,c) 需要停顿,那还不如去做一些有意义的事情。

综上所述,指令重排对于提高 CPU 处理性能十分必要。虽然由此带来了乱序问题,但是这点牺牲是值得的。

指令重排一般分为一下三种:

  • 编译器优化重排

    编译器在 不改变单线程程序语义 的前提下,可以重新安排语句的执行顺序。

  • 指令并行重排

    现代处理器采用了指令级并行技术来将多条指令重叠执行。如果 不存在数据依赖性 (即后一个执行的语句无需依赖前面执行的语句的结果),处理器可以改变语句对应的机器指令的执行顺序。

  • 内存系统重排

    由于处理器使用缓存和读写缓冲区,这使得加载(load)和存储(store)操作看上去可能是在乱序执行,因为三级缓存的存在,导致内存与缓存的数据同步存在时间差。

指令重排可以保证串行语义一致,但是没有义务保证多线程间的语义也一致。 所以在多线程下,指令重排序可能会导致一些问题。

7.2 顺序一致性模型与 JMM 的保证

顺序一致性模型是一个理论参考模型,内存模型在设计的时候都会以顺序一致性内存模型作为参考。

7.2.1 数据竞争与顺序一致性

当程序未正确同步的时候,就可能存在数据竞争。

数据竞争:在一个线程中写一个变量,在另一个线程读同一个变量,并且写和读没有通过同步来排序。

如果程序中包含了数据竞争,那么运行的结果往往充满了 不确定性,比如读发生在写之前,可能会就会读到错误的值;如果一个线程程序能够正确同步,那么就不存在数据竞争。

Java 内存模型(JMM)对于正确同步多线程程序的内存一致性做了一下保证:

如果程序是正确同步的,程序的执行将具有顺序一致性。 即程序的执行结果和该程序在顺序一致性模型中执行的结果相同。

这里的同步包括了使用 volatilefinalsynchronized 等关键字来实现多线程下的同步。

如果程序员没有正确使用 volatilefinalsynchronized,那么即便是使用了同步(单线程下的同步),JMM 也不会有内存可见性的保证,可能会导致你的程序出错,并且具有不可重现性,很难排查。

所以如何正确使用 volatilefinalsynchronized 是程序员应该去了解的,后面会有专门的章节介绍这几个关键字的内存语义及使用。

7.2.2 顺序一致性模型

顺序一致性内存模型是一个 理想化的理论参考模型,它为程序员提供了极强的内存可见性保证。

顺序一致性模型有两大特性:

  • 一个线程中的所有操作必须按照程序的顺序(即 Java 代码的顺序)来执行。
  • 不管程序是否同步,所有线程都只能看到一个单一的操作执行顺序。即在顺序一致性模型中,每个操作必须是原子性的,且立刻对所有线程可见

为了立即这两个特性,我们举个例子,假设有两个线程 A 和线程 B 并发执行,线程 A 有3个操作,他们在程序中的顺序是 A1 -> A2 -> A3,线程 B 也有3个操作,B1 -> B2 -> B3。

假设 正确使用了同步,A 线程的3个操作执行后释放锁,B 线程获取同一个锁。那么在 顺序一致性模型中的执行效果如下所示:

image-20220214113555157

操作的执行整体上有序,并且两个线程都只能看到这个执行顺序。

假设 没有使用同步,那么在顺序一致性模型中农的执行效果如下所示:

image-20220214113625114

操作的执行整体上无序,但是两个线程都只能看到这个执行顺序,之所以可以得到这个保证,是因为顺序一致性模型中每个操作必须立即对任意线程可见。

但是 JMM 没有这样的保证。

比如,在当前线程把写过的数据缓存在本地内存中,在没有刷新到主内存之前,这个写操作仅对当前线程可见;从其他线程的角度来观察,这个写操作根本没有被当前线程所执行。只有当前线程把本地内存中写过的数据刷新到主内存之后,这个写操作才会对其他线程可见。在这种情况下,当前线程和其他线程看到的执行顺序是不一样的。

7.2.3 JMM 中同步程序的顺序一致性结果

在顺序一致性模型中,所有操作完全按照程序的顺序串行执行。但是 JMM 中,临界区(同步块或同步方法中)的代码可以发生重排序(但不允许临界区内的代码 “逃逸” 到临界区之外,因为会破坏锁的内存语义)。

虽然线程 A 在临界区做了重排序,但是锁的特性,线程 B 无法观察到线程 A 在临界区的重排序。这种重排序既提高了执行效率,又没有改变程序的执行结果。

同时,JMM 会在退出临界区和进入临界区做特殊的处理,使得在临界区内程序获得与顺序一致性模型相同的内存视图。

由此可见,JMM 的具体实现方针是:在不改变(正确同步的)程序执行结果的前提下,尽量为编译器和处理器的优化打开方便之门。

7.2.4 JMM 中未同步程序的顺序一致性结果

对于未同步的多线程程序,JMM 只提供 最小安全性:线程读取到的值,要么是之前某个线程写入的值,要么是默认值,不会无中生有。

为了实现这个安全性,JVM 在堆上分配对象时,首先会对内存空间清零,然后才会在上面分配对象(这两个操作是同步的)。

JMM 没有保证未同步程序的执行结果与该程序在顺序一致性中执行结果一致。因为如果要保证执行结果一致,那么 JMM 需要禁止大量的优化,对程序的执行性能会产生很大的影响。

为同步程序在 JMM 和顺序一致性内存模型中的执行特性有如下差异:

  1. 顺序一致性保证单线程内的操作会按程序的顺序执行;JMM 不保证单线程内的操作会按照程序的顺序执行(因为重排序,但是 JMM 保证单线程下的重排序不影响执行结果)。
  2. 顺序一致性模型保证所有线程只能看到一致的操作执行顺序,而 JMM 不保证所有线程都看到一致的操作执行顺序(因为 JMM 不保证所有的操作立即可见)。
  3. 顺序一致性模型保证对所有的内存读写操作都具有原子性,而 JMM 不保证大对 64位的 long 型和 double 型变量的写操作具有原子性。

7.3 happens-before

7.3.1 什么是 happens-before

一方面,程序员需要 JMM 提供一个强得多内存模型来编写代码;另一方面,编译器和处理器希望 JMM 对它们的束缚越少越好,这样它们就可以最可能多的做优化来提高性能,希望的是一个弱的内存模型。

JMM 考虑了这两种需求,并且找到了平衡点,对编译器和处理器来说,只要不改变程序执行结果(单线程程序和正确同步了的多线程程序),编译器和处理器怎么优化都行。

而对于程序员,JMM 提供了 **happens-before规则 **(JSR-133规范),满足了程序员的需求--简单易懂,并且提供了足够强的内存可见性。 换言之,程序员只要遵循 happens-before 规则,那他写的程序就能保证在 JMM 中具有强的内存可见性。

JMM 使用 happens-before 的概念来制定两个操作之间的执行顺序。这两个操作可以在一个线程内,也可以是不同的线程之内。因此,JMM 可以通过 happens-before 关系向程序员提供跨线程的内存可见性保证。

happens-before 关系的定义如下:

  1. 如果一个操作 happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
  2. 两个操作之间存在 happens-before 关系,并不意味着 Java 平台的具体实现必须要按照 happens-before 关系指定的顺序来执行。如果重排序之后的执行结果,与按 happens-before 关系来执行的结果一致,那么 JMM 也允许这样的重排序。

happens-before 关系本质上和 as-if-serial 语义是一回事。

as-if-serial 语义保证单线程内重排序后的执行结果和程序代码本身应有的结果是一致的,happens-before 关系保证正确同步的多线程程序的执行结果不被重排序改变。

总之,如果操作 A happens-before 操作 B,那么操作 A 在内存上所做的操作对操作 B 都是可见的,不管他们在不在一个线程。

7.3.2 天然的 happens-before 关系

在 Java 中,有以下天然的 happens-before 关系:

  • 程序顺序规则:一个线程中的每个操作,happens-before 于该线程中的任意后续操作。
  • 监视器锁规则:对一个锁的解锁,happens-before 于随后对这个锁的加锁。
  • volatile 变量规则:对一个 volatile域的写,happens-before 于任意后续对这个 volatile域的读。
  • start 规则:如果线程 A 执行操作 ThreadB.start() 启动线程 B,那么 A 线程的 ThreadB.start() 操作 happens-before 于线程 B 中的任意操作。
  • join 规则:如果线程 A 执行操作 ThreadB.join() 并成功返回,那么线程 B 中任意操作 happens-before 于线程 A 从 ThreadB,join 操作成功返回。

举例:

int a = 1; // A 操作
int b = 2; // B 操作
int sum = a + b; // C 操作
System.out.println(sum);

根据以上介绍的 happens-before 规则,假如只有一个线程,那么不难得出:

A happens-before B
B happens-before C
A happens-before C

注意,真正在执行指令的时候,其实 JVM 有可能对 操作 A 和 操作 B 进行重排序,因为无论先执行 A 还是 B,它们都对对方是可见的,并且不影响执行结果。

如果这里发生了重排序,这在视觉上违背了 happens-before 原则,但是 JMM 是允许这样的重排序的。

所以,我们只关心 happens-before 规则,不用关心 JVM 到底是怎样执行的。只要确定操作 A happens-before 操作 B 就行了。

重排序有两类,JMM 对这两类重排序有不同的策略:

  • 会改变程序执行结果的重排序,比如 A -> C,JMM 要求编译器和处理器都禁止这种重排序。
  • 不会改变程序执行结果的重排序,比如 A -> B,JMM 对编译器和处理器不做要求,允许这种重排序。

八、volatile

8.1 几个基本概念

在介绍 volatile 之前,我们先回顾及介绍几个基本的概念。

8.1.1 内存可见性

在 Java 内存模型那一章我们介绍了 JMM 有一个主内存,每个线程都有自己私有的工作内存,工作内存中保存了一些变量在主内存的拷贝。

内存可见性,指的是线程之间的可见性,当一个线程修改了共享变量时,另一个线程可以读取到这个修改后的值。

8.1.2 重排序

为优化程序性能,对原有的指令执行顺序进行优化重新排序。重排序可能发生在多个阶段,比如编译重排序、CPU 重排序等。

8.1.3 happens-before 规则

是一个给程序员使用的规则,只要程序员在写代码的时候遵循 happens-before 规则,JVM 就能保证指令在多线程之间的顺序性符合程序员的预期。

8.2 volatile 的内存语义

在 Java 中,volatile 关键字有特殊的内存语义。volatile 主要有一下两个功能:

  • 保证变量的 **内存可见性 **
  • 禁止 volatile 变量与普通变量 重排序 ( JSR-133 提出,Java 5 开始才有了这个 “增强的 volatile 内存语义”)

8.2.1 内存可见性

public class VolatileDemo {

    int a = 0;
    volatile boolean flag = false;

    public void writer() {
        // step 1
        a = 1;
        // step 2
        flag = true;
    }

    public void reader() {
        // step 3
        if (flag) {
            // step 4
            System.out.println(a);
        }
    }
}

在这段代码里,我们使用 volatile 关键字修饰了一个 boolean 类型的变量 flag

所谓内存可见性,指的是当一个线程对 volatile 修饰的变量进行 写操作(比如 step 2)时,JMM 会立即把该线程对应的本地内存中的共享变量的值刷新到主内存;当一个线程对 volatile 修饰的变量进行 读操作(比如 step 3)时,JMM 会立即把该线程对应的本地内存置为无效,从主内存中读取共享变量的值。

在这一点上,volatile 与锁具有相同的内存效果,volatile 变量的写和锁的释放具有相同的内存语义,volatile 变量的读和锁的获取具有相同的内存语义。

假设在时间线上,线程 A 先执行方法 writer() 方法,线程 B 后执行 reader() 方法。那必然会有下图:

image-20220214154730016

而如果 flag 变量没有用 volatile 修饰,在 step 2,线程 A 的本地内存里面的变量就不会立即更新到主内存,那随后线程 B 也同样不会去主内存拿最新的值,仍然使用线程 B 本地内存缓存的变量的值 a=0,flag=false.

8.2.1 禁止重排序

在 JSR-133 之前的旧的 Java 内存模型中,是允许 volatile 变量与普通变量重排序的。那上面的案例中,可能就会被重排序成下列顺序来执行:

  1. 线程 A 写 volatile 变量,step 2,设置 flagtrue
  2. 线程 B 读同一个 volatile,step 3,读取到 flagtrue
  3. 线程 B 读普通变量,step 4,读取到 a = 0
  4. 线程 A 修改普通变量,step 1,设置 a = 1

可见,如果 volatile 变量与普通变量发生了重排序,虽然 volatile 变量能保证内存可见性,也可能导致普通变量读取错误。

所以在旧的内存模型中,volatile 的写-读就不能与锁的释放-获取具有相同的内存语义了。为了提供一种比锁更轻量级的 线程间的通信机制JSR-133 专家组决定增强 volatile 的内存语义:严格限制编译器和处理器对 volatile 变量与普通变量的重排序。

编译器还好说,JVM 是怎么还能限制处理器的重排序呢?它是通过 内存屏障 来实现的。

什么是内存屏障?

硬件层面上,内存屏障分为两种:读屏障(Load Barrier)和写屏障(Store Barrier)。内存屏障有两个作用:

  1. 阻止屏障两侧的指令重排序;
  2. 强制把写缓冲区/高速缓存中的脏数据等写回主内存,或者让缓存中相应的数据失效。

注意这里的缓存只要指的是 CPU 缓存,如 L1,L2 等。

编译器在 生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。编译器选择了一个 比较保守的 JMM 内存屏障插入策略, 这样可以保证在任何处理器平台, 任何程序中都能得到正确的 volatile 内存语义。这个策略是:

  • 在每个 volatile 写操作前插入一个 StoreStore 屏障;
  • 在每个 volatile 写操作后插入一个 StoreLoad 屏障;
  • 在每个 volatile 读操作前插入一个 LoadLoad 屏障;
  • 在每个 volatile 读操作后插入一个 LoadStore 屏障;

大概示意图如下:

image-20220214165126663

在逐个解释一下这几个屏障。注:下述 Load 代表读操作,Store 代表写操作。

LoadLoad 屏障: 对于这样的语句 Load1;LoadLoad;Load2,在 Load2 及后续读取操作要读取的数据被访问前,保证 Load1 要读取的数据被读取完毕。

StoreStore 屏障: 对于这样的语句 Store1;LoadStore;Store2,在 Store2 及后续写入操作执行前,这个屏障会把 Store1 强制刷新到内存,保证 Store1 的写入操作对其他处理器可见。

LoadStore 屏障: 对于这样的语句 Load1;LoadStore;Store2,在 Store2 及后续写入操作被刷出前,保证 Load1 要读取的数据被读取完毕。

StoreLoad 屏障: 对于这样的语句 Store1;StoreLoad;Load2,在 Load2 及后续所有读取操作执行前,保证 Store1 的写入对所有的处理器可见。它的开销是四种屏障中最大的(冲刷写缓冲器,清空无效化队列)。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其他三种内存屏障的功能。

对于连续多个 volatile 变量读或者连续多个 volatile变量写,编译器做了一定的优化来提高性能,比如:

第一个 volatile 读;

LoadLoad 屏障;

第二个 volatile 读;

LoadStore 屏障。

再介绍一下 volatile 与普通变量的重排序规则:

  1. 如果第一个操作是 volatile 读,那无论第二个操作是什么,都不能重排序;
  2. 如果第二个操作是 volatile 写,那无论第一个操作是什么,都不能重排序;
  3. 如果第一个操作是 volatile 写,第二个操作是 volatile读,那不能重排序。

举一个例子,我们在案例中 step 1,是普通变量的写,step 2 是 volatile 变量的写,那符合第2个规则,这两个 steps 不能重排序。而 step 3 是 volatile 变量读。step 4 是普通变量读,符合第1个规则,同样不能重排序。

但如果是下列情况:第一个操作是普通变量的读,第二个操作是 volatile 变量读,那是可以重排序的:

// 声明普通变量
int a = 0;
// 声明 volatile 变量
volatile boolean flag = false;

// 以下两个变量的读操作是可以重排序的
// 普通变量读
int i = a;
// volatile 变量读
boolean j = flag;

8.3 volatile 的用途

volatile 的内存语义上来看,volatile 可以保证内存可见性且禁止重排序。

在保证内存可见性这一点上,volatile 有着与锁相同的内存语义,所以可以作为一个 “轻量级” 的锁来使用。但由于 volatile 仅仅保证对单个 volatile 变量的读/写具有原子性,而锁可以保证整个 临界区代码 的执行具有原子性。所以在功能上,锁比 volatile 更强大;在性能上,volatile 更有优势。

在禁止重排序这一点上,volatile也是非常有用的。比如我们熟悉的单例模式,其中有一种实现方式是 “双重锁检查”,比如这样的代码:

public class SingletonDemo {
    // 不使用 volatile 关键字
    private static SingletonDemo instance;

    // 双重锁检验
    public static SingletonDemo getInstance() {
        if (instance == null) {
            synchronized (SignalDemo.class) {
                if (instance == null) {
                    instance = new SingletonDemo();
                }
            }
        }
        return instance;
    }

}

如果这里的变量声明不使用 volatile 关键字,是可能会发生错误的。它可能会被重排序:

 instance = new SingletonDemo();
 
 // 可以分解为以下三个步骤:
 1. memory = allocate();  // 分配内存,相当于c 的 malloc
 2. ctorInstance(memory); // 初始化对象
 3. s = memory;		 	  // 设置s指向刚分配的地址
 
 // 上述3个步骤可能会别重排序为 1-3-2,也就是
 1. memory = allocate();  // 分配内存,相当于c 的 malloc
 3. s = memory;		 	  // 设置s指向刚分配的地址
 2. ctorInstance(memory); // 初始化对象

而一旦假设发生了这样的重排序,比如线程 A 在第10行指向了步骤 1 和步骤 3,但是步骤 2 还没有执行完。这个时候另一个线程 B 执行到第 7 行,它会判定 instance不为空,然后直接返回一个未初始化完成的 instance!

所以 JSR-133 对 volatile 做了增强后,volatile 的禁止重排序功能还是非常有用的。

九、synchronized 与锁

这篇文章我们来聊一聊 Java 多线程里面的 “锁”。

首先需要明确一点的是:Java 多线程的锁都是基于对象的, Java 中的每一个对象都可以作为一个锁。

还有一点需要注意的是,我们常听到的 类锁 其实也是对象锁。

Java 类只有一个 Class 对象(可以由多个实例对象,对个实例对象共享这个 Class 对象),而 Class 对象也是特殊的 Java 对象。所以我们常说的类锁,其实就是 Class 对象的锁。

9.1 synchronized 关键字

说到锁,我们通常会谈到 synchronized 这个关键字,它翻译成中文就是 “同步” 的意思。

我们通常使用 synchronized 关键字来给一段代码或一个方法上锁,它通常有以下三种形式:

// 关键字在静态方法上,锁为当前 Class 对象
public static synchronized void classLock() {
	// code
}

// 关键字在实例方法上,锁为当前实例
public synchronized void instanceLock() {
	// code
}

// 关键字在代码块上,锁为括号里面的对象
public void blockLock() {
	Object o = new Object();
	synchronized (o) {
		// code
	}
}

我们这里介绍一下 “临界区” 的概念。所谓 “临界区”,指的是某一块代码区域,它同一时刻只能由一个线程执行。在上面的例子中,如果 synchronized 关键字在方法上,那临界区就是就是整个方法内部。而如果是使用 synchronized 代码块,那临界区就指的是代码块内部的区域。

通过上面的例子我们可以看到,下面这两个写法其实是等价的作用:

// 关键字在实例方法上,锁为当前实例
public synchronized void instanceLock() {
	// code
}

// 关键字在代码块上,锁为括号里面的对象
public void blockLock() {
	Object o = new Object();
	synchronized (o) {
		// code
	}
}

同理,下面这两个方法也应该是等价的:

// 关键字在静态方法上,锁为当前 Class 对象
public static synchronized void classLock() {
	// code
}

// 关键字在代码块上,锁为括号里面的对象
public void blockLock(){
	synchronized (this.getClass()){
		// code
	}
}

9.2 几种锁

Java 6 为了减少获得锁和释放锁带来的性能消耗,引入了 “偏向锁” 和 “轻量级锁”。在 Java 6 以前,所有的锁都是 “重量级” 锁。所以在 Java 6 及其以后,一个对象其实有四种锁状态,它们级别由低到高依次是:

  1. 无锁状态
  2. 偏向锁状态
  3. 轻量级锁状态
  4. 重量级锁状态

无锁就是没有对资源进行锁定,任何线程都可以尝试去修改它,无锁在这里不再细讲。

几种锁会随着竞争情况逐渐升级,锁的升级很容易发生,但是锁降级发生的条件会比较苛刻,锁降级发生在 Stop The World 期间,当 JVM 进入安全点的时候,会检查是否有闲置的锁,然后进行降级。

关于锁降级有两点说明:

1、不同于大部分文章说的锁不能降级,实际上 HotSpot JVM 是支持锁降级的(JVM 锁降级)。

2、上面提到的 Stop The World 期间,以及安全点,这些知识属于 JVM 的知识范畴,本文不做细讲。

下面分辨介绍这几种锁以及他们之间的升级。

9.2.1 Java 对象头

前面我们提到,Java 的锁都是基于对象的。首先我们来看看一个对象的 “锁” 的信息是存在什么地方的。

每个 Java 对象都有对象头。如果是非数据类型,则用 2 个字宽来存储对象头,如果是数组,则会用 3 个字宽来存储对象头。在 32 位处理器中,一个字宽是32位;在62位处理器中,一个字宽是64位。对象头的内容如下表:

长度内容说明
32/64 bitMark Word存储对象的 hashCode 或锁信息等
32/64 bitClass Metadata Address存储到对象类型数据的指针
32/64 bitArray length数组的长度(如果是数组)

我们主要来看看 Mark World 的格式:

锁状态29 bit 或 61 bit1 bit 是否是偏向锁2 bit 锁标志位
无锁001
偏向锁线程 ID101
轻量级锁指向栈中锁记录的指针此时这一位不用于标识偏向锁00
重量级锁指向互斥量(重量级锁)的指针此时这一位不用于标识偏向锁10
GC 标记此时这一位不用于标识偏向锁11

可以看到,当对象状态为偏向锁是,Mark Word 存储的是偏向的线程 ID;当状态为轻量级锁时,Mark Word 存储的是指向线程栈中 Lock Record 的指针;当状态为重量级锁时,Mark Word 为指向堆中的 monitor 对象的指针。

9.2.2 偏向锁

Hotspot 的作者经过以往的研究发现大多数情况下 锁不仅不存在多线程竞争,而且总是由同一线程多次获得,于是引入来了偏向锁。

偏向锁会偏向于第一个访问锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程访问,则持有偏向锁的线程将永远不需要触发同步。也就是说,偏向锁在资源无竞争情况下消除了同步语句,连 CAS 操作都不做了,提高了程序的运行性能。

大白话就是对锁设置个变量,如果发现为 true,代表资源无竞争,则无需再走各种加锁/解锁流程。如果为 false,代表存在其他线程竞争资源,那么就会走后面的流程。

实现原理

一个线程在第一次进入同步块时,会在对象头和栈帧中的锁记录里存储锁的偏向的线程 ID。当下次该线程进入这个同步块时,会去检查锁的 Mark Word 里面是不是放的自己的线程 ID。

如果是,表明该线程已经获得了锁,以后该线程在进入和退出同步块时不需要花费 CAS 操作来加锁和解锁。

如果不是,就代表有另一个线程来竞争这个偏向锁。这个时候会尝试使用 CAS 来替代 Mark Word 里面的线程 ID 为新线程的 ID,这个时候要分两种情况:

  • 成功:表示之前的线程不存在了,Mark Word 里面的线程 ID 为新线程的 ID,锁不会升级,仍然为偏向锁;
  • 失败:表示之前的线程仍然存在,那么暂停之前的线程,设置偏向锁标识为0,并设置锁标志位为 00,升级为轻量级锁,会按照轻量级锁的方式进行竞争锁。

CAS: Compare And Swap

比较并设置。用于在硬件层面上提供原子性操作。在 Intel 处理器中,比较并交换通过指令 cmpxchg 实现。

比较是否和给定的数值一致,如果一致则修改,不一致则不修改。

线程竞争偏向锁的过程如下:

image-20220215105329822

图中涉及到了 lock record 指针指向当前堆栈中的最近一个 lock record,是轻量级锁按照先来先服务的模式进行了轻量级锁的加锁。

撤销偏向锁

偏向锁使用了一种 等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。

偏向锁升级成轻量级锁时,会暂停用于偏向锁的线程,重置偏向锁标识,这个过程看起来容易,实则开销还是很大的,大概的过程如下:

  1. 在一个安全点(在这个时间点上没有字节码正在执行)停止拥有锁的线程。
  2. 遍历线程栈,如果存在锁记录的话,需要修复锁记录和 Mark Word,使其变成无锁状态。
  3. 唤醒被停止的线程,将当前锁升级成轻量级锁。

所以,如果应用程序里所有的锁通常处于竞争状态,那么偏向锁就会是一种累赘,对于这种情况,我们一开始就把偏向锁这个默认功能给关闭:

-XX:UseBiasedLocking=false

下面这个经典的图总结了偏向锁的获得和撤销:

image-20220215114249975

9.2.3 轻量级锁

多个线程在不同时段获取同一把锁,即不存在锁竞争的情况,也就是没有线程阻塞。针对这种情况,JVM 采用轻量级锁来避免线程的阻塞与唤醒。

轻量级锁的加锁

JVM 会为每个线程在当前线程的栈帧中创建用于存储锁记录的空间,我们称为 Displaced Mark Word。如果一个线程获得锁的时候发现是轻量级锁,会把锁的 Mark Word 复制到自己的 Displaced Mark Word 里面。

然后线程尝试用 CAS 将锁的 Mark Word 替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示 Mark Word 已经被替换成了其他线程的锁记录,说明在于其他线程竞争锁,当前线程就尝试使用自旋来获取锁。

自旋:不断尝试去获取锁,一般用循环来实现。

自旋是需要消耗 CPU 的,如果一直获取不到锁的话,那该线程就一直处在自旋状态,白白浪费 CPU 资源。解决这个问题最简单的办法就是指定自旋的次数,例如让其循环10次,如果还没获取到锁就进入阻塞状态。

但是 JDK 采用了更聪明的方式 ——适应性自旋,简单来说就是线程如果自旋成功了,则下次自旋的次数会更多,如果自旋失败了,则自旋的次数就会减少。

自旋也不是一直进行下去的,如果自旋到一定程度(和 JVM、操作系统相关),依然没有获取到锁,称为自旋失败,那么这个线程会阻塞。同时这个锁就会 升级成重量级锁

轻量级锁的释放:

在释放锁时,当前线程会使用 CAS 操作将 Displaced Mark Word 的内容复制回锁的 Mark Word 里面。如果没有发生竞争,那么这个复制的操作会成功。如果有其他线程因为自旋多次导致轻量级锁升级成了重量级锁,那么 CAS 操作会失败,此时会释放锁并唤醒被阻塞的线程。

一张图说明加锁和释放锁的过程:

image-20220215114215718

9.2.4 重量级锁

重量级锁依赖于操作系统的互斥量(mutex)实现的,而操作系统中线程间状态的转换需要相对比较长的时间,所以重量级锁效率很低,但被阻塞的线程不会消耗 CPU。

前面说到,每一个对象都可以当做一个锁,当多个线程同时请求某个对象锁时,对象锁会设置几种状态用来区分请求的线程:

Contention List:所有请求锁的线程将被首先放置到该竞争队列
Entry List:Contention List 中那些有资格成为候选人的线程被移到 Entry List
Wait Set:那些调用 wait 方法被阻塞的线程被放置到 Wait List
OnDeck:任何时刻最多只有一个线程正在竞争锁,该线程称为 OnDeck
Owner:获得锁的线程称为 Owner
!Owner:释放锁的线程

当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个 ObjectWaiter 对象插入到 Contention List 的队列的队首,然后调用 park 函数挂起当前线程。

当现场释放锁时,会从 Contention ListEntry List 中挑选一个线程唤醒,被选中的线程叫做 Heirpresumptive 即假定继承人,假定继承人被唤醒后会尝试获得锁,但 synchronized 是非公平的,所以假定继承人不一定能获得锁。这是因为对于重量级锁,线程先自旋尝试获得锁,这样做的目的是为了减少执行操作系统同步操作带来的开销。如果自旋不成功再进入等待队列。这对那些已经在等待队列中的线程来说,稍微显得不公平,还有一个不公平的地方是自旋线程可能会抢占 Ready 线程的锁。

如果线程获得锁后调用 Object.wait() 方法,则会将线程假如到 Wait Set 中,当被 Object.notify() 唤醒后,会将线程从 Wait Set 移动到 Contention ListEntry List 中去。需要注意的是,当调用一个锁对象的 waitnotify 方法时,如当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。

9.2.5 总结锁的升级流程

每一个线程在准备获取共享资源时:

第一步,检查 Mark Word里面是不是放的自己的 Thread ID,如果是,表示当前线程是处于 “偏向锁”。

第二步,如果 Mark Word 不是自己的 Thread ID,锁升级,这时候用 CAS 来执行切换,新的线程根据 Mark Word 里面现有的 Thread ID,通知之前线程暂停,之前线程将 Mark Word 的内容置为空。

第三步,两个线程都把锁对象的 HashCode 复制到自己新建的用于存储锁的记录空间,接着开始通过 CAS 操作,把锁对象的 Mark Word 的内容修改为自己新建的记录空间的地址的方式竞争 Mark Word

第四步,第三步中成功执行 CAS 的获得资源,失败的则进入自旋。

第五步,自旋的线程在自旋过程中,成功获得资源(即之前获得资源的线程执行完成并释放了共享资源),则整个状态依然处于轻量级锁的状态。如果自旋失败,则锁继续升级。

第六步,进入重量级锁的状态,这个时候,自旋的线程进行阻塞,等待之前线程执行完成并唤醒自己。

9.2.6 各种锁的优缺点比较

下表来自《Java 并发编程的艺术》:

优点缺点适用场景
偏向锁加锁和解锁不需要额外的消耗,和执行非同步方法相比仅存在纳秒级的差距如果线程间存在锁竞争,会带来额外的锁撤销的消耗适用于只有一个线程访问同步块场景
轻量级锁竞争的线程不会阻塞,提高了程序的响应速度如果始终得不到锁竞争的线程使用自旋会消耗CPU追求响应时间,同步块执行速度非常快
重量级锁线程进程不使用自旋,不会消耗 CPU线程阻塞,响应时间缓慢追求吞吐量,同步块执行时间较长

十、CAS 与原子操作

10.1 乐观锁与悲观锁的概念

锁可以从不同的角度分类,其中,乐观锁和悲观锁是一种分类方式。

悲观锁:

悲观锁就是我们常说的锁。对于悲观锁来说,它总是认为每次访问共享资源时会发生冲突,所以必须对每次数据操作加上锁,已保证临界区的程序同一时间只能有一个线程在执行。

乐观锁:

乐观锁又称为 “无锁”,顾名思义,它是乐观派。乐观派总是假设对共享资源的访问没有冲突,线程可以不停地执行,无需加锁也无需等待。而一旦多个线程发生冲突,乐观锁通常是使用一种称为 CAS 的技术来保证线程执行的安全性。

由于无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说 乐观锁天生免疫死锁。

乐观锁多用于 “读多写少” 的环境,避免频繁加锁影响性能;而悲观锁多用于 “写多读少” 的环境,避免频繁失败和重试影响性能。

10.2 CAS 的概念

CAS 的全称是:比较并交换(Compare And Swap)。在 CAS 中,有这样三个值:

  • V:要更新的变量(var)
  • E:预期值(expected)
  • N:新值(new)

比较并交换的过程如下:

判断 V 是否等于 E,如果等于,将 V 的值设置为 N;如果不等,说明已经有其他线程更新了 V,则当前线程放弃更新,什么都不做。

所以这里的 预期值 E 本质上指的是 “旧值”

我们以一个简单的例子来解释这个过程:

  1. 如果有一个多线程共享的变量 i 原本等于5,我现在在线程 A 中,想把它设置为新的值6;
  2. 我们使用 CAS 来做这个事情;
  3. 首先我们用 i 去与5对比,发现它等于5,说明没有被其他线程改过,那我就把它设置为新的值6,此次 CAS 成功,i 的值被设置成了6;
  4. 如果不等于5,说明 i 被其他线程改过了(比如现在 i 的值为2),那么我就什么也不做,此次 CAS 失败,i 的值仍然为2。

在这个例子中,i 就是 V,5就是 E,6就是 N

那有没有可能我在判断了 i 为5之后,正准备更新它的新值的时候,被其他线程更改了 i 的值呢?

不会的。因为 CAS 是一种原子操作,它是一直系统原语,是一条 CPU 的原子指令,从 CPU 层面保证它的原子性。

当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并更新成功,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且运行再次尝试,当然也允许失败的线程放弃操作。

10.3 Java 实现 CAS 的原理 —— Unsafe 类

前面提到,CAS 是一种原子操作。那么 Java 是怎样来使用 CAS 的呢?我们知道,在 Java 中如果一个方法是被 native 修饰的,那么 Java 就不负责具体实现它,而是交给底层的 JVM 使用 c 或者 c++ 去实现。

在 Java 中,有一个 Unsafe 类,它里面是一些 native 方法,其中就有今个关于 CAS 的:

boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
boolean compareAndSwapInt(Object o, long offset,int expected,int x);
boolean compareAndSwapLong(Object o, long offset,long expected,long x);

当然,它们都是 public native 的。

Unsafe 中对 CAS 的实现是 c++ 写的,它的具体实现和操作系统、CPU 都有关系。

Linux x86 下只要是通过 cmpxchgl 这个指令在 CPU 级完成 CAS 操作的,但在多事处理器下必须使用 lock 指令加锁来完成。当然不同的操作系统和处理器的实现会有所不同,大家可以自行了解。

当然,Unsafe 类里面还有其他方法用于不同的用途。比如支持线程挂起和恢复的 parkunparkLockSupport 类底层就是调用了这两个方法,还有支持反射操作的 allocateInstance() 方法。

10.4 原子操作—— AtomicInteger 类源码解析

上面介绍了 Unsafe 类的几个支持 CAS 的方法,那 Java 具体是如何使用这几个方法来实现原子操作的呢?

JDK 提供了一些用于原子操作的类,在 java.util.concurrent.atomic 包下面,JDK 11 提供了如下17个类:

image-20220215142112804

从名字就可以看得出来这些类大概的用途:

  • 原子更新基本类型
  • 原子更新数组
  • 原子更新引用
  • 原子更新字段(属性)

这里我们以 AtomicInteger 类的 getAndAdd(int delta) 方法为例,来看看 Java 是如何实现原子操作的。

先看看这个方法的源码:

public final int getAndAdd(int delta) {
	return U.getAndAddInt(this, VALUE, delta);
}

这里的 U 其实就是一个 Unsafe 对象:

private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();

所以其实 AtomicInteger 类的 getAndAdd(int delta) 方法是调用 Unsafe 类的方法来实现的:

@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

注:这个方法是在 JDK 1.8 才新增的,在 JDK 1.8 之前,AtomicInteger 源码实现有所不同,是基于 for 死循环的。

我们来一步步解析这段代码。首先,对象 othis,也就是一个 AtomicInteger 对象。然后 offset 是一个常量 VALUE,这个常量是在 AtomicInteger 类中声明的:

private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

同样是调用 Unsafe 类的方法。从方法名字上来看,是得到了一个对象字段偏移量。

用于获取某个字段相对于 Java 对象的 “起始地址” 的偏移量。

一个 Java 对象可以看成是一段内存,各个字段都得安装一定的顺序放在这段内存里,同时考虑到对齐要求,可能这些字段不是连续放置的。

用这个方法能准确地告诉你某个字段相对于对象的起始内存地址的字节偏移量,因为是相对偏移量,所以它其实跟某个具体对象又没有太大关系,跟 class 的定义和虚拟机的内存模型的实现细节更相关。

继续看源码,前面我们讲到,CAS 是 “无锁” 的基础,它允许更新失败,所以经常会与 while 循环搭配使用,在失败后不断去重试。

这里声明了一个 v,也就是要返回的值,从 getAndAddInt 来看,它返回的应该是原来的值,而新的值的 V + delta

这里使用的是 do-while 循环,它的目的是 保证循环体内的语句至少会被执行一遍。这样才能保证 return 的值 v 是我们期望的值。

循环体的条件是一个 CAS 方法:

@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset,
                                          int expected,
                                          int x) {
    return compareAndSetInt(o, offset, expected, x);

}
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,
                                             int expected,
                                             int x);

可以看到,最终其实是调用了我们之前说到的 CAS native 方法。那为什么要经过一层 weakCompareAndSetInt 呢?从 JDK 源码上看不出来什么,在 JDK 1.8 及之前的版本,这两个方法是一样的。

而在 JDK 9 开始,这两个方法上面增加了 @HotSpotIntrinsicCandidate 注解,这个注解允许 HotSpot VM 自己来写汇编或 IR 编译器来实现该方法以提供性能。也就是说虽然外面看到的在 JDK 9 中 weakCompareAndSetObjectcompareAndSetObject 底层依旧是调用了一样的代码,但是不排除 HotSpot VM 会手动实现 weakCompareAndSetObject 真正含义的功能的可能性。

weakCompareAndSetObject 操作仅保留了 volatile 自身变量的特性,而除去了 happens-before 规则带来的内存语义。也就是说,weakCompareAndSetObject 无法保证处理操作目标的 volatile 变量外的其他变量的执行顺序(编译器和处理器为了优化程序性能而对指令序列进行重排序),同时也无法保证这些变量的可见性。 这在一定程度上可以提高性能。

再回到循环条件上来,可以看到它是在不断尝试去用 CAS 更新。如果更新失败,就继续重试。那为什么要把获取 “旧值” v 的操作放到循环体内呢?其实这也很好理解,前面我们说了,CAS 如果旧值 V 不等于预期值 E,它就会更新失败。说明旧的值发生了变化,那么我们当然需要返回的是被其他线程改变之后的旧值了,因此放在了 do 循环体内。

10.5 CAS 实现原子操作的三大问题

这里介绍一下 CAS 实现原子操作的三大问题及其解决方案。

10.5.1 ABA 问题

所谓 ABA 问题,就是一个值原来是 A,变成了 B,然后又变回了 A,这个时候使用 CAS 是检查不出变化的,但实际上却被更新了两次。

ABA 问题的解决思路是在变量前面追加上 版本号或者时间戳。从 JDK 1.5 开始,JDK 的 atomic 包里提供了一个 AtomicStampedReference 类来解决 ABA 问题。

这个类的 compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果二者都相等,才使用 CAS 设置为新的值和标志。

public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

10.5.2 循环时间长开销大

CAS 多与自旋结合。如果自旋 CAS 长时间不成功,会占用大量的 CPU 资源。

解决思路是让 JVM 支持处理器提供的 pause 指令

pause 指令能让自旋失败时 CPU 睡眠一小段时间再继续自旋,从而使得读操作得到频率低很多,为解决内存顺序冲突而导致的 CPU 流水线重排的代价也会小很多。

10.5.3 只能保证一个共享变量的原子操作

这个问题你可能已经知道怎么解决了,有两种解决方案:

  1. 使用 JDK 1.5 开始就提供的 AtomicReference 类保证对象之间的原子性,把多个变量放到一个对象里面进行 CAS 操作。
  2. 使用锁。锁内的临界区代码可以保证只有当前线程能操作。

十一、 AQS

11.1 AQS 简介

AQSAbstractQueueSynchronizer 的简称,即 抽象队列同步器,从字面意思理解:

  • 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
  • 队列:使用先进先出(FIFO)队列存储数据;
  • 同步:实现了同步功能。

那 AQS 有什么用呢?AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器,比如我们提到的 ReentrantLockSemaphoreReentrantReadWriteLockSynchronousQueueFutureTask 等等皆是基于 AQS 的。

当然,我们自己也能利用 AQS 非常轻松容易的构造出符合我们自己需求的同步器,只要子类实现了它的几个 protected 方法就可以了,下文会有详细的介绍。

11.2 AQS 的数据结构

AQS 内部使用了一个 volatile 的变量 state来作为资源的标识。同时定义了几个获取和改变 stateprotected 方法,子类可以覆盖这些方法来实现自己的逻辑:

getState();
setStete();
compareAndSetState();

这三种操作均是原子操作,其中 compareAndSetState 的实现依赖于 UnsafecompareAndSwapInt() 方法。

而 AQS 类本身实现的是一些排队和阻塞的机制,比如具体线程等待队列的维护(如获取资源失败,入队/唤醒出队等)。它内部使用了一个先进先出(FIFO)的双端队列,并使用了两个指针 headtail 用于标识队列的头部和尾部。其数据结构如图:

image-20220215155511612

但是它并不是直接存储线程,而是存储拥有线程的 Node 节点。

11.3 资源共享模式

资源有两种共享模式,或者说两种同步方式:

  • 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如 ReentrantLock
  • 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如 Semaphore/CountDownLatch

一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如 ReadWriteLock

AQS 中关于这两种资源共享模式的定义源码(均在内部类 Node 中)。我们来看看 Node 的结构:

static final class Node {
    // 标记一个结点(对应的线程)在共享模式下等待
    static final Node SHARED = new Node();
    // 标记一个结点(对应的线程)在独占模式下等待
    static final Node EXCLUSIVE = null; 

    // waitStatus的值,表示该结点(对应的线程)已被取消
    static final int CANCELLED = 1; 
    // waitStatus的值,表示后继结点(对应的线程)需要被唤醒
    static final int SIGNAL = -1;
    // waitStatus的值,表示该结点(对应的线程)在等待某一条件
    static final int CONDITION = -2;
    /*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
    static final int PROPAGATE = -3;

    // 等待状态,取值范围,-3,-2,-1,0,1
    volatile int waitStatus;
    volatile Node prev; // 前驱结点
    volatile Node next; // 后继结点
    volatile Thread thread; // 结点对应的线程
    Node nextWaiter; // 等待队列里下一个等待条件的结点


    // 判断共享模式的方法
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 其它方法忽略,可以参考具体的源码
}

// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {
    // 使用了Node的这个构造函数
    Node node = new Node(Thread.currentThread(), mode);
    // 其它代码省略
}

注意:通过 Node 我们可以实现两个队列,一是通过 prevnext 实现 CLH 队列(线程同步队列,双向队列),二是 nextWaiter 实现 Condition 条件上的等待线程队列(单向队列),这个 Condition 主要用在 ReentrantLock 类。

11.4 AQS 的主要方法源码解析

AQS 的设计是基于 模板方法模式 的,它有一些方法必须要子类去实现的,它们主要有:

  • isHeldExclusively(): 该线程是否正在独占资源。只有用到 condition 才需要去实现它。
  • tryAcquire(int): 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
  • tryRelease(int): 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
  • tryAcquireShared(int): 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int): 共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回 true,否则返回 false。

这些方法虽然都是 protected 方法,但是它们并没有在 AQS 具体实现,而是直接抛出异常(这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,比如 Semaphore 只需要实现 tryAcquire 方法而不用实现其余不需要用到的模板方法):

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

而 AQS 实现了一系列主要的逻辑,下面我们从源码来分析一下获取和释放资源的主要逻辑:

11.4.1 获取资源

获取资源的入口是 acquire(int arg) 方法。arg 是要获取的资源的个数,在独占模式下始终为1,我们先来看看这个方法的逻辑:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先调用 tryAcquire(arg) 尝试去获取资源。前面提到了这个方法是在子类具体实现的。

如果获取资源失败,就通过 addWaiter(Node.EXCLUSIVE) 方法把这个线程插入到等待队列中。其中传入的参数代表要插入的 Node 时候独占式的。这个方法的具体实现:

// 为当前线程和给定模式创建和排队节点。
private Node addWaiter(Node mode) {
    // 生成当前线程对应的 Node 节点
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            // 使用 CAS 尝试,如果成功就返回
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}
// 在第一次竞争时初始化头部和尾部字段。
private final void initializeSyncQueue() {
    Node h;
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

上面的两个函数比较好理解,就是在队列的尾部插入新的 Node 节点,但是需要注意的是由于 AQS 中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,在这里是通过 CAS 自旋的方式保证了操作的线程安全性。

现在回到最开始的 acquire(int arg) 方法,现在通过 addWaiter 方法,已经把一个 Node 放到等待队列尾部了。而处于等待队列的节点时从头节点一个一个去获取资源的。具体实现我们来看看 acquireQueued 方法:

// 以独占不间断模式获取已在队列中的线程。由条件等待方法和获取使用。
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果node的前驱节点p是head,表示node是第2个节点,接可以尝试获取资源了
            if (p == head && tryAcquire(arg)) {
                // 拿到资源后,将head指向该节点
                // 所以head所指的节点,就是当前获取到资源的那个节点或null
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            // 如果自己可以休息了,就进入waiting状态,知道被unpark()
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

这里 parkAndCheckInterrupt 方法内部使用到了 LockSupport.park(this),顺便介绍一下 park()

LockSupport 类是 Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数:

  • park(boolean isAbsolute, long time): 阻塞当前线程
  • unpark(Object thread): 使给定的线程停止阻塞

所以 节点进入等待队里后,是调用 park 是它进入阻塞状态的。只有头节点的线程是处于活跃状态的。

当然,获取资源的方法除了 acquire 外,还有以下三个:

  • acquireInterruptibly: 申请可中断的资源(独占模式)
  • acquireShared: 申请共享模式的资源
  • acquireSharedInterruptibly: 申请可中断的资源(共享模式)

可中断的意思是,在线程中断时可能抛出 InterruptedException

总结起来的一个流程图:

image-20220216101040252

11.4.2 释放资源

释放资源相比于获取资源来说,会简单许多。在 AQS 中只有一小段实现。源码如下:

// 以独占模式发布。如果tryRelease返回 true,则通过解锁一个或多个线程来实现
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // 如果状态是负数,尝试把它设置为0
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

   // 得到头节点的后继节点 head.next
    Node s = node.next;
    // 如果这个后继节点为空或者状态大于0
    // 通过前面的定义可以知道,大于0只有一种可能,就是这个节点已被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 等待队列中所有可用的几点,都向前移动
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // 如果后继节点不为空,则此后继节点所在线程停止阻塞
    if (s != null)
        LockSupport.unpark(s.thread);
}

第三篇:工具篇

十二、线程池原理

12.1 为什么要使用线程池

使用线程池主要有以下三个原因:

  1. 创建/销毁线程需要消耗系统资源,线程池可以 复用已创建的线程
  2. 控制并发数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃(主要原因)。
  3. 可以对线程做统一管理

12.2 线程池的原理

Java 中的线程池顶层接口是 Executor 接口,ThreadPoolExecutor 是这个接口的实现类。

我们先来看看 ThreadPoolExecutor 类。

12.2.1 ThreadPoolExecutor 提供的构造方法

一共四个构造方法:

// 5个参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue);

// 6个参数的构造方法-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory);
// 6个参数的构造方法-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler);

// 7个参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler);

涉及到5~7个参数,我们先看看必须的5个参数是什么意思:

  • int corePoolSize: 该线程池中核心线程数最大值

核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会别销毁(临时工)。

  • int maximumPoolSize: 该线程池中线程总数最大值

该值等于核心线程数量 + 非核心线程数量

  • long keepAliveTime: 非核心线程闲置超时时长

非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置 allowCoreThreadTimeOut(true),则也会作用于核心线程。

  • TimeUnit unit: keepAliveTime 的单位

TimeUnit 是一个枚举类型,包括一下属性:

NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTESHOURSDAYS

  • BlockingQueue workQueue: 阻塞队列,维护着等待执行的 Runnable 任务对象

常用的几个阻塞队列:

  1. LinkedBlockingQueue: 链式阻塞队列,底层时间结构是链表,默认大小是 Integer.MAX_VALUE,也可以指定大小。
  2. ArrayBlockingQueue: 数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
  3. SynchronousQueue: 同步队列,内部容量为0,每个 put 操作必须等待一个 take 操作,反之亦然。
  4. DelayQueue: 延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。

我们将在下一章重点介绍各种阻塞队列。

好了,介绍完5个必须的参数之后,还有两个非必须的参数。

  • ThreadFactory threadFactory

创建线程的工厂,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。

DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
    						Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
        		  poolNumber.getAndIncrement() +
       			  "-thread-";
}
  • RejectedExecutionHandler handler

拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为:

  1. ThreadPoolExecutor.AbortPolicy: 默认拒绝处理策略,丢弃任务并抛出 RejectedExecutionException 异常。
  2. ThreadPoolExecutor.DiscardPolicy: 丢弃新来的任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列头部(最旧的)任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
  4. ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务。

12.2.2 ThreadPoolExecutor 的策略

线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

故线程池也有自己的状态。ThreadPoolExecutor 类中使用了一些 final int 常量变量来表示线程池的状态,分别为 RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • 线程池创建后处于 RUNNING 状态。

  • 调用 shutdown() 方法后处于 SHUTDOWN 状态,线程池不能接受新的任务,清除一些空闲 worker,不会等待阻塞队列的任务完成。

  • 调用 shutdownNow() 方法后处于 STOP 状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时 poolSize=0,阻塞队列的 size 也为0。

  • 当所有的任务已终止, ctl 记录的 “任务数量” 为0,线程池会变为 TIDYING 状态。接着会执行 terminated() 函数。

    ThreadPoolExecutor 中有一个控制状态的属性叫 ctl,它是一个 AtomicInteger 类型的变量。线程池状态就是通过 AtomicInteger 类型的成员变量 ctl 来获取的。

    获取的 ctl 值传入 runStateOf 方法,与 ~CAPACITY 位于运算(CAPACITY 是低 29 位全 1 的 int 变量)。

    ~CAPACITY 在这里相当于掩码,用来获取 ctl 的高3位,表示线程状态;而另外的低29位用于表示工作线程数

  • 线程池处在 TIDYING 状态时,执行完 terminated() 方法之后,就会由 TIDYING -> TERMINATED,线程池被设置为 TERMINATED 状态。

12.2.3 线程池主要的任务处理流程

处理任务的核心方法是 execute,我们来看看 ThreadPoolExecutor 中是如何处理线程任务的:

// 在未来的某个时间执行给定的任务。该任务可以在新线程或现有池线程中执行。如果任务无法提交执行,要么是因为这个执行器已经关闭,要么是因为它的容量已经达到,任务由当前的RejectedExecutionHandler处理。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1、当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2、如果不小于corePoolSize,则将任务添加到workQueue中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 如果isRunning 返回false(状态检查),则remove这个任务,然后执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.2 线程池处于running状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3、如果放入workQueue失败,则创建非核心线程执行任务
    // 如果这是创建非核心线程失败(当前线程总数不小于 maximumPoolSize时),就会执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

ctl.get() 是获取线程池状态,用 int 类型表示。第二步中,入队前进行一次 isRunning 判断,入队之后,又进行一次 isRunning 判断。

为什么要二次检查线程池的状态?

在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将 command 加入 workQueue 是线程池之前的状态。倘若没有二次检查,万一线程池处于非 RUNNING 状态(在多线程环境下很有可能发生),那么 command 永远不会执行。

总结一下处理流程

  1. 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到 corePoolSize,在核心线程数据 < corePoolSize 时)。注意,这一步需要获得全局锁
  2. 线程总数量 >= corePoolSize 时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些 “临时工” 来执行这些任务了,于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁
  4. 缓存队列满了,且总线程数达到了 maximumPoolSize,则会采取上面提到拒绝策略进行处理。

整个过程如图所示:

image-20220216132523165

12.2.4 ThreadPoolExecutor 如何做到线程复用的?

我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,即一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么,线程池如何做到线程复用呢?

原来,ThreadPoolExecutor 创建线程时,会将线程封装成 工作线程 worker,并放入 工作线程组 中,然后这个 worker 反复从阻塞队列中拿任务去执行。话不多说,我们继续看看源码。

这里的 addWorker 方法是在上面提到的 execute 方法里面调用的,先看看上半部分:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

上半部分主要是判断线程数量是否超出阈值,超过了就返回 false。我们继续看下半部分:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    // 1.创建一个worker对象
    w = new Worker(firstTask);
    // 2.实例化一个Thread对象
    final Thread t = w.thread;
    if (t != null) {
        // 3.线程池全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int c = ctl.get();

            if (isRunning(c) ||
                (runStateLessThan(c, STOP) && firstTask == null)) {
                if (t.getState() != Thread.State.NEW)
                    throw new IllegalThreadStateException();
                workers.add(w);
                workerAdded = true;
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            // 4.启动这个线程
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
}

创建 worker 对象,并初始化一个 Thread 对象,然后启动这个线程对象。

我们接着看看 Worker 类,仅展示部分源码:

// Worker类部分源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
            runWorker(this);
    }
    //其余代码略...
}

Worker 类实现了 Runnable 接口,所以 Worker 也是一个线程任务。在构造方法中,创建了一个线程,线程的任务就是自己。故 addWorker 方法源码下半部分中的第4步 t.start(),会触发 Worker 类的 run 方法被 JVM 调用。

我们再看看 runWorker 的逻辑:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 1.线程启动之后,通过unlock方法释放锁
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      // 2.Worker执行firstTask或从workQueue中获取任务,如果getTask()方法不返回null,循环不退出
        while (task != null || (task = getTask()) != null) {
            // 2.1进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
            w.lock();
            // 2.2检查线程池状态,倘若线程池处于中断状态,当前线程将中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 2.3执行beforeExecute
                beforeExecute(wt, task);
                try {
                    // 2.4执行任务
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 2.5执行afterExecute
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                // 2.6解锁操作
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

首先去执行创建这个 worker 时就有的任务,当执行完这个任务后,worker 的声明周期并没有结束,在 while 循环中,worker 会不断地调用 getTask() 方法从 阻塞队列 中获取任务然后调用 task.run() 执行任务,从而达到 复用线程 的目的。只要 getTask() 方法不返回 null,此线程就不会退出。

当然,核心线程池中创建的线程想要拿到阻塞队列中的任务,先要判断线程池的状态,如果 STOP 或者 TERMINATED,返回 null

最后看看 getTask() 方法的实现:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 1.allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁
        // 如果为true,核心线程在keepAliveTime内仍空闲则不会被销毁
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		// 2.如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量
        // 如果有设置允许线程超时或者线程数量超过了核心线程数量,
        // 并且线程在规定时间内均未poll到任务切队列为空则递减worker数量
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 3.如果timed为true,则会调用workQueue的poll方法获取任务.
            // 超时时间是keepAliveTime,如果超过keepAliveTIme时长
            // poll返回了null,上边提到的while循环就会退出,线程也就执行完了
            // 如果timed为false(allowCoreThreadTimeOut为false,
            // 且wc> corePoolSize为false),则会调用workQueue的take方法阻塞在当前。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

核心线程会一直卡在 workQueue.take 方法,被阻塞并挂起,不会占用 CPU 资源,直到拿到 Runnable 然后返回(当然如果 allowCoreThreadTimeOut 设置为 true,那么核心线程就会去调用 poll 方法,因为 poll 可能会返回 null,所以这个时候核心线程满足超时条件也会被销毁)。

非核心线程会 workQueue.poll(keepAliveTime, TimeUnit,NANOSECONDS),如果超时还没有拿到,下一次循环判断 compareAndDecrementWorkerCount 就会返回 nullWorker 对象的 run() 方法循环体的判断为 null,任务结束,然后线程被系统回收。

12.3 四种常见的线程池

Executors 类中提供的几个静态方法来创建线程池。到了这一步,如果看懂了前面讲的 ThreadPoolExecutor 构造方法中各种参数的意义,那么一看到 Executors 类中提供的线程池的源码就应该知道这个线程池是干嘛的了。

12.3.1 newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

CacheThreadPool 的运行流程如下:

  1. 提交任务进线程池;
  2. 因为 corePoolSize 为0的关系,不创建核心线程,线程池最大为 Integer.MAX_VALUE
  3. 尝试将任务添加到 SynchornousQueue 队列。
  4. 如果 SynchornousQueue 入列成功,等待被当前运行的线程空闲后拉去执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从 SynchornousQueue 拉去任务并在当前线程执行。
  5. 如果 SynchornousQueue 已有任务在等待,入列操作将会阻塞。

当需要执行很多短时间的任务时,CacheThreadPool 的线程复用率比较高,会显著的提高性能,而且线程60s 后会回收,意味着即使没有任务进来,CacheThreadPool 并不会占用很多资源。

12.3.2 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

核心线程数量和总线程数量相等,都是传入的参数 nThreads,所以只能创建核心线程,不能创建非核心线程。因为 LinkedBlockingQueue 的默认大小是 Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。

CachedThreadPool 的区别:

  • 因为 corePoolSize == maximumPoolSize,所以 FixedThreadPool 只会创建核心线程。而 CachedThreadPool 因为 corePoolSize=0,所以只会创建非核心线程。
  • getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take(),线程不会被回收,CachedThreadPool 会在 60s 后收回。
  • 由于线程不会被回收,会一直卡在阻塞,所以 没有任务的情况下,FixedThreadPool 占用资源更多
  • 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool 是因为阻塞队列可以很大(最大为 Integer 最大值),故几乎不会触发拒绝策略;CachedThreadPool 是因为线程池很大(最大为 Integer 最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。

12.3.3 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

有且仅有一个核心线程(corePoolSize == maximumPoolSize = 1),使用了 LinkedBlockingQueue (容量很大),所以,不会创建非核心线程。所有任务按照 先来先执行 的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。

12.3.4 newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);

}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}

这四种常见的线程池基本够我们使用了,但是《阿里巴巴开发手册》不建议我们直接使用 Executors 类中的线程池,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学需要更加明确线程池的运行规则,避免资源耗尽的风险。

但如果你及团队本身对线程池非常熟悉,又确定业务规模不会达到资源耗尽的程度(比如线程数量或任务队列长度可能达到 Integer.MAX_VALUE)时,其实是可以使用 JDK 提供的这几个接口的,它能让我们的代码具有更强的可读性。

十三、阻塞队列

13.1 阻塞队列的由来

我们假设一种场景,生产者一直生成资源,消费者一直消费资源,资源存储在一个缓冲池中,生产者将生产的资源存进缓存池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的 生产者——消费者模式

该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。

我们自己 coding 实现这个模式的时候,因为需要让 多个线程操作共享变量(即资源),所以很容易引发 线程安全问题,造成 重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者。这些个 等待—唤醒 逻辑都需要自己实现。

这么容易出错的事情,JDK 当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。

BlockingQueueJava.util.concurrent 包下重要的数据结构,区别与普通的队列,BlockingQueue 提供了 线程安全得到队列访问方式,并发包下很多高级同步类的实现都是基于 BlockingQueue 的。

BlockingQueue 一般用于生产者——消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。BlockingQueue 就是存放元素的容器

13.2 BlockingQueue 的操作方法

阻塞队列提供了四组不同的方法用于插入、移除、检查操作:

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
检查方法element()peek()--
  • 抛出异常:如果试图的操作我发立即执行,抛出异常。当阻塞队列满的时候,再往队列里插入元素,会抛出 IllegalStateException("Queue full") 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常。
  • 返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是 true/false
  • 一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
  • 超时退出:如果试图的操作无法立即执行,该方法调用会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true/false

注意之处:

  • 不能往阻塞队列中插入 null,会抛出空指针异常。
  • 可以访问阻塞队列中的任意元素,调用 remove(o) 可以将队列之中的特定对象移除,但并不高效,尽量避免使用。

13.3 BlockingQueue 的实现类

13.3.1 ArrayBlockingQueue

数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

可以初始化队列大小,且一旦初始化不能改变。构造方法中的 fair 表示控制对象的内部锁是否采用公平锁,默认是 非公平锁

13.3.2 LinkedBlockingQueue

链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是 Integer.MAX_VALUE,也可以是指定大小。此队列按照先进先出的原则对元素进行排序。

13.3.3 DelayQueue

该队列中的元素只有当其指定的延迟时间到了,才能从队列中获取该元素。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

13.3.4 PriorityBlockingQueue

基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 compator 对象来决定),内部控制线程同步的锁采用的是非公平锁。

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}

13.3.5 SynchronousQueue

这个队列比较特殊,没有任何内部容量,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。

需要区别容量为1的 ArrayBlockingQueueLinkedBlockingQueue

以下方法的返回值,可以帮助理解这个队列:

  • iterator() 永远返回空,因为里面没有东西。
  • peek() 永远返回 null
  • put() 往 queue 放进去一个 element 以后就一直 wait,直到有其他线程进来把这个 element 取走
  • offer() 往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个线程取走了,offer 方法返回 true,认为 offer 成功,否则返回 false
  • take() 取出并且 remove 掉 queue 里的 element,取不到东西它会一直等
  • poll() 取出并且 remove 掉queue 里的 element,只有碰巧另外一个线程正在往 queue 里 offer 数据或者 put 数据的时候,该方法才会取到东西,否则立即返回 null
  • isEmpty() 永远返回 true
  • remove() & removeAll() 永远返回false

注意

PriorityBlockingQueue 不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞塑胶的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。 对于使用默认大小的 LinkedBlockingQueue 也是一样的。

13.5 阻塞队列的原理

阻塞队列的原理很简单,利用了 Lock 锁的多条件(Condition)阻塞控制。接下来我们分析下 ArrayBlockingQueue 的源码:

首先是构造器,处理初始化队列大小和是否公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是 notEmptynotFull。这两个监视器的所用目前可以简单理解为标记分组,当该线程是 pull 操作时,给它减伤监视器 notFull,标记这个线程是一个生产者;当线程操作时 take 时,给它加上监视器 notEmpty,标记这个线程是消费者。

//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//生产者监视器
private final Condition notFull;  

public ArrayBlockingQueue(int capacity, boolean fair) {
    //..省略其他代码
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

put 操作的源码

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.自旋拿锁
    lock.lockInterruptibly();
    try {
        // 2.判断队列是否满了
        while (count == items.length)
            // 2.1如果满了,阻塞该线程,并标记为notFull线程,
            // 等待notFull的唤醒,唤醒之后继续执行while循环。
            notFull.await();
        // 3.如果没有满,则进入队列
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 4 唤醒一个等待的线程
    notEmpty.signal();
}

总结 put 的流程:

  1. 所有执行 put 操作的线程竞争 lock 锁,拿到 lock 锁的线程进入下一步,没有拿到锁的线程自旋竞争锁。
  2. 判断阻塞队列是否满了,如果满了,则调用 await 方法阻塞这个线程,并标记为 notFull (生产者)线程,同时释放 lock 锁,等待被消费者线程唤醒。
  3. 如果没有满,则调用 enqueue 方法将元素 put 进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。
  4. 唤醒一个标记为 notEmpty(消费者)的线程。

take 操作的源码:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

take 操作和 put 操作的流程是类似的,总结下 take 操作的流程:

  1. 所有执行 take 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。
  2. 判断阻塞队列是否为空,如果是空,则调用 await 方法阻塞这个线程,并标记为 notEmpty(消费者)线程,同时释放 lock 锁,等待被生产者线程唤醒。
  3. 如果没有空,则调用 dequeue 方法,注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。
  4. 唤醒一个标记为 notFull(生产者)的线程。

注意

  1. puttake 操作都需要 先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
  2. 就算拿到锁了之后,也不一定会顺利进行 put/get 操作,需要判断 队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。
  3. 在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁 才能继续往下执行,否则,自旋拿到锁,拿到锁了再 while 判断队列是否可用(这也就是为什么不用 if 判断,而使用 while 判断的原因)。

13.6 示例和使用场景

13.6.1 生产者-消费者模型

public class ProductDemo {

    private final int queueSize = 10;

    private final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);

    public static void main(String[] args) {
        ProductDemo test = new ProductDemo();

        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();

        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                try {
                    queue.take();
                    System.out.println("从队列中取走一个元素,队列剩余 " + queue.size() + " 个元素");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                try {
                    queue.put(1);
                    System.out.println("向队列中插入一个元素,队列剩余空间: " + queue.size());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

下面是这个例子的输出片段:

image-20220308101807185

注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于 System.out.println语句没有锁。考虑到这样的情况:线程 1 在执行完 put/take 操作后立即失去 CPU 时间片,然后切换到线程 2 执行 put/take 操作,执行完毕后回到线程 1 的 System.out.println 语句输出,发现这个时候阻塞队列的 size 已经被线程 2 改变了,所以这个时候输出的 size 并不是当时线程 1 执行完 put/take 操作之后阻塞队列的 size,但可以确保的是 size 不会超过 10 个。实际上使用阻塞队列是没有问题的。

13.6.2 线程池中使用阻塞队列

 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

Java 中的线程池就是使用阻塞队列实现的,我们在了解了阻塞队列之后,无论是使用 Executors 类中已经提供的线程池,还是自己通过 ThreadPoolExecutor 实现线程池,都会更加得心应手,想要了解线程池的同学,可以看第十二章:线程池原理。

注:上面提到的生产者-消费者模式,大家可以参考生产者-消费者模型,可以更好的理解阻塞队列。

十四、锁接口和类

前面我们介绍了 Java 原生的锁——基于对象的锁,它一般是配合 synchronized 关键字来使用的。实际上,Java 在 java.util.concurrent.locks 包下,还为我们提供了几个关于锁的类和接口,它们有更强大的功能或更高的性能。

14.1 synchronized 的不足之处

我们先来看看 synchronized 有什么不足之处:

  • 如果临界区是只读操作,其实可以多线程一起执行,但使用 synchronized 的话,同一时间只能有一个线程执行
  • synchronized 无法知道线程有没有成功获取到锁。
  • 使用 synchronized 如果临界区因为 IO 或者 sleep 方法等原因阻塞了,而当前线程又没有释放锁,就会导致 所有线程等待

而这些都是 locks 包下的锁可以解决的。

14.2 锁的几种分类

锁可以根据以下几种方式来进行分类,下面我们逐一介绍。

14.2.1 可重入锁和非可重入锁

所谓重入锁,顾名思义就是支持重新进入的锁,也就是说这个锁支持一个 线程对资源重复加锁

synchronized 关键字就是使用的重入锁。比如说,你在一个 synchronized 实例方法里面调用另一个本实例的 synchronized 实例方法,它可以重新进入这个锁,不会出现任何异常。

如果我们自己在继承 AQS 实现同步器的时候,没有考虑到占有锁的线程再次获取锁的场景,可能就会导致线程阻塞,那这个就是一个 “非可重入锁”。

ReentrantLock 的中文意思就是可重入锁,也是本文后续要介绍的重点类。

14.2.2 公平锁与非公平锁

这里的 “公平”,其实通俗意义来说就是 “先来后到”,也就是 FIFO。如果对一个锁来说,先对锁获取获取请求的线程一定先会被满足,后对锁获取请求的线程后被满足,那这个锁就是公平的。反之,那就是不公平的。

一般情况下,非公平锁能提升一定的效率。但是非公平锁可能会发生线程饥饿(有一些线程长时间得不到锁)的情况。 所以要根据实际的需求来选择非公平锁和公平锁。

ReentrantLock 支持非公平锁和公平锁。

14.2.3 读写锁和排它锁

我们前面讲到的 synchronized 用的锁和 ReentrantLook,其实都是排它锁,也就是说,这些锁在同一时刻只允许一个线程进行访问。

而读写锁可以在同一时刻允许多个读线程访问。Java 提供了 ReentrantReadWriteLock 类作为读写锁的默认实现,内部维护了两个锁:一个读锁,一个写锁。通过分离读锁和写锁,使得在 “读多写少” 的环境下,大大地提高了性能。

注意:即使用读写锁,在写线程访问时,所有的读线程和其它写写线程均被阻塞。

可见,只是 synchronized 是远远不能满足多样化的业务对锁的要求的。 接下来我们介绍一下 JDK 中有关锁的一些接口和类。

14.3 JDK 中有关锁的一些接口和类

众所周知,JDK 中关于并发的类大多都在 java.util.concurrent(以下简称 juc)包下,而 juc.locks 包看名字就知道,是提供了一些并发锁的工具类。前面我们介绍的 AQS(AbstractQueuedSynchronizer)就是在这个包下。下面分别介绍一下这个包下的类和接口以及它们之间的关系。

14.3.1 抽象类 AQS/AQLS/AOS

这三个抽象类有一定的关系,所以这里放到一起讲。

首先我们看 AQSAbstractQueuedSynchronizer),之前专门有章节介绍这个类,它是在 JDK 1.5 发布的,提供了一个 “队列同步器” 的基本功能实现。而 AQS 里面的 “资源” 是用一个 int 类型的数据来表示的,有时候我们的业务需求的数量超出了 int 的范围,所以在 JDK 1.6 中,多了一个 AQLSAbstractQueuedLongSynchronizer),它的代码跟 AQS 几乎一样,只是把资源的类型变成了 long 类型。

AQS 和 AQLS 都继承了一个类叫 AOSAbstractOwnableSynchronizer),这个类也是在 JDK 1.6 中出现的。这个类只有几行简答的代码。从源码类上的注释可以知道,它是用于表示锁与持有者之间的关系(独占模式)。可以看下它的主要方法:

// 独占模式,锁的持有者  
private transient Thread exclusiveOwnerThread;  

// 设置锁持有者  
protected final void setExclusiveOwnerThread(Thread t) {  
    exclusiveOwnerThread = t;  
}  

// 获取锁的持有线程  
protected final Thread getExclusiveOwnerThread() {  
    return exclusiveOwnerThread;  
}
#Java(6)#Thread(3)

评论