# 七、并发集合

# 1. Vector

线程安全的 ArrayList。

重要方法都加上了 synchronized,性能较低。

public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}

# 2. Hashtable

线程安全的 HashTable。

重要方法都加上了 synchronized,性能较低。

public synchronized V put(K key, V value) {
    .....
}
public synchronized V remove(Object key) {
  	.....
}

# 3. Collections.synchronizedList

虽然 ArrayList 和 HashMap 是线程不安全的,但是可以用

  • Collections.synchronizedList(new ArrayList<E>())
  • Collections.synchronizedMap(new HashMap<K,V>())

使之变成线程安全的。

public static <T> List<T> synchronizedList(List<T> list) {
    return (list instanceof RandomAccess ?
            //ArrayList 和 HashMap 都实现了 RandomAccess
            new SynchronizedRandomAccessList<>(list) :
            new SynchronizedList<>(list));
}

SynchronizedRandomAccessList(List<E> list) {
    super(list);
}

public boolean add(E e) {
    synchronized (mutex) {return c.add(e);}
}
public boolean remove(Object o) {
    synchronized (mutex) {return c.remove(o);}
}

不过是将 synchronized 加在方法里面而已,跟前面的 Vector 和 Hashtable 没什么区别。

# 4. ConcurrentHashMap

# 4.1 JDK7

image-20210322103403259
  • JDK7 中的 ConcurrentHashMap 最外层是多个 segment,每个 segment 的底层数据结构与 HashMap 类似,然后是 数组+链表 组成的拉链法。
  • 每个 segment 独立上 ReentrantLock 锁,每个 segment 之间互不影响,提高了并发效率。
  • ConcurrentHashMap 默认有 16 个 segment,所以最多可以同时支持 16 个线程并发写(分别操作在不同的 segment 上)。这个默认值可以在初始化的时候设置为其他值,但是一旦初始化后,是不可以扩容的。

# 4.2 JDK8

image-20210322103804081

# put()

线程安全的核心:CAS(赋值) 加 synchronized(扩容),使得每一个 Node 都是并发独立的,效率比 JDK7 更高。

  1. 判断 key, value 不为空。
  2. 计算 hash 值。
  3. 根据对应位置节点的类型来赋值,可能是 helpTransfer,可能是增长链表,可能是给红黑树加节点。
  4. 检查满足阈值就“红黑树化”。
  5. 返回 oldVal。
public V put(K key, V value) {
    return putVal(key, value, false);
}

/** put真正的实现 */
final V putVal(K key, V value, boolean onlyIfAbsent) {
  	//ConcurrentHashMap 中 key 和 value 不允许为 null(HashMap 允许)
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
      	//1. 初始化 table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
      	//2. 计算 hash,判断元素应该放哪里,如果位置为空
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
          	//2-1 那么直接用 CAS 插入元素
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   
        }
      	//3. 判断当前状态是不是 MOVED,MOVED 表示一种扩容状态,是 MOVED 表示数据转移,在 map 扩容的时候就会进行数据转移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
          	//4. 如果槽点有值
            V oldVal = null;
          	//4-1 锁住
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                  	//看看是链表吗
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                          	//4-2 如果这个 key 之前就存在
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                              	//4-3 更新值并返回
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                          	//4-4 如果这个 key 之前不存在,就尾插法插入链表
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                  	//看看是红黑树吗
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                      	//是则进行红黑树的操作
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

# get()

  1. 计算 hash 值
  2. 找到对应的位置
    1. 直接取值
    2. 红黑树取值
    3. 链表取值
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  	//1. 算出 hash 值
    int h = spread(key.hashCode());
  	//2. 得有元素,没有元素则返回 null
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
      	//3. key 在槽点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //4. key 在红黑树
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
      	//5. key 在链表
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
# 为什么 JDK8 要改成红黑树?
红黑树的查询效率logn 更高。

# 为什么是链表中元素个数大于 8 才变呢?
红黑树每个 Node 的空间是链表每个 Node 的两倍,数量太少就没必要了。
数量多的时候才转。
hash 冲突 8 次的概率的极低的,如果出现了碰撞 8 次,是一种比较极端的情况,可能是 hash 算法出问题了,为了在这种极端情况下还能正常进行插入、查询等操作,就转为红黑树了。
0: 0.60653066
1: 0.30326533
2: 0.07581633
3: 0.01263606
4: 0.00157952
5: 0.00015795
6: 0.00001316
7: 0.00000094
8: 0.00000006

# 5. CopyOnWriteArrayList

# 5.1 诞生原因

Vector 和 SynchronizedList 的锁的粒度太大,并发效率比较低,并且迭代时无法编辑。

# 5.2 适用场景

读操作尽可能地快,而写即使慢一些也没有太大关系。

  • 黑名单的更新。

# 5.3 读写规则

读取是完全不用加锁的,即使是写入也不会阻塞读取操作。

只有写入和写入之间需要进行同步等待。

public class CopyOnWriteArrayListDemo {

    public static void main(String[] args) {
        CopyOnWriteArrayList<String> arrayList = new CopyOnWriteArrayList<>();
        arrayList.add("1");
        arrayList.add("2");
        arrayList.add("3");
        arrayList.add("4");
        arrayList.add("5");
        Iterator<String> iterator = arrayList.iterator();
        while (iterator.hasNext()){
            System.out.println("list = " + arrayList);
            String next = iterator.next();
            System.out.println(next);
            if (next.equals("2")){
                arrayList.remove("5");
            }
            if (next.equals("3")){
                arrayList.add("3 found");
            }
        }
    }
}

结果:

list = [1, 2, 3, 4, 5]
1
list = [1, 2, 3, 4, 5]
2
list = [1, 2, 3, 4]
3
list = [1, 2, 3, 4, 3 found]
4
list = [1, 2, 3, 4, 3 found]
5  //拿的还是旧的5,而不是新的 3 found

# 5.4 实现原理

读写分离:

  1. 先用老集合 copy 出一份新集合。
  2. 然后在新集合上做修改。
  3. 最后用新集合直接替换老集合。

# 5.5 缺点

  • 数据一致性问题

    CopyOnWrite 容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以你如果你希望写入的数据马上能读到,那就不能用 CopyOnWrite 容器。

  • 内存占用问题

    因为 CopyOnWrite 的写的复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,会额外占用一定的内存。

# 5.6 源码分析

  • 属性

    public class CopyOnWriteArrayList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
        private static final long serialVersionUID = 8673264195747942595L;
    
        /** 锁 */
        final transient ReentrantLock lock = new ReentrantLock();
    
        /** 数组,只有 get 和 set 方法可以访问到这个 array */
        private transient volatile Object[] array;
    
  • E get(int index)

    /**
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public E get(int index) {
        return get(getArray(), index);
    }
    
  • E set(int index, E element)

    /**
     * Replaces the element at the specified position in this list with the
     * specified element.
     *
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public E set(int index, E element) {
      	//1. 先获得锁
        final ReentrantLock lock = this.lock;
        lock.lock();
      	//2. 上锁
        try {
          	//3. 获取数组
            Object[] elements = getArray();
          	//4. 获取 index 位置上的元素
            E oldValue = get(elements, index);
    				//5.1 如果元素不同,则需要插入新的元素
            if (oldValue != element) {
                int len = elements.length;
              	//6. 复制一份新的数组
                Object[] newElements = Arrays.copyOf(elements, len);
              	//7. 插入新元素
                newElements[index] = element;
              	//8. 用新数组覆盖旧数组
                setArray(newElements);
            } else {
                //5.2 如果元素相同,则直接原地赋值
                setArray(elements);
            }
          	//9. 返回旧值
            return oldValue;
        } finally {
          	//10. 解锁
            lock.unlock();
        }
    }
    
  • boolean add(E element)

    public boolean add(E e) {
      	//1. 获得锁
        final ReentrantLock lock = this.lock;
      	//2. 加锁
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
          	//3. 复制出一个 len+1 的新数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
          	//4. 插入新值
            newElements[len] = e;
          	//5. 覆盖旧数组
            setArray(newElements);
            return true;
        } finally {
          	//6. 解锁
            lock.unlock();
        }
    }
    

# 6. BlockingQueue

image-20210322170416256

# 6.1 阻塞队列 BlockingQueue

# 6.1.1 阻塞队列

  • 阻塞队列是具有阻塞功能的队列。
  • 通过,阻塞队列的一端是给生产者放数据用,另一端是给消费者拿数据用。
  • 阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的。
  • 阻塞队列是线程池的重要组成部分。
image-20210322171009923

# 6.1.2 take()/put()

  • take():向队列中拿一个数据,队列为空位阻塞。
image-20210322171059723
  • put():向队列中放一个数据,队列满了会阻塞。
image-20210322171042547

# 6.1.3 add()/remove()/element()

  • add():向队列中添加一个数据,队列满了会抛异常。
  • remove():从队列中删除一个数据,队列空了会抛异常。
  • element():会返回队头的元素,队空时会抛出异常。

# 6.1.4 offer()/poll()/peek()

  • boolean offer():添加一个元素,成功与否通过返回 boolean 来判断,不会抛异常。
  • poll():试图取出一个元素,空的返回 null,会删除元素,不会抛异常。
  • peek():试图取出一个元素,空的返回 null,不会删除元素,不会抛异常。

# 6.2 ArrayBlockingQueue

  • 有界
  • 可以指定容量
  • 可以指定是否公平:如果想保证公平的话,那么等待了最长时间的线程会被优先处理,不过这会同时带来一定的性能消耗。

# 6.2.1 put()

public void put(E e) throws InterruptedException {
  	//1. 检查要插进队列的值是否为空
    checkNotNull(e);
  	//2. 获得锁
    final ReentrantLock lock = this.lock;
  	//3. 可中断加锁
    lock.lockInterruptibly();
    try {
      	//4. 满了的话就阻塞
        while (count == items.length)
            notFull.await();
      	//5. 没满就插入队列
        enqueue(e);
    } finally {
      	//6. 解锁
        lock.unlock();
    }
}

# 6.2.2 take()

public E take() throws InterruptedException {
    //1. 获得锁
    final ReentrantLock lock = this.lock;
  	//2. 可中断加锁
    lock.lockInterruptibly();
    try {
      	//3. 队列空则阻塞
        while (count == 0)
            notEmpty.await();
      	//4. 队列不空则去出队头元素
        return dequeue();
    } finally {
      	//5. 解锁
        lock.unlock();
    }
}

# 6.3 LinkedBlockingQueue

  • 可以指定容量,默认无界(Integer.MAX_VALUE)

# 6.3.1 put()

public void put(E e) throws InterruptedException {
  	//1. 检查是否为空
    if (e == null) throw new NullPointerException();
    // 这个 c 是用来记录修改了多少次,如果还是为 -1 的话就表示失败
    int c = -1;
  	//2. 创建出新结点
    Node<E> node = new Node<E>(e);
  	//3. 拿到 put 锁
    final ReentrantLock putLock = this.putLock;
  	//4. 计数
    final AtomicInteger count = this.count;
  	//5. 上可中断锁
    putLock.lockInterruptibly();
    try {
        //6. 如果到容量了,阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
      	//7. 没到容量,入队
        enqueue(node);
      	//8. 迭代 count++,返回旧的 count
        c = count.getAndIncrement();
      	//9. 通知其他线程是否已经满了
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
      	//10. 解锁
        putLock.unlock();
    }
  	//11. c 拿到的是旧 count,如果 c == 0,说明新的 count 是1,所以通知其他线程说队列非空
    if (c == 0)
        signalNotEmpty();
}

# 6.3.2 take()

public E take() throws InterruptedException {
    E x;
  	//这个 c 是用来记录修改了多少次,如果还是为 -1 的话就表示失败
    int c = -1;
  	//1. 获取当前元素个数
    final AtomicInteger count = this.count;
  	//2. 获取 take 锁
    final ReentrantLock takeLock = this.takeLock;
  	//3. 加可中断锁
    takeLock.lockInterruptibly();
    try {
      	//4. 如果队列为空则阻塞
        while (count.get() == 0) {
            notEmpty.await();
        }
      	//5. 队列不为空则拿出队头
        x = dequeue();
      	//6. 迭代count--,返回旧的 count
        c = count.getAndDecrement();
      	//7. 旧的 count > 1,说明新的 count > 0,非空
        if (c > 1)
            notEmpty.signal();
    } finally {
      	//8. 解锁
        takeLock.unlock();
    }
  	//9. 如果旧的 count == capacity,又因为已经取出一个元素了,说明现在不满,通知其他线程现在没有满
    if (c == capacity)
        signalNotFull();
  	//10. 返回取出来的元素
    return x;
}

# 6.4 PriorityBlockingQueue

# PriorityQueue 主要方法时间复杂度
boolean add(E e):O(logn)
boolean contains(Object o):O(n)
E peek():O(1)
E poll():O(logn)  //最小堆,拿出最小的
boolean remove(Object o):O(n)
int size():O(1)
  • 支持优先级
  • 支持自然排序(不是先进先出)
  • 并发采用 ReentrantLock
  • 不可插入 null
  • 无界队列(会扩容)

# 6.5 SynchronousQueue

  • 容量为 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递。
  • SynchronousQueue 没有 peek() 等函数,因为 peek() 的含义是取出头结点,但是 SynchronousQueue 的容量是 0,所有连头结点都没有,也就没有 peek() 方法。同理,没有 iterate() 等方法。
  • 是一个极好的用来直接传递的并发数据结构。
  • SynchronousQueue 是线程池 Executors.newCachedThreadPool() 使用的阻塞队列。
  • 使用场景:生产者和消费者必须步调一致,有人要消费的时候才允许生产者来生产,意味着一个put() 操作必须等待另一个take() 操作完成。
image-20210322175513636

# 6.6 DelayQueue

  • 延迟队列,根据延迟时间排序。
  • 元素需要实现 Delayed 接口,规定排序规则。

# 6.7 非阻塞队列 ConcurrentLinkedQueue

  • 使用 CAS 非阻塞算法来实现线程安全。

# 6.7.1 offer()

  • offer() 中的 casNext() 方法调用了 UNSAFE.compareAndSwapObject()

# 7. ConcurrentSkipListMap

# 7.1 SkipList

SkipList 部分内容参考博客:https://blog.csdn.net/chenssy/article/details/75000701

# 7.1.1 说明

跳表,它是一种可以替代平衡树的数据结构,其数据元素默认按照key值升序,天然有序。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。

简单的链表:

image-20210322190842111

如果我们需要查询9、21、30,则需要比较次数为3 + 6 + 8 = 17 次,那么有没有优化方案呢?有!我们将该链表中的某些元素提炼出来作为一个比较“索引”,如下:

image-20210322190914994

我们先与这些索引进行比较来决定下一个元素是往右还是下走,由于存在“索引”的缘故,导致在检索的时候会大大减少比较的次数。当然元素不是很多,很难体现出优势,当元素足够多的时候,这种索引结构就会大显身手。

# 7.1.2 特性

  1. 由很多层结构组成,level是通过一定的概率随机产生的
  2. 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的Comparator进行排序,具体取决于使用的构造方法
  3. 最底层(Level 1)的链表包含所有元素
  4. 如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现
  5. 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素

我们将上图再做一些扩展就可以变成一个典型的SkipList结构了 这里写图片描述

# 7.1.3 查找

SkipListd的查找算法较为简单,对于上面我们我们要查找元素21,其过程如下:

  1. 比较3,大于,往后找(9),
  2. 比9大,继续往后找(25),但是比25小,则从9的下一层开始找(16)
  3. 16的后面节点依然为25,则继续从16的下一层找
  4. 找到21

如图

这里写图片描述

红色虚线代表路径。

# 7.1.4 插入

SkipList的插入操作主要包括:

  1. 查找合适的位置。这里需要明确一点就是在确认新节点要占据的层次 K 时,采用丢硬币的方式,完全随机。如果占据的层次 K 大于链表的层次,则重新申请新的层,否则插入指定层次。
  2. 申请新的节点
  3. 调整指针

假定我们要插入的元素为 23,经过查找可以确认它是位于 25 前面,9、16、21 后面。当然需要考虑申请的层次K。

如果层次K > 3

需要申请新层次(Level 4)

这里写图片描述

如果层次 K = 2

直接在Level 2 层插入即可

这里写图片描述

需要注意的地方就是,在 K 层插入元素后,需要确保所有小于 K 层的层次都应该出现新节点

# 7.1.5 删除

删除节点和插入节点思路基本一致:

  1. 找到节点
  2. 删除节点
  3. 调整指针。

比如删除节点9,如下:

这里写图片描述

# 7.2 ConcurrentSkipListMap

  • ConcurrentSkipListMap 其内部采用 SkipLis 数据结构实现。

    image-20210322191602149
  • 使用 CAS 保证并发安全。

    image-20210322191734038

上次更新: 11/1/2021, 10:07:49 AM