# 四、atomic 包

# 1. 简介

# 1.1 原子类的作用

  1. 原子类的作用和锁类似,是为了保证并发情况下线程安全。
  2. 不过原子类的粒度更细。原子变量可以把竞争范围缩小到变量级别,这是我们可以获得的最细粒度的情况,通常锁的粒度都要大于原子变量的粒度。
  3. 原子类的性能更高。使用原子类的效率会比使用锁的效率更高(除非高度竞争)。

# 1.2 纵览

image-20210321095112926

# 2. AtomicInteger

# 2.1 public final int get()

private volatile int value;

public final int get() {
    return value;
}

# 2.2 public final int getAndSet(int newValue)

private static final Unsafe unsafe = Unsafe.getUnsafe();

public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}
  • unsafe.getAndSetInt()
public final int getAndSetInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
      //底层采用 CAS
    } while(!this.compareAndSwapInt(var1, var2, var5, var4));
    return var5;
}

# 2.3 public final int getAndIncrement()

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
  • unsafe.getAndAddInt()
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

# 2.4 public final int getAndDecrement()

public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}

# 2.5 public final int getAndAdd(int delta)

public final int getAndAdd(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta);
}

# 2.6 boolean compareAndSet(int expect, int update)

  • 加载 Unsafe 类工具,它可以用来直接操作内存数据。
  • 用 volatile 修饰 value 字段,保证可见性。
private volatile int value;
private static final Unsafe unsafe = Unsafe.getUnsafe();
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

# 3. AtomicIntegerArray

每个元素都是原子的。

# 3.1 public final int get(int i)

public final int get(int i) {
    return getRaw(checkedByteOffset(i));
}
  • getRaw()
private static final Unsafe unsafe = Unsafe.getUnsafe();
private int getRaw(long offset) {
    return unsafe.getIntVolatile(array, offset);
}

# 3.2 public final int getAndSet(int i, int newValue)

public final int getAndSet(int i, int newValue) {
    return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}

# 3.3 public final boolean compareAndSet(int i, int expect, int update)

public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

# 3.4 public final int getAndIncrement(int i)

public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}

# 3.5 public final int getAndDecrement(int i)

public final int getAndDecrement(int i) {
    return getAndAdd(i, -1);
}

# 3.6 public final int getAndAdd(int i, int delta)

public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

# 3.7 演示

/**
 * 演示原子数组的使用
 *
 * @author Hedon Wang
 * @create 2021-03-21 10:08 AM
 */
public class AtomicArrayDemo {


    public static void main(String[] args) throws Exception{
        //每个元素都是原子的
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
        Decrementer decrementer = new Decrementer(atomicIntegerArray);
        Incrementer incrementer = new Incrementer(atomicIntegerArray);
        //一百个线程
        Thread[] threadIncrementer = new Thread[100];
        Thread[] threadDecrementer = new Thread[100];
        
        //每个元素加100次再减100次
        for (int i = 0; i < 100; i++) {
            threadDecrementer[i] = new Thread(decrementer);
            threadIncrementer[i] = new Thread(incrementer);
            threadDecrementer[i].start();
            threadIncrementer[i].start();
        }
        
        //让前面都运行完,严谨应该用 join
        Thread.sleep(5000);
        
        //检查是否有0
        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            //结果没有0
            System.out.println(atomicIntegerArray.get(i));
        }

    }
}

/**
 * 减少器
 */
class Decrementer implements Runnable{

    AtomicIntegerArray array;

    public Decrementer(AtomicIntegerArray atomicIntegerArray){
        this.array = atomicIntegerArray;
    }

    @Override
    public void run() {
        for (int i = 0; i < array.length(); i++) {
            array.getAndDecrement(i);
        }
    }
}

/**
 * 增加器
 */
class Incrementer implements Runnable{

    AtomicIntegerArray array;

    public Incrementer(AtomicIntegerArray atomicIntegerArray){
        this.array = atomicIntegerArray;
    }

    @Override
    public void run() {
        for (int i = 0; i < array.length(); i++) {
            array.getAndIncrement(i);
        }
    }
}

# 4. AtomicReference<V>

  • AtomicReference 和 AtomicInteger 并没有本质区别。只是 AtomicReference 可以保证对象的原子性,而对象里面可以包含多个属性,功能更强大。

# 4.1 public final boolean compareAndSet(V expect, V update)

private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile V value;
public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

# 5. AtomicIntegerFiledUpdater

将普通变量升级为原子性。

# 5.1 AtomicIntegerFieldUpdater.newUpdater(Class<U> tclass, String fieldName)

@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                          String fieldName) {
    return new AtomicIntegerFieldUpdaterImpl<U>
        (tclass, fieldName, Reflection.getCallerClass());
}
  • AtomicIntegerFieldUpdaterImpl<U> (tclass, fieldName, Reflection.getCallerClass())
AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                              final String fieldName,
                              final Class<?> caller) {
    //原理:反射
    final Field field;
    final int modifiers;
    try {
        field = AccessController.doPrivileged(
            new PrivilegedExceptionAction<Field>() {
                public Field run() throws NoSuchFieldException {
                    return tclass.getDeclaredField(fieldName);
                }
            });
        modifiers = field.getModifiers();
        sun.reflect.misc.ReflectUtil.ensureMemberAccess(
            caller, tclass, null, modifiers);
        ClassLoader cl = tclass.getClassLoader();
        ClassLoader ccl = caller.getClassLoader();
        if ((ccl != null) && (ccl != cl) &&
            ((cl == null) || !isAncestor(cl, ccl))) {
            sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
        }
    } catch (PrivilegedActionException pae) {
        throw new RuntimeException(pae.getException());
    } catch (Exception ex) {
        throw new RuntimeException(ex);
    }
		
  	//① 要甚至的字段类型必须对应上,此处为 int
    if (field.getType() != int.class)
        throw new IllegalArgumentException("Must be integer type");
		//② 字段必须声明为可见的,即用 volatile 进行修饰
    if (!Modifier.isVolatile(modifiers))
        throw new IllegalArgumentException("Must be volatile type");
		
  	//③ 对于 protected 字段需要谨慎
    this.cclass = (Modifier.isProtected(modifiers) &&
                   tclass.isAssignableFrom(caller) &&
                   !isSamePackage(tclass, caller))
                  ? caller : tclass;
    this.tclass = tclass;
    this.offset = U.objectFieldOffset(field);
}

# 5.2 示例

class Candidate{
    volatile int score;
}

public class AtomicIntegerFieldUpdaterDemo implements Runnable{

    static Candidate tom;
    static Candidate perter;
		
  	//指定字段 score 进行升级
    public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            perter.score++;
          	//tom 进行升级
            scoreUpdater.getAndIncrement(tom);
        }
    }

    public static void main(String[] args) throws Exception{
        tom = new Candidate();
        perter = new Candidate();
        AtomicIntegerFieldUpdaterDemo aifd = new AtomicIntegerFieldUpdaterDemo();
        Thread t1 = new Thread(aifd);
        Thread t2 = new Thread(aifd);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("普通 peter:" + perter.score);  //普通 peter:19650
        System.out.println("升级 tom:" + tom.score);       //升级 tom:20000
    }
}

注意点:

  • 不支持 static
  • 需用 volatile 修饰

# 6. LongAdder

  • 是 Java 8 引入的,相对比较新的一个类。
  • 高并发下 LongAdder 比 AtomicLong 效率高,不过本质是空间换时间。
  • 在竞争激烈的时候,LongAdder 把不同线程对应到不同的 Cell 上进行修改,降低了冲突的概率,是多段锁的概念,提高了并发性。

# 6.1 public void add(long x)

/**
 * Table of cells. When non-null, size is a power of 2.
 */
transient volatile Cell[] cells;

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}


final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

# 6.2 LongAdder 的改进

  1. AtomicLong 每一次更新都要 flush 和 refresh,冲突较多,性能较低(JMM)。

    image-20210321105517176
  2. LongAdder 每个线程都会有自己的一个计数器,仅用在自己线程内计数,这样一来就不会和其他线程的计数器互相干扰。

    因为每个线程的计数器不同,所以也不需要 fluash 和 refresh。

    LongAdder 引入了分段累加的概念,内部有一个 base 变量和一个 Cell[] 数组共同参与计数。

    • base 变量:竞争不激烈的情况下,它会直接累加到该变量上。
    • Cell[] 数组:当竞争激烈的时候,各个线程分散累加到自己的槽 Cell[i] 中。
    img

    使用 LongAdder 时,则是在内部维护多个 Cell 变量,每个 Cell 里面有一个初始值为 0 的 long 类型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程就会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个 Cell 原子变量时如果失败了,它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行 CAS 尝试,这个改变增加了当前线程重试 CAS 成功的可能性。最后在获取 LongAdder 当前值时,是把所有 Cell 变量的 value 值累加后再加上 base 返回的。

    public long sum() {
      	//Cell[] 数组
        Cell[] as = cells; Cell a;
      	//基值变量 base
        long sum = base;
        if (as != null) {
          	//累加
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    LongAdder 维护了一个延迟初始化的原子性更新数组**(默认情况下 Cell 数组是 null)和一个基值变量 base。由于 Cells 占用的内存是相对比较大的,所以一开始并不创建它,而是在需要创建时,也就是懒加载**。

    当一开始判断 Cell 数组是 null 并且并发较少时,所有的累加操作都是对 base 变量进行的。保持Cell数组的大小为 2 的 N 次方,在初始化时 Cell 数组中的 Cell 元素个数为2,数组里面的变量实体是 Cell 类型。

# 6.3 AtomicLong 和 LongAdder 对比

  • 在低争用情况下,二者差别不大。但是在竞争激烈的情况下,LongAdder 的预期吞吐量要高得多,但是要消耗更多的空间。
  • LongAdder 适合的场景是统计求和技术的场景,而且 LongAdder 基本只提供了 add() 方法,而 AtomicLong 还具有 cas() 等方法。

# 7. LongAccumulator

  • Accumulator 和 Adder 非常相似,是更通用的 Adder。

# 7.1 构造方法

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                       long identity) {
  	//指定方法
    this.function = accumulatorFunction;
  	//初始值
    base = this.identity = identity;
}

# 7.2 示例

public class LongAccumulatorDemo {

    public static void main(String[] args) {
        //累加器,初始值是0(可以指定为其他)
        LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
        //线程池
        ExecutorService service = Executors.newFixedThreadPool(8);
        //int流,从1加到9
        IntStream.range(1,10).forEach(i->service.submit(()->longAccumulator.accumulate(i)));
        //暂停线程池
        service.shutdown();
        //检查线程池是否暂停
        while (!service.isTerminated()){

        }
        //输出结果 45
        System.out.println(longAccumulator.getThenReset());
    }
}

# 7.3 优点

  • 可以自定义初始值 identify。
  • 可以自定义累加规则 (x,y)->x+y,也可以是(x,y)->Math.max(x,y)。
  • 可以利用多线程来进行并行计算,比 for 单线程效率更高。

# 7.4 适合场景

  • 计算顺序不能有要求,计算顺序不影响结果。
  • 适合大量计算的并发情景。
上次更新: 9/17/2021, 12:28:06 PM