ConcurrentHashMap高并发性的实现机制_第1页
ConcurrentHashMap高并发性的实现机制_第2页
ConcurrentHashMap高并发性的实现机制_第3页
ConcurrentHashMap高并发性的实现机制_第4页
ConcurrentHashMap高并发性的实现机制_第5页
已阅读5页,还剩13页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

一、HashMap,即java.util.HashMap标准链地址法实现。这个不用多解析,下图十分明了。(图片来自网络)二、Collections.synchronizedMap()函数返回的线程安全的HashMap这个的实现比较简单。代码中有:12privatefinalMap<K,V>m;

//BackingMapfinalObject

mutex;//Objectonwhichtosynchronize基本所有的方法都加上了synchronized(mutex)。但是这个HashMap不能随便地进行迭代,因为它只是简单包装了HashMap,而回看HashMap的实现,我们可以发现,对于冲突的key,形成一个链表,明显如果有一个线程在历遍HashMap,另一个线程在做删除操作,则很有可能出错。因此,JDK中给出以下代码:123456789Mapm=Collections.synchronizedMap(newHashMap());

...

Sets=m.keySet();

//Needn'tbeinsynchronizedblock

...

synchronized(m){

//Synchronizingonm,nots!

Iteratori=s.iterator();//Mustbeinsynchronizedblock

while(i.hasNext())

foo(i.next());

}三、ConcurrentHashMap对于HashMap,最主要的是以下四种的操作:1234publicVget(Objectkey)

publicVput(Kkey,Vvalue)

publicVremove(Objectkey)

迭代在多线程环境下,get,put,remove都是比较容易实现的(如果不考虑效率,简单加锁即可),迭代的操作才是真正的难点。从Collections.synchronizedMap()的迭代来看,它并不能做到对客户代码透明,有点蛋疼。下面简单分析ConcurrentHashMap的实现,相当精巧。默认一个ConcurrentHashMap中有16个子HashMap,所以相当于一个二级哈希。对于所有的操作都是先定位到子HashMap,再作相应的操作。对于:publicVget(Objectkey)先得到key所在的table,再像HashMap一样get

中间并不加锁publicVput(Kkey,Vvalue)先得到所属的table,加锁

判断table是否要扩容

如果table要扩容,则产生newTable

hash值相同的slot整体移到newTable

hash值不同的slot,把oldTable中的所有Entry都复制到newTable中

因为有可能其它线程在历遍这个table,所以不能把原来的链表拆断publicVremove(Objectkey)remove操作,如下图,要删除Entry3,则先复制Entry1为Entry1*,Entry1*指向Entry4,再复制Entry2为Entry2*,Entry2*指向Entry1*,最终形成一个两叉的链表。原本的Entry1,Entry2,Entry3会被GC自动回收。迭代操作:ConcurrentHashMap的历遍是从后向前历遍的,因为如果有另一个线程B在执行clear操作时,会把table中的所有slot都置为null,这个操作是从前向后执行

如果线程A在历遍Map时也是从前向后,则有可能会出现追赶现象。以下代码:123456HashMap<Integer,String>m1=newHashMap();

m1.put(1,"001");

m1.put(2,"002");

for(Entry<Integer,String>entry:m1.entrySet()){

System.out.println("key:"+entry.getKey());

}HashMap输出的是key:1key:2

ConcurrentHashMap输出的是key:2key:1考虑到在使用HashMap在并发时会出现不正确行为,根据网上资料自己编写了采用ConcurrentHashMap来完成静态缓存的处理,目的是为了能够用来处理高并发的线程安全类,如有问题请各位大侠指教:[java]\o"viewplain"viewplain\o"copy"copy\o"print"print\o"?"?package

com.zengms.cache;

import

java.util.Map;

import

java.util.concurrent.ConcurrentHashMap;

import

mons.logging.Log;

import

mons.logging.LogFactory;

public

class

MapCacheManager

{

private

final

static

Log

log

=

LogFactory.getLog(MapCacheManager.class);

private

volatile

long

updateTime

=

0L;//

更新缓存时记录的时间

private

volatile

boolean

updateFlag

=

true;//

正在更新时的阀门,为false时表示当前没有更新缓存,为true时表示当前正在更新缓存

private

volatile

static

MapCacheManager

mapCacheObject;//

缓存实例对象

private

static

Map<String,

String>

cacheMap

=

new

ConcurrentHashMap<String,

String>();//

缓存容器

private

MapCacheManager()

{

this.LoadCache();//

加载缓存

updateTime

=

System.currentTimeMillis();//

缓存更新时间

}

/**

*

采用单例模式获取缓存对象实例

*

*

@return

*/

public

static

MapCacheManager

getInstance()

{

if

(null

==

mapCacheObject)

{

synchronized

(MapCacheManager.class)

{

if

(null

==

mapCacheObject)

{

mapCacheObject

=

new

MapCacheManager();

}

}

}

return

mapCacheObject;

}

/**

*

装载缓存

*/

private

void

LoadCache()

{

this.updateFlag

=

true;//

正在更新

/**********

数据处理,将数据放入cacheMap缓存中

**begin

******/

cacheMap.put("key1",

"value1");

cacheMap.put("key2",

"value2");

cacheMap.put("key3",

"value3");

cacheMap.put("key4",

"value4");

cacheMap.put("key5",

"value5");

/**********

数据处理,将数据放入cacheMap缓存中

***end

*******/

this.updateFlag

=

false;//

更新已完成

}

/**

*

返回缓存对象

*

*

@return

*/

public

Map<String,

String>

getMapCache()

{

long

currentTime

=

System.currentTimeMillis();

if

(this.updateFlag)

{//

前缓存正在更新

("cache

is

Instance

.....");

return

null;

}

if

(this.IsTimeOut(currentTime))

{//

如果当前缓存正在更新或者缓存超出时限,需重新加载

synchronized

(this)

{

this.ReLoadCache();

this.updateTime

=

currentTime;

}

}

return

this.cacheMap;

}

private

boolean

IsTimeOut(long

currentTime)

{

return

((currentTime

-

this.updateTime)

>

1000000);//

超过时限,超时

}

/**

*

获取缓存项大小

*

@return

*/

private

int

getCacheSize()

{

return

cacheMap.size();

}

/**

*

获取更新时间

*

@return

*/

private

long

getUpdateTime()

{

return

this.updateTime;

}

/**

*

获取更新标志

*

@return

*/

private

boolean

getUpdateFlag()

{

return

this.updateFlag;

}

/**

*

重新装载

*/

private

void

ReLoadCache()

{

this.cacheMap.clear();

this.LoadCache();

}

}

测试代码:[java]\o"viewplain"viewplain\o"copy"copy\o"print"print\o"?"?package

com.zengms.cache;

import

java.util.Iterator;

import

java.util.Map;

import

java.util.Set;

import

java.util.concurrent.ConcurrentHashMap;

public

class

CacheTest

{

public

static

void

main(String[]

args)

{

MapCacheManager

cache

=

MapCacheManager.getInstance();

Map<String,

String>

cacheMap

=

new

ConcurrentHashMap<String,

String>();

cacheMap

=

cache.getMapCache();

Set<String>

set

=

cacheMap.keySet();

Iterator<String>

it

=

set.iterator();

while(it.hasNext()){

String

key

=

it.next();

System.out.println(key+"="+cacheMap.get(key));

}

}

}

现阶段的学习策略是理解和实践这些知识点,并没有深入分析其原理,但确实精读了许多关于这个主题基础性的资料让我很受益(见参考资料)。哈希表基础1.哈希表是基于数组的数据结构2.通过对关键字的哈希运算实现元素的快速定位3.哈希表的重点是哈希化,哈希化负责把一个大范围的数字转化成一个小范围的数字4.哈希化过程中会产生值冲突,这种情况有多种办法可以解决(开放地址法、链地址法)4.1.开放地址法,通过在哈希表中寻找一个空位解决冲突问题,寻找空位的方法也有多种(线性探测、二次探测、再哈希)4.2.链地址法,通过在哈希表单元中加入链表的方式解决5.哈希表的重要缺点5.1.当存储数组基本被填满时性能下降很高5.2.对存储数组进行扩容会分别对已存储的元素重新计算哈希的过程ConcurrentHashMap分段与锁的学习一、结构

二、定位分段这块对Key的哈希值进行移位处理,首先给定的Key在哪一段,然后从具体段中定位Hash值对应具体值对象。

三、锁ConcurrentHashMap没有将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制实现更大程度的共享。这种细粒度的加锁机制体现在ConcurrentHashMap划分的Segment数组,Segment数组上各Segment元素代表了粒度更细的锁,从结构图中可以看到Segment继承自ReentrantLock可重入锁。ConcurrentHashMap这种基于分组Segment并加锁的策略可在高并发的环境下获得更高的吞吐量。ConcurrentHashMap实现并发的基础操作都通过sun.misc.Unsafe完成。

四、sun.misc.UnsafeUnsafe类相关于一个工具类,低层调用native方法,提供了基础CompareAndSet(CAS)支持,通过CAS可以在不加锁并发情况下实现数字或引用的细粒度更新。ConcurrentHashMap的锁分离技术博客分类:源码学习

concurrenthashmap是一个非常好的map实现,在高并发操作的场景下会有非常好的效率。实现的目的主要是为了避免同步操作时对整个map对象进行锁定从而提高并发访问能力。

ConcurrentHashMap类中包含两个静态内部类HashEntry和Segment。HashEntry用来封装映射表的键/值对;Segment用来充当锁的角色,每个Segment对象守护整个散列映射表的若干个桶。每个桶是由若干个HashEntry对象链接起来的链表。一个ConcurrentHashMap实例中包含由若干个Segment对象组成的数组。

Java代码

\o"收藏这段代码"static

final

class

HashEntry<K,V>

{

final

K

key;

//

声明

key

final

final

int

hash;

//

声明

hash

值为

final

volatile

V

value;

//

声明

value

volatile

final

HashEntry<K,V>

next;

//

声明

next

final

HashEntry(K

key,

int

hash,

HashEntry<K,V>

next,

V

value)

{

this.key

=

key;

this.hash

=

hash;

this.next

=

next;

this.value

=

value;

}

}

Java代码

\o"收藏这段代码"static

final

class

Segment<K,V>

extends

ReentrantLock

implements

Serializable

{

transient

volatile

int

count;

//在本

segment

范围内,包含的

HashEntry

元素的个数

//volatile

transient

int

modCount;

//table

被更新的次数

transient

int

threshold;

//默认容量

final

float

loadFactor;

//装载因子

/**

*

table

是由

HashEntry

对象组成的数组

*

如果散列时发生碰撞,碰撞的

HashEntry

对象就以链表的形式链接成一个链表

*

table

数组的数组成员代表散列映射表的一个桶

*/

transient

volatile

HashEntry<K,V>[]

table;

/**

*

根据

key

的散列值,找到

table

中对应的那个桶(table

数组的某个数组成员)

*

把散列值与

table

数组长度减

1

的值相“与”,得到散列值对应的

table

数组的下标

*

然后返回

table

数组中此下标对应的

HashEntry

元素

*

即这个段中链表的第一个元素

*/

HashEntry<K,V>

getFirst(int

hash)

{

HashEntry<K,V>[]

tab

=

table;

return

tab[hash

&

(tab.length

-

1)];

}

Segment(int

initialCapacity,

float

lf)

{

loadFactor

=

lf;

setTable(HashEntry.<K,V>newArray(initialCapacity));

}

/**

*

设置

table

引用到这个新生成的

HashEntry

数组

*

只能在持有锁或构造函数中调用本方法

*/

void

setTable(HashEntry<K,V>[]

newTable)

{

threshold

=

(int)(newTable.length

*

loadFactor);

table

=

newTable;

}

}

注意Segment继承了ReentrantLock锁

左边便是Hashtable的实现方式---锁整个hash表;而右边则是ConcurrentHashMap的实现方式---锁桶(或段)。ConcurrentHashMap将hash表分为16个桶(默认值),诸如get,put,remove等常用操作只锁当前需要用到的桶。试想,原来只能一个线程进入,现在却能同时16个写线程进入(写线程才需要锁定,而读线程几乎不受限制,之后会提到),并发性的提升是显而易见的。

更令人惊讶的是ConcurrentHashMap的读取并发,因为在读取的大多数时候都没有用到锁定,所以读取操作几乎是完全的并发操作,而写操作锁定的粒度又非常细,比起之前又更加快速(这一点在桶更多时表现得更明显些)。只有在求size等操作时才需要锁定整个表。而在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器(见之前的文章《JAVAAPI备忘---集合》)的另一种迭代方式,我们称为弱一致迭代器。在这种迭代方式中,当iterator被创建后集合再发生改变就不再是抛出ConcurrentModificationException,取而代之的是在改变时new新的数据从而不影响原有的数据,iterator完成后再将头指针替换为新的数据,这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。

接下来,让我们看看ConcurrentHashMap中的几个重要方法,心里知道了实现机制后,使用起来就更加有底气。

ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图可以看出之间的关系。

get方法(请注意,这里分析的方法都是针对桶的,因为ConcurrentHashMap的最大改进就是将粒度细化到了桶上),首先判断了当前桶的数据个数是否为0,为0自然不可能get到什么,只有返回null,这样做避免了不必要的搜索,也用最小的代价避免出错。然后得到头节点(方法将在下面涉及)之后就是根据hash和key逐个判断是否是指定的值,如果是并且值非空就说明找到了,直接返回;程序非常简单,但有一个令人困惑的地方,这句returnreadValueUnderLock(e)到底是用来干什么的呢?研究它的代码,在锁定之后返回一个值。但这里已经有一句Vv=e.value得到了节点的值,这句returnreadValueUnderLock(e)是否多此一举?事实上,这里完全是为了并发考虑的,这里当v为空时,可能是一个线程正在改变节点,而之前的get操作都未进行锁定,根据bernstein条件,读后写或写后读都会引起数据的不一致,所以这里要对这个e重新上锁再读一遍,以保证得到的是正确值,这里不得不佩服DougLee思维的严密性。整个get操作只有很少的情况会锁定,相对于之前的Hashtable,并发是不可避免的啊!

get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。Java代码

V

get(Object

key,

int

hash)

{

if

(count

!=

0)

{

//

read-volatile

HashEntry

e

=

getFirst(hash);

while

(e

!=

null)

{

if

(e.hash

==

hash

&&

key.equals(e.key))

{

V

v

=

e.value;

if

(v

!=

null)

return

v;

return

readValueUnderLock(e);

//

recheck

}

e

=

e.next;

}

}

return

null;

}

V

readValueUnderLock(HashEntry

e)

{

lock();

try

{

return

e.value;

}

finally

{

unlock();

}

}

put操作一上来就锁定了整个segment,这当然是为了并发的安全,修改数据是不能并发进行的,必须得有个判断是否超限的语句以确保容量不足时能够rehash,而比较难懂的是这句intindex=hash&(tab.length-1),原来segment里面才是真正的hashtable,即每个segment是一个传统意义上的hashtable,如上图,从两者的结构就可以看出区别,这里就是找出需要的entry在table的哪一个位置,之后得到的entry就是这个链的第一个节点,如果e!=null,说明找到了,这是就要替换节点的值(onlyIfAbsent==false),否则,我们需要new一个entry,它的后继是first,而让tab[index]指向它,什么意思呢?实际上就是将这个新entry插入到链头,剩下的就非常容易理解了。

Java代码

V

put(K

key,

int

hash,

V

value,

boolean

onlyIfAbsent)

{

lock();

try

{

int

c

=

count;

if

(c++

>

threshold)

//

ensure

capacity

rehash();

HashEntry[]

tab

=

table;

int

index

=

hash

&

(tab.length

-

1);

HashEntry

first

=

(HashEntry)

tab[index];

HashEntry

e

=

first;

while

(e

!=

null

&&

(e.hash

!=

hash

||

!key.equals(e.key)))

e

=

e.next;

V

oldValue;

if

(e

!=

null)

{

oldValue

=

e.value;

if

(!onlyIfAbsent)

e.value

=

value;

}

else

{

oldValue

=

null;

++modCount;

tab[index]

=

new

HashEntry(key,

hash,

first,

value);

count

=

c;

//

write-volatile

}

return

oldValue;

}

finally

{

unlock();

}

}

remove操作非常类似put,但要注意一点区别,中间那个for循环是做什么用的呢?(*

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论