ConcurrentHashMap源码分析


前言

基于JDK8的源码
ConcurrentHashMap介绍使用什么的我就不说了,直接进入主题吧

主要属性

    // 容量最大值
    private static final int MAXIMUM_CAPACITY = 1 << 30
    // 默认容量
    private static final int DEFAULT_CAPACITY = 16;

    //底层数组最大长度
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    // 默认并行度
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    //负载因子
    private static final float LOAD_FACTOR = 0.75f;

    // 链表变成红黑树的界限值
    static final int TREEIFY_THRESHOLD = 8;
    //非树化界限
    static final int UNTREEIFY_THRESHOLD = 6;
    //树化最小容量
    static final int MIN_TREEIFY_CAPACITY = 64;

    private static final int MIN_TRANSFER_STRIDE = 16;

    private static int RESIZE_STAMP_BITS = 16;

    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    static final int MOVED     = -1; // 正在转移节点
    static final int TREEBIN   = -2; // 已经转移成树
    static final int RESERVED  = -3; // 临时保留hash
    static final int HASH_BITS = 0x7fffffff; // 普通节点可用的hash位数

    // 可用cpu核数
    static final int NCPU = Runtime.getRuntime().availableProcessors();

大致的解释都在注释里了,具体的应用还是要再方法里看

内部类

链表节点类:

    static class Node implements Map.Entry {
        final int hash;
        final K key;
        volatile V val;
        volatile Node next;
    ...
    }

很简单,只有四个属性,两个方法,值得注意的就是修饰属性的关键字,首先是hash和key,使用final修饰的,应该是保证这个节点的key不可变,除非删除节点,否则不能改变key值,响应的hash也就不能变.然后是val和next是用volatile修饰的,因为get()方法时无锁的,所以这里有使用volatile保证及时的可见性.

树节点:

    static final class TreeNode extends Node {
        TreeNode parent;  // red-black tree links
        TreeNode left;
        TreeNode right;
        TreeNode prev;    // needed to unlink next upon deletion
        boolean red;
    ...
    }

很简单的红黑树结构,但是红黑树本身并不简单,或许可以很好的理解插入删除等操作,但是真正去实现,就有点难,关于红黑树的源码,可以看TreeMap的源码.

操作节点数组的三个原子方法

    // 返回节点数组的指定位置的节点的原子操作
    static final  Node tabAt(Node[] tab, int i) {
        return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    // cas原子操作,在指定位置设置值
    static final  boolean casTabAt(Node[] tab, int i,
                                        Node c, Node v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    //原子操作,在指定位置设置值
    static final  void setTabAt(Node[] tab, int i, Node v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

构造函数

    //默认构造,使用默认参数构造
    public ConcurrentHashMap() {
    }
    //设置初始容量的构造,底层数组将设置为2的整数幂大小
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    //还有很多,可以指定负载因子,传入一个Map,并行度的构造函数
    ...

get方法

    public V get(Object key) {
        Node[] tab; Node e, p; int n, eh; K ek;
        // 计算hash值
        int h = spread(key.hashCode());
        //定位及判断
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //判断定位到的头结点是不是key
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 头结点hash值为负的情况
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //遍历节点寻找,找到返回值
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        // 没有该key对应的值,返回null
        return null;
    }

put方法

put方法实际调用的是putVal(),直接看这个方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
        // concurrent不允许key或value为null
        if (key == null || value == null) throw new NullPointerException();
        // 计算hash值
        int hash = spread(key.hashCode());
        // 记录这个链表有多少个值,控制转树
        int binCount = 0;
        // 无限制循环
        for (Node[] tab = table;;) {
            Node f; int n, i, fh;
            // 如果table没有初始化,就调用初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 如果节点数组当前位置没有值,就调用cas操作设置值
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果当前节点正在扩容的阶段,改线程也进行帮助扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //如果当前节点一切正常,就获得当前头结点的排它锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 大于0代表还是链表
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历列表,寻找是否有对应的key
                            for (Node e = f;; ++binCount) {
                                K ek;
                                // 如果有,就替换
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node pred = e;
                                //如果没有,就尾插一个新的节点
                                if ((e = e.next) == null) {
                                    pred.next = new Node(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 已经变树了
                        else if (f instanceof TreeBin) {
                            Node p;
                            binCount = 2;
                            // 调用putTreeVal将节点插入红黑树中
                            if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 判断这个数组该位置的节点数是不是大于等于变树界限,如果是,就转树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //计数加一
        addCount(1L, binCount);
        return null;
    }

总结一下:

  1. 首先判断添加的k-v是不是null,如果是,就抛出异常
  2. 接着判断table是否已经初始化,如果没有,就初始化
  3. 然后计算hash确定在数组中的位置
  4. 如果当前位置为空,直接添加,如果不为空,取出这个节点
  5. 如果该节点hash值是moved,表示正在扩容,就去帮助扩容
  6. 否则,取出节点,判断是树还是链表(hash值是否小于0),如果是链表,就操作链表,如果是树,就操作树
  7. 判断当前位置的count是不是大于界限值,如果大于,就转换成树

扩容

扩容比较复杂,大致说一下:
首先确认的就是,底层数组的长度永远都是2的正整数幂的大小,然后扩容和转红黑树之间也有一定关系,当底层数组大小小于64的时候,链表达到界限值是不会转树的,而是扩容数组,只有数组大于等于64后,才会转树.
触发扩容的条件是当容器存储值大于sizectl(capacity*负载因子).
ConcurrentHashMap扩容是一个并发的扩容,通常是将原来的数组复制到一个新的数组,然后允许多个线程进入,每个线程负责一定长度(16)的数组部分.

源码待析…

同步问题

get

可以看到,get全程是无锁的,也就是说无论是否有其他线程正在扩容或者正在put,都是无锁进行的,可以想到的就是上面说到的节点的属性,首先扩容不改变值,且在新的数组中进行,所以不影响get(),其次是put(),这里应该就是通过volatile来保证及时的可见性,毕竟get()操作只是获取值.

put

这里就是put和扩容时的问题.上面可以看到,如果数组正在扩容,会设置节点hash为moved,put线程看到后,会调用helpTransfer方法帮助扩容

主要方法

其实上面都有说到,concurrentHashmap主要通过synchronizedcas来完成同步.
在获取sizeCtl和定位数组中的位置时都是CAS
在put某个位置时,使用synchronized
等等

总结

concurrentHashMap这个并发容器真的是比较重要,我看到的源码里用的有Spring的IOC容器底层实现就是一个ConcurrentHashMap,所以掌握深一点绝对没错.


  目录