前言
基于JDK8的源码
ConcurrentHashMap介绍使用什么的我就不说了,直接进入主题吧
主要属性
// 容量最大值
private static final int MAXIMUM_CAPACITY = 1 << 30
// 默认容量
private static final int DEFAULT_CAPACITY = 16;
//底层数组最大长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并行度
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表变成红黑树的界限值
static final int TREEIFY_THRESHOLD = 8;
//非树化界限
static final int UNTREEIFY_THRESHOLD = 6;
//树化最小容量
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; // 正在转移节点
static final int TREEBIN = -2; // 已经转移成树
static final int RESERVED = -3; // 临时保留hash
static final int HASH_BITS = 0x7fffffff; // 普通节点可用的hash位数
// 可用cpu核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
大致的解释都在注释里了,具体的应用还是要再方法里看
内部类
链表节点类:
static class Node implements Map.Entry {
final int hash;
final K key;
volatile V val;
volatile Node next;
...
}
很简单,只有四个属性,两个方法,值得注意的就是修饰属性的关键字,首先是hash和key,使用final修饰的,应该是保证这个节点的key不可变,除非删除节点,否则不能改变key值,响应的hash也就不能变.然后是val和next是用volatile修饰的,因为get()方法时无锁的,所以这里有使用volatile保证及时的可见性.
树节点:
static final class TreeNode extends Node {
TreeNode parent; // red-black tree links
TreeNode left;
TreeNode right;
TreeNode prev; // needed to unlink next upon deletion
boolean red;
...
}
很简单的红黑树结构,但是红黑树本身并不简单,或许可以很好的理解插入删除等操作,但是真正去实现,就有点难,关于红黑树的源码,可以看TreeMap的源码.
操作节点数组的三个原子方法
// 返回节点数组的指定位置的节点的原子操作
static final Node tabAt(Node[] tab, int i) {
return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// cas原子操作,在指定位置设置值
static final boolean casTabAt(Node[] tab, int i,
Node c, Node v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//原子操作,在指定位置设置值
static final void setTabAt(Node[] tab, int i, Node v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
构造函数
//默认构造,使用默认参数构造
public ConcurrentHashMap() {
}
//设置初始容量的构造,底层数组将设置为2的整数幂大小
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
//还有很多,可以指定负载因子,传入一个Map,并行度的构造函数
...
get方法
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
// 计算hash值
int h = spread(key.hashCode());
//定位及判断
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//判断定位到的头结点是不是key
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 头结点hash值为负的情况
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//遍历节点寻找,找到返回值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
// 没有该key对应的值,返回null
return null;
}
put方法
put方法实际调用的是putVal(),直接看这个方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
// concurrent不允许key或value为null
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
int hash = spread(key.hashCode());
// 记录这个链表有多少个值,控制转树
int binCount = 0;
// 无限制循环
for (Node[] tab = table;;) {
Node f; int n, i, fh;
// 如果table没有初始化,就调用初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果节点数组当前位置没有值,就调用cas操作设置值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果当前节点正在扩容的阶段,改线程也进行帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果当前节点一切正常,就获得当前头结点的排它锁
synchronized (f) {
if (tabAt(tab, i) == f) {
// 大于0代表还是链表
if (fh >= 0) {
binCount = 1;
//遍历列表,寻找是否有对应的key
for (Node e = f;; ++binCount) {
K ek;
// 如果有,就替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
//如果没有,就尾插一个新的节点
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
// 已经变树了
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
// 调用putTreeVal将节点插入红黑树中
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 判断这个数组该位置的节点数是不是大于等于变树界限,如果是,就转树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//计数加一
addCount(1L, binCount);
return null;
}
总结一下:
- 首先判断添加的k-v是不是null,如果是,就抛出异常
- 接着判断table是否已经初始化,如果没有,就初始化
- 然后计算hash确定在数组中的位置
- 如果当前位置为空,直接添加,如果不为空,取出这个节点
- 如果该节点hash值是moved,表示正在扩容,就去帮助扩容
- 否则,取出节点,判断是树还是链表(hash值是否小于0),如果是链表,就操作链表,如果是树,就操作树
- 判断当前位置的count是不是大于界限值,如果大于,就转换成树
扩容
扩容比较复杂,大致说一下:
首先确认的就是,底层数组的长度永远都是2的正整数幂的大小,然后扩容和转红黑树之间也有一定关系,当底层数组大小小于64的时候,链表达到界限值是不会转树的,而是扩容数组,只有数组大于等于64后,才会转树.
触发扩容的条件是当容器存储值大于sizectl(capacity*负载因子).ConcurrentHashMap
扩容是一个并发的扩容,通常是将原来的数组复制到一个新的数组,然后允许多个线程进入,每个线程负责一定长度(16)的数组部分.
源码待析…
同步问题
get
可以看到,get全程是无锁的,也就是说无论是否有其他线程正在扩容或者正在put,都是无锁进行的,可以想到的就是上面说到的节点的属性,首先扩容不改变值,且在新的数组中进行,所以不影响get(),其次是put(),这里应该就是通过volatile来保证及时的可见性,毕竟get()操作只是获取值.
put
这里就是put和扩容时的问题.上面可以看到,如果数组正在扩容,会设置节点hash为moved,put线程看到后,会调用helpTransfer
方法帮助扩容
主要方法
其实上面都有说到,concurrentHashmap
主要通过synchronized
和cas
来完成同步.
在获取sizeCtl和定位数组中的位置时都是CAS
在put某个位置时,使用synchronized
等等
总结
concurrentHashMap这个并发容器真的是比较重要,我看到的源码里用的有Spring的IOC容器底层实现就是一个ConcurrentHashMap
,所以掌握深一点绝对没错.