# 四、atomic 包
# 1. 简介
# 1.1 原子类的作用
- 原子类的作用和锁类似,是为了保证并发情况下线程安全。
- 不过原子类的粒度更细。原子变量可以把竞争范围缩小到变量级别,这是我们可以获得的最细粒度的情况,通常锁的粒度都要大于原子变量的粒度。
- 原子类的性能更高。使用原子类的效率会比使用锁的效率更高(除非高度竞争)。
# 1.2 纵览
# 2. AtomicInteger
# 2.1 public final int get()
private volatile int value;
public final int get() {
return value;
}
# 2.2 public final int getAndSet(int newValue)
private static final Unsafe unsafe = Unsafe.getUnsafe();
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
- unsafe.getAndSetInt()
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
//底层采用 CAS
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}
# 2.3 public final int getAndIncrement()
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
- unsafe.getAndAddInt()
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
# 2.4 public final int getAndDecrement()
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
# 2.5 public final int getAndAdd(int delta)
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
# 2.6 boolean compareAndSet(int expect, int update)
- 加载 Unsafe 类工具,它可以用来直接操作内存数据。
- 用 volatile 修饰 value 字段,保证可见性。
private volatile int value;
private static final Unsafe unsafe = Unsafe.getUnsafe();
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
# 3. AtomicIntegerArray
每个元素都是原子的。
# 3.1 public final int get(int i)
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
- getRaw()
private static final Unsafe unsafe = Unsafe.getUnsafe();
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}
# 3.2 public final int getAndSet(int i, int newValue)
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}
# 3.3 public final boolean compareAndSet(int i, int expect, int update)
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
# 3.4 public final int getAndIncrement(int i)
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
# 3.5 public final int getAndDecrement(int i)
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
}
# 3.6 public final int getAndAdd(int i, int delta)
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
# 3.7 演示
/**
* 演示原子数组的使用
*
* @author Hedon Wang
* @create 2021-03-21 10:08 AM
*/
public class AtomicArrayDemo {
public static void main(String[] args) throws Exception{
//每个元素都是原子的
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
Decrementer decrementer = new Decrementer(atomicIntegerArray);
Incrementer incrementer = new Incrementer(atomicIntegerArray);
//一百个线程
Thread[] threadIncrementer = new Thread[100];
Thread[] threadDecrementer = new Thread[100];
//每个元素加100次再减100次
for (int i = 0; i < 100; i++) {
threadDecrementer[i] = new Thread(decrementer);
threadIncrementer[i] = new Thread(incrementer);
threadDecrementer[i].start();
threadIncrementer[i].start();
}
//让前面都运行完,严谨应该用 join
Thread.sleep(5000);
//检查是否有0
for (int i = 0; i < atomicIntegerArray.length(); i++) {
//结果没有0
System.out.println(atomicIntegerArray.get(i));
}
}
}
/**
* 减少器
*/
class Decrementer implements Runnable{
AtomicIntegerArray array;
public Decrementer(AtomicIntegerArray atomicIntegerArray){
this.array = atomicIntegerArray;
}
@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.getAndDecrement(i);
}
}
}
/**
* 增加器
*/
class Incrementer implements Runnable{
AtomicIntegerArray array;
public Incrementer(AtomicIntegerArray atomicIntegerArray){
this.array = atomicIntegerArray;
}
@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.getAndIncrement(i);
}
}
}
# 4. AtomicReference<V>
- AtomicReference 和 AtomicInteger 并没有本质区别。只是 AtomicReference 可以保证对象的原子性,而对象里面可以包含多个属性,功能更强大。
# 4.1 public final boolean compareAndSet(V expect, V update)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile V value;
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
# 5. AtomicIntegerFiledUpdater
将普通变量升级为原子性。
# 5.1 AtomicIntegerFieldUpdater.newUpdater(Class<U> tclass, String fieldName)
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
- AtomicIntegerFieldUpdaterImpl<U> (tclass, fieldName, Reflection.getCallerClass())
AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
final String fieldName,
final Class<?> caller) {
//原理:反射
final Field field;
final int modifiers;
try {
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
return tclass.getDeclaredField(fieldName);
}
});
modifiers = field.getModifiers();
sun.reflect.misc.ReflectUtil.ensureMemberAccess(
caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) &&
((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
} catch (PrivilegedActionException pae) {
throw new RuntimeException(pae.getException());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
//① 要甚至的字段类型必须对应上,此处为 int
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
//② 字段必须声明为可见的,即用 volatile 进行修饰
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");
//③ 对于 protected 字段需要谨慎
this.cclass = (Modifier.isProtected(modifiers) &&
tclass.isAssignableFrom(caller) &&
!isSamePackage(tclass, caller))
? caller : tclass;
this.tclass = tclass;
this.offset = U.objectFieldOffset(field);
}
# 5.2 示例
class Candidate{
volatile int score;
}
public class AtomicIntegerFieldUpdaterDemo implements Runnable{
static Candidate tom;
static Candidate perter;
//指定字段 score 进行升级
public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
perter.score++;
//tom 进行升级
scoreUpdater.getAndIncrement(tom);
}
}
public static void main(String[] args) throws Exception{
tom = new Candidate();
perter = new Candidate();
AtomicIntegerFieldUpdaterDemo aifd = new AtomicIntegerFieldUpdaterDemo();
Thread t1 = new Thread(aifd);
Thread t2 = new Thread(aifd);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("普通 peter:" + perter.score); //普通 peter:19650
System.out.println("升级 tom:" + tom.score); //升级 tom:20000
}
}
注意点:
- 不支持 static
- 需用 volatile 修饰
# 6. LongAdder
- 是 Java 8 引入的,相对比较新的一个类。
- 高并发下 LongAdder 比 AtomicLong 效率高,不过本质是空间换时间。
- 在竞争激烈的时候,LongAdder 把不同线程对应到不同的 Cell 上进行修改,降低了冲突的概率,是多段锁的概念,提高了并发性。
# 6.1 public void add(long x)
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
# 6.2 LongAdder 的改进
AtomicLong 每一次更新都要 flush 和 refresh,冲突较多,性能较低(JMM)。
LongAdder 每个线程都会有自己的一个计数器,仅用在自己线程内计数,这样一来就不会和其他线程的计数器互相干扰。
因为每个线程的计数器不同,所以也不需要 fluash 和 refresh。
LongAdder 引入了分段累加的概念,内部有一个
base 变量
和一个Cell[]
数组共同参与计数。- base 变量:竞争不激烈的情况下,它会直接累加到该变量上。
- Cell[] 数组:当竞争激烈的时候,各个线程分散累加到自己的槽 Cell[i] 中。
使用 LongAdder 时,则是在内部维护多个 Cell 变量,每个 Cell 里面有一个初始值为 0 的 long 类型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程就会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个 Cell 原子变量时如果失败了,它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行 CAS 尝试,这个改变增加了当前线程重试 CAS 成功的可能性。最后在获取 LongAdder 当前值时,是把所有 Cell 变量的 value 值累加后再加上 base 返回的。
public long sum() { //Cell[] 数组 Cell[] as = cells; Cell a; //基值变量 base long sum = base; if (as != null) { //累加 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
LongAdder 维护了一个延迟初始化的原子性更新数组**(默认情况下 Cell 数组是 null)和一个
基值变量 base
。由于 Cells 占用的内存是相对比较大的,所以一开始并不创建它,而是在需要创建时,也就是懒加载**。当一开始判断 Cell 数组是 null 并且并发较少时,所有的累加操作都是对 base 变量进行的。保持Cell数组的大小为 2 的 N 次方,在初始化时 Cell 数组中的 Cell 元素个数为2,数组里面的变量实体是 Cell 类型。
# 6.3 AtomicLong 和 LongAdder 对比
- 在低争用情况下,二者差别不大。但是在竞争激烈的情况下,LongAdder 的预期吞吐量要高得多,但是要消耗更多的空间。
- LongAdder 适合的场景是统计求和技术的场景,而且 LongAdder 基本只提供了 add() 方法,而 AtomicLong 还具有 cas() 等方法。
# 7. LongAccumulator
- Accumulator 和 Adder 非常相似,是更通用的 Adder。
# 7.1 构造方法
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
//指定方法
this.function = accumulatorFunction;
//初始值
base = this.identity = identity;
}
# 7.2 示例
public class LongAccumulatorDemo {
public static void main(String[] args) {
//累加器,初始值是0(可以指定为其他)
LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
//线程池
ExecutorService service = Executors.newFixedThreadPool(8);
//int流,从1加到9
IntStream.range(1,10).forEach(i->service.submit(()->longAccumulator.accumulate(i)));
//暂停线程池
service.shutdown();
//检查线程池是否暂停
while (!service.isTerminated()){
}
//输出结果 45
System.out.println(longAccumulator.getThenReset());
}
}
# 7.3 优点
- 可以自定义初始值 identify。
- 可以自定义累加规则 (x,y)->x+y,也可以是(x,y)->Math.max(x,y)。
- 可以利用多线程来进行并行计算,比 for 单线程效率更高。
# 7.4 适合场景
- 计算顺序不能有要求,计算顺序不影响结果。
- 适合大量计算的并发情景。