请选择 进入手机版 | 继续访问电脑版
搜索
房产
装修
汽车
婚嫁
健康
理财
旅游
美食
跳蚤
二手房
租房
招聘
二手车
教育
茶座
我要买房
买东西
装修家居
交友
职场
生活
网购
亲子
情感
龙城车友
找美食
谈婚论嫁
美女
兴趣
八卦
宠物
手机

源码分析— java读写锁ReentrantReadWriteLock

[复制链接]
查看: 31|回复: 0

1万

主题

2万

帖子

5万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
52027
发表于 2019-12-3 03:54 | 显示全部楼层 |阅读模式
前言

本日看Jraft的时候发现了很多地方都用到了读写锁,所以血汗来潮想要分析以下读写锁是怎样实现的。
先上一个doc里面的例子:
  1. class CachedData {  Object data;  volatile boolean cacheValid;  final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  void processCachedData() {      //加上一个读锁    rwl.readLock().lock();    if (!cacheValid) {      // Must release read lock before acquiring write lock      //必须在加写锁之前开释读锁      rwl.readLock().unlock();      rwl.writeLock().lock();      try {        // Recheck state because another thread might have        // acquired write lock and changed state before we did.          //两重检查        if (!cacheValid) {            //设备值          data = ...          cacheValid = true;        }        // Downgrade by acquiring read lock before releasing write lock          //锁升级,反之则不成        rwl.readLock().lock();      } finally {          //开释写锁,可是仍然持有写锁        rwl.writeLock().unlock(); // Unlock write, still hold read      }    }    try {      use(data);    } finally {        //开释读锁      rwl.readLock().unlock();    }  }}}
复制代码
我们一样平常实例化一个ReentrantReadWriteLock,一样平常是挪用空的机关器建立,所以默许利用的好坏公允锁
  1. public ReentrantReadWriteLock() {    this(false);}public ReentrantReadWriteLock(boolean fair) {      //默许利用的是NonfairSync    sync = fair ? new FairSync() : new NonfairSync();    readerLock = new ReadLock(this);    writerLock = new WriteLock(this);}//别离挪用writeLock和readLock会返回读写锁实例public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
复制代码
ReentrantReadWriteLock内部类Sync
  1. abstract static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = 6317671515068378041L;      //位移量      //在读写锁中,state是一个32位的int,所以用state的高16位表现读锁,用低16位表现写锁    static final int SHARED_SHIFT   = 16;      //由于读锁是高16位,所以用1向左移动16位表现读锁每次锁状态变革的量    static final int SHARED_UNIT    = (1  SHARED_SHIFT; }    //获得低16位写锁state次数,重入次数    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }    //用来记录每个线程持有的读锁数目    static final class HoldCounter {        int count = 0;        // Use id, not reference, to avoid garbage retention        final long tid = getThreadId(Thread.currentThread());    }        static final class ThreadLocalHoldCounter        extends ThreadLocal {        public HoldCounter initialValue() {            return new HoldCounter();        }    }    private transient ThreadLocalHoldCounter readHolds;      // 用于缓存,记录"末端一个获得读锁的线程"的读锁重入次数    private transient HoldCounter cachedHoldCounter;      // 第一个获得读锁的线程(而且其未开释读锁),以及它持有的读锁数目    private transient Thread firstReader = null;    private transient int firstReaderHoldCount;    Sync() {          // 初始化 readHolds 这个 ThreadLocal 属性        readHolds = new ThreadLocalHoldCounter();        setState(getState()); // ensures visibility of readHolds    }    ....}
复制代码

  • 由于int是32位的,所以在ReentrantReadWriteLock中将state分为两部分,高16位作为读锁的状态控制器,低16位作为写锁的状态控制器。
  • 每主要获得读锁确当前状态都需要挪用sharedCount传入当前的state,将state向右移动16位来获得
  • 要获得低16位则需要将1左移16位减一,获得一个低16位尽是1的数,然后和传入的state举行取与利用获得state的低16位的值
  • cachedHoldCounter里面保存了最新的读锁的线程和挪用次数
  • firstReaderfirstReaderHoldCount 将”第一个”获得读锁的线程记录在 firstReader 属性中,这里的第一个不是全局的概念,等这个 firstReader 当前代表的线程开释掉读锁今后,会有后来的线程占用这个属性的。
读锁获得
  1. //readLock#lockpublic void lock() {      //这里会挪用父类AQS的acquireShared,尝试获得锁    sync.acquireShared(1);}//AQS#acquireSharedpublic final void acquireShared(int arg) {      //返回值小于 0 代表没有获得到同享锁    if (tryAcquireShared(arg) < 0)          //进入到阻塞行列,然后等待先驱节点叫醒        doAcquireShared(arg);}
复制代码
这里的tryAcquireShared是挪用ReentrantReadWriteLock的内部类Sync的tryAcquireShared的方式
  1. protected final int tryAcquireShared(int unused) {      //获得当前方程    Thread current = Thread.currentThread();      //获得AQS中的state属性值    int c = getState();    //exclusiveCount方式是用来获得写锁状态,未即是0代表有写锁    if (exclusiveCount(c) != 0 &&          //假如不是当前方程获得的写锁,那末间接返回-1        getExclusiveOwnerThread() != current)        return -1;      //获得读锁的锁定次数    int r = sharedCount(c);      // 读锁获得能否需要被阻塞    if (!readerShouldBlock() &&        r < MAX_COUNT &&        //由于高16位代表同享锁,所以CAS需要加上一个SHARED_UNIT        compareAndSetState(c, c + SHARED_UNIT)) {        if (r == 0) {              //记录一下初度读线程            firstReader = current;            firstReaderHoldCount = 1;        } else if (firstReader == current) {               //firstReader 重入获得读锁            firstReaderHoldCount++;        } else {            HoldCounter rh = cachedHoldCounter;              // 假如 cachedHoldCounter 缓存的不是当前方程,设备为缓存当前方程的 HoldCounter            if (rh == null || rh.tid != getThreadId(current))                cachedHoldCounter = rh = readHolds.get();            else if (rh.count == 0)                readHolds.set(rh);            rh.count++;        }           // return 大于 0 的数,代表获得到了同享锁        return 1;    }    return fullTryAcquireShared(current);}
复制代码

  • 首先会去挪用exclusiveCount方式来检察写锁能否被占用,假如被占用,那末检察当前方程能否是占用读锁的线程,假如不是则返回-1。经过这里可以看出可以先占用读锁再占用写锁
  • 挪用readerShouldBlock方式获得能否需要阻塞读锁获得,然后检查一下高16位读锁重入次数能否横跨了2^16-1,末端经过CAS利用将state高16举行加1利用,假如没有其他线程抢占就会乐成
  • 假如state的高16位为零,那末就设备初度读线程和初度数次数,假如不是则校验初度读线程能否是当前方程,是的话将firstReaderHoldCount次数加1。假如不是初度读线程,那末校验一下末端一次读线程能否是当前方程,不是的话就从readHolds中获得,并将HoldCounter计数加1,假如末端读线程是当前方程那末计数加1
readerShouldBlock
  1. //NonfairSync#readerShouldBlockfinal boolean readerShouldBlock() {    return apparentlyFirstQueuedIsExclusive();}//AQSfinal boolean apparentlyFirstQueuedIsExclusive() {    Node h, s;    return (h = head) != null &&        (s = h.next)  != null &&        !s.isShared()         &&        s.thread != null;}
复制代码
在非公允形式中readerShouldBlock会挪用AQS的方式,判定当前头节点的下一个节点,假如不是同享节点,那末readerShouldBlock就返回true,读锁就会阻塞。
  1. //FairSync#readerShouldBlockfinal boolean readerShouldBlock() {    return hasQueuedPredecessors();}//AQSpublic final boolean hasQueuedPredecessors() {       Node t = tail; // Read fields in reverse initialization order    Node h = head;    Node s;    return h != t &&        ((s = h.next) == null || s.thread != Thread.currentThread());}
复制代码
在公允形式中会去看看行列里有没有其他元素在行列里等待获得锁,倘使有那末读锁就举行阻塞
ReentrantReadWriteLock#fullTryAcquireShared
  1. final int fullTryAcquireShared(Thread current) {       HoldCounter rh = null;    for (;;) {        int c = getState();          //检查能否写锁被占用        if (exclusiveCount(c) != 0) {               //被占用,可是占用读锁线程不是当前方程,返回阻塞            if (getExclusiveOwnerThread() != current)                return -1;            // else we hold the exclusive lock; blocking here            // would cause deadlock.            //检查读锁能否应当被阻塞        } else if (readerShouldBlock()) {            // Make sure we&#39;re not acquiring read lock reentrantly              //初度读线程是当前方程,下面间接CAS            if (firstReader == current) {                // assert firstReaderHoldCount > 0;            } else {                if (rh == null) {                       //设备末端一次读线程                    rh = cachedHoldCounter;                    if (rh == null || rh.tid != getThreadId(current)) {                        rh = readHolds.get();                        if (rh.count == 0)                               //假如发现 count == 0,也就是说,纯属上一行代码初始化的,那末实行 remove                            readHolds.remove();                    }                }                   //假如末端读取线程次数为0,那末阻塞                if (rh.count == 0)                    return -1;            }        }          //假如读锁重入次数到达上限,抛很是        if (sharedCount(c) == MAX_COUNT)            throw new Error("Maximum lock count exceeded");          //尝试CAS读锁重入次数加1        if (compareAndSetState(c, c + SHARED_UNIT)) {               // 这里 CAS 乐成,那末就意味着乐成获得读锁了            // 下面需要做的是设备 firstReader 或 cachedHoldCounter            if (sharedCount(c) == 0) {                firstReader = current;                firstReaderHoldCount = 1;            } else if (firstReader == current) {                firstReaderHoldCount++;            } else {                  // 下面这几行,就是将 cachedHoldCounter 设备为当前方程                if (rh == null)                    rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current))                    rh = readHolds.get();                else if (rh.count == 0)                    readHolds.set(rh);                rh.count++;                cachedHoldCounter = rh; // cache for release            }              // 返回大于 0 的数,代表获得到了读锁            return 1;        }    }}
复制代码
这个方式重如果用来处置惩罚重入锁利用的。首先校验一下写锁能否被占用,假如没有被占用则判定当前方程能否是第一次读线程,假如不是则判定末端一次读线程能否是当前方程,假如不是则从readHolds获得,并判定HoldCounter实例中获得读锁次数假如为0,那末就不是重入。
假如可以判定当前方程是重入的,那末则对state高16举行加1利用,利用乐成,则对firstReader或cachedHoldCounter举行设备,并返回1,表现获得到锁。
到这里我们看完了tryAcquireShared方式,我再把acquireShared方式贴出来:
  1. public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}
复制代码
下面看doAcquireShared方式:
  1. private void doAcquireShared(int arg) {      //实例化一个同享节点入队    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {              //获得当前节点的上一个前置节点            final Node p = node.predecessor();              //前置节点假如是头节点,那末代表行列里没有此外节点,先挪用tryAcquireShared尝试获得锁            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                       //醒行列中其他同享节点                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                       //响应停止                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }              //设备前置节点waitStatus状态            if (shouldParkAfterFailedAcquire(p, node) &&                  //阻塞当前方程                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码
doAcquireShared方式中会实例化一个同享节点并入队。假如当前节点的前置节点是头节点,那末间接挪用tryAcquireShared先获得一次锁,假如返回大于0,那末表现可以获得锁,挪用setHeadAndPropagate叫醒行列中其他的线程;假如没有返回则会挪用shouldParkAfterFailedAcquire方式将前置节点的waitStatus设值成SIGNAL,然后挪用parkAndCheckInterrupt方式阻塞
AQS#setHeadAndPropagate
  1. private void setHeadAndPropagate(Node node, int propagate) {    Node h = head; // Record old head for check below      //把node节点设值为头节点    setHead(node);       //由因而propagate大于零才进这个方式,所以这个必进这个if    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {          //获得node的下一个节点        Node s = node.next;          //判定下一个节点能否为空,或是同享节点        if (s == null || s.isShared())              //往下看            doReleaseShared();    }}
复制代码
这个方式重如果更换头节点为当前节点,然后挪用doReleaseShared举行叫醒节点的利用
AQS#doReleaseShared
  1. private void doReleaseShared() {     for (;;) {        Node h = head;        // 1. h == null: 说明阻塞行列为空        // 2. h == tail: 说明头结点大如果刚刚初始化的头节点,        //   大如果普通线程节点,可是此节点既然是头节点了,那末代表已经被叫醒了,阻塞行列没有其他节点了        // 所以这两种情况不需要举行叫醒后继节点        if (h != null && h != tail) {            int ws = h.waitStatus;               //背面的节点会把前置节点设备为Node.SIGNAL            if (ws == Node.SIGNAL) {                    //1                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                    // 叫醒 head 的后继节点,也就是阻塞行列中的第一个节点                unparkSuccessor(h);            }            else if (ws == 0 &&                        //2                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }          //3 假如被叫醒的节点已经并吞head了,那末继续循环,否则跳出循环        if (h == head)                   // loop if head changed            break;    }}
复制代码

  • unparkSuccessor这里会叫醒下一个节点,那末下一个节点也会挪用setHeadAndPropagate举行抢占头节点;假如同时有当前方程和被叫醒的下一个线程同时走到这里,那末只会有一个乐成,另一个返回false的就不举行叫醒利用
  • 这里CAS失利的原因原由大如果一个新的节点入队,然后将前置节点设值为了Node.SIGNAL,所以致使当前的CAS失利
  • 假如被叫醒的节点抢占头节点乐成,那末h == head 就不建立,那末会举行下一轮的循环,否则就是head没有被抢占乐成
AQS#unparkSuccessor
[code]private void unparkSuccessor(Node node) {    //假如当前节点小于零,那末作为头节点要被扫除一下状态    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 下面的代码就是叫醒后继节点,可是有大要后继节点取消了等待    // 从队尾往前找,找到waitStatus 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus  0;        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;    } else {          // 判定 cachedHoldCounter 能否缓存的是当前方程,不是的话要到 ThreadLocal 中取        HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Copyright © 2006-2014 妈妈网-中国妈妈第一,是怀孕、育儿、健康等知识交流传播首选平台 版权所有 法律顾问:高律师 客服电话:0791-88289918
技术支持:迪恩网络科技公司  Powered by Discuz! X3.2
快速回复 返回顶部 返回列表