Index: sys/rwlock.h =================================================================== RCS file: /CVS/CVS_IPSO/src/sys/sys/rwlock.h,v retrieving revision 1.1.12.1 diff -u -p -r1.1.12.1 rwlock.h --- sys/rwlock.h 17 Nov 2007 01:01:56 -0000 1.1.12.1 +++ sys/rwlock.h 15 Jan 2008 14:26:16 -0000 @@ -66,10 +66,11 @@ #define RW_LOCK_READ 0x01 #define RW_LOCK_READ_WAITERS 0x02 #define RW_LOCK_WRITE_WAITERS 0x04 -#define RW_LOCK_RECURSED 0x08 +#define RW_LOCK_WRITE_SPINNER 0x08 #define RW_LOCK_FLAGMASK \ (RW_LOCK_READ | RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS | \ - RW_LOCK_RECURSED) + RW_LOCK_WRITE_SPINNER) +#define RW_LOCK_WAITERS (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS) #define RW_OWNER(x) ((x) & ~RW_LOCK_FLAGMASK) #define RW_READERS_SHIFT 4 @@ -113,7 +114,9 @@ #define __rw_wunlock(rw, tid, file, line) do { \ uintptr_t _tid = (uintptr_t)(tid); \ \ - if (!_rw_write_unlock((rw), _tid)) \ + if ((rw)->rw_recurse) \ + (rw)->rw_recurse--; \ + else if (!_rw_write_unlock((rw), _tid)) \ _rw_wunlock_hard((rw), _tid, (file), (line)); \ } while (0) Index: kern/kern_rwlock.c =================================================================== RCS file: /CVS/CVS_IPSO/src/sys/kern/kern_rwlock.c,v retrieving revision 1.1.12.5 diff -u -p -r1.1.12.5 kern_rwlock.c --- kern/kern_rwlock.c 9 Jan 2008 10:06:02 -0000 1.1.12.5 +++ kern/kern_rwlock.c 15 Jan 2008 14:26:16 -0000 @@ -211,6 +211,15 @@ _rw_wunlock(struct rwlock *rw, const cha __rw_wunlock(rw, curthread, file, line); } +#define RW_CAN_READ(_rw) \ + (((_rw) & \ + (RW_LOCK_READ | RW_LOCK_WRITE_WAITERS | RW_LOCK_WRITE_SPINNER)) == \ + RW_LOCK_READ) + +#define RW_CAN_SPIN(_rw) \ + (((_rw) & \ + (RW_LOCK_READ | RW_LOCK_WRITE_WAITERS | RW_LOCK_WRITE_SPINNER)) == 0) + void _rw_rlock(struct rwlock *rw, const char *file, int line) { @@ -220,26 +229,16 @@ _rw_rlock(struct rwlock *rw, const char #endif uint64_t waittime = 0; int contested = 0; - uintptr_t x; + uintptr_t v; KASSERT(rw->rw_lock != RW_DESTROYED, ("rw_rlock() of destroyed rwlock @ %s:%d", file, line)); KASSERT(rw_wowner(rw) != curthread, ("%s (%s): wlock already held @ %s:%d", __func__, rw->lock_object.lo_name, file, line)); - WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER, file, line); + WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER | LOP_NORECURSE, + file, line); - /* - * Note that we don't make any attempt to try to block read - * locks once a writer has blocked on the lock. The reason is - * that we currently allow for read locks to recurse and we - * don't keep track of all the holders of read locks. Thus, if - * we were to block readers once a writer blocked and a reader - * tried to recurse on their reader lock after a writer had - * blocked we would end up in a deadlock since the reader would - * be blocked on the writer, and the writer would be blocked - * waiting for the reader to release its original read lock. - */ for (;;) { /* * Handle the easy case. If no other thread has a write @@ -251,22 +250,20 @@ _rw_rlock(struct rwlock *rw, const char * completely unlocked rwlock since such a lock is encoded * as a read lock with no waiters. */ - x = rw->rw_lock; - if (x & RW_LOCK_READ) { - + v = rw->rw_lock; + if (RW_CAN_READ(v)) { /* * The RW_LOCK_READ_WAITERS flag should only be set - * if another thread currently holds a write lock, - * and in that case RW_LOCK_READ should be clear. + * if the lock has been unlocked and write waiters + * were present. */ - MPASS((x & RW_LOCK_READ_WAITERS) == 0); - if (atomic_cmpset_acq_ptr(&rw->rw_lock, x, - x + RW_ONE_READER)) { + if (atomic_cmpset_acq_ptr(&rw->rw_lock, v, + v + RW_ONE_READER)) { if (LOCK_LOG_TEST(&rw->lock_object, 0)) CTR4(KTR_LOCK, "%s: %p succeed %p -> %p", __func__, - rw, (void *)x, - (void *)(x + RW_ONE_READER)); + rw, (void *)v, + (void *)(v + RW_ONE_READER)); break; } cpu_spinwait(); @@ -281,32 +278,35 @@ _rw_rlock(struct rwlock *rw, const char * the owner stops running or the state of the lock * changes. */ - owner = (struct thread *)RW_OWNER(x); - if (TD_IS_RUNNING(owner)) { - if (LOCK_LOG_TEST(&rw->lock_object, 0)) - CTR3(KTR_LOCK, "%s: spinning on %p held by %p", - __func__, rw, owner); - while ((struct thread*)RW_OWNER(rw->rw_lock) == owner && - TD_IS_RUNNING(owner)) - cpu_spinwait(); - continue; + if (RW_CAN_SPIN(v)) { + owner = (struct thread *)RW_OWNER(v); + if (TD_IS_RUNNING(owner)) { + if (LOCK_LOG_TEST(&rw->lock_object, 0)) + CTR3(KTR_LOCK, + "%s: spinning on %p held by %p", + __func__, rw, owner); + while ((struct thread*)RW_OWNER(rw->rw_lock) == + owner && TD_IS_RUNNING(owner)) + cpu_spinwait(); + continue; + } } #endif /* * Okay, now it's the hard case. Some other thread already - * has a write lock, so acquire the turnstile lock so we can - * begin the process of blocking. + * has a write lock or there are write waiters present, + * acquire the turnstile lock so we can begin the process + * of blocking. */ ts = turnstile_trywait(&rw->lock_object); /* * The lock might have been released while we spun, so - * recheck its state and restart the loop if there is no - * longer a write lock. + * recheck its state and restart the loop if needed. */ - x = rw->rw_lock; - if (x & RW_LOCK_READ) { + v = rw->rw_lock; + if (RW_CAN_READ(v)) { turnstile_cancel(ts); cpu_spinwait(); continue; @@ -317,23 +317,30 @@ _rw_rlock(struct rwlock *rw, const char * If the current owner of the lock is executing on another * CPU quit the hard path and try to spin. */ - owner = (struct thread *)RW_OWNER(x); - if (TD_IS_RUNNING(owner)) { - turnstile_cancel(ts); - cpu_spinwait(); - continue; + if (RW_CAN_SPIN(v)) { + owner = (struct thread *)RW_OWNER(v); + if (TD_IS_RUNNING(owner)) { + turnstile_cancel(ts); + cpu_spinwait(); + continue; + } } #endif /* - * Ok, it's still a write lock. If the RW_LOCK_READ_WAITERS - * flag is already set, then we can go ahead and block. If - * it is not set then try to set it. If we fail to set it - * drop the turnstile lock and restart the loop. - */ - if (!(x & RW_LOCK_READ_WAITERS)) { - if (!atomic_cmpset_ptr(&rw->rw_lock, x, - x | RW_LOCK_READ_WAITERS)) { + * The lock is held in write mode or it already has waiters. + */ + MPASS(!RW_CAN_READ(v)); + + /* + * If the RW_LOCK_READ_WAITERS flag is already set, then + * we can go ahead and block. If it is not set then try + * to set it. If we fail to set it drop the turnstile + * lock and restart the loop. + */ + if (!(v & RW_LOCK_READ_WAITERS)) { + if (!atomic_cmpset_ptr(&rw->rw_lock, v, + v | RW_LOCK_READ_WAITERS)) { turnstile_cancel(ts); cpu_spinwait(); continue; @@ -372,7 +379,7 @@ void _rw_runlock(struct rwlock *rw, const char *file, int line) { struct turnstile *ts; - uintptr_t x; + uintptr_t x, v, queue; KASSERT(rw->rw_lock != RW_DESTROYED, ("rw_runlock() of destroyed rwlock @ %s:%d", file, line)); @@ -401,31 +408,14 @@ _rw_runlock(struct rwlock *rw, const cha } continue; } - - - /* - * We should never have read waiters while at least one - * thread holds a read lock. (See note above) - */ - KASSERT(!(x & RW_LOCK_READ_WAITERS), - ("%s: waiting readers", __func__)); - /* * If there aren't any waiters for a write lock, then try * to drop it quickly. */ - if (!(x & RW_LOCK_WRITE_WAITERS)) { - - /* - * There shouldn't be any flags set and we should - * be the only read lock. If we fail to release - * the single read lock, then another thread might - * have just acquired a read lock, so go back up - * to the multiple read locks case. - */ - MPASS(x == RW_READERS_LOCK(1)); - if (atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1), - RW_UNLOCKED)) { + if (!(x & RW_LOCK_WAITERS)) { + MPASS((x & ~RW_LOCK_WRITE_SPINNER) == + RW_READERS_LOCK(1)); + if (atomic_cmpset_ptr(&rw->rw_lock, x, RW_UNLOCKED)) { if (LOCK_LOG_TEST(&rw->lock_object, 0)) CTR2(KTR_LOCK, "%s: %p last succeeded", __func__, rw); @@ -433,18 +423,13 @@ _rw_runlock(struct rwlock *rw, const cha } continue; } - /* - * There should just be one reader with one or more - * writers waiting. - */ - MPASS(x == (RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS)); - - /* - * Ok, we know we have a waiting writer and we think we - * are the last reader, so grab the turnstile lock. + * Ok, we know we have waiters and we think we are the + * last reader, so grab the turnstile lock. */ turnstile_chain_lock(&rw->lock_object); + v = rw->rw_lock & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER); + MPASS(v & RW_LOCK_WAITERS); /* * Try to drop our lock leaving the lock in a unlocked @@ -462,8 +447,14 @@ _rw_runlock(struct rwlock *rw, const cha * acquired a read lock, so drop the turnstile lock and * restart. */ - if (!atomic_cmpset_ptr(&rw->rw_lock, - RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS, RW_UNLOCKED)) { + x = RW_UNLOCKED; + if (v & RW_LOCK_WRITE_WAITERS) { + queue = TS_EXCLUSIVE_QUEUE; + x |= (v & RW_LOCK_READ_WAITERS); + } else + queue = TS_SHARED_QUEUE; + if (!atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v, + x)) { turnstile_chain_unlock(&rw->lock_object); continue; } @@ -480,7 +471,7 @@ _rw_runlock(struct rwlock *rw, const cha */ ts = turnstile_lookup(&rw->lock_object); MPASS(ts != NULL); - turnstile_broadcast(ts, TS_EXCLUSIVE_QUEUE); + turnstile_broadcast(ts, queue); turnstile_unpend(ts, TS_SHARED_LOCK); turnstile_chain_unlock(&rw->lock_object); break; @@ -501,7 +492,7 @@ _rw_wlock_hard(struct rwlock *rw, uintpt volatile struct thread *owner; #endif uint64_t waittime = 0; - uintptr_t v; + uintptr_t v, x; int contested = 0; int spintries = 0; @@ -510,7 +501,6 @@ _rw_wlock_hard(struct rwlock *rw, uintpt ("%s: recursing but non-recursive rw %s @ %s:%d\n", __func__, rw->lock_object.lo_name, file, line)); rw->rw_recurse++; - atomic_set_ptr(&rw->rw_lock, RW_LOCK_RECURSED); if (LOCK_LOG_TEST(&rw->lock_object, 0)) CTR2(KTR_LOCK, "%s: %p recursing", __func__, rw); return; @@ -539,12 +529,20 @@ _rw_wlock_hard(struct rwlock *rw, uintpt TD_IS_RUNNING(owner)) cpu_spinwait(); continue; - } else if (v & RW_LOCK_READ && spintries < 10) { + } +#if 1 + if ((v & RW_LOCK_READ) && RW_READERS(v) && spintries < 100) { int i; - + if (!(v & RW_LOCK_WRITE_SPINNER)) { + if (!atomic_cmpset_ptr(&rw->rw_lock, v, + v | RW_LOCK_WRITE_SPINNER)) { + cpu_spinwait(); + continue; + } + } spintries++; for (i = 100000; i > 0; i--) { - if ((rw->rw_lock & RW_LOCK_READ) == 0) + if ((rw->rw_lock & RW_LOCK_WRITE_SPINNER) == 0) break; cpu_spinwait(); } @@ -552,20 +550,11 @@ _rw_wlock_hard(struct rwlock *rw, uintpt continue; } #endif +#endif ts = turnstile_trywait(&rw->lock_object); v = rw->rw_lock; - /* - * If the lock was released while spinning on the - * turnstile chain lock, try again. - */ - if (v == RW_UNLOCKED) { - turnstile_cancel(ts); - cpu_spinwait(); - continue; - } - #ifdef ADAPTIVE_RWLOCKS /* * If the current owner of the lock is executing on another @@ -580,31 +569,25 @@ _rw_wlock_hard(struct rwlock *rw, uintpt } } #endif - /* - * If the lock was released by a writer with both readers - * and writers waiting and a reader hasn't woken up and - * acquired the lock yet, rw_lock will be set to the - * value RW_UNLOCKED | RW_LOCK_WRITE_WAITERS. If we see - * that value, try to acquire it once. Note that we have - * to preserve the RW_LOCK_WRITE_WAITERS flag as there are - * other writers waiting still. If we fail, restart the - * loop. - */ - if (v == (RW_UNLOCKED | RW_LOCK_WRITE_WAITERS)) { - if (atomic_cmpset_acq_ptr(&rw->rw_lock, - RW_UNLOCKED | RW_LOCK_WRITE_WAITERS, - tid | RW_LOCK_WRITE_WAITERS)) { - turnstile_claim(ts); - CTR2(KTR_LOCK, "%s: %p claimed by new writer", - __func__, rw); + * If the lock was released while waiting for the turnstile + * chain lock retry. + */ + x = v & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER); + if ((v & ~x) == RW_UNLOCKED) { + x &= ~RW_LOCK_WRITE_SPINNER; + if (atomic_cmpset_acq_ptr(&rw->rw_lock, v, tid | x)) { + if (x) + turnstile_claim(ts); + else + turnstile_cancel(ts); break; } + cpu_spinwait(); turnstile_cancel(ts); cpu_spinwait(); continue; } - /* * If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to * set it. If we fail to set it, then loop back and try @@ -621,7 +604,6 @@ _rw_wlock_hard(struct rwlock *rw, uintpt CTR2(KTR_LOCK, "%s: %p set write waiters flag", __func__, rw); } - /* * We were unable to acquire the lock and the write waiters * flag is set, so we must block on the turnstile. @@ -652,12 +634,12 @@ _rw_wunlock_hard(struct rwlock *rw, uint int queue; if (rw_wlocked(rw) && rw_recursed(rw)) { - if ((--rw->rw_recurse) == 0) - atomic_clear_ptr(&rw->rw_lock, RW_LOCK_RECURSED); + rw->rw_recurse--; if (LOCK_LOG_TEST(&rw->lock_object, 0)) CTR2(KTR_LOCK, "%s: %p unrecursing", __func__, rw); return; } + v = rw->rw_lock; KASSERT(rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS), ("%s: neither of the waiter flags are set", __func__)); @@ -711,7 +693,7 @@ _rw_wunlock_hard(struct rwlock *rw, uint int _rw_try_upgrade(struct rwlock *rw, const char *file, int line) { - uintptr_t v, tid; + uintptr_t v, x, tid; struct turnstile *ts; int success; @@ -727,32 +709,40 @@ _rw_try_upgrade(struct rwlock *rw, const * turnstile. So, do the simple case of no waiters first. */ tid = (uintptr_t)curthread; - if (!(rw->rw_lock & RW_LOCK_WRITE_WAITERS)) { - success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1), - tid); - goto out; - } - - /* - * Ok, we think we have write waiters, so lock the - * turnstile. - */ - ts = turnstile_trywait(&rw->lock_object); + success = 0; + for (;;) { + v = rw->rw_lock; + if (RW_READERS(v) > 1) + break; + if (!(v & RW_LOCK_WAITERS)) { + success = atomic_cmpset_ptr(&rw->rw_lock, v, tid); + if (!success) + continue; + break; + } - /* - * Try to switch from one reader to a writer again. This time - * we honor the current state of the RW_LOCK_WRITE_WAITERS - * flag. If we obtain the lock with the flag set, then claim - * ownership of the turnstile. - */ - v = rw->rw_lock & RW_LOCK_WRITE_WAITERS; - success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v, - tid | v); - if (success && v) - turnstile_claim(ts); - else + /* + * Ok, we think we have waiters, so lock the turnstile. + */ + ts = turnstile_trywait(&rw->lock_object); + v = rw->rw_lock; + if (RW_READERS(v) > 1) + break; + /* + * Try to switch from one reader to a writer again. This time + * we honor the current state of the waiters flags. + * If we obtain the lock with the flags set, then claim + * ownership of the turnstile. + */ + x = rw->rw_lock & RW_LOCK_WAITERS; + success = atomic_cmpset_ptr(&rw->rw_lock, v, tid | x); + if (success) { + if (x) + turnstile_claim(ts); + break; + } turnstile_cancel(ts); -out: + } LOCK_LOG_TRY("WUPGRADE", &rw->lock_object, 0, success, file, line); if (success) WITNESS_UPGRADE(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK, @@ -768,6 +758,7 @@ _rw_downgrade(struct rwlock *rw, const c { struct turnstile *ts; uintptr_t tid, v; + int rwait, wwait; KASSERT(rw->rw_lock != RW_DESTROYED, ("rw_downgrade() of destroyed rwlock @ %s:%d", file, line)); @@ -782,8 +773,7 @@ _rw_downgrade(struct rwlock *rw, const c /* * Convert from a writer to a single reader. First we handle * the easy case with no waiters. If there are any waiters, we - * lock the turnstile, "disown" the lock, and awaken any read - * waiters. + * lock the turnstile and "disown" the lock. */ tid = (uintptr_t)curthread; if (atomic_cmpset_rel_ptr(&rw->rw_lock, tid, RW_READERS_LOCK(1))) @@ -794,23 +784,28 @@ _rw_downgrade(struct rwlock *rw, const c * read the waiter flags without any races. */ turnstile_chain_lock(&rw->lock_object); - v = rw->rw_lock; - MPASS(v & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS)); + v = rw->rw_lock & RW_LOCK_WAITERS; + rwait = v & RW_LOCK_READ_WAITERS; + wwait = v & RW_LOCK_WRITE_WAITERS; + MPASS(rwait | wwait); /* - * Downgrade from a write lock while preserving - * RW_LOCK_WRITE_WAITERS and give up ownership of the - * turnstile. If there are any read waiters, wake them up. + * Downgrade from a write lock while preserving waiters flag + * and give up ownership of the turnstile. */ ts = turnstile_lookup(&rw->lock_object); MPASS(ts != NULL); - if (v & RW_LOCK_READ_WAITERS) + if (!wwait) + v &= ~RW_LOCK_READ_WAITERS; + atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v); + /* + * Wake other readers if there are no writers pending. Otherwise they + * won't be able to acquire the lock anyway. + */ + if (rwait && !wwait) { turnstile_broadcast(ts, TS_SHARED_QUEUE); - atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | - (v & RW_LOCK_WRITE_WAITERS)); - if (v & RW_LOCK_READ_WAITERS) - turnstile_unpend(ts, TS_EXCLUSIVE_LOCK); - else if (ts) + turnstile_unpend(ts, TS_SHARED_LOCK); + } else turnstile_disown(ts); turnstile_chain_unlock(&rw->lock_object); out: