Java Concurrency in Practice阅读笔记
Introduction
A (very) brief history of concurrency
- 最早的计算机都是串行的,一个指令接着一个指令的执行下去,但是为了提高效率,进程和线程出现了
- 同一个进程里面的线程共享同样的内存和堆栈,由于它们都能访问相同的变量,如果没有做好同步的话,就会出现一些不可预计的结果。
Benefits of threads
- 更好的利用多核处理器
- 更好的专注于线程要执行的单个简单的任务:比如常见的servlet和RPC架构,框架负责处理请求,创建线程,负责均衡,分发请求等,程序员只需要考虑servlet.service()方法里面的逻辑处理部分即可,仿佛这就是一个单线程的同步请求
Risks of threads
Safety hazards
- 多线程程序如果没有很好的同步机制,就会导致一些不可预测的结果,俗称线程安全问题以上的程序在单线程中执行没有问题,但是如果在多线程中执行就会出现问题,因为这里的value++看似是一个操作,其实是三个操作:读取value当前的值,加1,把新的值赋给value。由于两个线程在执行程序的时候可能会被交错执行。需要指明的是,这里说的多线程执行指的是实例化一个UnsafeSequence,然后用两个线程去执行这一个实例的getNext()方法,由于线程是共享内存的,所以只存在一个被共享的UnsafeSequence实例。如果在两个线程里面分别实例化两个UnsafeSequence,那么并不会有线程安全问题,因为每个线程操作的是各自的实例
1
2
3
4
5
6
7
8
public class UnsafeSequence {
private int value;
/** Returns a unique value. */
public int getNext() {
return value++;
}
}1
T1: 取值9 -> T2: 取值9 -> T1: 加1 -> T2: 加1 -> T1: 赋给value,结果为10 -> T2: 赋给value,结果还是为10
- 像UnsafeSequence这种线程不安全的例子我们也称作race condition,这是由于线程共享内存空间,可以对同一个变量进行修改造成的,改进的办法是把这个方法变成同步的:
1
2
3
4
5
6
7
public class Sequence {
private int value;
public synchronized int getNext() {
return value++;
}
}
Liveness hazards
- 活性问题不止出现在多线程程序中,单线程程序中最常见的活性问题是死循环,这会导致程序不能最终到达一个好的状态。多线程种常见的活性问题有死锁(Deadlock),饥饿(Starvation),活锁(LiveLock)
Performance hazards
- 由于锁的存在以及上下文切换的存在,多线程程序会存在性能问题
Thread Safety
- Writing thread-safe code is, at its core, about managing access to state, and in particular to shared, mutable state. By shared, we mean that a variable could be accessed by multiple threads; by mutable, we mean that its value could change during its lifetime.
- 每当多于一个线程访问给定的状态变量(state variable),并且其中一个可能对其进行写操作时,它们都必须使用同步机制协调对其的访问。
- 修复线程不安全的三种办法:
- 不要在多线程共享状态变量
- 让状态变量变为不可改变的
- 在访问状态变量的时候使用同步机制
- 如果一个程序只由线程安全的类组成,那么我们可以说这个程序就是线程安全的吗?不可以!假设我们有两个线程安全的类Increase和Decrease,它们都对一个number的变量进行操作,当这两个类分别单独使用的时候,它们都是线程安全的,但是在一起使用的时候,就可能出现线程不安全。所以我们常说的一个类是线程安全的,只针对一个类完全封装了它的状态变量的时候才有意义
What is thread safety?
- 线程安全类指的是一个类被多个线程访问时表现正确,无论运行时环境如何安排或交错执行这些线程,并且在调用代码方面没有额外的同步或其他协调
Example: a stateless servlet
- 前面我们提到线程不安全的前提是存在状态变量,如果一个类根本不存在状态变量,也就是类内部没有变量也不引用变量,那么这个类就是无状态的,也就不存在线程安全的问题
1
2
3
4
5
6
7
8
public class StatelessFactorizer implements Servlet {
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
BigInteger[] factors = factor(i);
encodeIntoResponse(resp, factors);
}
}
Atomicity
- 如果我们向上面线程安全的类里面加入一个状态变量,那么这个类就变得不是线程安全了
1
2
3
4
5
6
7
8
9
10
11
public class UnsafeCountingFactorizer implements Servlet {
private long count = 0;
public long getCount() { return count; }
public void service(ServletRequest req, ServletResponse resp{
BigInteger i = extractFromRequest(req);
BigInteger[] factors = factor(i);
++count;
encodeIntoResponse(resp, factors);
}
}
Race conditions
- Race conditions通常发生在计算的正确性取决于多个线程的相对时间或交错执行的情况,换句话说,得到正确答案依赖于幸运的时序。最常见的race conditions类型是check-then-act,check的时候由于时序的原因,可能观察到过时的状态变量,因此进行了错误的act
Example: race conditions in lazy initialization
- 一个check-then-act的线程不安全例子:懒加载
1
2
3
4
5
6
7
8
9
public class LazyInitRace {
private ExpensiveObject instance = null;
public ExpensiveObject getInstance() {
if (instance == null)
instance = new ExpensiveObject();
return instance;
}
} - 另一种常见race condition的例子就是read-modify-write,比如前面见到的
counter++
Compound actions
- 原子操作:操作A和B相对于彼此是原子的,如果从执行A的线程的角度来看,当另一个线程执行B时,要么B的所有部分都已执行,要么全部没有执行
- LazyInitRace(check-then-act)和UnsafeCountingFactorizer(read-modify-write)都包含了一些复合操作,这些操作并不具备原子性,所以都会出现线程安全问题
- 如果我们在和UnsafeCountingFactorizer类中使用java.util.concurrent.atomic自带的原子变量,那么就不会出现线程安全问题了
1
2
3
4
5
6
7
8
9
10
11
public class CountingFactorizer implements Servlet {
private final AtomicLong count = new AtomicLong(0);
public long getCount() { return count.get(); }
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
BigInteger[] factors = factor(i);
count.incrementAndGet();
encodeIntoResponse(resp, factors);
}
}
Locking
- 前面我们介绍的是在CountingFactorizer类中加入一个状态变量,只要保证这个变量是线程安全的即可保证整个类是线程安全的,但是当我们的类中有多个状态变量的时候,问题可能变得复杂。比如我们想要对servlet的请求结果做一个cache:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class UnsafeCachingFactorizer implements Servlet {
private final AtomicReference<BigInteger> lastNumber = new AtomicReference<BigInteger>();
private final AtomicReference<BigInteger[]> lastFactors = new AtomicReference<BigInteger[]>();
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
if (i.equals(lastNumber.get()))
encodeIntoResponse(resp, lastFactors.get());
else {
BigInteger[] factors = factor(i);
lastNumber.set(i);
lastFactors.set(factors);
encodeIntoResponse(resp, factors);
}
}
} - 虽然上面的两个变量都使用了atomic类,它们各自的操作都是原子的,但是由于lastNumber的值会影响lastFactors的取值,所以还是会出现race condition。线程安全的定义要求无论多线程中操作的时间或交错如何,都必须保持不变量。在UnsafeCachingFactorizer中的一个不变量是,缓存在lastFactors中的因数的乘积应该等于在lastNumber中缓存的值;我们的servlet只有在这个不变量始终保持的情况下才是正确的。
- 可能的race condition:初始状态时lastNumber=0, response=10;A线程的i=5进来了,i不等于lastNumber,所以lastNumber被设置成了5;此时B线程的i=5进来了,由于i==lastNumber,直接返回了当前的response=10;此时A线程继续执行,根据factors的计算结果返回一个response=20。可以看到同样的输入i=5,由于race condition导致了不同的结果。
- To preserve state consistency, update related state variables in a single atomic operation.
Intrinsic locks
- Java提供了一种内置锁来保证操作的原子性:the
synchronized
block。synchronized
有两种使用方法
synchronized
代码块:通过一个对象作为锁来保护一段代码块1
2
3synchronized (lock) {
// Access or modify shared state guarded by lock
}synchronized
方法:是一种简写版的synchronized
代码块,调用这个方法的对象会被用作锁
- 每个Java对象都可以充当
synchronized
块的锁,这些java内置的锁被称为内部锁或监视器锁。在进入synchronized
块之前,执行线程会自动获取该锁,并在离开同步块时(通过正常路径或通过在块内抛出异常)自动释放锁。获取内部锁的唯一方法是进入由该锁保护的synchronized
块或synchronized
方法。 - Java中的内部锁是一种互斥锁(mutexes, mutual exclusion locks),这意味着最多只能有一个线程拥有该锁。当线程A尝试获取由线程B持有的锁时,A必须等待,或者阻塞,直到B释放它。如果B永远不释放锁,A将永远等待。
- 通过内置锁,我们就可以改写上面的UnsafeCachingFactorizer使它变为线程安全的,这里使用的是
synchronized
方法。但是需要注意的是,虽然这种方式保证了线程安全,但是会引起性能问题,因为此时servlet在同一时间只能处理一个请求。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SynchronizedFactorizer implements Servlet {
private BigInteger lastNumber;
private BigInteger[] lastFactors;
public synchronized void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
if (i.equals(lastNumber))
encodeIntoResponse(resp, lastFactors);
else {
BigInteger[] factors = factor(i);
lastNumber = i;
lastFactors = factors;
encodeIntoResponse(resp, factors);
}
}
}Reentrancy
- 内部锁具备可重入性,也就是说如果一个线程尝试获取一个它已经持有的锁,该请求会成功。可重入性代表着内部锁是基于线程的,而不是基于调用的。
- Reentrancy is implemented by associating with each lock an acquisition count and an owning thread. When the count is zero, the lock is considered unheld. When a thread acquires a previously unheld lock, the JVM records the owner and sets the acquisition count to one. If that same thread acquires the lock again, the count is incremented, and when the owning thread exits the synchronized block, the count is decremented. When the count reaches zero, the lock is released
- 可重入性简化了面向对象的开发,比如下面的例子,当我们使用继承的时候,由于可重入性的存在,
super.doSometing()
并不会阻塞导致死锁。1
2
3
4
5
6
7
8
9
10
11public class Widget {
public synchronized void doSomething() {
...
}
}
public class LoggingWidget extends Widget {
public `synchronized` void doSomething() {
System.out.println(toString() + ": calling doSomething");
super.doSomething();
}
}Guarding state with locks
- 当一个可变的状态变量被多于一个线程访问(读或者写),如果所有的访问都被同一个锁持有,那么我们说这个状态变量由这个锁保护。一个常见的错误是认为同步只需要被用在写操作,而不需要用在读操作上
- 对于涉及多个变量的不变量,与该不变量相关的所有变量都必须由相同的锁保护(
For every invariant that involves more than one variable, all the variables involved in that invariant must be guarded by the same lock)。一个很好的例子是Java的vector集合,vector集合的所有操作都使用了synchronized
代码块,但是对于复合操作,仍然会出现race condition。我们仍然说vector类是线程安全的,因为它的每个操作都是原子的,但是在使用多个操作的时候,我们仍然要考虑线程安全的问题:1
2if (!vector.contains(element))
vector.add(element); - 上边的例子不止对vector是这样,对于所有的线程安全的map都是如此,比如ConcurrentHashMap也是一样的:https://stackoverflow.com/questions/14947723/is-concurrenthashmap-totally-safe。因此ConcurrentHashMap提供了`putIfAbsent()`方法,其实就是原子化的上面这段代码
Liveness and performance
- 前面的SynchrizedFactorizer虽然没有线程安全问题,但是作为一个servlet,它同一时间只能处理一个请求,这无疑是低效的,解决的办法是减少我们
synchronized
代码块覆盖的范围,比如某些local variable并不需要加锁。synchronized
代码块覆盖的长度是一个trade off问题,太长会导致性能问题,但也不能由于太短而导致线程安全问题。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CachedFactorizer implements Servlet {
private BigInteger lastNumber;
private BigInteger[] lastFactors;
private long hits;
private long cacheHits;
public synchronized long getHits() { return hits; }
public synchronized double getCacheHitRatio() {
return (double) cacheHits / (double) hits;
}
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
BigInteger[] factors = null;
synchronized (this) {
++hits;
if (i.equals(lastNumber)) {
++cacheHits;
factors = lastFactors.clone();
}
}
if (factors == null) {
factors = factor(i);
synchronized (this) {
lastNumber = i;
lastFactors = factors.clone();
}
}
encodeIntoResponse(resp, factors); }
} - 避免在长时间的计算或可能无法迅速完成的操作(例如网络请求)期间持有锁
Sharing Objects
- 上一章我们主要介绍了如果通过使用同步手段来避免多个线程对同一个变量的同时访问;这一章我们主要介绍如何通过一些手段来共享和发布对象,以让它们可以安全的被多个线程访问。
- 上一章我们已经看到了同步机制可以确保所包含的代码块是原子执行的,但是一个常见的误区是同步机制只是用来做这个的,同步机制另外一个很重要的方面是保证内存可见性。 我们不止希望保证当一个线程在使用一个变量的时候,别的线程不能修改它;也希望确保当一个线程修改了一个变量以后,别的线程可以知道这个变化。
Visibility
- 可见性是非常微妙的因为可见性是违背直觉的:当读和写发生在不同的线程,读线程不能保证可以看到写线程做出的改动,甚至完全看不到。为了保证内存的可见性,必须使用同步机制。
- 一个正面内存可见性的例子。例子里面在主线程里面把number设置成42,ready设置成true,但是在Reader线程里面number的打印结果可能是0,或者永远不会打印结果。永远不会打印结果的原因是可能ready的结果在Reader线程里面一直不可见,number打印是0的原因是ready是true已经在Reader线程可见,但是number=42的改动在Reader线程还不可见。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class NoVisibility {
private static boolean ready;
private static int number;
private static class ReaderThread extends Thread {
public void run() {
while (!ready)
Thread.yield();
System.out.println(number);
}
}
public static void main(String[] args) {
new ReaderThread().start();
number = 42;
ready = true;
}
} - 在缺乏同步的情况下,编译器、处理器和运行时环境可能以一些相当奇怪的方式影响操作执行的顺序。试图推断在没有同步的多线程程序中内存操作的执行顺序几乎肯定是不正确的
Stale data
- 如前面见到的,内存不可见的一大问题就是会导致读取过时的数据
Nonatomic 64-bit operations
- 64位的长整型在JVM可能被当做两个32位的操作,内存不可见会导致这两个32位读取完全不同的值
Locking and visibility
- 加锁不止可以保证互斥,也能保证内存一致性(Locking is not just about mutual exclusion; it is also about memory visi- bility. To ensure that all threads see the most up-to-date values of shared mutable variables, the reading and writing threads must synchronize on a common lock)
Volatile variables
- Java提供了一种比锁弱一些的同步机制来保证内存一致性,Volatile关键词可以保证修饰的变量的改变可以被其他线程看到,不会被寄存器缓存
- 但是相比锁的同步机制,Volatile关键词只保证了内存一致性,并不能保证操作的原子性(
Locking can guarantee both visibility and atomicity; volatile variables can only guarantee visibility)
Publication and escape
- 发布(publish)一个对象意味着使其在其当前范围之外的代码可用,比如通过在其他代码可以找到的地方存储对它的引用,从非私有方法返回它,或将它传递给另一个类中的方法。如果一个对象发布在了本来不应该可见的地方,那么我们就说这个对象逃逸(escape)了。对象的逃逸会导致线程安全问题。
- 逃逸的例子:1. 即使states被声明为private,但是由于有getStates()这个public的方法可以访问他,所以本质上其他类可以访问他,所以这里出现了逃逸 2. 内部类存在一个隐含的对外部类实例的引用,所以内部类可以直接访问外部类的成员和方法,包括私有成员和方法,所以这里其实也造成了逃逸
1
2
3
4
5class UnsafeStates {
private String[] states = new String[] {
"AK", "AL" ...
};
public String[] getStates() { return states; } }1
2
3
4
5
6
7
8
9
10
11
12public class ThisEscape {
pirvate int outClassValue;
public ThisEscape(EventSource source) {
source.registerListener (
new EventListener() {
public void onEvent(Event e) {
doSomething(e, outClassValue);
}
}
)
}
}
Safe construction practices
- 一种常见的逃逸错误就是在一个对象还没有完全构造完成之前,把this传递给了其他类,这会导致其他类看到的对象还没有完全构造完成,导致一些不可预测的错误
1
2
3
4
5
6
7
8
9
10
11public class MyClass {
private int value;
public MyClass() {
// Do some initialization
// ...
// Don't do this - "this" reference escapes during construction
SomeExternalClass.registerInstance(this);
}
} - 另外一种常见的this引用逃逸的例子是在构造方法中启动一个线程,这个线程或显示的引用this,或隐式的引用this(比如Thread或者Runnable是这个对象的内部类)由于在这个Thread类是MyClass类的匿名内部类,所以可以直接引用MyClass类的成员变量。Thread内部类在MyClass类的构造方法里面创建并开启,这个时候MyClass类的对象还没有初始化完成,但是新开启的线程已经使用了MyClass类的对象里的value成员变量,这会导致严重的错误。因此,我们可以在构造方法里面新建Thread,但是最好不要在构造方法里开启Thread。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class MyClass {
private int value;
public MyClass() {
// Do some initialization
// ...
// Don't do this - starting a thread from the constructor
new Thread(() -> {
// The thread has a reference to the partially constructed object
// This can lead to unexpected behavior
System.out.println("Value from thread: " + value);
}).start();
}
}
Thread confinement
- 访问共享(shared)可变(mutable)的变量需要使用同步机制来保证线程安全,一个最简单的方法去避免线程安全问题就是不共享变量,这种技术称作Thread confinement(线程封闭):一个对象被限制在只被一个线程使用,这样自然而然就没有线程安全的问题了
- 一个常见的例子是JDBC的connection对象,JDBC并不要求connection对象是线程安全的,所以一般的应用都是建立一个JDBC连接池,当servlet的请求到来的时候,去JDBC连接池中获取一个connection对象并使用,由于一个servlet请求都是在一个线程中工作,所以获取的connection对象在当前请求未完成前不会回到连接池,也就不会被多个线程共用。
Stack confinement
- 本地变量被限制在所执行的线程中,它们存在于所执行线程的栈(stack)中,其它线程不可访问。比如下面例子里面的numPairs变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public int loadTheArk(Collection<Animal> candidates) {
SortedSet<Animal> animals;
int numPairs = 0;
Animal candidate = null;
// animals confined to method, don’t let them escape!
animals = new TreeSet<Animal>(new SpeciesGenderComparator());
animals.addAll(candidates);
for (Animal a : animals) {
if (candidate == null || !candidate.isPotentialMate(a))
candidate = a;
else {
ark.load(new AnimalPair(candidate, a));
++numPairs;
candidate = null;
}
}
return numPairs;
}
ThreadLocal
- 更正式地实现线程封闭性的方法是使用
ThreadLocal
,ThreadLocal
提供了get
和set
存取方法,这些方法为每个使用它的线程维护一个单独的值副本,因此get
会返回当前执行线程从set
中获取的最新值。 - 同样是JDBC的例子,这里使用
ThreadLocal
:1
2
3
4
5
6
7
8private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
public Connection initialValue() {
return DriverManager.getConnection(DB_URL);
}
};
public static Connection getConnection() {
return connectionHolder.get();
}
Immutability
- 消除线程安全的另一个方向就是使用不可变(immutable)的变量
- 被声明成final的的变量不一定就是不可变的,因为final只代表这个变量的引用不能变了,但是引用的对象本身其实是可以改变的,比如声明一个
public final List<String> example = new LinkedList<>()
,example变量的引用不能改变了,但是它引用的这个list是可以增加删除元素的。 - An object is immutable if:
- Its state cannot be modified after construction;
- All its fields are final;12 and
- It is properly constructed (the this reference does not escape during
construction).
Example: Using volatile to publish immutable objects
- 通过immutable volatile的OneValueCache来保证线程安全,OneValueCache对象里面的lastNumber和lastFactors如果需要改变,会直接copy一个新的OneValueCache对象,以此保证lastNumber和lastFactors的同步更改。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class OneValueCache {
private final BigInteger lastNumber;
private final BigInteger[] lastFactors;
public OneValueCache(BigInteger i, BigInteger[] factors) {
lastNumber = i;
lastFactors = Arrays.copyOf(factors, factors.length);
}
public BigInteger[] getFactors(BigInteger i) {
if (lastNumber == null || !lastNumber.equals(i))
return null;
else
return Arrays.copyOf(lastFactors, lastFactors.length);
}
}
public class VolatileCachedFactorizer implements Servlet {
private volatile OneValueCache cache = new OneValueCache(null, null);
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
BigInteger[] factors = cache.getFactors(i);
if (factors == null) {
factors = factor(i);
cache = new OneValueCache(i, factors);
}
encodeIntoResponse(resp, factors);
}
}
Safe publication
- 前面几个单元我们主要专注于保证对象不会发布和逃逸,但是在实际的代码中,我们肯定不能保证所有的对象都不逃逸,所以本单元主要介绍如果保证安全的发布
Improper publication: when good objects go bad
- 不安全的发布会导致会严重的后果,比如下面的例子中,两个不同的线程调用Holder构造方法和assertSanity(),导致assertSanity()方法里面两次读到的n不一致,那么会出现AssertionError:
1
2
3
4
5
6
7
8public class Holder {
private int n;
public Holder(int n) { this.n = n; }
public void assertSanity() {
if (n != n)
throw new AssertionError("This statement is false.");
}
}
Immutable objects and initialization safety
- 如前所述,不可变对象可以被安全的发布
Safe publication idioms
- To publish an object safely, both the reference to the object and the ob- ject’s state must be made visible to other threads at the same time. A properly constructed object can be safely published by:
- Initializing an object reference from a static initializer;
- Storing a reference to it into a volatile field or AtomicReference;
- Storing a reference to it into a final field of a properly constructed
object; or - Storing a reference to it into a field that is properly guarded by a
lock.
Effectively immutable objects
- 如果一个对象是可变的,但是其他线程都不会改变它,那么从逻辑上它就是一个不可变的对象,那么它在安全发布以后就可以被随意使用
Mutable objects
- 如果一个对象在发布以后还会被改变,那么安全发布只是保证了对象的可见性,还是需要使用同步机制保证线程安全
Sharing objects safely
- The most useful policies for using and sharing objects in a concurrent program are:
- 线程封闭:A thread-confined object is owned exclusively by and confined to one thread, and can be modified by its owning thread.
- 共享对象,但是只读:A shared read-only object can be accessed concurrently by multiple threads without additional synchronization, but cannot be modified by any thread. Shared read-only objects include immutable and effectively immutable objects.
- 使用线程安全的共享对象,比如Java自带的线程安全集合:A thread-safe object performs synchronization internally, so multiple threads can freely access it through its public interface without further synchronization.
- 被锁保护的对象: A guarded object can be accessed only with a specific lock held. Guarded objects include those that are encapsulated within other thread-safe objects and published objects that are known to be guarded by a specific lock.
Composing Objects
- 前面我们已经介绍了线程安全和同步的基础,但是我们并不希望分析代码的每一部分去确认我们的程序是线程安全的,这一章我们会介绍一些组织Java类的模式来让它们保证线程安全
Designing a thread-safe class
- 线程安全类的设计模式:
- Identify the variables that form the object’s state(找到形成一个对象所有的状态变量)
- Identify the invariants that constrain the state variables(找到所有限制状态变量取值的不变量)
- Establish a policy for managing concurrent access to the object’s
state(建立一个管理并行访问对象状态的策略)
- 一个对象的状态变量就是由它的成员变量(fields)所决定的,如果所有的成员变量都是基本数据类型,那么这个类的线程安全只由这些基本数据类型共同决定。如果成员变量有对于其他类的引用,比如LinkedList,那么线程安全也需要包括所有LinkedList的节点的状态变量。
- 同步策略定义了一个对象是如何协调对其状态变量的访问来保证不违反不变量(invariants)或者后置条件(postconditions),它定义了这个对象如何使用不变性(immutability),线程封闭(thread confinement)以及锁(locking)的结合来确保线程安全的
Gathering synchronization requirements
- 确保一个类是线程安全的意味着在并发访问下保持其不变量成立,这需要对其状态变量进行推理。状态变量都有自己的状态空间(state space),表明状态变量合法的可以取得值的范围
- 大部分类都有一些不变量来说明状态变量是合法还是不合法的。比如一个counter变量是一个长整型,长整型的状态变量是
Long.MIN_VALUE
到Long.MAX_VALUE
,但是我们限制counter只能是非负数,那么小于零就是不合法的。同样的,后置条件也会影响状态变量的取值,比如如果counter目前的值是17,那么它的下一个值就必须是18。但比如我们的变量存储的是当前的温度值,那么下一个温度值就不受前面的温度值影响。 - 一个类也会有不变量是由两个以上状态变量控制的,比如后边我们会介绍的NumberRange类,它声明了一个lower bound和一个upper bound,那么这就存在一个不变量:lower bound不能大于upper bound。如果我们的多线程程序没有考虑这一个不变量,就会导致出现lower bound大于upper bound的问题。
- 综上,如果我们不理解一个类的不变量和后置条件,我们是不能确保线程安全的,因为我们也许需要通过创建一些原子操作或者进行类封装来限制产生不合法的状态
State-dependent operations
- 一些对象会有一些方法具有基于状态的前置条件,比如我们不能从一个空的queue里面移除元素
- 在一个单线程的程序中,如果一个前置条件没有被满足,那么程序只能选择fail;但是对于多线程程序,前置条件可能会达到通过其他线程的操作,所以多线程提供了等待达成前置条件并继续操作的可能。
- Java内置的wait和notify方法就用来实现这种等待,它们和对象的内置锁紧密相关联,我们会在第14章具体介绍它们。或者我们可以使用一些Java内置的类,比如BlockingQueue, Semaphore来达到等待的效果
State ownership
- 以Servlet举例,它使用的ServletContext是线程安全的,我们可以使用它的setAttribute和getAttribute来设置属性,但是ServletContext存储的类的线程安全需要由我们作为使用者来确保。
Instance confinement
- Java封装可以简单的使一个类线程安全,我们称这种方式叫做实例封闭(Instance confinement)。一个线程不安全的类,被封闭在另外一个类中,我们确保只有这个封装的类可以访问这个线程不安全的类,并且采用同步等手段保证其线程安全。
- 以下的例子展示了instance confinement。例子中的HashSet并不是线程安全的,但由于我们将其封装在PersonSet类中,并且使用内置锁保证了线程安全,因此这里我们可以说PersonSet就是一个线程安全的类。需要注意的是,这里我们没有关注Person类的线程安全,如果Person类不是线程安全的,那么我们也需要采取同步措施。
1
2
3
4
5
6
7
8
9
10
11
public class PersonSet {
private final Set<Person> mySet = new HashSet<Person>();
public synchronized void addPerson(Person p) {
mySet.add(p);
}
public synchronized boolean containsPerson(Person p) {
return mySet.contains(p);
}
} - Java自己也使用了instance confinement的方法,比如ArrayList和Hashmap都是线程不安全的,但是Java提供了一些wrapper factory method(Collections.synchronizedList等)。Wrapper实现一些同步的底层集合使用的方法,并且将请求传递给底层的集合
The Java monitor pattern
- 按照前面的instance confinement设计的类就使用了Java的监视器模式。也就是通过把可变的变量封装在一个新的类中,并且使用同步机制保证线程安全的模式
Example: tracking fleet vehicles
- 一个使用Java monitor pattern的例子,虽然MutablePoint类不是线程安全的,但是MonitorVehicleTracker通过使用同步机制使对它的使用是线程安全的。
1
2
3
4
5
6
7
8
9
public class MutablePoint {
public int x, y;
public MutablePoint() { x = 0; y = 0; }
public MutablePoint(MutablePoint p) {
this.x = p.x;
this.y = p.y;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MonitorVehicleTracker {
private final Map<String, MutablePoint> locations;
public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
this.locations = deepCopy(locations);
}
public synchronized Map<String, MutablePoint> getLocations() {
return deepCopy(locations);
}
public synchronized MutablePoint getLocation(String id) {
MutablePoint loc = locations.get(id);
return loc == null ? null : new MutablePoint(loc);
}
public synchronized void setLocation(String id, int x, int y) {
MutablePoint loc = locations.get(id);
if (loc == null)
throw new IllegalArgumentException("No such ID: " + id);
loc.x = x;
loc.y = y;
}
private static Map<String, MutablePoint> deepCopy(Map<String, MutablePoint> m) {
Map<String, MutablePoint> result =
new HashMap<String, MutablePoint>();
for (String id : m.keySet())
result.put(id, new MutablePoint(m.get(id)));
return Collections.unmodifiableMap(result);
}
}
Delegating thread safety
- Java监视器模式对于从头创建一个类或者使用线程不安全的类很有帮助,但是如果已经存在的类已经是线程安全的,我们还需要使用监视器模式来创建一层吗?答案是有的时候可以,有的时候不行,下面我们会具体讨论
- 前面很早我们讨论的CountingFactorizer类使用了AtomicLong类,由于AtomicLong是唯一的状态变量,我们可以说由于AtomicLong类是线程安全的,所以CountingFactorizer类也是线程安全的,也就是说CountingFactorizer类的线程安全性由AtomicLong类代表了。
Example: vehicle tracker using delegation
- 以下的例子展示了通过delegation确保线程安全。由于使用的ConcurrentMap和Point类都是线程安全的,所以DelegatingVehicleTracker类也是线程安全的。
1 |
|
1 |
|
Independent state variables
- 前面我们的delegation的例子都是针对一个线程安全的状态变量(比如AtomicLong和locations)。一个类也可以由多个状态变量进行delegation,但是需要各自是独立的
- 以下是一个由两个独立的状态变量delegation的类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class VisualComponent {
private final List<KeyListener> keyListeners =
new CopyOnWriteArrayList<KeyListener>();
private final List<MouseListener> mouseListeners =
new CopyOnWriteArrayList<MouseListener>();
public void addKeyListener(KeyListener listener) {
keyListeners.add(listener);
}
public void addMouseListener(MouseListener listener) {
mouseListeners.add(listener);
}
public void removeKeyListener(KeyListener listener) {
keyListeners.remove(listener);
}
public void removeMouseListener(MouseListener listener) {
mouseListeners .remove (listener);
}
}When delegation fails
- 当一个类由多个非独立的状态变量组成的时候,并不能仅仅通过每个状态变量的线程安全就判断整个类的线程安全
- 以下是一个线程不安全的例子,假设一个类调用
setLower(5)
,另一个类调用setUpper(4)
,就会出现打破lower<=upper的不变量。执行顺序是:初始的lower是0,upper是10,第一个线程调用setLower(5)
,判断i不大于upper,继续执行,此时第二个线程调用setUpper(4)
,判断i不小于lower,继续执行,然后upper被设置成了4,然后第一个线程恢复执行,lower被设置成了5.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class NumberRange {
// INVARIANT: lower <= upper
private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);
public void setLower(int i) {
// Warning -- unsafe check-then-act
if (i > upper.get())
throw new IllegalArgumentException(
"can’t set lower to " + i + " > upper");
lower.set(i);
}
public void setUpper(int i) {
// Warning -- unsafe check-then-act
if (i < lower.get())
throw new IllegalArgumentException(
"can’t set upper to " + i + " < lower");
upper.set(i);
}
public boolean isInRange(int i) {
return (i >= lower.get() && i <= upper.get());
}
}Adding functionality to existing thread-safe classes
- 本章介绍如何使用已经存在的线程安全的类来扩展我们想要的功能。比如如果一个List已经是线程安全的,并且已经有contains和add方法,而我们想要的是一个put-if-absent方法,我们可以修改List源代码(如果我们可以的话)或者继承已有的类
- 一个继承的例子
1
2
3
4
5
6
7
8
9
10
public class BetterVector<E> extends Vector<E> {
public synchronized boolean putIfAbsent(E x) {
boolean absent = !contains(x);
if (absent)
add(x);
return absent;
}
}Client-side locking
- 除了直接修改源代码和继承,第三种方法是使用一个helper类,但是下面的一个例子由于没有正确的使用同步的锁,而线程不安全。因为list对象使用的是自己的内置锁进行同步,但是putIfAbsent方法使用的是ListHelper的内置锁,这就代表着putIfAbsent和list其他的方法并不用同一个锁,也就是线程不安全的
1
2
3
4
5
6
7
8
9
10
11
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
...
public synchronized boolean putIfAbsent(E x) {
boolean absent = !list.contains(x);
if (absent)
list.add(x);
return absent;
}
} - 为了修复这个问题,我们这里需要使用与list相同的锁,也就是client-side locking
1
2
3
4
5
6
7
8
9
10
11
12
13
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
...
public boolean putIfAbsent(E x) {
synchronized(list){
boolean absent = !list.contains(x);
if (absent)
list.add(x);
return absent;
}
}
}
Composition
- 另外我们也可以使用组合来保证线程安全,下面的例子我们不关心list的具体实现,因为所有的方法我们都会用ImprovedList类的内置锁进行同步。这里其实我们也使用了Java监视器模式。
1
2
3
4
5
6
7
8
9
10
11
12
13
public class ImprovedList<T> implements List<T> {
private final List<T> list;
public ImprovedList(List<T> list) { this.list = list; }
public synchronized boolean putIfAbsent(T x) {
boolean contains = list.contains(x);
if (contains)
list.add(x);
return !contains;
}
public synchronized void clear() { list.clear(); }
// ... similarly delegate other List methods
}
Documenting synchronization policies
- 一个类的线程安全与否要记录下来以方便client,一个类的线程安全策略也应记录下来以方便维护者
Building Blocks
- 前面我们介绍了线程安全的概念以及如何使一个类保证线程安全,其中提到了代理的方法,在我们日常写代码的过程中,我们肯定不希望从头构建新的类,而是希望代理已有的类来保证线程安全,所以这一章将会介绍Java内置的一些同步类
Synchronized collections
- Java 1.2中引入了一些同步集合类,包括Vector和Hashtable,和一些由
Collections.synchronizedXxx
的工厂方法包装的同步类。
Problems with synchronized collections
- 同步类虽然本身是线程安全的,但是并不能很好的处理复合操作(compound actions),比如我们前面提到的put-if-absent和迭代操作(iteration)
- 下面的例子展示了复合操作是如何导致线程安全问题的,如果两个不同的线程分别执行
getLast
和deleteLast
方法,比如A线程执行getLast
并且计算出lastIndex是9(假设vector开始有10个元素),然后此时B线程执行deleteLast
并且计算出lastIndex是9,然后执行list.remove(lastIndex);
,最后回到A线程开始执行return list.get(lastIndex);
。由于此时第十个元素已经被线程B删除了,所以A线程的return list.get(lastIndex);
会报错ArrayIndexOutOfBoundsException1
2
3
4
5
6
7
8public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
} - 想要修复问题,我们需要使用client side locking
1
2
3
4
5
6
7
8
9
10
11
12
13
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
} - 上面同样的线程安全问题也会发生在迭代,因为迭代也是复合操作
1
2for (int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
Iterators and ConcurrentModificationException
- 我们使用Vector作为例子展示了迭代作为复合操作会导致的线程安全问题,但是我们现在常用的Java 5版本引入的Collection的Iterator也有同样的问题,而其采取的解决办法是如果检测到Iterator被使用在多线程中,就会抛出ConcurrentModificationException,具体的实现是内部维护一个modification count,如果在迭代的过程中这个值被修改,那么就会抛出异常。
- 所以如果想要线程安全的使用迭代,那么也需要使用同步机制
- 或者每次当我们要迭代集合的时候,我们就为当前线程复制一个新的集合,由于thread confineness,就可以保证线程安全,但是如果集合太大,必然导致性能开销
Hidden iterators
- 某些代码里面没有明确使用Iterator,但是编译器在编译的时候会使用Iterator,比如这一句想要打印set集合的内容
System.out.println("DEBUG: added ten elements to " + set);
,编译器会调用集合的toString
方法,而这个方法是迭代实现的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class HiddenIterator {
private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {
set.add(i);
}
public synchronized void remove(Integer i) {
set.remove(i);
}
public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG: added ten elements to " + set);
}
}
Concurrent collections
- Java 5使用并发集合提升了同步集合的缺点。使用ConcurrentHashMap代替了同步map,使用了CopyOnWriteArrayList代替了同步list。同时还引入了Queue和BlockingQueue
ConcurrentHashMap
- 不像同步集合有一个固定的锁限制了所有的操作,ConcurrentHashMap使用了一种叫做lock striping的方法来允许更大程度的共享访问:读线程可以无限制的并行访问;读线程也可以并行的和写线程同时操作,并且只有有限制数量的写线程可以修改map
- ConcurrentHashMap的Ieratator也不会像前面提到的普通集合抛出ConcurrentModificationException,而是一种weakly consistent,也就是说Iterator返回的值是在其构建时候的值,如果有别的线程修改了map的值,Iterator并不保证能反映其最新的变动的值
- ConcurrentHashMap为了达到最好的并发效率,也做了一些trade-offs,比如ConcurrentHashMap的size和isEmpty方法返回的都是大概值
- ConcurrentHashMap增加了常用的复合操作,比如put-if-absent, remove-if-equal, replace-if-equal
CopyOnWriteArrayList
- copy-on-write集合的实现线程安全的方式是每次集合被修改的时候,都会复制出来一个全新的集合并返回。CopyOnWriteArrayList也支持Iterator,Iterator返回的元素是其创建时候的元素,并不包括后续修改的元素
- 由于复制需要一些花销,copy-on-write集合更适用于那种迭代操作大于修改操作的程序
Blocking queues and the producer-consumer pattern
- Blocking Queue提供了阻塞的put和take方法,如果queue是满的,那么put方法会block直到queue不是满的;如果queue是空的,那么take方法会block直到queue不是空的
- Blocking Queue支持producer-consumer设计模式。producer-consumer设计模式简化了开发,因为它移除了producer和consumer代码的依赖性。最常见的producer-consumer的设计就是一个线程池前面连接这一个block queue,Java的Executor task execution framework就是使用了这种模式
- Blocking queue分为bounded和unbounded,unbounded blocking在producer的处理速度大于consumer的处理速度的情况下,queue里的元素会越来越多
- Java提供一些实现好的Blocking queue,比如LinkedBlockingQueue和ArrayBlockingQueue都是FIFO queue,而PriorityBlockingQueue可以按照我们自己定义的顺序移出元素
Serial thread confinement
- 对于可变的对象,producer-consumer设计模式使用了serial thread confinement,也就是说可变对象同一时间只能在一个线程里,并保证除了正在使用它的线程别的线程不会改变它。比如一个可变对象正在consumer线程里面处理,那么producer线程不会修改它
Deques and work stealing
- Java 6还增加了另外两种集合类型:Deque和BlockDequeue。Deque是一种两端队列,两端都可以取出元素,具体的实现有ArrayDeque和LinkedBlockingDeque
- 就好像BlockingQueue非常适合producer-consumer模式,Deque非常适合work stealing设计模式。producer-consumer设计对所有消费者使用一个共享的工作队列;在work stealing设计中,每个消费者都有自己的deque。如果一个消费者耗尽了自己的deque中的工作,它可以从其他消费者的deque尾部窃取工作
Blocking and interruptible methods
- BlockingQueue的put和take方法会抛出InterrupedException。如果一个线程抛出InterrupedException,首先代表着这个方法是一个阻塞的方法,另外代表着此刻这个方法被中断了。我们的程序要处理InterrupedException,绝对不能直接将它吞掉
- 一般面对InterrupedException的处理方法有两种
- Propagate the InterruptedException:也就是把InterruptedException层层向上抛出
- Restore the interrupt:有的时候,由于我们无法直接在线程里面抛出异常,比如在Runnable的run()方法是无法抛出异常的,这个时候我们可以让当前线程中断来恢复异常,例子如下
1
2
3
4
5
6
7
8
9
10
11
12public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
...
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
}
Synchronizers
- 同步器是一种用于控制多个线程之间协同工作和同步的机制。同步器可用于确保线程安全、避免竞态条件以及实现线程之间的协作,前面我们介绍的BlocikingQueue就可以用作同步器
Latches
- Latch同步器用于阻塞线程的运行,直到Latch到达它的termination状态
- 一个使用CountingDownLatch的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class TestHarness {
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) { }
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end-start;
}
}
FutureTask
- FutureTask实现了Futurn和Runnable类,常被用作进行异步计算任务,一个FutureTask有三种状态:completion, cancellation, exception,一旦进入completion状态就不可以改变了,所以一个FutureTask如果任务完成以后不能再次启动或取消
- 使用FutureTask预加载数据的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class Preloader {
private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
final Thread thread = new Thread(future);
public void start() { thread.start(); }
public ProductInfo get() throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw launderThrowable(cause);
}
}
/** If the Throwable is an Error, throw it;
if it is a RuntimeException return it,
otherwise throw IllegalStateException */
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
Semaphores(信号量)
- Counting semaphores是被用来控制可以同时访问同一资源或者进行某些动作的同步器。信号量在构造的时候会传入一个初始的允许值(permit),使用的时候可以acquire或者release信号量(acquire会将permit减1,release会将permit加1)。当尝试去acquire信号量,但是没有permit的时候,acquire方法会阻塞
- Binary semaphore(permit初始值是1)是一种不可重入的锁
- 信号量常被用于实现一些资源池,比如数据库连接池,当然我们可以很简单的实现一个固定大小的连接池,只要池子里没有多余的连接,我们就让请求失败,但是更好的实现方式是使用信号量,如果池子里没有多余的可用连接,就让请求阻塞,直到别的线程release连接
- 下面的例子使用信号量来bound collection(当然我们可以直接使用blocking queue,这里只是为了证明Semaphores)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
Barriers
- Barrier允许一组线程互相等待,直到到达某个公共屏障点。一旦所有线程都到达这个点,屏障就会打破,所有线程可以继续执行。Latch和Barrier的区别点是Latch等待的是event,Barrier等待的是其他的线程ready
- Barrier的例子,使用子线程计算subboard的值,都计算结束以后Barrier在计算总的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard .commitNewValues (); }});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public void start(){
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard .waitForConvergence ();
}
}
Building an efficient, scalable result cache
- 这一节介绍了如何构建一个用于Servlet使用的in memory cache,同时需要保证cache的线程安全和性能
- 最简单的思路就是使用HashMap加上synchronized,但是这种cache的性能很低,因为同一时间只能有一个线程访问cache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public interface Computable<A, V> {
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunction implements Computable<String, BigInteger> {
public BigInteger compute(String arg) {
// after deep thought...
return new BigInteger(arg); }
}
public class Memoizer1<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new HashMap<A, V>();
private final Computable<A, V> c;
public Memoizer1(Computable<A, V> c) {
this.c = c;
}
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
} - 一个很简单的提升思路就是使用ConcurrentHashMap,由于ConcurrentHashMap是线程安全的,所以不需要synchronized,这样的话多个线程都可以访问cache。但是这种实现也有一个问题,如果一个线程已经开始计算缓存k1,这个时候另外一个线程也进来想要计算缓存k1,那么这两个线程都会进行计算,导致额外的资源浪费
1
2
3
4
5
6
7
8
9
10
11
12
13public class Memoizer2<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
private final Computable<A, V> c;
public Memoizer2(Computable<A, V> c) { this.c = c; }
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
} - 进一步的提升是使用ConcurrentHashMap + Future,这样如果第二个线程进来想要计算同样的k1,就只会返回一个Future,当计算结束以后可以通过Future.get()获取结果。这种实现仍会有一点小瑕疵,就是极低的概率下,两个线程都进行if语句的判断,发现cache不存在,都构建了Future进行计算,造成的原因是这里是一个compute-if-present操作,但不是原子的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache =
new ConcurrentHashMap<A,Future<V>>();
private final Computable<A, V> c;
public Memoizer3(Computable<A, V> c) { this.c = c; }
public V compute(final A arg) throws InterruptedException {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.put(arg, ft);
ft.run(); // call to c.compute happens here
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
Task Execution
Executing tasks in threads
- 大多数的web server都把每个请求当成一个task,各个请求之间互不影响,通过明确task的边界,并且定义合理的task execution policy,我们可以让task更高效的执行
Executing tasks sequentially
- 最简单的方式就是串行的执行task,但是这种方式的效率太低
Explicitly creating threads for tasks
- 稍微提升的办法是每来一个task,我们都建立一个新的线程来处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
}
Disadvantages of unbounded thread creation
- 无限制的建立线程有以下缺点
- Thread lifecycle overhead:线程创建与销毁会消耗资源
- Resource consumption:线程数太多会造成大量资源浪费在上下文切换
- Stability:线程数有上限,超过会out of memory
The Executor framework
- 类比于前面我们提到过的BlockQueue,使用线程池对于管理线程也是很有益的,Java提供的Executor可以让我们很方便的创建和使用线程池。Executor接口如下。
1
2
3public interface Executor {
void execute(Runnable command);
} - Executor基于producer-consumer设计模式,提交任务的是生产者,执行任务的是消费者,使用Executor是一种最简单的使用producer-consumer设计模式的方法
Example: web server using Executor
- 以下是一个使用Java内置的标准Executor实现(一个100大小的固定线程池)的例子。由于使用了Executor,我们把一个Runnable任务的提交和执行解耦开来了。如果我们想要修改执行的方式,只需要修改Executor即可,这样的改动是很小的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec =
Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
} - 比如我们可以修改TaskExecutionWebServer的Executor,不使用前面的100大小标准线程池,而是每个请求创建一个新线程,这样就类似与我们前面的为每个请求创建新线程的例子。当然,我们也可以修改Executor,让它只单线程的串行执行请求
1
2
3
4
5public class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}所以上面两个例子证明了Executor模式下可以通过简单的修改Executor即可达到对任务的控制,不需要修改别的代码1
2
3
4
5public class WithinThreadExecutor implements Executor {
public void execute(Runnable r) {
r.run();
};
}
Execution policies
- 把任务的提交和执行解耦的最大好处就是我们可以自由的控制执行策略
- In what thread will tasks be executed?
- In what order should tasks be executed (FIFO, LIFO, priority order)?
- How many tasks may execute concurrently?
- How many tasks may be queued pending execution?
- If a task has to be rejected because the system is overloaded, which task should be selected as the victim, and how should the application be noti- fied?
- What actions should be taken before or after executing a task?
Thread pools
- 线程池总是和一个工作队列绑定在一起,工作队列里面存储着等待执行的任务,线程池负责提供空闲的线程执行任务。线程池的优势在于不需要每次处理请求的时候都浪费资源去创建和销毁线程
- Java内置的一些线程池
- newFixedThreadPool. A fixed-size thread pool creates threads as tasks are sub- mitted, up to the maximum pool size, and then attempts to keep the pool size constant (adding new threads if a thread dies due to an unexpected Exception).
- newCachedThreadPool. A cached thread pool has more flexibility to reap idle threads when the current size of the pool exceeds the demand for processing, and to add new threads when demand increases, but places no bounds on the size of the pool.
- newSingleThreadExecutor. A single-threaded executor creates a single worker thread to process tasks, replacing it if it dies unexpectedly. Tasks are guaranteed to be processed sequentially according to the order imposed by the task queue (FIFO, LIFO, priority order).
- newScheduledThreadPool. A fixed-size thread pool that supports delayed and periodic task execution, similar to Timer. (See Section 6.2.5.)
Executor lifecycle
- 前面我们介绍了如何使用建立Executor并执行任务,但是我们没有介绍如何关闭掉Executor。为了管理Executor的生命周期,Java在Executor类的基础上扩展了ExecutorService类,并提供了生命周期的管理
1
2
3
4
5
6
7
8public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// ... additional convenience methods for task submission
} - ExecutorService 暗示的生命周期有三个状态:运行中、正在关闭和已终止。ExecutorService 最初是在运行状态创建的。shutdown 方法启动了一个优雅的关闭过程:不再接受新任务,但允许先前提交的任务完成,包括那些尚未开始执行的任务。shutdownNow 方法启动了一个突然的关闭过程:它尝试取消未完成的任务,并且不启动那些已排队但尚未开始执行的任务。
- 使用ExecutorService管理生命周期的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25class LifecycleWebServer {
private final ExecutorService exec = ...;
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() { handleRequest(conn); }
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown())
log("task submission rejected", e);
}
}
}
public void stop() { exec.shutdown(); }
void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req))
stop();
else
dispatchRequest (req);
}
}
Delayed and periodic tasks
- 对于延迟的或者周期性的任务,使用newScheduledThreadPool而不是timer
Finding exploitable parallelism
- 本章通过一个页面加载html的例子来展示不同程度的并发度
Example: sequential page renderer
- 最简单的实现就是串行处理html文件
1
2
3
4
5
6
7
8
9
10public class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo : scanForImageInfo(source))
imageData .add(imageInfo .downloadImage ());
for (ImageData data : imageData)
renderImage(data);
}
}
Result-bearing tasks: Callable and Future
- Runnable vs Callable: The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception. 说白了就是Java1.0以来一直有Runnable,但是Runnable不能有返回值,也不能抛出异常,但是对于有些并行任务是需要返回值和异常的,所以后面又提出了Callable(https://stackoverflow.com/questions/141284/the-difference-between-the-runnable-and-callable-interfaces-in-java)
- Runnable和Callable都是用来描述可以计算的任务,而任务的生物周期是由Future接口来表现的。Future接口提供了方法查看一个任务是否完成或者取消
1
2
3
4
5
6
7public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit) throws InterruptedException,ExecutionException, CancellationException, TimeoutException;
} - 那么如何创建一个Future去描述一个任务呢?ExecutorService类的submit方法可以接收Callable或者Runnable,然后返回一个Future。或者我们可以显式的直接实现一个FutureTask,FutureTask为Future接口提供了基本实现,FutureTask有两个构造方法,分别接受Callable或者Runnable
Example: page renderer with Future
- 作为并发改造的第一步,我们把页面加载分为两部分,一个任务用来加载文字,一个任务用来下载图片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class FutureRenderer {
private final ExecutorService executor = ...;
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result = new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos)
result.add(imageInfo.downloadImage());
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData)
renderImage(data);
} catch (InterruptedException e) {
// Re-assert the thread’s interrupted status
Thread.currentThread().interrupt();
// We don’t need the result, so cancel the task too
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
Limitations of parallelizing heterogeneous tasks
- 上面的改造看似进行了并行处理,但是只是对不同类型的任务做了并行(加载文字和下载图片),但其实网页加载占据大部分时间的是图片下载,所以即使我们对不同类型的任务做了并行,也不一定会有很大的提升。想要进一步提升并行度,就需要对相同类型的任务做并行处理
CompletionService: Executor meets BlockingQueue
- 我们如果想要获取一系列提交到Executor的任务的结果,可以非常枯燥的去不停地使用Future.get()来尝试获取结果,但是Java提供了一个更简单的方法,使用CompletionService,它可以把完成的Future自动的放到一个存储结果的BlockingQueue里面
- ExecutorService = incoming queue + worker threads
CompletionService = incoming queue + worker threads + output queue
Example: page renderer with CompletionService
- 使用CompletionService来处理每一个图片的下载,也即把每一个图片下载都当成一个独立的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) { this.executor = executor; }
void renderPage(CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for(int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Placing time limits on tasks
- 有的时候我们不想无限制的等待一个任务的结束,比如一个页面加载广告的时候,如果超过两秒还没有收到广告服务器的回复,那么就加载默认的广告。Future接口提供了
get(long timeout, TimeUnit unit)
方法,如果超过设置的timeout时间仍然没有收到结果,就会抛出TimeoutException,我们的程序可以catch TimeoutException,然后取消任务的执行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
// Render the page while waiting for the ad
Page page = renderPageBody();
Ad ad;
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad);
return page;
}
Example: a travel reservations portal
- 前面的例子我们只针对一个任务设置了过期时间,我们也可以对invokeAll方法设置过期时间,invokeAll如果设置了过期时间,那么会在所有的任务已经完成,被中断或者超时的情况下返回,任何没完成的任务都会被取消掉
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;
...
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote (e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote (e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}
Cancellation and Shutdown
- 前面几章我们介绍了线程的创建和启动,但是没有介绍如何停止或者取消线程。Java提供了一种合作机制来停止线程:中断
Task cancellation
- Java语言中,没有安全的方式可以抢占式的停止一个线程或者任务,想要安全的停止一个线程,只能使用合作的方式,即运行的线程和停止执行的代码遵循事先达成的协议。一种最简单的协议是使用一个”cancellation requested flag”,例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
private final List<BigInteger> primes
= new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();
}
} - 从上面这个例子可以看出来,想要取消一个线程或者任务,我们需要定义一个cancellation policy来说明:在什么时候,以怎样的方式,取消什么任务。
Interruption
- 前面PrimeGenerator有一个很严重的问题:如果我们是想要需要一个阻塞的方法,比如BlockingQueue.put,那么前面的例子并不适用。这种情况下我们就应该使用Java自带的取消机制:中断
- 以下是一个阻塞方法不适用前面”cancellation requested flag”的例子,如果queue已经满了,那么
queue.put
会一直阻塞下去,我们的cancel()
方法不起作用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled)
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
}
}
public void cancel() {
cancelled = true;
}
} - 每一个线程都有一个interrupted状态,当一个线程被中断,interrupted状态会被设置为true。Thread类提供了进行中断或者查询interrupted状态的方法:
interrupt()
方法会中断目标线程;isInterrupted()
方法会返回一个线程的interrupted状态;interrupted()
方法会清除当前线程的interrupted状态,并返回清除前的状态,这是Java提供的唯一一种清除interrupted状态的方法。1
2
3
4
5
6public class Thread {
public void interrupt() { ... }
public boolean isInterrupted() { ... }
public static boolean interrupted() { ... }
...
} - 一些阻塞的内置方法,比如
Thread.sleep
和Object.wait
会去检测线程是否中断,如果发现中断,那么会清除interrupted状态,并且抛出InterruptedException,表明阻塞的操作由于中断而提前结束了 - 如果一个线程不是阻塞的,那么对这个线程的中断只是起到把interrupted状态设为true的作用,如果线程本身不处理,那么会一直保留interrupted状态
- 所以说白了,中断其实就是针对Java中那些阻塞的方法的,可以让这些方法取消执行,然后通过抛出InterruptedException告诉调用的类:你调用的阻塞方法被中断了
interrupted()
使用的时候要格外谨慎,如果它的返回值是true,代表清除之前的interrupted状态是true,这个时候一定要抛出InterruptedException- 使用中断修复之前的例子:这里的
Thread.currentThread().isInterrupted()
不是一定需要的,因为本身queue.put
就会接收中断信号。这里的好处是使得程序对于中断的响应更快,也就是在进入耗时比较长的质数计算之前先检测一下interrupted状态1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() {
interrupt();
}
}
Interruption policies
- 类似于任务应该定义好cancellation policy,线程也应该定义好自己的interruption policy。我们需要区分好任务和线程对于中断的反应,很多时候一个中断信号不止会取消任务,也会关闭线程
- 对于那种只是在线程中执行,但是不拥有线程的所有权的任务,比如在线程池中执行Java自带的阻塞库,往往任务只会catch InterruptedException然后继续抛出,因为它们并不拥有线程的所有权
- 由于每个线程都有自己的中断策略,除非你知道对于该线程而言中断意味着什么,否则不应该中断一个线程。
Responding to interruption
- 当我们使用可以中断的阻塞类库,比如
Thread.sleep
和BlockingQueue.put
,往往有两种常见的办法处理InterruptedException:
- Propagate the exception (possibly after some task-specific cleanup), making your method an interruptible blocking method, too; or
- Restore the interruption status so that code higher up on the call stack can deal with it.
- 第一种方法向上传播异常很简单
1
2
3
4
5BlockingQueue<Task> queue;
...
public Task getNextTask() throws InterruptedException {
return queue.take();
} - 如果不能直接向上传播,比如在一个Runnable中不能抛异常,那么需要使用
Thread.currentThread().interrupt();
来保证中断不被吞掉
Example: timed run
- 前面例子中的的PrimeGenerator里的aSecondOfPrimes方法会开启计算质数并在一秒钟后取消,但是有一个问题,我们无法获取计算质数的任务是否抛出异常
- 为了解决这个问题,我们下面例子的timedRun方法会起一个子线程用来中断任务,而任务的执行放在主线程,这样我们就可以在主线程中进行异常的检查。这样看起来很美好,但违背了前面我们讨论的原则:中断的线程必须清除被中断线程的中断原则。子线程的中断任务执行的时候,我们已经对主线程在干嘛一无所知,有可能runnable任务已经执行完,此刻中断信号才进来,而此时主线程可能已经在执行别的代码了
1
2
3
4
5
6
7
8
9
10
11
12private static final ScheduledExecutorService cancelExec = Executors。newScheduledThreadPool(1);
public static void timedRun(Runnable r,
long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
r.run();
} - 提升的版本我们分别用两个子线程分别执行RethrowableTask任务和中断任务,同时使用Thread.join等待RethrowableTask任务,join以后再抛出catch到的异常。这个方法的问题是我们需要依赖join的timeout,如果最后没有异常抛出,我们无法判断是任务正常执行还是join超时了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class TimedRun2 {
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);
public static void timedRun(final Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;
public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}
void rethrow() {
if (t != null)
throw launderThrowable(t);
}
}
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}
}
Cancellation via Future
- 前面我们已经使用过Future来管理任务的生命周期,所以这里我们也可以使用Future来完成任务的取消,
Future.get(boolean mayInterruptIfRunning)
方法可以接收一个boolean值来向执行的任务发送中断信号,由于通过Executor提交的任务线程的中断策略是取消任务,所以我们可以安全的把mayInterruptIfRunning设置为true来进行Executor里面执行任务的中断。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class TimedRun {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();
public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// task will be cancelled below
} catch (ExecutionException e) {
// exception thrown in task; rethrow
throw launderThrowable(e.getCause());
} finally {
// Harmless if task already completed
task.cancel(true); // interrupt if running
}
}
} - 以上的例子在遇到TimeoutException和InterruptedException的时候会执行
task.cancel(true)
来取消任务,同时对于InterruptedException,中断会继续传播到timedRun的caller,在遇到其他ExecutionException的时候会把异常继续抛出
Dealing with non-interruptible blocking
- 前面我们提到了例如
Thread.sleep
等阻塞方法可以被中断,但是Java还有一些阻塞的场景没有实现中断
- Synchronous socket I/O in java.io. The common form of blocking I/O in server applications is reading or writing to a socket. Unfortunately, the read and write methods in InputStream and OutputStream are not re- sponsive to interruption, but closing the underlying socket makes any threads blocked in read or write throw a SocketException.
- Synchronous I/O in java.nio. Interrupting a thread waiting on an Interrupt- ibleChannel causes it to throw ClosedByInterruptException and close the channel (and also causes all other threads blocked on the channel to throw ClosedByInterruptException). Closing an InterruptibleChannel causes threads blocked on channel operations to throw AsynchronousCloseExcep- tion. Most standard Channels implement InterruptibleChannel.
- Asynchronous I/O with Selector. If a thread is blocked in Selector.select (in java.nio.channels), calling close or wakeup causes it to return prema- turely.
- Lock acquisition. If a thread is blocked waiting for an intrinsic lock, there is nothing you can do to stop it short of ensuring that it eventually acquires the lock and makes enough progress that you can get its attention some other way. However, the explicit Lock classes offer the lockInterruptib- ly method, which allows you to wait for a lock and still be responsive to interrupts—see Chapter 13.
Stopping a thread-based service
- Java应用往往不会直接创建线程,而是通过创建服务(比如线程池)来管理线程。由于线程的所有权具有不可传递性,服务建立的线程,我们不能直接在应用中停止它,而是还是需要通过服务来停止。
Example: a logging service
- 以一个producer-consumer模式的日志服务举例,以下的代码通过中断达到停止日志服务的目的,但是这种实现有一个问题:当作为consumer的LoggerThread被中断的时候,作为producer的LogWritter并不知道,还是会一直往queue里面发日志直到阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
private class LoggerThread extends Thread {
private final PrintWriter writer;
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
} - 一个进阶版的实现:这里没有直接同步
log
方法,而是只是同步isShutdown状态的改变,好处是这样的同步没有带上queue.put(msg)
,BlockingQueue本身就是线程安全的,我们再往其上面加锁只会导致锁的力度太大。另外这里使用reservation变量,可以保证当停止日志服务的时候,queue里剩余的日志都会被处理完毕,随后writer才会关闭,随后LoggerThread停止1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
private boolean isShutdown;
private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;
}
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
ExecutorService shutdown
- ExecutorService提供两种停止方法:shutdown和shutdownNow。shutdownNow会立刻停止ExecutorService,但是会存在执行到一般的任务被打断,shutdown会把queue中所有的任务执行完成以后才会停止
- 使用ExecutorService的日志服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class LogService {
private final ExecutorService exec = newSingleThreadExecutor(); ...
public void start() { }
public void stop() throws InterruptedException {
try {
exec.shutdown();
exec.awaitTermination(TIMEOUT, UNIT);
} finally {
writer.close();
}
}
public void log(String msg) {
try {
exec.execute(new WriteTask(msg));
} catch (RejectedExecutionException ignored) { }
}
}Poison pills
- 对于producer-consumer模式的服务,还有一种办法可以优雅的停止,当想要停止服务的时候,我们通过producer放入一个POISON到queue中,然后producer不再放入数据;当consumer收到POISON的时候,停止线程执行
Limitations of shutdownNow
- shutdownNow方法会取消正在执行的任务,并且返回所有提交到ExecutionService,但还没有开始执行的任务。一个很严重的缺陷是上述的返回值中不包括取消的正在执行的任务。
Handling abnormal thread termination
- 子线程在运行的过程中可能遇到异常,我们可以主动的处理异常,同时通知主线程发生的异常
1
2
3
4
5
6
7
8
9
10public void run() {
Throwable thrown = null;
try {
while (!isInterrupted()) runTask(getTaskFromWorkQueue ());
} catch (Throwable e) {
thrown = e;
} finally {
threadExited(this, thrown);
}
}
Uncaught exception handlers
- 上一章我们介绍了如何主动处理线程运行过程中的异常,Java线程同样也提供了UncaughtExceptionHandler机制来处理异常,默认的UncaughtExceptionHandler会把stack trace打印到system.err
- 需要注意的是,UncaughtExceptionHandler只对
Executor.execute(Runnable r)
起作用,对于Future f = ExecutionService.submit(Runnable or Callable)
不起作用,相应的异常被包裹在ExecutionException中,通过Future.get
来获取
JVM shutdown
- JVM有两种停止的方式:正常和突然。正常的关闭通过最后一个非守护线程调用
System.exit
,或者发送SIGINT信号或使用Ctrl-C;突然的关闭通过调用Runtime.halt
或者操作系统直接杀死JVM进程
Shutdown hooks
- JVM正常的关闭会调用所有注册的shutdown hooks,JVM并不保证shutdown hooks的执行顺序,当所有的shutdown hooks执行完毕以后,如果flag runFinalizersOnExit是true,那么JVM会执行finalizers。如果shutdown hooks和finalizers都执行完毕,那么JVM会停止;如果
shutdown hooks或finalizers执行遇到问题而阻塞,那么JVM需要通过突然的关闭来停止
Daemon threads
- 线程被分为两个种类:普通线程和守护线程,所有由主线程创建的子线程都是普通线程。当JVM停止的时候,所有守护线程会立刻停止,并不会执行finally块,所以守护线程必须是那些不需要清理的线程,常常用于进行垃圾回收或者资源清理
Finalizers
- finalizer方法是Java为每个对象提供的主动进行垃圾回收的办法,一般来说,程序通过使用finally块足以进行资源的回收。finalizer很容易使用错,所以避免使用它
Applying Thread Pools
- 第6章介绍了通过Executor和ExecutionService执行任务,以简化线程的使用,本章将介绍如何配置Executor和ExecutionService创建的thread pool
Implicit couplings between tasks and execution policies
- 不同的任务,我们ExecutionService的策略也必然需要对应的调整
Thread starvation deadlock
- 如果我们使用
Executors.newSingleThreadExecutor
,同时提交的任务依赖于提交的其他子任务,就会造成这种饥饿死锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class LoadFileTask implements Callable<String> {
private final String fileName;
public LoadFileTask(String fileName) {
this.fileName = fileName;
}
public String call() throws Exception {
// Here's where we would actually read the file
return "";
}
}
public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
// Will deadlock -- task waiting for result of subtask
return header.get() + page + footer.get();
}
private String renderBody() {
// Here's where we would actually render the page
return "";
}
}
}
Long-running tasks
- 如果我们的任务都是耗时很长的任务,可能会导致线程资源不够用
Sizing thread pools
- 书中给了一个公式大概的计算合适的线程池大小
Configuring ThreadPoolExecutor
- Executor类提供了newCachedThreadPool, newFixedThreadPool, newScheduledThreadExecutor等默认的线程池,如果我们想要定制化自己的线程池,可以使用ThreadPoolExecutor
Thread creation and teardown
- 以下是ThreadPoolExecutor的参数:corePoolSize是线程池默认维护的线程数,即使真正执行的任务数少于这个值,只有当队列满的时候,线程池才会扩容;maximumPoolSize是能扩容的最大线程数;keepAliveTime是当一个线程闲置超过这个时间,并且线程数大于corePoolSize的时候,线程会被销毁
1
2
3
4
5
6
7
8
9public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{ ... } - newFixedThreadPool会把corePoolSize和maximumPoolSize设置成相同的值,生成一个固定大小的线程池;newCachedThreadPool会把corePoolSize设置成0,maximumPoolSize设置成Integer.MAX_VALUE,keepAliveTime设置成一分钟,生成一个可以根据流量自动伸缩的线程池
Managing queued tasks
- ThreadPoolExecutor允许提供一个BlockingQueue来处理进入线程池的任务,通常有三种基本策略:unbounded queue, bounded queue, and synchronous handoff
Saturation policies
- 如果使用的是bounded queue,那么相对应的我们需要配置饱和策略,饱和策略可以通过
setRejectedExecutionHandler
方法配置,一些提供的饱和策略有:AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy - 默认的abort policy会抛出RejectedExecutionException;DiscardPolicy会把新任务抛弃掉;DiscardOldestPolicy会把最老的任务抛弃掉
Thread factories
- ThreadPoolExecutor通过线程工厂创建新的线程,默认的线程工厂创建一个没有额外配置的普通线程,如果想要定制化的线程,我们可以创建自己的线程工厂
Customizing ThreadPoolExecutor after construction
- ThreadPoolExecutor创建好以后,还可以通过自带的一些set方法配置前面提到的参数
Extending ThreadPoolExecutor
- ThreadPoolExecutor提供了一些hooks,以方便我们扩展,包括beforeExecute, afterExecute, terminated。具体可以参考书中的例子
Parallelizing recursive algorithms
- 对于递归的程序,可以使用并行处理,比串行处理效率高,书中有例子
Avoiding Liveness Hazards
Deadlock
- 哲学家就餐问题常用来讨论并行计算中多线程同步的问题:https://zh.wikipedia.org/wiki/%E5%93%B2%E5%AD%A6%E5%AE%B6%E5%B0%B1%E9%A4%90%E9%97%AE%E9%A2%98。其中一种哲学家就餐问题的解法是每个哲学家都先拿自己左手边的筷子,然后再拿右手边的筷子,这样就会造成死锁问题。因为当五个哲学家拿了自己左手边的筷子以后,桌子上的五支筷子就都被拿完了,哲学家再想拿右手边的筷子就会阻塞住,也就是死锁
Lock-ordering deadlocks
- 由于使用了多个锁,且多个锁执行顺序的不同,可能导致死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class LeftRightDeadlock {
private final Object left = new Object();
private final Object right = new Object();
public void leftRight() {
synchronized (left) {
synchronized (right) {
doSomething();
}
}
}
public void rightLeft() {
synchronized (right) {
synchronized (left) {
doSomethingElse();
}
}
}
void doSomething() {
}
void doSomethingElse() {
}
} - 所以可以看出来死锁是由于多个锁的获取顺序不同导致的,假设程序中获取锁的顺序全是一致的,那么就会避免死锁问题
Dynamic lock order deadlocks
- 有的时候死锁并不像前一个例子这样好观察到,比如下面的例子,如果我们有两个账户A和B,两个线程执行
transferMoney(A, B, 100)
和transferMoney(B, A, 50)
,就有可能造成死锁,因为当第一个线程想要拿B账户的锁的时候,B账户的锁已经被第二个线程获取了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static void transferMoney(Account fromAccount,
Account toAccount,
DollarAmount amount)
throws InsufficientFundsException {
synchronized (fromAccount) {
synchronized (toAccount) {
if (fromAccount.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
} - 为了解决以上死锁问题,我们引入对象的hashcode,来确保获取锁的顺序是一致的,要不都是先A后B,要不都是先B后A。另外为了预防两个对象的hashcode一致(概率很低),我们增加一个tieLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public void transferMoney(final Account fromAcct,
final Account toAcct,
final DollarAmount amount)
throws InsufficientFundsException {
class Helper {
public void transfer() throws InsufficientFundsException {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
}
}
}
int fromHash = System.identityHashCode(fromAcct);
int toHash = System.identityHashCode(toAcct);
if (fromHash < toHash) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {
synchronized (toAcct) {
synchronized (fromAcct) {
new Helper().transfer();
}
}
} else {
synchronized (tieLock) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
}
}
}
Resource deadlocks
- Say you have two pooled resources, such as connection pools for two different databases. Resource pools are usually implemented with semaphores (see Section 5.5.3) to facilitate blocking when the pool is empty. If a task requires connections to both databases and the two resources are not always requested in the same order, thread A could be holding a connection to database D1 while waiting for a connection to database D2, and thread B could be holding a connection to D2 while waiting for a connection to D1
- 另外一种资源死锁是前面提到的thread-starvation deadlock
Avoiding and diagnosing deadlocks
Timed lock attempts
- 使用一个具有timeout时间的tryLock,这样即使出现死锁,timeout结束以后也会自动解开
Deadlock analysis with thread dumps
- 可以分析thread dump来分析死锁,具体见书中例子
Other liveness hazards
Starvation
- 饥饿指的是一个线程无法获取到需要的资源,从而无法继续执行。常见的饥饿有由于设置了线程的优先级,导致低优先级的线程一直无法得到执行。
- 饥饿强调的是由于无法获取需要的资源而导致线程无法继续运行,而死锁强调的是两个线程互相等待对方释放资源而卡死的情况
Livelock
- 活锁指的是一个线程尽管没有被阻塞,但是由于不停地重试某些任务,导致程序没有make progress,也可以理解为在做无用功。比如一个有问题的重试机制,重试前请求锁A,但是重试失败,释放锁A;然后继续尝试获取锁A,重试还是失败。
- 说白了死锁是加不上就死等,活锁是加不上就放开已获得的资源重试
- 另外一种活锁是两个线程互相谦让资源,导致程序没法继续执行,类似于路上的两个人互相让路,但都一直往同方向让
- 解决活锁问题的办法一般是引入一些随机时间
Performance and Scalability
Reducing lock contention
- 减少锁的竞争的三种方向:
- 减少锁持续的时间
- 减少请求锁的频率
- 将独占锁替换为允许更大并发性的协调机制
Narrowing lock scope (“Get in, get out”)
- 减少锁的范围,也就是把不需要锁的代码移除synchronized块
Reducing lock granularity
- 减少锁的力度,可以通过lock splitting或者lock striping来达到,lock splitting的意思就是本来只用一个锁保护的对象,我们改为使用多个锁
Lock striping
- 一个例子就是ConcurrentHashMap,它使用一个含有16个锁的数组,每个锁负责保护1/16的hashed bucket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class StripedMap {
// Synchronization policy: buckets[n] guarded by locks[n%N_LOCKS]
private static final int N_LOCKS = 16;
private final Node[] buckets;
private final Object[] locks;
private static class Node {
Node next;
Object key;
Object value;
}
public StripedMap(int numBuckets) {
buckets = new Node[numBuckets];
locks = new Object[N_LOCKS];
for (int i = 0; i < N_LOCKS; i++)
locks[i] = new Object();
}
private final int hash(Object key) {
return Math.abs(key.hashCode() % buckets.length);
}
public Object get(Object key) {
int hash = hash(key);
synchronized (locks[hash % N_LOCKS]) {
for (Node m = buckets[hash]; m != null; m = m.next)
if (m.key.equals(key))
return m.value;
}
return null;
}
public void clear() {
for (int i = 0; i < buckets.length; i++) {
synchronized (locks[i % N_LOCKS]) {
buckets[i] = null;
}
}
}
} - Lock striping的缺点是有些操作可能需要获取全部的锁
Alternatives to exclusive locks
- 除了互斥锁,还有一些Java内置的更高效的提升并行速度的类,比如ReadWriteLock和Atomic variables
Explicit Locks
- 在Java 5之前,唯一的协调访问共享变量的机制是synchronized和volatile,Java 5之后,Java提供了一种ReentrantLock,在某些内置锁(synchronized)使用受限的场景下,ReentrantLock是一种很好的选择
Lock and ReentrantLock
- Lock接口定义了一些锁的基本操作,不像内置锁,Lock接口提供了unconditional(无条件), polled(轮询), timed(定时), interruptible(中断)等获取锁的方式。ReentrantLock类实现了Lock接口,提供了和synchronized块相同的mutual exclusion(互斥性)和内存可见性,以及相同的可重入性
1
2
3
4
5
6
7public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException; void unlock();
Condition newCondition();
} - 既然ReentrantLock类实现了和synchronized块相同的机制,为什么需要ReentrantLock类呢?这是由于synchronized块在大多数情况下都可以使用,但在诸如中断和定时等场景不适用
- ReentrantLock类的使用哲学:一定一定要记住在finally块里释放锁,由于与synchronized块不同,synchronized块是在块的结束自动释放锁,而ReentrantLock类是需要程序员手动获取和释放锁的,所以一旦忘记释放锁,那么会造成程序的错误
1
2
3
4
5
6
7
8Lock lock = new ReentrantLock(); ...
lock.lock();
try {
// update object state
// catch exceptions and restore invariants if necessary
} finally {
lock.unlock();
} - 因此,ReentrantLock类应该只使用在synchronized块不能适用的场景下
Polled and timed lock acquisition
- Lock接口的tryLock()方法可以在发生死锁的时候自动恢复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class TimedLocking {
private Lock lock = new ReentrantLock();
public boolean trySendOnSharedLine(String message,
long timeout, TimeUnit unit)
throws InterruptedException {
long nanosToLock = unit.toNanos(timeout)
- estimatedNanosToSend(message);
if (!lock.tryLock(nanosToLock, NANOSECONDS))
return false;
try {
return sendOnSharedLine(message);
} finally {
lock.unlock();
}
}
private boolean sendOnSharedLine(String message) {
/* send something */
return true;
}
long estimatedNanosToSend(String message) {
return message.length();
} - tryLock也可以在给定的定时时间内尝试获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13public boolean trySendOnSharedLine(String message,
long timeout, TimeUnit unit)
throws InterruptedException {
long nanosToLock = unit.toNanos(timeout)
- estimatedNanosToSend(message);
if (!lock.tryLock(nanosToLock, NANOSECONDS))
return false;
try {
return sendOnSharedLine(message);
} finally {
lock.unlock();
}
}
Interruptible lock acquisition
- 可以使用Lock接口的lockInterruptibly()来等待获取锁的时候进行中断
1
2
3
4
5
6
7
8
9public boolean sendOnSharedLine(String message)
throws InterruptedException {
lock.lockInterruptibly();
try {
return cancellableSendOnSharedLine(message);
} finally {
lock.unlock();
}
}
Non-block-structured locking
- synchronized块的锁获取和释放在块的开始和结束,如果我们想要更灵活的使用锁,可以使用ReentrantLock
Performance considerations
- Java 5之前,ReentrantLock比synchronized性能强很多,但是Java 6以后,synchronized仿照ReeentrantLock重写了锁的实现,所以ReentrantLock只比synchronized性能略强
Fairness
- ReentrantLock可以提供公平或者不公平的锁:公平锁会严格按照获取锁的顺序分配锁;不公平锁则没有严格的顺序,所以可能出现当锁空闲的时候,恰好有一个线程想要获取它,锁也就给它了,即使前面还有别的线程在等待这个锁
- 一般来说,我们不需要公平锁,因为它会带来额外的开销
Choosing between synchronized and ReentrantLock
- ReentrantLock is an advanced tool for situations where intrinsic locking is not practical. Use it if you need its advanced features: timed, polled, or interruptible lock acquisition, fair queueing, or non-block-structured locking. Otherwise, prefer synchronized.
Read-write locks
- ReentrantLock实现了一种标准的互斥锁:同一时间只有一个线程可以获取锁。但是有的时候,我们不需要这么严格的互斥条件,所以Java也提供了一种ReadWriteLock接口:同一个锁可以被多个reader获取,或者只能被一个writer获取,且不能既获取读锁也获得写锁
1
2
3public interface ReadWriteLock { Lock readLock();
Lock writeLock();
} - 读写锁的实现需要考虑以下因素:
- Release preference. When a writer releases the write lock and both readers and writers are queued up, who should be given preference—readers, writers, or whoever asked first?
- Reader barging. If the lock is held by readers but there are waiting writers, should newly arriving readers be granted immediate access, or should they wait behind the writers? Allowing readers to barge ahead of writers en- hances concurrency but runs the risk of starving writers.
- Reentrancy. Are the read and write locks reentrant?
- Downgrading. If a thread holds the write lock, can it acquire the read lock without releasing the write lock? This would let a writer “downgrade” to a read lock without letting other writers modify the guarded resource in the meantime.
- Upgrading. Can a read lock be upgraded to a write lock in preference to other waiting readers or writers? Most read-write lock implementations do not support upgrading, because without an explicit upgrade operation it is deadlock-prone. (If two readers simultaneously attempt to upgrade to a write lock, neither will release the read lock.)
- Java实现了ReentrantReadWriteLock,并且支持公平锁或者非公平锁。对于非公平读写锁,给予锁的顺序不按照线程获取锁的时间,同时允许降级不允许升级
Building Custom Synchronizers
- Java类库中有很多状态不独立(state-dependent)的类,比如FutureTask, Semaphore和 BlockingQueue。我们不能从空的BlockingQueue中获取元素,也不能获取FutureTask的结果如果FutureTask还没有执行完,所以说这些state-dependent类都具有一些基于状态的前置条件。
- 想要构造一个state-dependent类,最简单的办法自然是使用Java类库中提供的类,但如果我们需要构造全新的state-dependent类该怎么办呢?本章将介绍intrinsic condition queues,Condition对象,AbstractQueuedSynchronizer
Managing state dependence
- 本章,我们将尝试去用不同的办法编写一个bounded buffer类,包含两个前置条件的操作:put和take。我们不能从一个空的bounded buffer中取出元素,也不能向一个满的bounded buffer中放入元素。本章后边的类都会extend BaseBoundedBuffer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class BaseBoundedBuffer <V> {
private final V[] buf;
private int tail;
private int head;
private int count;
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length)
tail = 0;
++count;
}
protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length)
head = 0;
--count;
return v;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}
Example: propagating precondition failure to callers
- 最简单的bounded buffer是在两种前置条件(buffer empty or full)的时候抛出异常,然后由使用bounded buffer的代码catch异常并处理。但是这种办法无疑加大了client端的复杂度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
public GrumpyBoundedBuffer() {
this(100);
}
public GrumpyBoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws BufferFullException {
if (isFull())
throw new BufferFullException();
doPut(v);
}
public synchronized V take() throws BufferEmptyException {
if (isEmpty())
throw new BufferEmptyException();
return doTake();
}
}
class ExampleUsage {
private GrumpyBoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;
void useBuffer() throws InterruptedException {
while (true) {
try {
String item = buffer.take();
// use item
break;
} catch (BufferEmptyException e) {
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
}
class BufferFullException extends RuntimeException {
}
class BufferEmptyException extends RuntimeException {
}
Example: crude blocking by polling and sleeping
- 为了减少client使用我们创建的bounded buffer的难度,我们把检查前置条件放到我们类的代码里。可以看出来为了实现bounded buffer,我们的代码比较复杂
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
int SLEEP_GRANULARITY = 60;
public SleepyBoundedBuffer() {
this(100);
}
public SleepyBoundedBuffer(int size) {
super(size);
}
public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty())
return doTake();
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
Condition queues to the rescue
- 为了简化bounded buffer的实现,我们将介绍java自带的condition queue。condition queue的名字是由于使用condition queue的线程都在等待特定的条件变为true,就很像有一条队列,队列里面都是在等待前置条件的线程。
- 类似于每一个Java对象都可以作为内置锁(intrinsic lock),每一个对象也都可以作为intrinsic condition queue,并且提供了wait, notify, notifyAll三个方法。
- intrinsic condition queue和intrinsic lock紧密相关,在使用intrinsic condition queue之前,线程必须已经获取相同对象的intrinsic lock。An object’s intrinsic lock and its intrinsic condition queue are related: in order to call any of the condition queue methods on object X, you must hold the lock on X. This is because the mechanism for waiting for state-based conditions is necessarily tightly bound to the mechanism for preserving state consistency: you cannot wait for a condition unless you can examine the state, and you cannot release another thread from a condition wait unless you can modify the state.
- 调用Object.wait()方法会释放已经获取的intrinsic lock,表示当前线程进入等待状态,把锁释放掉以便别的线程去改变变量的状态。当线程“苏醒”的时候,会再次获取锁。
- 使用intrinsic condition queue实现的bounded buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
// CONDITION PREDICATE: not-full (!isFull())
// CONDITION PREDICATE: not-empty (!isEmpty())
public BoundedBuffer() {
this(100);
}
public BoundedBuffer(int size) {
super(size);
}
// BLOCKS-UNTIL: not-full
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait();
doPut(v);
notifyAll();
}
// BLOCKS-UNTIL: not-empty
public synchronized V take() throws InterruptedException {
while (isEmpty())
wait();
V v = doTake();
notifyAll();
return v;
}
// BLOCKS-UNTIL: not-full
// Alternate form of put() using conditional notification
public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}
}
Using condition queues
The condition predicate
- 使用condition queue的关键是找到造成一个操作状态不独立(state-dependent)的前置条件(condition predicate),比如bounded buffer的两个前置条件分别是:buffer满的时候不能加入元素;buffer空的时候不能获取元素
- 由于前置条件都是针对状态变量而言的,所以在使用Object.wait()之前,线程必须已经获取同样的状态变量的锁
Waking up too soon
- 一个intrinsic condition queue,可能被多个前置条件使用,比如前面例子中,我们对于两个前置条件,使用了同一个对象的
wait()
方法。假设这个时候这个对象的notifyAll()
方法被调用,所有正在wait()
的线程都会被唤醒。当控制重新进入调用wait的代码时,它已经重新获取了与condition queue相关联的锁。此时前置条件是否为真?可能是,也可能不是。在通知线程调用notifyAll时,前置条件可能是真的,但在重新获取锁的时候可能又变为假。在你的线程被唤醒和wait重新获取锁之间,其他线程可能已经获取了锁并改变了对象的状态。或者自从调用wait以来,条件可能一直不为真。你不知道另一个线程为什么调用了notify或notifyAll;也许是因为与同一condition queue相关联的另一个前置条件变为真了。每个condition queue上存在多个前置条件是相当常见的——例如,BoundedBuffer在同一个condition queue上用于“未满”和“非空”前置条件。 - 出于以上原因,
Object.wait()
一定是在一个循环中被调用,且循环的判断条件就是前置条件。以下是condition queue的规范用法1
2
3
4
5
6
7
8void stateDependentMethod() throws InterruptedException {
// condition predicate must be guarded by lock
synchronized(lock) {
while (!conditionPredicate())
lock.wait();
// object is now in desired state
}
} - When using condition waits (Object.wait or Condition.await):
- Always have a condition predicate—some test of object state that
must hold before proceeding; - Always test the condition predicate before calling wait, and again
after returning from wait; - Always call wait in a loop;
- Ensure that the state variables making up the condition predicate are
guarded by the lock associated with the condition queue; - Hold the lock associated with the the condition queue when calling
wait, notify, or notifyAll; and - Do not release the lock after checking the condition predicate but
before acting on it.
Missed signals
- Missed signals指的是调用了notify或者notifyAll,但是信号没有收到,这可能是程序实现的有问题
Notification
- 前面我们介绍了condition queue的一半:
Object.wait()
,另一半是notification。我们需要确保当前置条件不满足并调用Object.wait()
以后,我们能够通过Object.notify()
或者Object.notifyAll()
唤醒等待的线程。 - condition queue提供两种通知机制:
Object.notify()
会通知JVM选择一个等待该condition queue的线程并唤醒;Object.notifyAll()
会通知所有等待该condition queue的线程并唤醒。在调用Object.notify()
或者Object.notifyAll()
之前,线程必须获取相同对象的锁,然后快速的释放掉锁,以方便其他等待该condition queue的线程可以获取锁 - 下面以put方法举例说明整个Lock和Condition运作的方式:
1
2
3
4
5
6
7
8public synchronized void put(V v) throws InterruptedException {
//进入方法的时候,已经获取了这个bounded queue对象的内置锁
while (isFull()) //假设一开始queue是满的
wait(); //由于锁在bounded queue身上,我们调用wait,进入等待状态并释放掉bounded queue对象身上的锁,以方便其他线程从queue中移出元素
//当另外一个线程调用过take以后,会调用take方法里面的notifyAll,本线程的wait()也会被通知并唤醒。唤醒的第一时间会重新获取bounded queue对象的锁,且由于queue不是满的,所以退出循环
doPut(v);//加入元素
notifyAll();//进行通知
} - 通知的原则是尽量使用notifyAll,而不使用notify,除非我们能确保要通知的线程只有一个,原因见下
- BoundedBuffer provides a good illustration of why notifyAll should be pre- ferred to single notify in most cases. The condition queue is used for two different condition predicates: “not full” and “not empty”. Suppose thread A waits on a condition queue for predicate PA, while thread B waits on the same condition queue for predicate PB. Now, suppose PB becomes true and thread C performs a single notify: the JVM will wake up one thread of its own choosing. If A is chosen, it will wake up, see that PA is not yet true, and go back to waiting. Meanwhile, B, which could now make progress, does not wake up. This is not exactly a missed signal. it’s more of a “hijacked signal”, but the problem is the same: a thread is waiting for a signal that has (or should have) already occurred.
Example: a gate class
- 以下是一个Gate类,在关闭的时候,所有的线程不能继续运行,开启以后所有等待的线程可以运行。这里面我们的前置条件不止是isOpen,因为会有门开了以后,所有之前等待的线程都被通知并依次执行,但此时门又关掉了,对于哪些没来及执行完的线程,他们应该可以通过并继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadGate {
// CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
private boolean isOpen;
private int generation;
public synchronized void close() {
isOpen = false;
}
public synchronized void open() {
++generation;
isOpen = true;
notifyAll();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation)
wait();
}
}
Explicit condition objects
- 类似于前面我们介绍完intrinsic lock以后介绍了explicit lock,在某些intrinsic lock不适用的场景下可以使用explicit lock。本章也会介绍explicit lock,类似于intrinsic lock和intrinsic condition是一对,explicit condition也和explicit lock是一对,当我们想要创建explicit condition的时候调用
Lock.newCondition
- explicit condition不像intrinsic condition,对于一个锁,可以创建多个condition,以下是使用explicit condition的bounded queue,这里我们对于两个前置条件分别创建一个condition对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ConditionBoundedBuffer <T> {
protected final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// CONDITION PREDICATE: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();
private static final int BUFFER_SIZE = 100;
private final T[] items = (T[]) new Object[BUFFER_SIZE];
private int tail, head, count;
// BLOCKS-UNTIL: notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// BLOCKS-UNTIL: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
} - Hazard warning: The equivalents of wait, notify, and notifyAll for Condition objects are await, signal, and signalAll. However, Con- dition extends Object, which means that it also has wait and no- tify methods. Be sure to use the proper versions—await and signal— instead!
Anatomy of a synchronizer
- Java提供的synchronizer,比如 ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock, SynchronousQueue, FutureTask,都是通过AbstractQueuedSynchronizer(AQS)类实现的
AbstractQueuedSynchronizer
- AQS的基本操作有两个:acquire和release。acquire根据AQS维持的状态来判断是阻塞线程,还是让线程继续执行,acquisition可以是互斥的(比如ReentrantLock),也可以是非互斥的(比如Semaphore和CountDownLatch);release会释放阻塞的线程。
- AQS通过getState, setState, compareAndSetStat来改变和管理维持的状态
- 所以一个acquire操作往往分为两部分:首先根据synchronizer维持的当前状态判断线程是继续执行还是阻塞,然后可能会改变synchronizer的状态。比如如果一个线程acquire了synchronizer,是会影响到其它线程还能否获取synchronizer,因此状态的改变是可能的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15boolean acquire() throws InterruptedException {
while (state does not permit acquire) {
if (blocking acquisition requested) {
enqueue current thread if not already queued block current thread
} else
return failure
}
possibly update synchronization state dequeue thread if it was queued return success
}
void release() {
update synchronization state
if (new state may permit a blocked thread to acquire)
unblock one or more queued threads
} - 当我们编写一个synchronizer的时候,对于exclusive acquisition需要实现tryAcquire, tryRelease, isHeldExclusively, 对于shared acquisition需要实现tryAcquireShared, tryReleaseShared.
- Returning a negative value from tryAcquireShared indicates acquisition failure; returning zero indicates the synchronizer was acquired exclusively; and returning a positive value indicates the synchronizer was ac- quired nonexclusively. The tryRelease and tryReleaseShared methods should return true if the release may have unblocked threads attempting to acquire the synchronizer.
A simple latch
- 以下是一使用AQS实现的latch,状态表示latch的开启还是关闭,0代表关闭,1代表开启。await方法调用acquireSharedInterruptibly,进而调用tryAcquireShared。tryAcquireShared里面会判断当前状态,如果是开启就是返回1,表示线程可以继续,否则返回-1表示线程阻塞。tryReleaseShared会将状态设置回1,表示latch开启,同时返回true表示release成功,通知其他阻塞在await的线程可以继续进行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() {
sync.releaseShared(0);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
// Succeed if latch is open (state == 1), else fail
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored) {
setState(1); // Latch is now open
return true; // Other threads may now be able to acquire
}
}
}
AQS in java.util.concurrent synchronizer classes
- 介绍了一下其他实现AQS的synchronizer,具体代码见书
Atomic Variables and Nonblocking Synchronization
- 前面的章节中我们一直在介绍如何使用锁使得并行程序安全运行,本章将介绍如何使用low-level的原子性的机器操作,比如compare-and-swap来移除掉锁的使用,使得程序的性能提升。由于不使用锁,程序的线程之间不再需要竞争,线程的运行也不会彼此阻塞,所以称为nonblocking algorithms
- Nonblocking algorithms不存在死锁,活锁等liveness问题。在Java 5之前,Java没有提供相应的nonblocking algorithms的支持,直到Java 5推出了atomic variable classes
Disadvantages of locking
- 锁之间进行context switch消耗资源
- 如果一个占用了锁的线程具有较高的线程优先级,且这个线程长时间阻塞,俺么所有等待这个锁的线程都会阻塞
Hardware support for concurrency
- 互斥锁是一种悲观的方式,它确保一个线程只有在获得锁的时候才会安全;而对一些优化比较好的操作,通常存在一种乐观的方式,可以在不被中断的情况下完成整个原子操作。早期的处理器提供了诸如test-and-set, fetch-and-increment, swap的原子操作,现代的处理器提供了compare-and-swap, load-linked/store-conditional等read-modify-write的原子操作。
Compare and swap
- CAS has three operands—a memory location V on which to operate, the expected old value A, and the new value B. CAS atomically updates V to the new value B, but only if the value in V matches the expected old value A; otherwise it does nothing. In either case, it returns the value currently in V
- CAS是一种乐观的方式,它在更新的时候抱着可以更新成功的希望,但也可以检测到需要被更新的变量的改动。如果在更新的过程中检测到失败,那么CAS的更新相当于失败。当多个线程同时用CAS更新一个变量的时候,只有一个会赢得竞争,其他的会输掉,但是对于输掉的CAS,由于它不会像锁一样阻塞,所以可以选择再次尝试或者别的办法。
- 以下是一个用锁模拟的CAS:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SimulatedCAS {
private int value;
public synchronized int get() {
return value;
}
public synchronized int compareAndSwap(int expectedValue,
int newValue) {
int oldValue = value;
if (oldValue == expectedValue)
value = newValue;
return oldValue;
}
public synchronized boolean compareAndSet(int expectedValue,
int newValue) {
return (expectedValue
== compareAndSwap(expectedValue, newValue));
}
}
A nonblocking counter
- 以下是一个用CAS实现的counter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CasCounter {
private SimulatedCAS value;
public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
} while (v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}
CAS support in the JVM
- Java 5之前,JVM不支持CAS操作;Java 5之后,JVM通过AtomicXxx等类支持CAS操作,由于CAS是硬件实现的操作,所以JVM会先尝试用平台硬件实现的CAS操作,如果没有才会选择自己实现的自旋锁(spin lock)
Atomic variable classes
- 最常用的Atomic类:AtomicInteger, AtomicLong, AtomicBoolean, and Atom- icReference。都支持CAS操作
Atomics as “better volatiles”
- Atomics类首先是原子的,其次它储存的值使用了volatiles关键词,保证了内存的可见性:`private volatile int value;
- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html. The memory effects for accesses and updates of atomics generally follow the rules for volatiles, as stated in The Java Language Specification (17.4 Memory Model):
- get has the memory effects of reading a volatile variable.
- set has the memory effects of writing (assigning) a volatile variable.
- lazySet has the memory effects of writing (assigning) a volatile variable except that it permits reorderings with subsequent (but not previous) memory actions that do not themselves impose reordering constraints with ordinary non-volatile writes. Among other usage contexts, lazySet may apply when nulling out, for the sake of garbage collection, a reference that is never accessed again.
- weakCompareAndSet atomically reads and conditionally writes a variable but does not create any happens-before orderings, so provides no guarantees with respect to previous or subsequent reads and writes of any variables other than the target of the weakCompareAndSet.
- compareAndSet and all other read-and-update operations such as getAndIncrement have the memory effects of both reading and writing volatile variables.`
Nonblocking algorithms
- 不使用锁而是使用CAS等原子操作的算法称作非阻塞算法
A nonblocking stack
- 不用锁实现的栈
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ConcurrentStack <E> {
AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
private static class Node <E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}
nonblocking linked list
- 非阻塞算法实现栈是比较直接的,但是如果想要实现queue,就会比较麻烦。Java的ConcurrentLinkedQueue就是类似这种实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class LinkedQueue <E> {
private static class Node <E> {
final E item;
final AtomicReference<LinkedQueue.Node<E>> next;
public Node(E item, LinkedQueue.Node<E> next) {
this.item = item;
this.next = new AtomicReference<LinkedQueue.Node<E>>(next);
}
}
private final LinkedQueue.Node<E> dummy = new LinkedQueue.Node<E>(null, null);
private final AtomicReference<LinkedQueue.Node<E>> head
= new AtomicReference<LinkedQueue.Node<E>>(dummy);
private final AtomicReference<LinkedQueue.Node<E>> tail
= new AtomicReference<LinkedQueue.Node<E>>(dummy);
public boolean put(E item) {
LinkedQueue.Node<E> newNode = new LinkedQueue.Node<E>(item, null);
while (true) {
LinkedQueue.Node<E> curTail = tail.get();
LinkedQueue.Node<E> tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (tailNext != null) {
// Queue in intermediate state, advance tail
tail.compareAndSet(curTail, tailNext);
} else {
// In quiescent state, try inserting new node
if (curTail.next.compareAndSet(null, newNode)) {
// Insertion succeeded, try advancing tail
tail.compareAndSet(curTail, newNode);
return true;
}
}
}
}
}
} - We need several tricks to develop this plan. The first is to ensure that the data structure is always in a consistent state, even in the middle of an multi-step update. That way, if thread A is in the middle of a update when thread B arrives on the scene, B can tell that an operation has been partially completed and knows not to try immediately to apply its own update. Then B can wait (by repeatedly examining the queue state) until A finishes, so that the two don’t get in each other’s way.
While this trick by itself would suffice to let threads “take turns” accessing the data structure without corrupting it, if one thread failed in the middle of an update, no thread would be able to access the queue at all. To make the algorithm nonblocking, we must ensure that the failure of a thread does not prevent other threads from making progress. Thus, the second trick is to make sure that if B arrives to find the data structure in the middle of an update by A, enough information is already embodied in the data structure for B to finish the update for A. If B “helps” A by finishing A’s operation, B can proceed with its own operation without waiting for A. When A gets around to finishing its operation, it will find that B already did the job for it.
The ABA problem
- CAS操作指的是对于内存中的某一个值V,提供一个旧值A和一个新值B。如果提供的旧值V和A相等就把B写入V。这个过程是原子性的。CAS执行结果要么成功要么失败,对于失败的情形下一班采用不断重试。或者放弃。
- ABA:如果另一个线程修改V值假设原来是A,先修改成B,再修改回成A。当前线程的CAS操作无法分辨当前V值是否发生过变化。
- A –> B –> C
假设你有个⽤单链表 实现的栈,如上⾯所示,有个head指针指向栈顶的A
⽤CAS原⼦操作 ,你可能会这样实现push和popABA的问题在于,pop函数 中,next = curr->next 和 while之间,线程被切换⾛,然后其他线程先1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17push(node):
curr := head
old := curr
node->next = curr
while (old != (curr = CAS(&head, curr, node))) {
old = curr
node->next = curr
}
pop():
curr := head
old := curr
next = curr->next
while (old != (curr = CAS(&head, curr, next))) {
old = curr
next = curr->next
}
return curr
把A弹出,⼜把B弹出,然后⼜把A压⼊,栈变成 了A –> C,此时head还是指向A,等pop被切换回
来继续执⾏,就把head指向B了。但其实head此时应该指向C
The Java Memory Model
What is a memory model, and why would I want one?
- 假设一个线程将3赋值给一个变量
aVariable = 3;
,那么内存模型就解决的这样一个问题:在什么样的情况下,另外一个读取aVariable的线程能够看到3的值。有很多原因会导致其它线程看不到aVariable变量的改动:编译器不一定会按照源代码的执行顺序生成机器指令,或者在寄存器中存储值而不是内存中;处理器不一定按源代码的顺序执行;存储在处理器局部的缓存并不被其他处理器所见。
Reordering
- JMM允许机器指令的reorde
The Java Memory Model in 500 words or less
- JMM定义了一种partial ordering叫做happens-before。To guarantee that the thread executing action B can see the results of action A (whether or not A and B occur in different threads), there must be a happens-before relationship between A and B
- The rules for happens-before are:
- Program order rule. Each action in a thread happens-before every ac-
tion in that thread that comes later in the program order. - Monitor lock rule. An unlock on a monitor lock happens-before every
subsequent lock on that same monitor lock.3 - Volatile variable rule. A write to a volatile field happens-before every
subsequent read of that same field.4 - Thread start rule. A call to Thread.start on a thread happens-before
every action in the started thread. - Thread termination rule. Any action in a thread happens-before any other thread detects that thread has terminated, either by success- fully return from Thread.join or by Thread.isAlive returning false.
- Interruption rule. A thread calling interrupt on another thread happens-before the interrupted thread detects the interrupt (either by having InterruptedException thrown, or invoking isInter- rupted or interrupted).
- Finalizer rule. The end of a constructor for an object happens-before the start of the finalizer for that object.
- Transitivity. If A happens-before B, and B happens-before C, then A happens-before C.
Publication
Unsafe publication
- 前面介绍的happens-before证明了如果缺少了happens-before,会导致partially constructed object
1
2
3
4
5
6
7
8
public class UnsafeLazyInitialization {
private static Resource resource;
public static Resource getInstance() {
if (resource == null)
resource = new Resource(); // unsafe publication return resource;
}
}
Safe publication
- 使用synchronized块可以做到Safe publication
1
2
3
4
5
6
7
8
public class SafeLazyInitialization {
private static Resource resource;
public synchronized static Resource getInstance() {
if (resource == null)
resource = new Resource(); return resource;
}
}
Safe initialization idioms
- static变量是在JVM初始化的时候加载的,所以静态变量可以被所有线程可见。只要是静态变量,那么就不需要特别的同步机制,当然,前提是这个静态变量初始化以后不再改变,否则还是需要同步机制。
1
2
3
4
5
public class EagerInitialization {
private static Resource resource = new Resource();
public static Resource getResource() { return resource; }
}