vlambda博客
学习文章列表

jdk源码解析--ReentryReadWriteLock类(下)

在上文中,我们对读写锁大概有了一个了解,本节我们将深入去看下读写锁内部的具体实现,内部的同步器Sync是怎么实现读写锁的功能。现在我们先看下sync的内部实现。

1. abstract static class Sync extends AbstractQueuedSynchronizer{  

2.         private static final long serialVersionUID = 6317671515068378041L;  

3.         //以下使用位运算计算共享的数量  

4.         static final int SHARED_SHIFT   = 16;  

5.         static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  

6.         static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  

7.         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;  

8.   

9.         /** Returns the number of shared holds represented in count  */  

10.         static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }//返回共享的数量  

11.         /** Returns the number of exclusive holds represented in count  */  

12.         static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//返回独占的数量  

13.   

14.         static final class HoldCounter {//一个由本地线程变量维护的缓存器,返回该线程占用的次数  

15.             int count = 0;  

16.             // Use id, not reference, to avoid garbage retention  

17.             final long tid = getThreadId(Thread.currentThread());  

18.         }  

19.         static final class ThreadLocalHoldCounter  

20.             extends ThreadLocal<HoldCounter> {//使用threadlocal实现计数器  

21.             public HoldCounter initialValue() {  

22.                 return new HoldCounter();  

23.             }  

24.         }  

25.         private transient ThreadLocalHoldCounter readHolds;//缓存  

26.         private transient HoldCounter cachedHoldCounter;  

27.         private transient Thread firstReader = null;//第一个读线程  

28.         private transient int firstReaderHoldCount;//第一个读线程的锁住次数  

29.   

30.         Sync() {  

31.             readHolds = new ThreadLocalHoldCounter();  

32.             setState(getState()); // 确保读线程可见  

33.         }  

34.         abstract boolean readerShouldBlock();//是否读阻塞  

35.         abstract boolean writerShouldBlock();//是否写阻塞  

36.         protected final boolean tryRelease(int releases) {//尝试释放  

37.             if (!isHeldExclusively())  

38.                 throw new IllegalMonitorStateException();  

39.             int nextc = getState() - releases;//减去释放的次数  

40.             boolean free = exclusiveCount(nextc) == 0;//判断是否已经没有占用者  

41.             if (free)  

42.                 setExclusiveOwnerThread(null);//如果没有占用者,全部释放  

43.             setState(nextc);//设置state的值  

44.             return free;//返回是否释放完  

45.         }  

46.   

47.         protected final boolean tryAcquire(int acquires) {  

48.             

49.             Thread current = Thread.currentThread();//获取当前线程  

50.             int c = getState();//获取占用次数  

51.             int w = exclusiveCount(c);//该线程的占用次数  

52.             if (c != 0) {//读锁的情况  

53.                 // 如果读数量和写的数量不为0且占用者不是该线程,返回fail  

54.                 if (w == 0 || current != getExclusiveOwnerThread())  

55.                     return false;  

56.                //如果已经大于最大的占用量,返回fail  

57.                 if (w + exclusiveCount(acquires) > MAX_COUNT)  

58.                     throw new Error("Maximum lock count exceeded");  

59.                 // 否则设置值  

60.                 setState(c + acquires);  

61.                 return true;  

62.             }  

63.             //写锁的判断  

64.             if (writerShouldBlock() ||  

65.                 !compareAndSetState(c, c + acquires))//如果是写锁阻塞或者设置失败,返回不成功  

66.                 return false;  

67.             setExclusiveOwnerThread(current);//设置当前占有者  

68.             return true;//返回成功  

69.         }  

70.        //共享的释放  

71.         protected final boolean tryReleaseShared(int unused) {  

72.             Thread current = Thread.currentThread();  

73.             if (firstReader == current) {//如果当前线程是第一个读者  

74.                 // assert firstReaderHoldCount > 0;  

75.                 if (firstReaderHoldCount == 1)//如果第一个读者的持有数量是1  

76.                     firstReader = null;//设置firstReader 为null  

77.                 else  

78.                     firstReaderHoldCount--;//减1  

79.             } else {  

80.                 HoldCounter rh = cachedHoldCounter;  

81.                 if (rh == null || rh.tid != getThreadId(current))  

82.                     rh = readHolds.get();//获取当前线程的计数器  

83.                 int count = rh.count;//获取计数值  

84.                 if (count <= 1) {//如果小于等于1  

85.                     readHolds.remove();//清理当前的计数器,防止内存泄漏  

86.                     if (count <= 0)  

87.                         throw unmatchedUnlockException();  

88.                 }  

89.                 --rh.count;//否则减1  

90.             }  

91.             for (;;) {  

92.                 int c = getState();  

93.                 int nextc = c - SHARED_UNIT;  

94.                 if (compareAndSetState(c, nextc))//CAS设置值  

95.                     return nextc == 0;  

96.             }  

97.         }  

98.   

99.         private IllegalMonitorStateException unmatchedUnlockException() {  

100.             return new IllegalMonitorStateException(  

101.                 "attempt to unlock read lock, not locked by current thread");  

102.         }  

103.   

104.         protected final int tryAcquireShared(int unused) {  

105.               

106.               

107.             Thread current = Thread.currentThread();  

108.             int c = getState();  

109.             if (exclusiveCount(c) != 0 &&  

110.                 getExclusiveOwnerThread() != current)//如果不是当前线程,返回失败  

111.                 return -1;  

112.             int r = sharedCount(c);//获取共享的占用数量  

113.             if (!readerShouldBlock() &&  

114.                 r < MAX_COUNT &&  

115.                 compareAndSetState(c, c + SHARED_UNIT)) {//读没阻塞且小于最大占用数且设置成功  

116.                 if (r == 0) {//第一个占用者,设置第一个占用者的信息  

117.                     firstReader = current;  

118.                     firstReaderHoldCount = 1;  

119.                 } else if (firstReader == current) {  

120.                     firstReaderHoldCount++;//如果第一个占用者是当前线程,占用数+1  

121.                 } else {//其他线程的话,就相应获取加1  

122.                     HoldCounter rh = cachedHoldCounter;  

123.                     if (rh == null || rh.tid != getThreadId(current))  

124.                         cachedHoldCounter = rh = readHolds.get();  

125.                     else if (rh.count == 0)  

126.                         readHolds.set(rh);  

127.                     rh.count++;  

128.                 }  

129.                 return 1;  

130.             }  

131.             return fullTryAcquireShared(current);//上面的操作不成功,可能是CAS失败,尝试更新之后再添加  

132.         }  

133.   

134.           

135.         final int fullTryAcquireShared(Thread current) {  

136.               

137.             HoldCounter rh = null;  

138.             for (;;) {//死循环  

139.                 int c = getState();  

140.                 if (exclusiveCount(c) != 0) {  

141.                     if (getExclusiveOwnerThread() != current)//不是当前线程  

142.                         return -1;//返回失败  

143.                     // else we hold the exclusive lock; blocking here  

144.                     // would cause deadlock.  

145.                 } else if (readerShouldBlock()) {//如果是读锁阻塞  

146.                     // Make sure we're not acquiring read lock reentrantly  

147.                     if (firstReader == current) {  

148.                         // assert firstReaderHoldCount > 0;  

149.                     } else {//当前线程不是第一个占用者  

150.                         if (rh == null) {  

151.                             rh = cachedHoldCounter;  

152.                             if (rh == null || rh.tid != getThreadId(current)) {  

153.                                 rh = readHolds.get();  

154.                                 if (rh.count == 0)//如果为0,清除  

155.                                     readHolds.remove();  

156.                             }  

157.                         }  

158.                         if (rh.count == 0)  

159.                             return -1;  

160.                     }  

161.                 }  

162.                 if (sharedCount(c) == MAX_COUNT)//占用超过最大值  

163.                     throw new Error("Maximum lock count exceeded");  

164.                 if (compareAndSetState(c, c + SHARED_UNIT)) {//以下和上面tryAcquireShared一样重复了过程  

165.                     if (sharedCount(c) == 0) {  

166.                         firstReader = current;  

167.                         firstReaderHoldCount = 1;  

168.                     } else if (firstReader == current) {  

169.                         firstReaderHoldCount++;  

170.                     } else {  

171.                         if (rh == null)  

172.                             rh = cachedHoldCounter;  

173.                         if (rh == null || rh.tid != getThreadId(current))  

174.                             rh = readHolds.get();  

175.                         else if (rh.count == 0)  

176.                             readHolds.set(rh);  

177.                         rh.count++;  

178.                         cachedHoldCounter = rh; // cache for release  

179.                     }  

180.                     return 1;  

181.                 }  

182.             }  

183.         }  

184.   

185.         /** 

186.          * Performs tryLock for write, enabling barging in both modes. 

187.          * This is identical in effect to tryAcquire except for lack 

188.          * of calls to writerShouldBlock. 

189.          */  

190.         final boolean tryWriteLock() {//尝试占用写锁,立马返回的  

191.             Thread current = Thread.currentThread();  

192.             int c = getState();  

193.             if (c != 0) {  

194.                 int w = exclusiveCount(c);  

195.                 if (w == 0 || current != getExclusiveOwnerThread())  

196.                     return false;  

197.                 if (w == MAX_COUNT)  

198.                     throw new Error("Maximum lock count exceeded");  

199.             }  

200.             if (!compareAndSetState(c, c + 1))  

201.                 return false;  

202.             setExclusiveOwnerThread(current);  

203.             return true;  

204.         }  

205.   

206.         /** 

207.          * Performs tryLock for read, enabling barging in both modes. 

208.          * This is identical in effect to tryAcquireShared except for 

209.          * lack of calls to readerShouldBlock. 

210.          */  

211.         final boolean tryReadLock() {//尝试占用读锁,立马返回的,操作基本类似  

212.             Thread current = Thread.currentThread();  

213.             for (;;) {  

214.                 int c = getState();  

215.                 if (exclusiveCount(c) != 0 &&  

216.                     getExclusiveOwnerThread() != current)  

217.                     return false;  

218.                 int r = sharedCount(c);  

219.                 if (r == MAX_COUNT)  

220.                     throw new Error("Maximum lock count exceeded");  

221.                 if (compareAndSetState(c, c + SHARED_UNIT)) {  

222.                     if (r == 0) {  

223.                         firstReader = current;  

224.                         firstReaderHoldCount = 1;  

225.                     } else if (firstReader == current) {  

226.                         firstReaderHoldCount++;  

227.                     } else {  

228.                         HoldCounter rh = cachedHoldCounter;  

229.                         if (rh == null || rh.tid != getThreadId(current))  

230.                             cachedHoldCounter = rh = readHolds.get();  

231.                         else if (rh.count == 0)  

232.                             readHolds.set(rh);  

233.                         rh.count++;  

234.                     }  

235.                     return true;  

236.                 }  

237.             }  

238.         }  

239.   

240.         protected final boolean isHeldExclusively() {  

241.             // While we must in general read state before owner,  

242.             // we don't need to do so to check if current thread is owner  

243.             return getExclusiveOwnerThread() == Thread.currentThread();  

244.         }  

245.   

246.         // Methods relayed to outer class  

247.   

248.         final ConditionObject newCondition() {  

249.             return new ConditionObject();  

250.         }  

251.   

252.         final Thread getOwner() {  

253.             // Must read state before owner to ensure memory consistency  

254.             return ((exclusiveCount(getState()) == 0) ?  

255.                     null :  

256.                     getExclusiveOwnerThread());  

257.         }  

258.   

259.         final int getReadLockCount() {  

260.             return sharedCount(getState());  

261.         }  

262.   

263.         final boolean isWriteLocked() {  

264.             return exclusiveCount(getState()) != 0;  

265.         }  

266.   

267.         final int getWriteHoldCount() {  

268.             return isHeldExclusively() ? exclusiveCount(getState()) : 0;  

269.         }  

270.   

271.         final int getReadHoldCount() {  

272.             if (getReadLockCount() == 0)  

273.                 return 0;  

274.   

275.             Thread current = Thread.currentThread();  

276.             if (firstReader == current)  

277.                 return firstReaderHoldCount;  

278.   

279.             HoldCounter rh = cachedHoldCounter;  

280.             if (rh != null && rh.tid == getThreadId(current))  

281.                 return rh.count;  

282.   

283.             int count = readHolds.get().count;  

284.             if (count == 0) readHolds.remove();  

285.             return count;  

286.         }  

287.   

288.         /** 

289.          * Reconstitutes the instance from a stream (that is, deserializes it). 

290.          */  

291.         private void readObject(java.io.ObjectInputStream s)  

292.             throws java.io.IOException, ClassNotFoundException {  

293.             s.defaultReadObject();  

294.             readHolds = new ThreadLocalHoldCounter();  

295.             setState(0); // reset to unlocked state  

296.         }  

297.   

298.         final int getCount() { return getState(); }  

299.     }  

上面的核心在于共享和可重入值的计算,一个总共线程占用的次数state值,一个是当前线程占用的次数使用本地变量计算器统计。在看这里的源码要对这两个值理解清楚,上面的代码看起来就不是那么复杂。先看下公平和非公平的构造

1. static final class NonfairSync extends Sync {  

2.         private static final long serialVersionUID = -8159625535654395037L;  

3.         final boolean writerShouldBlock() {  

4.             return false// 一直不阻塞  

5.         }  

6.         final boolean readerShouldBlock() {  

7.             /* As a heuristic to avoid indefinite writer starvation, 

8.              * block if the thread that momentarily appears to be head 

9.              * of queue, if one exists, is a waiting writer.  This is 

10.              * only a probabilistic effect since a new reader will not 

11.              * block if there is a waiting writer behind other enabled 

12.              * readers that have not yet drained from the queue. 

13.              */  

14.             return apparentlyFirstQueuedIsExclusive();//判断队列的第一是否是独占的,如果是独占,那么读锁就应该阻塞,写锁也要阻塞,如果是共享的话,读锁可以共享,写锁阻塞  

15.         }  

16.     }  

17. static final class FairSync extends Sync {  

18.         private static final long serialVersionUID = -2274990926593161451L;  

19.         final boolean writerShouldBlock() {  

20.             return hasQueuedPredecessors();//是否是第一个  

21.         }  

22.         final boolean readerShouldBlock() {  

23.             return hasQueuedPredecessors();//是否是第一个  

24.         }  

25.     }  

 

下面的读写锁的代码就相对简单了,基本是调用上面的同步器执行相应的操作,在这里就不介绍了,有兴趣的读者可以查看jdk的源码。当然读写锁还有部分问题存在,读的过程不允许写,如果希望增加效率的话,允许读的过程,也可以写,可以看下下一文中介绍的StampedLock