about summary refs log tree commit diff
path: root/nptl/pthread_rwlock_common.c
diff options
context:
space:
mode:
authorCarlos O'Donell <carlos@systemhalted.org>2017-07-28 00:22:44 -0400
committerCarlos O'Donell <carlos@systemhalted.org>2017-07-28 00:23:58 -0400
commitfaf8c066df0d6bccb54bd74dd696eeb65e1b3bbc (patch)
treeaf22c223947b73a7c22f3add45da6b5c8f204672 /nptl/pthread_rwlock_common.c
parent2557ae38f3aa599718f34317cd0c150892a92be5 (diff)
downloadglibc-faf8c066df0d6bccb54bd74dd696eeb65e1b3bbc.tar.gz
glibc-faf8c066df0d6bccb54bd74dd696eeb65e1b3bbc.tar.xz
glibc-faf8c066df0d6bccb54bd74dd696eeb65e1b3bbc.zip
rwlock: Fix explicit hand-over (bug 21298)
Without this fix, the rwlock can fail to execute the explicit hand-over
in certain cases (e.g., empty critical sections that switch quickly between
read and write phases).  This can then lead to errors in how __wrphase_futex
is accessed, which in turn can lead to deadlocks.
Diffstat (limited to 'nptl/pthread_rwlock_common.c')
-rw-r--r--nptl/pthread_rwlock_common.c478
1 files changed, 240 insertions, 238 deletions
diff --git a/nptl/pthread_rwlock_common.c b/nptl/pthread_rwlock_common.c
index 256508ca2a..846687e1cf 100644
--- a/nptl/pthread_rwlock_common.c
+++ b/nptl/pthread_rwlock_common.c
@@ -55,7 +55,6 @@
    lock acquisition attempts, so that new incoming readers do not prolong a
    phase in which readers have acquired the lock.
 
-
    The main components of the rwlock are a writer-only lock that allows only
    one of the concurrent writers to be the primary writer, and a
    single-writer-multiple-readers lock that decides between read phases, in
@@ -70,15 +69,16 @@
    ---------------------------
    #1    0   0   0   0   Lock is idle (and in a read phase).
    #2    0   0   >0  0   Readers have acquired the lock.
-   #3    0   1   0   0   Lock is not acquired; a writer is waiting for a write
-			 phase to start or will try to start one.
+   #3    0   1   0   0   Lock is not acquired; a writer will try to start a
+			 write phase.
    #4    0   1   >0  0   Readers have acquired the lock; a writer is waiting
 			 and explicit hand-over to the writer is required.
    #4a   0   1   >0  1   Same as #4 except that there are further readers
 			 waiting because the writer is to be preferred.
    #5    1   0   0   0   Lock is idle (and in a write phase).
-   #6    1   0   >0  0   Write phase; readers are waiting for a read phase to
-			 start or will try to start one.
+   #6    1   0   >0  0   Write phase; readers will try to start a read phase
+			 (requires explicit hand-over to all readers that
+			 do not start the read phase).
    #7    1   1   0   0   Lock is acquired by a writer.
    #8    1   1   >0  0   Lock acquired by a writer and readers are waiting;
 			 explicit hand-over to the readers is required.
@@ -375,9 +375,9 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
      complexity.  */
   if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
     return 0;
-
-  /* If there is no primary writer but we are in a write phase, we can try
-     to install a read phase ourself.  */
+  /* Otherwise, if we were in a write phase (states #6 or #8), we must wait
+     for explicit hand-over of the read phase; the only exception is if we
+     can start a read phase if there is no primary writer currently.  */
   while (((r & PTHREAD_RWLOCK_WRPHASE) != 0)
       && ((r & PTHREAD_RWLOCK_WRLOCKED) == 0))
     {
@@ -390,15 +390,18 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
 	{
 	  /* We started the read phase, so we are also responsible for
 	     updating the write-phase futex.  Relaxed MO is sufficient.
-	     Note that there can be no other reader that we have to wake
-	     because all other readers will see the read phase started by us
-	     (or they will try to start it themselves); if a writer started
-	     the read phase, we cannot have started it.  Furthermore, we
-	     cannot discard a PTHREAD_RWLOCK_FUTEX_USED flag because we will
-	     overwrite the value set by the most recent writer (or the readers
-	     before it in case of explicit hand-over) and we know that there
-	     are no waiting readers.  */
-	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 0);
+	     We have to do the same steps as a writer would when handing
+	     over the read phase to us because other readers cannot
+	     distinguish between us and the writer; this includes
+	     explicit hand-over and potentially having to wake other readers
+	     (but we can pretend to do the setting and unsetting of WRLOCKED
+	     atomically, and thus can skip this step).  */
+	  if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 0)
+	      & PTHREAD_RWLOCK_FUTEX_USED) != 0)
+	    {
+	      int private = __pthread_rwlock_get_private (rwlock);
+	      futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
+	    }
 	  return 0;
 	}
       else
@@ -407,102 +410,98 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
 	}
     }
 
-  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
+  /* We were in a write phase but did not install the read phase.  We cannot
+     distinguish between a writer and another reader starting the read phase,
+     so we must wait for explicit hand-over via __wrphase_futex.
+     However, __wrphase_futex might not have been set to 1 yet (either
+     because explicit hand-over to the writer is still ongoing, or because
+     the writer has started the write phase but has not yet updated
+     __wrphase_futex).  The least recent value of __wrphase_futex we can
+     read from here is the modification of the last read phase (because
+     we synchronize with the last reader in this read phase through
+     __readers; see the use of acquire MO on the fetch_add above).
+     Therefore, if we observe a value of 0 for __wrphase_futex, we need
+     to subsequently check that __readers now indicates a read phase; we
+     need to use acquire MO for this so that if we observe a read phase,
+     we will also see the modification of __wrphase_futex by the previous
+     writer.  We then need to load __wrphase_futex again and continue to
+     wait if it is not 0, so that we do not skip explicit hand-over.
+     Relaxed MO is sufficient for the load from __wrphase_futex because
+     we just use it as an indicator for when we can proceed; we use
+     __readers and the acquire MO accesses to it to eventually read from
+     the proper stores to __wrphase_futex.  */
+  unsigned int wpf;
+  bool ready = false;
+  for (;;)
     {
-      /* We are in a write phase, and there must be a primary writer because
-	 of the previous loop.  Block until the primary writer gives up the
-	 write phase.  This case requires explicit hand-over using
-	 __wrphase_futex.
-	 However, __wrphase_futex might not have been set to 1 yet (either
-	 because explicit hand-over to the writer is still ongoing, or because
-	 the writer has started the write phase but does not yet have updated
-	 __wrphase_futex).  The least recent value of __wrphase_futex we can
-	 read from here is the modification of the last read phase (because
-	 we synchronize with the last reader in this read phase through
-	 __readers; see the use of acquire MO on the fetch_add above).
-	 Therefore, if we observe a value of 0 for __wrphase_futex, we need
-	 to subsequently check that __readers now indicates a read phase; we
-	 need to use acquire MO for this so that if we observe a read phase,
-	 we will also see the modification of __wrphase_futex by the previous
-	 writer.  We then need to load __wrphase_futex again and continue to
-	 wait if it is not 0, so that we do not skip explicit hand-over.
-	 Relaxed MO is sufficient for the load from __wrphase_futex because
-	 we just use it as an indicator for when we can proceed; we use
-	 __readers and the acquire MO accesses to it to eventually read from
-	 the proper stores to __wrphase_futex.  */
-      unsigned int wpf;
-      bool ready = false;
-      for (;;)
+      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
+	  | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
 	{
-	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
-	      | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
+	  int private = __pthread_rwlock_get_private (rwlock);
+	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
+	      && !atomic_compare_exchange_weak_relaxed
+		  (&rwlock->__data.__wrphase_futex,
+		   &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
+	    continue;
+	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
+	      1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
+	  if (err == ETIMEDOUT)
 	    {
-	      int private = __pthread_rwlock_get_private (rwlock);
-	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
-		  && !atomic_compare_exchange_weak_relaxed
-		      (&rwlock->__data.__wrphase_futex,
-		       &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
-		continue;
-	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
-		  1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
-	      if (err == ETIMEDOUT)
+	      /* If we timed out, we need to unregister.  If no read phase
+		 has been installed while we waited, we can just decrement
+		 the number of readers.  Otherwise, we just acquire the
+		 lock, which is allowed because we give no precise timing
+		 guarantees, and because the timeout is only required to
+		 be in effect if we would have had to wait for other
+		 threads (e.g., if futex_wait would time-out immediately
+		 because the given absolute time is in the past).  */
+	      r = atomic_load_relaxed (&rwlock->__data.__readers);
+	      while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
 		{
-		  /* If we timed out, we need to unregister.  If no read phase
-		     has been installed while we waited, we can just decrement
-		     the number of readers.  Otherwise, we just acquire the
-		     lock, which is allowed because we give no precise timing
-		     guarantees, and because the timeout is only required to
-		     be in effect if we would have had to wait for other
-		     threads (e.g., if futex_wait would time-out immediately
-		     because the given absolute time is in the past).  */
-		  r = atomic_load_relaxed (&rwlock->__data.__readers);
-		  while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
-		    {
-		      /* We don't need to make anything else visible to
-			 others besides unregistering, so relaxed MO is
-			 sufficient.  */
-		      if (atomic_compare_exchange_weak_relaxed
-			  (&rwlock->__data.__readers, &r,
-			   r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
-			return ETIMEDOUT;
-		      /* TODO Back-off.  */
-		    }
-		  /* Use the acquire MO fence to mirror the steps taken in the
-		     non-timeout case.  Note that the read can happen both
-		     in the atomic_load above as well as in the failure case
-		     of the CAS operation.  */
-		  atomic_thread_fence_acquire ();
-		  /* We still need to wait for explicit hand-over, but we must
-		     not use futex_wait anymore because we would just time out
-		     in this case and thus make the spin-waiting we need
-		     unnecessarily expensive.  */
-		  while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
-		      | PTHREAD_RWLOCK_FUTEX_USED)
-		      == (1 | PTHREAD_RWLOCK_FUTEX_USED))
-		    {
-		      /* TODO Back-off?  */
-		    }
-		  ready = true;
-		  break;
+		  /* We don't need to make anything else visible to
+		     others besides unregistering, so relaxed MO is
+		     sufficient.  */
+		  if (atomic_compare_exchange_weak_relaxed
+		      (&rwlock->__data.__readers, &r,
+		       r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
+		    return ETIMEDOUT;
+		  /* TODO Back-off.  */
 		}
-	      /* If we got interrupted (EINTR) or the futex word does not have the
-		 expected value (EAGAIN), retry.  */
+	      /* Use the acquire MO fence to mirror the steps taken in the
+		 non-timeout case.  Note that the read can happen both
+		 in the atomic_load above as well as in the failure case
+		 of the CAS operation.  */
+	      atomic_thread_fence_acquire ();
+	      /* We still need to wait for explicit hand-over, but we must
+		 not use futex_wait anymore because we would just time out
+		 in this case and thus make the spin-waiting we need
+		 unnecessarily expensive.  */
+	      while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
+		  | PTHREAD_RWLOCK_FUTEX_USED)
+		  == (1 | PTHREAD_RWLOCK_FUTEX_USED))
+		{
+		  /* TODO Back-off?  */
+		}
+	      ready = true;
+	      break;
 	    }
-	  if (ready)
-	    /* See below.  */
-	    break;
-	  /* We need acquire MO here so that we synchronize with the lock
-	     release of the writer, and so that we observe a recent value of
-	     __wrphase_futex (see below).  */
-	  if ((atomic_load_acquire (&rwlock->__data.__readers)
-	      & PTHREAD_RWLOCK_WRPHASE) == 0)
-	    /* We are in a read phase now, so the least recent modification of
-	       __wrphase_futex we can read from is the store by the writer
-	       with value 1.  Thus, only now we can assume that if we observe
-	       a value of 0, explicit hand-over is finished. Retry the loop
-	       above one more time.  */
-	    ready = true;
+	  /* If we got interrupted (EINTR) or the futex word does not have the
+	     expected value (EAGAIN), retry.  */
 	}
+      if (ready)
+	/* See below.  */
+	break;
+      /* We need acquire MO here so that we synchronize with the lock
+	 release of the writer, and so that we observe a recent value of
+	 __wrphase_futex (see below).  */
+      if ((atomic_load_acquire (&rwlock->__data.__readers)
+	  & PTHREAD_RWLOCK_WRPHASE) == 0)
+	/* We are in a read phase now, so the least recent modification of
+	   __wrphase_futex we can read from is the store by the writer
+	   with value 1.  Thus, only now we can assume that if we observe
+	   a value of 0, explicit hand-over is finished. Retry the loop
+	   above one more time.  */
+	ready = true;
     }
 
   return 0;
@@ -741,10 +740,23 @@ __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
 	  r = atomic_load_relaxed (&rwlock->__data.__readers);
 	}
       /* Our snapshot of __readers is up-to-date at this point because we
-	 either set WRLOCKED using a CAS or were handed over WRLOCKED from
+	 either set WRLOCKED using a CAS (and update r accordingly below,
+	 which was used as expected value for the CAS) or got WRLOCKED from
 	 another writer whose snapshot of __readers we inherit.  */
+      r |= PTHREAD_RWLOCK_WRLOCKED;
     }
 
+  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
+     MO is sufficient for futex words; acquire MO on the previous
+     modifications of __readers ensures that this store happens after the
+     store of value 0 by the previous primary writer.  */
+  atomic_store_relaxed (&rwlock->__data.__writers_futex,
+      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
+
+  /* If we are in a write phase, we have acquired the lock.  */
+  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
+    goto done;
+
   /* If we are in a read phase and there are no readers, try to start a write
      phase.  */
   while (((r & PTHREAD_RWLOCK_WRPHASE) == 0)
@@ -758,166 +770,156 @@ __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
 	  &r, r | PTHREAD_RWLOCK_WRPHASE))
 	{
 	  /* We have started a write phase, so need to enable readers to wait.
-	     See the similar case in__pthread_rwlock_rdlock_full.  */
+	     See the similar case in __pthread_rwlock_rdlock_full.  Unlike in
+	     that similar case, we are the (only) primary writer and so do
+	     not need to wake another writer.  */
 	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 1);
-	  /* Make sure we fall through to the end of the function.  */
-	  r |= PTHREAD_RWLOCK_WRPHASE;
-	  break;
+
+	  goto done;
 	}
       /* TODO Back-off.  */
     }
 
-  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
-     MO is sufficient for futex words; acquire MO on the previous
-     modifications of __readers ensures that this store happens after the
-     store of value 0 by the previous primary writer.  */
-  atomic_store_relaxed (&rwlock->__data.__writers_futex,
-      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
-
-  if (__glibc_unlikely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
+  /* We became the primary writer in a read phase and there were readers when
+     we did (because of the previous loop).  Thus, we have to wait for
+     explicit hand-over from one of these readers.
+     We basically do the same steps as for the similar case in
+     __pthread_rwlock_rdlock_full, except that we additionally might try
+     to directly hand over to another writer and need to wake up
+     other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
+  unsigned int wpf;
+  bool ready = false;
+  for (;;)
     {
-      /* We are not in a read phase and there are readers (because of the
-	 previous loop).  Thus, we have to wait for explicit hand-over from
-	 one of these readers.
-	 We basically do the same steps as for the similar case in
-	 __pthread_rwlock_rdlock_full, except that we additionally might try
-	 to directly hand over to another writer and need to wake up
-	 other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
-      unsigned int wpf;
-      bool ready = false;
-      for (;;)
+      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
+	  | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
 	{
-	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
-	      | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
+	  int private = __pthread_rwlock_get_private (rwlock);
+	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
+	      && !atomic_compare_exchange_weak_relaxed
+		  (&rwlock->__data.__wrphase_futex, &wpf,
+		   PTHREAD_RWLOCK_FUTEX_USED))
+	    continue;
+	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
+	      PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
+	  if (err == ETIMEDOUT)
 	    {
-	      int private = __pthread_rwlock_get_private (rwlock);
-	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
-		  && !atomic_compare_exchange_weak_relaxed
-		      (&rwlock->__data.__wrphase_futex, &wpf,
-		       PTHREAD_RWLOCK_FUTEX_USED))
-		continue;
-	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
-		  PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
-	      if (err == ETIMEDOUT)
+	      if (rwlock->__data.__flags
+		  != PTHREAD_RWLOCK_PREFER_READER_NP)
 		{
-		  if (rwlock->__data.__flags
-		      != PTHREAD_RWLOCK_PREFER_READER_NP)
-		    {
-		      /* We try writer--writer hand-over.  */
-		      unsigned int w = atomic_load_relaxed
-			  (&rwlock->__data.__writers);
-		      if (w != 0)
-			{
-			  /* We are about to hand over WRLOCKED, so we must
-			     release __writers_futex too; otherwise, we'd have
-			     a pending store, which could at least prevent
-			     other threads from waiting using the futex
-			     because it could interleave with the stores
-			     by subsequent writers.  In turn, this means that
-			     we have to clean up when we do not hand over
-			     WRLOCKED.
-			     Release MO so that another writer that gets
-			     WRLOCKED from us can take over our view of
-			     __readers.  */
-			  unsigned int wf = atomic_exchange_relaxed
-			      (&rwlock->__data.__writers_futex, 0);
-			  while (w != 0)
-			    {
-			      if (atomic_compare_exchange_weak_release
-				  (&rwlock->__data.__writers, &w,
-				      w | PTHREAD_RWLOCK_WRHANDOVER))
-				{
-				  /* Wake other writers.  */
-				  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
-				    futex_wake
-					(&rwlock->__data.__writers_futex, 1,
-					 private);
-				  return ETIMEDOUT;
-				}
-			      /* TODO Back-off.  */
-			    }
-			  /* We still own WRLOCKED and someone else might set
-			     a write phase concurrently, so enable waiting
-			     again.  Make sure we don't loose the flag that
-			     signals whether there are threads waiting on
-			     this futex.  */
-			  atomic_store_relaxed
-			      (&rwlock->__data.__writers_futex, wf);
-			}
-		    }
-		  /* If we timed out and we are not in a write phase, we can
-		     just stop being a primary writer.  Otherwise, we just
-		     acquire the lock.  */
-		  r = atomic_load_relaxed (&rwlock->__data.__readers);
-		  if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		  /* We try writer--writer hand-over.  */
+		  unsigned int w = atomic_load_relaxed
+		      (&rwlock->__data.__writers);
+		  if (w != 0)
 		    {
-		      /* We are about to release WRLOCKED, so we must release
-			 __writers_futex too; see the handling of
-			 writer--writer hand-over above.  */
+		      /* We are about to hand over WRLOCKED, so we must
+			 release __writers_futex too; otherwise, we'd have
+			 a pending store, which could at least prevent
+			 other threads from waiting using the futex
+			 because it could interleave with the stores
+			 by subsequent writers.  In turn, this means that
+			 we have to clean up when we do not hand over
+			 WRLOCKED.
+			 Release MO so that another writer that gets
+			 WRLOCKED from us can take over our view of
+			 __readers.  */
 		      unsigned int wf = atomic_exchange_relaxed
 			  (&rwlock->__data.__writers_futex, 0);
-		      while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		      while (w != 0)
 			{
-			  /* While we don't need to make anything from a
-			     caller's critical section visible to other
-			     threads, we need to ensure that our changes to
-			     __writers_futex are properly ordered.
-			     Therefore, use release MO to synchronize with
-			     subsequent primary writers.  Also wake up any
-			     waiting readers as they are waiting because of
-			     us.  */
 			  if (atomic_compare_exchange_weak_release
-			      (&rwlock->__data.__readers, &r,
-			       (r ^ PTHREAD_RWLOCK_WRLOCKED)
-			       & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
+			      (&rwlock->__data.__writers, &w,
+				  w | PTHREAD_RWLOCK_WRHANDOVER))
 			    {
 			      /* Wake other writers.  */
 			      if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
 				futex_wake (&rwlock->__data.__writers_futex,
-				    1, private);
-			      /* Wake waiting readers.  */
-			      if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
-				futex_wake (&rwlock->__data.__readers,
-				    INT_MAX, private);
+					    1, private);
 			      return ETIMEDOUT;
 			    }
+			  /* TODO Back-off.  */
 			}
-		      /* We still own WRLOCKED and someone else might set a
-			 write phase concurrently, so enable waiting again.
-			 Make sure we don't loose the flag that signals
-			 whether there are threads waiting on this futex.  */
-		      atomic_store_relaxed (&rwlock->__data.__writers_futex,
-			  wf);
+		      /* We still own WRLOCKED and someone else might set
+			 a write phase concurrently, so enable waiting
+			 again.  Make sure we don't loose the flag that
+			 signals whether there are threads waiting on
+			 this futex.  */
+		      atomic_store_relaxed
+			  (&rwlock->__data.__writers_futex, wf);
 		    }
-		  /* Use the acquire MO fence to mirror the steps taken in the
-		     non-timeout case.  Note that the read can happen both
-		     in the atomic_load above as well as in the failure case
-		     of the CAS operation.  */
-		  atomic_thread_fence_acquire ();
-		  /* We still need to wait for explicit hand-over, but we must
-		     not use futex_wait anymore.  */
-		  while ((atomic_load_relaxed
-		      (&rwlock->__data.__wrphase_futex)
-		       | PTHREAD_RWLOCK_FUTEX_USED)
-		      == PTHREAD_RWLOCK_FUTEX_USED)
+		}
+	      /* If we timed out and we are not in a write phase, we can
+		 just stop being a primary writer.  Otherwise, we just
+		 acquire the lock.  */
+	      r = atomic_load_relaxed (&rwlock->__data.__readers);
+	      if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		{
+		  /* We are about to release WRLOCKED, so we must release
+		     __writers_futex too; see the handling of
+		     writer--writer hand-over above.  */
+		  unsigned int wf = atomic_exchange_relaxed
+		      (&rwlock->__data.__writers_futex, 0);
+		  while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
 		    {
-		      /* TODO Back-off.  */
+		      /* While we don't need to make anything from a
+			 caller's critical section visible to other
+			 threads, we need to ensure that our changes to
+			 __writers_futex are properly ordered.
+			 Therefore, use release MO to synchronize with
+			 subsequent primary writers.  Also wake up any
+			 waiting readers as they are waiting because of
+			 us.  */
+		      if (atomic_compare_exchange_weak_release
+			  (&rwlock->__data.__readers, &r,
+			   (r ^ PTHREAD_RWLOCK_WRLOCKED)
+			   & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
+			{
+			  /* Wake other writers.  */
+			  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
+			    futex_wake (&rwlock->__data.__writers_futex,
+				1, private);
+			  /* Wake waiting readers.  */
+			  if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
+			    futex_wake (&rwlock->__data.__readers,
+				INT_MAX, private);
+			  return ETIMEDOUT;
+			}
 		    }
-		  ready = true;
-		  break;
+		  /* We still own WRLOCKED and someone else might set a
+		     write phase concurrently, so enable waiting again.
+		     Make sure we don't loose the flag that signals
+		     whether there are threads waiting on this futex.  */
+		  atomic_store_relaxed (&rwlock->__data.__writers_futex, wf);
 		}
-	      /* If we got interrupted (EINTR) or the futex word does not have
-		 the expected value (EAGAIN), retry.  */
+	      /* Use the acquire MO fence to mirror the steps taken in the
+		 non-timeout case.  Note that the read can happen both
+		 in the atomic_load above as well as in the failure case
+		 of the CAS operation.  */
+	      atomic_thread_fence_acquire ();
+	      /* We still need to wait for explicit hand-over, but we must
+		 not use futex_wait anymore.  */
+	      while ((atomic_load_relaxed
+		  (&rwlock->__data.__wrphase_futex)
+		   | PTHREAD_RWLOCK_FUTEX_USED)
+		  == PTHREAD_RWLOCK_FUTEX_USED)
+		{
+		  /* TODO Back-off.  */
+		}
+	      ready = true;
+	      break;
 	    }
-	  /* See pthread_rwlock_rdlock_full.  */
-	  if (ready)
-	    break;
-	  if ((atomic_load_acquire (&rwlock->__data.__readers)
-	      & PTHREAD_RWLOCK_WRPHASE) != 0)
-	    ready = true;
+	  /* If we got interrupted (EINTR) or the futex word does not have
+	     the expected value (EAGAIN), retry.  */
 	}
+      /* See pthread_rwlock_rdlock_full.  */
+      if (ready)
+	break;
+      if ((atomic_load_acquire (&rwlock->__data.__readers)
+	  & PTHREAD_RWLOCK_WRPHASE) != 0)
+	ready = true;
     }
 
+ done:
   atomic_store_relaxed (&rwlock->__data.__cur_writer,
       THREAD_GETMEM (THREAD_SELF, tid));
   return 0;