Home
Reading
Searching
Subscribe
Sponsors
Statistics
Posting
Contact
Spam
Lists
Links
About
Hosting
Filtering
Features Download
Marketing
Archives
FAQ
Blog
 
Gmane
From: Steven Rostedt <rostedt <at> goodmis.org>
Subject: [RFC PATCH RT V2] rwsem: The return of multi-reader PI rwsems
Newsgroups: gmane.linux.kernel
Date: Thursday 10th April 2014 17:42:17 UTC (over 2 years ago)
OK, I added rt_rw_limit == 0 being unlimited as the last change and
never tested it. Clark did, and found that it locked up the system.
That's because it zero wasn't properly checked at all locations and in
some cases it could never grab the lock for read.

I fixed that with this patch. And tested it too (that's a bonus).

Signed-off-by: Steven Rostedt 

Index: linux-rt.git/kernel/rtmutex.c
===================================================================
--- linux-rt.git.orig/kernel/rtmutex.c
+++ linux-rt.git/kernel/rtmutex.c
@@ -26,6 +26,21 @@
 #include "rtmutex_common.h"
 
 /*
+ * rt_rw_limit is the number of simultaneous readers of a rwsem lock.
+ *
+ * rt_rw_limit gets updated on boot up to the number of
+ * possible CPUs, but we need to initialize it to something other
+ * than zero.
+ */
+unsigned rt_rw_limit = NR_CPUS;
+
+/* cnt == 0 means unlimited */
+static inline int under_rt_rw_limit(int cnt)
+{
+	return !cnt || cnt < rt_rw_limit;
+}
+
+/*
  * lock->owner state tracking:
  *
  * lock->owner holds the task_struct pointer of the owner. Bit 0
@@ -110,19 +125,44 @@ static inline void init_lists(struct rt_
 		plist_head_init(&lock->wait_list);
 }
 
+static inline void init_rw_lists(struct rt_rw_mutex *rwlock)
+{
+	struct rt_mutex *lock = &rwlock->mutex;
+
+	/*
+	 * A rwsem priority is initialized to -1 and never will
+	 * be that again.
+	 */
+	if (unlikely(rwlock->prio < 0)) {
+		rwlock->prio = MAX_PRIO;
+		init_lists(lock);
+	}
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio);
+static inline int task_has_reader_locks(struct task_struct *task);
+
 /*
  * Calculate task priority from the waiter list priority
  *
  * Return task->normal_prio when the waiter list is empty or when
  * the waiter is not allowed to do priority boosting
+ *
+ * On PREEMPT_RT, we also check the priorities of the list
+ * of read locks that the task holds.
  */
 int rt_mutex_getprio(struct task_struct *task)
 {
-	if (likely(!task_has_pi_waiters(task)))
-		return task->normal_prio;
+	int prio = task->normal_prio;
+
+	if (likely(!task_has_pi_waiters(task) &&
+		   !task_has_reader_locks(task)))
+		return prio;
 
-	return min(task_top_pi_waiter(task)->pi_list_entry.prio,
-		   task->normal_prio);
+	if (task_has_reader_locks(task))
+		prio = rt_mutex_get_readers_prio(task, prio);
+
+	return min(task_top_pi_waiter(task)->pi_list_entry.prio, prio);
 }
 
 /*
@@ -181,6 +221,11 @@ static void rt_mutex_wake_waiter(struct
  */
 int max_lock_depth = 1024;
 
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth);
 /*
  * Adjust the priority chain. Also used for deadlock detection.
  * Decreases task's usage by one - may thus free the task.
@@ -203,7 +248,8 @@ static int rt_mutex_adjust_prio_chain(st
 				      int deadlock_detect,
 				      struct rt_mutex *orig_lock,
 				      struct rt_mutex_waiter *orig_waiter,
-				      struct task_struct *top_task)
+				      struct task_struct *top_task,
+				      int recursion_depth)
 {
 	struct rt_mutex *lock;
 	struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
@@ -316,6 +362,18 @@ static int rt_mutex_adjust_prio_chain(st
 
 	/* Grab the next task */
 	task = rt_mutex_owner(lock);
+
+	/*
+	 * Readers are special. We may need to boost more than one owner.
+	 */
+	if (unlikely(task == RT_RW_READER)) {
+		ret = rt_mutex_adjust_readers(orig_lock, orig_waiter,
+					      top_task, lock,
+					      recursion_depth);
+		raw_spin_unlock(&lock->wait_lock);
+		goto out;
+	}
+
 	get_task_struct(task);
 	raw_spin_lock_irqsave(&task->pi_lock, flags);
 
@@ -349,7 +407,7 @@ static int rt_mutex_adjust_prio_chain(st
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
  out_put_task:
 	put_task_struct(task);
-
+ out:
 	return ret;
 }
 
@@ -518,6 +576,13 @@ static int task_blocks_on_rt_mutex(struc
 		return 0;
 
 	if (waiter == rt_mutex_top_waiter(lock)) {
+		/* readers are handled differently */
+		if (unlikely(owner == RT_RW_READER)) {
+			res = rt_mutex_adjust_readers(lock, waiter,
+						      current, lock, 0);
+			return res;
+		}
+
 		raw_spin_lock_irqsave(&owner->pi_lock, flags);
 		plist_del(&top_waiter->pi_list_entry, &owner->pi_waiters);
 		plist_add(&waiter->pi_list_entry, &owner->pi_waiters);
@@ -527,7 +592,8 @@ static int task_blocks_on_rt_mutex(struc
 			chain_walk = 1;
 		raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
 	}
-	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock))
+	else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock) &&
+		 owner != RT_RW_READER)
 		chain_walk = 1;
 
 	if (!chain_walk)
@@ -543,7 +609,7 @@ static int task_blocks_on_rt_mutex(struc
 	raw_spin_unlock(&lock->wait_lock);
 
 	res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock, waiter,
-					 task);
+					 task, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 
@@ -633,7 +699,7 @@ static void remove_waiter(struct rt_mute
 
 	raw_spin_unlock(&lock->wait_lock);
 
-	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current);
+	rt_mutex_adjust_prio_chain(owner, 0, lock, NULL, current, 0);
 
 	raw_spin_lock(&lock->wait_lock);
 }
@@ -660,7 +726,7 @@ void rt_mutex_adjust_pi(struct task_stru
 	/* gets dropped in rt_mutex_adjust_prio_chain()! */
 	get_task_struct(task);
 	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task);
+	rt_mutex_adjust_prio_chain(task, 0, NULL, NULL, task, 0);
 }
 
 #ifdef CONFIG_PREEMPT_RT_FULL
@@ -739,7 +805,7 @@ static void  noinline __sched rt_spin_lo
 	struct rt_mutex_waiter waiter, *top_waiter;
 	int ret;
 
-	rt_mutex_init_waiter(&waiter, true);
+	rt_mutex_init_waiter(&waiter, true, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1001,15 +1067,564 @@ __mutex_lock_check_stamp(struct rt_mutex
 
 	return 0;
 }
+
+/*
+ * Wake up the next waiter on a rw lock.
+ *
+ * Similar to wakeup_next_waiter() but the waiter is not on
+ * the owner's pi_waiters. Also, it does not reset the lock
+ * owner.
+ *
+ * Called with lock->wait_lock held.
+ */
+static void wakeup_next_rw_waiter(struct rt_mutex *lock)
+{
+	struct rt_mutex_waiter *waiter;
+
+	waiter = rt_mutex_top_waiter(lock);
+	rt_mutex_wake_waiter(waiter);
+}
+
+/* Called with rwmutex->mutex.wait_lock held */
+static inline void
+rt_rw_mutex_take_as_reader(struct task_struct *task,
+			   struct rt_rw_mutex *rwmutex,
+			   struct rt_mutex_waiter *waiter)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	unsigned long flags;
+	bool wakeup = false;
+
+	rwmutex->nr_owners++;
+	rt_mutex_set_owner(&rwmutex->mutex, RT_RW_READER);
+
+	if (waiter || rt_mutex_has_waiters(mutex)) {
+		unsigned long flags;
+		struct rt_mutex_waiter *top;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+
+		/* remove the queued waiter. */
+		if (waiter) {
+			plist_del(&waiter->list_entry, &mutex->wait_list);
+			task->pi_blocked_on = NULL;
+		}
+
+		/*
+		 * Initialize the rwmutex prio to the priority of
+		 * the top waiter.
+		 */
+		if (rt_mutex_has_waiters(mutex)) {
+			top = rt_mutex_top_waiter(mutex);
+			top->pi_list_entry.prio = top->list_entry.prio;
+			/*
+			 * Readers set the lock priority for faster access
+			 * to read the priorities of the locks it owns
+			 * when boosting. This helps to not have to take
+			 * the pi_lock of the task. The rwmutex->prio
+			 * is protected by the rwmutex->mutex.wait_lock,
+			 * which is held during boosting.
+			 */
+			rwmutex->prio = top->list_entry.prio;
+
+			/*
+			 * If this waiter is a reader, and the reader limit
+			 * has not been hit, then we can wake this waiter
+			 * up too.
+			 */
+			if (!top->writer && under_rt_rw_limit(rwmutex->nr_owners))
+				wakeup = true;
+		} else
+			rwmutex->prio = MAX_PRIO;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	/*
+	 * It is possible to have holes in the owned_read_locks array.
+	 * If we take read lock A and then B, but then release A, we
+	 * can't move the pointer of B because if something blocks on
+	 * B, it can use the B pointer to boost this task. B can only
+	 * be moved, by owning the wait_list lock of B. Remember, the
+	 * B lock has its pointers to that index of our array.
+	 */
+	rls = &task->owned_read_locks[task->reader_lock_free];
+	BUG_ON(rls->lock);
+
+	/*
+	 * Grabing the pi_lock here as other tasks can boost this
+	 * lock via other held locks, and if free lock descriptor
+	 * was not at the end of the owned_read_locks array, then
+	 * it might get confused if it sees this descriptor with
+	 * a lock and no task.
+	 */
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	rls->lock = rwmutex;
+	rls->task = task;
+	list_add(&rls->list, &rwmutex->owners);
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	if (task->reader_lock_free == task->reader_lock_count) {
+		/*
+		 * If we nest too deep, then this task can never get the lock.
+		 * This task will then block for good. Warn about this and
+		 * hopefully, people will notice the warning. If not, they will notice
+		 * the dead task. Maybe this should be a BUG_ON()
+		 */
+		if (WARN_ON(task->reader_lock_count == MAX_RWLOCK_DEPTH))
+			return;
+
+		/*
+		 * We don't need to take the pi_lock here as we have the
+		 * wait_lock of the lock at the end of the list. If this task
+		 * was being boosted by a task blocked on this lock, it would
+		 * need to grab the wait_lock before boosting this task.
+		 */
+		task->reader_lock_count++;
+		task->reader_lock_free++;
+	} else {
+		/*
+		 * Find the next free lock in array. Again, we do not need
+		 * to grab the pi_lock because the boosting doesn't use
+		 * the reader_lock_free variable, which is the only thing
+		 * we are updating here.
+		 */
+		do {
+			rls = &task->owned_read_locks[++task->reader_lock_free];
+		} while (rls->lock &&
+			 task->reader_lock_free < task->reader_lock_count);
+	}
+
+	if (wakeup)
+		wakeup_next_rw_waiter(&rwmutex->mutex);
+}
+
+static int try_to_take_rw_read(struct rt_rw_mutex *rwmutex,
+			       struct rt_mutex_waiter *waiter)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct task_struct *owner;
+
+	assert_raw_spin_locked(&mutex->wait_lock);
+
+	/* The writer unlock can use the fast path */
+	mark_rt_mutex_waiters(mutex);
+
+	/* If we are at the reader limit, we can't take the lock */
+	if (unlikely(!under_rt_rw_limit(rwmutex->nr_owners)))
+		return 0;
+
+	/*
+	 * If there's no waiters, and there is no owner or
+	 * the owner is a reader, we get the lock.
+	 * Note, if the waiter or pending bits are set in the owner
+	 * then we need to do more checks.
+	 */
+	if (likely(!mutex->owner || mutex->owner == RT_RW_READER))
+		goto taken;
+
+	owner = rt_mutex_owner(mutex);
+
+	/* If the lock is owned by a task, then it is a writer */
+	if (owner && owner != RT_RW_READER)
+		return 0;
+
+	/*
+	 * A writer or a reader may be waiting. In either case, we
+	 * may still be able to steal the lock. The RT rwsems are
+	 * not fair if the new reader comes in and is higher priority
+	 * than all the waiters.
+	 */
+	if (likely(rt_mutex_has_waiters(mutex))) {
+		struct task_struct *pown = rt_mutex_top_waiter(mutex)->task;
+
+		if (task != pown && !lock_is_stealable(task, pown, STEAL_NORMAL))
+			return 0;
+	}
+
+ taken:
+	rt_rw_mutex_take_as_reader(task, rwmutex, waiter);
+	return 1;
+}
+
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex_waiter waiter;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (likely(try_to_take_rw_read(rwmutex, NULL))) {
+		/* Got the lock */
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	rt_mutex_init_waiter(&waiter, false, false);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, current, 0);
+	BUG_ON(ret);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_read(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_read(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+static int
+try_to_take_rw_write(struct rt_rw_mutex *rwmutex, struct rt_mutex_waiter
*waiter)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	/* Writers block if there's any readers owning the lock */
+	if (rwmutex->nr_owners)
+		return 0;
+
+	/* No readers, then we can try to take the mutex normally. */
+	return try_to_take_rt_mutex(mutex, current, waiter);
+}
+
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter waiter;
+	struct task_struct *task = current;
+	int ret;
+
+	raw_spin_lock(&mutex->wait_lock);
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL)) {
+		raw_spin_unlock(&mutex->wait_lock);
+		return;
+	}
+
+	/* Writers wake up differently than readers (flag it) */
+	rt_mutex_init_waiter(&waiter, false, true);
+
+	ret = task_blocks_on_rt_mutex(mutex, &waiter, task, 0);
+	BUG_ON(ret);
+
+	set_current_state(TASK_UNINTERRUPTIBLE);
+
+	for (;;) {
+		/* Try to acquire the lock: */
+		if (try_to_take_rw_write(rwmutex, &waiter))
+			break;
+
+		raw_spin_unlock(&mutex->wait_lock);
+
+		debug_rt_mutex_print_deadlock(waiter);
+
+		schedule_rt_mutex(mutex);
+
+		raw_spin_lock(&mutex->wait_lock);
+		set_current_state(TASK_UNINTERRUPTIBLE);
+	}
+
+	set_current_state(TASK_RUNNING);
+
+	raw_spin_unlock(&mutex->wait_lock);
+	WARN_ONCE(rwmutex->nr_owners, "Writer has lock with readers");
+
+	debug_rt_mutex_free_waiter(&waiter);
+}
+
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	int ret = 0;
+
+	if (!raw_spin_trylock(&mutex->wait_lock))
+		return ret;
+
+	init_rw_lists(rwmutex);
+
+	if (try_to_take_rw_write(rwmutex, NULL))
+		ret = 1;
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	return ret;
+}
+
+/*
+ * When a reader lock is released, see if the reader_lock_count
+ * can be moved back.
+ */
+static void shrink_reader_lock_array(struct task_struct *task)
+{
+	struct reader_lock_struct *read_locks = task->owned_read_locks;
+
+	while (task->reader_lock_count &&
+	       read_locks[task->reader_lock_count - 1].lock == NULL)
+		task->reader_lock_count--;
+
+	if (task->reader_lock_free > task->reader_lock_count)
+		task->reader_lock_free = task->reader_lock_count;
+}
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex)
+{
+	struct rt_mutex *mutex = &rwmutex->mutex;
+	struct rt_mutex_waiter *waiter;
+	struct reader_lock_struct *rls;
+	struct task_struct *task = current;
+	unsigned long flags;
+	int readers;
+	int i;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	rt_mutex_deadlock_account_unlock(current);
+
+	WARN_ON_ONCE(!rwmutex->nr_owners);
+
+	rwmutex->nr_owners--;
+
+	if (rt_mutex_has_waiters(mutex)) {
+		/*
+		 * If the top waiter is a reader and we are under
+		 * the limit, or the top waiter is a writer and
+		 * there's no more readers, then we can give the
+		 * top waiter pending ownership and wake it up.
+		 *
+		 * To simplify things, we only wake up one task, even
+		 * if its a reader. If the reader wakes up and gets the
+		 * lock, it will look to see if it can wake up the next
+		 * waiter, and so on. This way we only need to worry about
+		 * one task at a time.
+		 */
+		waiter = rt_mutex_top_waiter(mutex);
+		readers = rwmutex->nr_owners;
+		if ((waiter->writer && !readers) ||
+		    (!waiter->writer && under_rt_rw_limit(readers)))
+			wakeup_next_rw_waiter(mutex);
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (!rwmutex->nr_owners)
+		rt_mutex_set_owner(&rwmutex->mutex, NULL);
+
+	raw_spin_lock_irqsave(&task->pi_lock, flags);
+	/* Remove the lock from this tasks list */
+	for (i = task->reader_lock_count - 1; i >= 0; i--) {
+		rls = &task->owned_read_locks[i];
+
+		if (rls->lock == rwmutex) {
+			rls->lock = NULL;
+			list_del_init(&rls->list);
+			/* Shrink the array if we can. */
+			if (i == task->reader_lock_count - 1)
+				shrink_reader_lock_array(task);
+			else if (i < task->reader_lock_free)
+				task->reader_lock_free = i;
+			break;
+		}
+	}
+	raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+	WARN_ON_ONCE(i < 0);
+
+	raw_spin_unlock(&mutex->wait_lock);
+
+	/* Undo pi boosting if necessary */
+	rt_mutex_adjust_prio(current);
+}
+
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex)
+{
+	struct task_struct *task = current;
+	struct rt_mutex *mutex = &rwmutex->mutex;
+
+	raw_spin_lock(&mutex->wait_lock);
+
+	/*
+	 * Writers have normal pi with other tasks blocked
+	 * on the lock. That is, the top waiter will be in the
+	 * pi_list of this task. But for readers, waiters of
+	 * the lock are not included in the pi_list, only the
+	 * locks are. We must remove the top waiter of this
+	 * lock from this task.
+	 */
+	if (rt_mutex_has_waiters(mutex)) {
+		struct rt_mutex_waiter *waiter;
+		unsigned long flags;
+
+		waiter = rt_mutex_top_waiter(mutex);
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		plist_del(&waiter->pi_list_entry, &task->pi_waiters);
+		/*
+		 * The rt_rw_mutex_take_as_reader() will update
+		 * the rwmutex prio.
+		 */
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+	}
+
+	WARN_ONCE(rwmutex->nr_owners, "Writer owned with readers");
+
+	rt_rw_mutex_take_as_reader(task, rwmutex, NULL);
+
+	raw_spin_unlock(&mutex->wait_lock);
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return task->reader_lock_count;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	struct reader_lock_struct *rls;
+	struct rt_rw_mutex *rwmutex;
+	int lock_prio;
+	int i;
+
+	for (i = 0; i < task->reader_lock_count; i++) {
+		rls = &task->owned_read_locks[i];
+		rwmutex = rls->lock;
+		if (!rwmutex)
+			continue;
+		lock_prio = rwmutex->prio;
+		if (prio > lock_prio)
+			prio = lock_prio;
+	}
+	WARN_ON_ONCE(prio < 0);
+
+	return prio;
+}
+
+/* Expects to be called with lock->wait_lock held */
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	struct reader_lock_struct *rls;
+	struct rt_mutex_waiter *waiter;
+	struct task_struct *task;
+	struct rt_rw_mutex *rwmutex = container_of(lock, struct rt_rw_mutex,
mutex);
+	unsigned long flags;
+
+	/* Update the rwmutex's prio */
+	if (rt_mutex_has_waiters(lock)) {
+		waiter = rt_mutex_top_waiter(lock);
+		/*
+		 * Do we need to grab the task->pi_lock?
+		 * Really, we are only reading it. If it
+		 * changes, then that should follow this chain
+		 * too.
+		 */
+		rwmutex->prio = waiter->task->prio;
+	} else
+		rwmutex->prio = MAX_PRIO;
+
+	if (recursion_depth >= MAX_RWLOCK_DEPTH) {
+		WARN_ON(1);
+		return 1;
+	}
+
+	list_for_each_entry(rls, &rwmutex->owners, list) {
+		bool skip = false;
+
+		task = rls->task;
+
+		raw_spin_lock_irqsave(&task->pi_lock, flags);
+		__rt_mutex_adjust_prio(task);
+		/*
+		 * We need to grab the pi_lock to adjust the task prio
+		 * might as well use this to check if the task is blocked
+		 * as well, and save on a call to the prio chain that will
+		 * just grab the lock again and do the test.
+		 */
+		if (!rt_mutex_real_waiter(task->pi_blocked_on))
+			skip = true;
+		raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+
+		if (skip)
+			continue;
+
+		get_task_struct(task);
+		/*
+		 * rt_mutex_adjust_prio_chain will do
+		 * the put_task_struct
+		 */
+		rt_mutex_adjust_prio_chain(task, 0, orig_lock,
+					   orig_waiter, top_task,
+					   recursion_depth+1);
+	}
+
+	return 0;
+}
 #else
+static int rt_mutex_adjust_readers(struct rt_mutex *orig_lock,
+				   struct rt_mutex_waiter *orig_waiter,
+				   struct task_struct *top_task,
+				   struct rt_mutex *lock,
+				   int recursion_depth)
+{
+	return 0;
+}
+
+static int rt_mutex_get_readers_prio(struct task_struct *task, int prio)
+{
+	return prio;
+}
+
+static inline int task_has_reader_locks(struct task_struct *task)
+{
+	return 0;
+}
+
 static inline int __sched
 __mutex_lock_check_stamp(struct rt_mutex *lock, struct ww_acquire_ctx
*ctx)
 {
 	BUG();
 	return 0;
 }
-
-#endif
+#endif /* CONFIG_PREEMPT_RT */
 
 /**
  * __rt_mutex_slowlock() - Perform the wait-wake-try-to-take loop
@@ -1154,7 +1769,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
 	struct rt_mutex_waiter waiter;
 	int ret = 0;
 
-	rt_mutex_init_waiter(&waiter, false);
+	rt_mutex_init_waiter(&waiter, false, false);
 
 	raw_spin_lock(&lock->wait_lock);
 	init_lists(lock);
@@ -1718,4 +2333,12 @@ void __sched ww_mutex_unlock(struct ww_m
 	rt_mutex_unlock(&lock->base.lock);
 }
 EXPORT_SYMBOL(ww_mutex_unlock);
+
+static int __init rt_rw_limit_init(void)
+{
+	rt_rw_limit = num_possible_cpus();
+	return 0;
+}
+core_initcall(rt_rw_limit_init);
+
 #endif
Index: linux-rt.git/kernel/rtmutex_common.h
===================================================================
--- linux-rt.git.orig/kernel/rtmutex_common.h
+++ linux-rt.git/kernel/rtmutex_common.h
@@ -43,6 +43,7 @@ extern void schedule_rt_mutex_test(struc
  * @list_entry:		pi node to enqueue into the mutex waiters list
  * @pi_list_entry:	pi node to enqueue into the mutex owner waiters list
  * @task:		task reference to the blocked task
+ * @writer:		true if its a rwsem writer that is blocked
  */
 struct rt_mutex_waiter {
 	struct plist_node	list_entry;
@@ -50,6 +51,7 @@ struct rt_mutex_waiter {
 	struct task_struct	*task;
 	struct rt_mutex		*lock;
 	bool			savestate;
+	bool			writer;
 #ifdef CONFIG_DEBUG_RT_MUTEXES
 	unsigned long		ip;
 	struct pid		*deadlock_task_pid;
@@ -101,6 +103,20 @@ static inline struct task_struct *rt_mut
 		((unsigned long)lock->owner & ~RT_MUTEX_OWNER_MASKALL);
 }
 
+/* used as reader owner of the mutex */
+#define RT_RW_READER		(struct task_struct *)0x100
+
+#ifdef CONFIG_PREEMPT_RT_FULL
+
+void rt_rw_mutex_read_unlock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_downgrade_write(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_write_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_write_lock(struct rt_rw_mutex *rwmutex);
+int rt_rw_mutex_read_trylock(struct rt_rw_mutex *rwmutex);
+void rt_rw_mutex_read_lock(struct rt_rw_mutex *rwmutex);
+
+#endif /* CONFIG_PREEMPT_RT */
+
 /*
  * PI-futex support (proxy locking functions, etc.):
  */
@@ -128,11 +144,12 @@ extern int rt_mutex_finish_proxy_lock(st
 #endif
 
 static inline void
-rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate)
+rt_mutex_init_waiter(struct rt_mutex_waiter *waiter, bool savestate, bool
writer)
 {
 	debug_rt_mutex_init_waiter(waiter);
 	waiter->task = NULL;
 	waiter->savestate = savestate;
+	waiter->writer = writer;
 }
 
 #endif
Index: linux-rt.git/kernel/rt.c
===================================================================
--- linux-rt.git.orig/kernel/rt.c
+++ linux-rt.git/kernel/rt.c
@@ -310,14 +310,19 @@ EXPORT_SYMBOL(__rt_rwlock_init);
 void  rt_up_write(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	/*
+	 * Unlocking a write is the same as unlocking the mutex.
+	 * The woken reader will do all the work if it needs to
+	 * wake up more than one reader.
+	 */
+	__rt_spin_unlock(&rwsem->rwlock.mutex);
 }
 EXPORT_SYMBOL(rt_up_write);
 
 void  rt_up_read(struct rw_semaphore *rwsem)
 {
 	rwsem_release(&rwsem->dep_map, 1, _RET_IP_);
-	rt_mutex_unlock(&rwsem->lock);
+	rt_rw_mutex_read_unlock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_up_read);
 
@@ -327,13 +332,14 @@ EXPORT_SYMBOL(rt_up_read);
  */
 void  rt_downgrade_write(struct rw_semaphore *rwsem)
 {
-	BUG_ON(rt_mutex_owner(&rwsem->lock) != current);
+	BUG_ON(rt_mutex_owner(&rwsem->rwlock.mutex) != current);
+	rt_rw_mutex_downgrade_write(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_downgrade_write);
 
 int  rt_down_write_trylock(struct rw_semaphore *rwsem)
 {
-	int ret = rt_mutex_trylock(&rwsem->lock);
+	int ret = rt_rw_mutex_write_trylock(&rwsem->rwlock);
 
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
@@ -344,14 +350,14 @@ EXPORT_SYMBOL(rt_down_write_trylock);
 void  rt_down_write(struct rw_semaphore *rwsem)
 {
 	rwsem_acquire(&rwsem->dep_map, 0, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write);
 
 void  rt_down_write_nested(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 EXPORT_SYMBOL(rt_down_write_nested);
 
@@ -359,14 +365,14 @@ void rt_down_write_nested_lock(struct rw
 		struct lockdep_map *nest)
 {
 	rwsem_acquire_nest(&rwsem->dep_map, 0, 0, nest, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_write_lock(&rwsem->rwlock);
 }
 
 int  rt_down_read_trylock(struct rw_semaphore *rwsem)
 {
 	int ret;
 
-	ret = rt_mutex_trylock(&rwsem->lock);
+	ret = rt_rw_mutex_read_trylock(&rwsem->rwlock);
 	if (ret)
 		rwsem_acquire(&rwsem->dep_map, 0, 1, _RET_IP_);
 
@@ -377,7 +383,7 @@ EXPORT_SYMBOL(rt_down_read_trylock);
 static void __rt_down_read(struct rw_semaphore *rwsem, int subclass)
 {
 	rwsem_acquire(&rwsem->dep_map, subclass, 0, _RET_IP_);
-	rt_mutex_lock(&rwsem->lock);
+	rt_rw_mutex_read_lock(&rwsem->rwlock);
 }
 
 void  rt_down_read(struct rw_semaphore *rwsem)
@@ -402,7 +408,8 @@ void  __rt_rwsem_init(struct rw_semaphor
 	debug_check_no_locks_freed((void *)rwsem, sizeof(*rwsem));
 	lockdep_init_map(&rwsem->dep_map, name, key, 0);
 #endif
-	rwsem->lock.save_state = 0;
+	rt_rw_mutex_init(&rwsem->rwlock);
+	rwsem->rwlock.mutex.save_state = 0;
 }
 EXPORT_SYMBOL(__rt_rwsem_init);
 
Index: linux-rt.git/include/linux/sched.h
===================================================================
--- linux-rt.git.orig/include/linux/sched.h
+++ linux-rt.git/include/linux/sched.h
@@ -993,6 +993,22 @@ struct sched_entity {
 #endif
 };
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+struct rt_rw_mutex;
+/**
+ * reader_lock_struct
+ *
+ * @lock:	pointer to rwsem that is held
+ * @task:	pointer back to task, for lock code
+ * @list_head:	link into rt_rw_mutex owners list
+ */
+struct reader_lock_struct {
+	struct rt_rw_mutex		*lock;
+	struct task_struct		*task;
+	struct list_head		list;
+};
+#endif
+
 struct sched_rt_entity {
 	struct list_head run_list;
 	unsigned long timeout;
@@ -1224,6 +1240,10 @@ struct task_struct {
 #ifdef CONFIG_PREEMPT_RT_FULL
 	/* TODO: move me into ->restart_block ? */
 	struct siginfo forced_info;
+#define MAX_RWLOCK_DEPTH 5
+	int reader_lock_count;	/* index of last element in owned_read_locks */
+	int reader_lock_free;	/* index of next free element. */
+	struct reader_lock_struct owned_read_locks[MAX_RWLOCK_DEPTH];
 #endif
 
 	unsigned long sas_ss_sp;
Index: linux-rt.git/kernel/fork.c
===================================================================
--- linux-rt.git.orig/kernel/fork.c
+++ linux-rt.git/kernel/fork.c
@@ -1385,6 +1385,26 @@ static struct task_struct *copy_process(
 	if (retval)
 		goto bad_fork_cleanup_io;
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+	p->reader_lock_count = 0;
+	p->reader_lock_free = 0;
+	/* Bracket to keep 'i' local */
+	{
+		int i;
+		/*
+		 * We could put the initialization of this list in
+		 * the grabbing of the lock, but it is safer to
+		 * do it now. The list head initialization may be
+		 * removed, but we'll keep it for now, just to be safe.
+		 */
+		for (i = 0; i < MAX_RWLOCK_DEPTH; i++) {
+			p->owned_read_locks[i].lock = NULL;
+			p->owned_read_locks[i].task = p;
+			INIT_LIST_HEAD(&p->owned_read_locks[i].list);
+		}
+	}
+#endif
+
 	if (pid != &init_struct_pid) {
 		retval = -ENOMEM;
 		pid = alloc_pid(p->nsproxy->pid_ns_for_children);
Index: linux-rt.git/kernel/sysctl.c
===================================================================
--- linux-rt.git.orig/kernel/sysctl.c
+++ linux-rt.git/kernel/sysctl.c
@@ -91,6 +91,10 @@
 #include 
 #endif
 
+#ifdef CONFIG_PREEMPT_RT_FULL
+extern int rt_rw_limit;
+#endif
+
 
 #if defined(CONFIG_SYSCTL)
 
@@ -453,6 +457,15 @@ static struct ctl_table kern_table[] = {
 		.proc_handler	= proc_dointvec,
 	},
 #endif
+#ifdef CONFIG_PREEMPT_RT_FULL
+	{
+		.procname	= "rwsem_reader_limit",
+		.data		= &rt_rw_limit,
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+#endif
 	{
 		.procname	= "panic",
 		.data		= &panic_timeout,
Index: linux-rt.git/include/linux/rtmutex.h
===================================================================
--- linux-rt.git.orig/include/linux/rtmutex.h
+++ linux-rt.git/include/linux/rtmutex.h
@@ -42,6 +42,21 @@ struct rt_mutex {
 #endif
 };
 
+/**
+ * The rt_rw_mutex structure
+ *
+ * @rt_mutex:	The mutex to wait on
+ * @owners:	list of read owners of the mutex
+ * @nr_owners:	number of read owners
+ * @prio:	the priority of the highest waiter
+ */
+struct rt_rw_mutex {
+	struct rt_mutex		mutex;
+	struct list_head	owners;
+	int			nr_owners;
+	int			prio;
+};
+
 struct rt_mutex_waiter;
 struct hrtimer_sleeper;
 
@@ -75,6 +90,15 @@ struct hrtimer_sleeper;
 		__rt_mutex_init(mutex, #mutex);			\
 	} while (0)
 
+# define rt_rw_mutex_init(rwmutex)					\
+	do {								\
+		raw_spin_lock_init(&(rwmutex)->mutex.wait_lock);	\
+		INIT_LIST_HEAD(&(rwmutex)->owners);			\
+		(rwmutex)->nr_owners = 0;				\
+		(rwmutex)->prio = -1;					\
+		__rt_mutex_init(&(rwmutex)->mutex, #rwmutex);		\
+	} while (0)
+
 #define __RT_MUTEX_INITIALIZER_PLAIN(mutexname) \
 	.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(mutexname.wait_lock) \
 	, .wait_list = PLIST_HEAD_INIT(mutexname.wait_list) \
@@ -85,6 +109,11 @@ struct hrtimer_sleeper;
 #define __RT_MUTEX_INITIALIZER(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname) }
 
+#define __RT_RW_MUTEX_INITIALIZER(mutexname)			\
+	{ .owners = LIST_HEAD_INIT(mutexname.owners)		\
+	  , .prio = -1						\
+	  , .mutex = __RT_MUTEX_INITIALIZER(mutexname.mutex) }
+
 #define __RT_MUTEX_INITIALIZER_SAVE_STATE(mutexname) \
 	{ __RT_MUTEX_INITIALIZER_PLAIN(mutexname)    \
 	  , .save_state = 1 }
Index: linux-rt.git/include/linux/rwsem_rt.h
===================================================================
--- linux-rt.git.orig/include/linux/rwsem_rt.h
+++ linux-rt.git/include/linux/rwsem_rt.h
@@ -19,14 +19,14 @@
 #include 
 
 struct rw_semaphore {
-	struct rt_mutex		lock;
+	struct rt_rw_mutex	rwlock;
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 	struct lockdep_map	dep_map;
 #endif
 };
 
 #define __RWSEM_INITIALIZER(name) \
-	{ .lock = __RT_MUTEX_INITIALIZER(name.lock), \
+	{ .rwlock = __RT_RW_MUTEX_INITIALIZER(name.rwlock), \
 	  RW_DEP_MAP_INIT(name) }
 
 #define DECLARE_RWSEM(lockname) \
@@ -37,7 +37,7 @@ extern void  __rt_rwsem_init(struct rw_s
 
 #define __rt_init_rwsem(sem, name, key)			\
 	do {						\
-		rt_mutex_init(&(sem)->lock);		\
+		rt_rw_mutex_init(&(sem)->rwlock);	\
 		__rt_rwsem_init((sem), (name), (key));\
 	} while (0)
 
@@ -63,7 +63,7 @@ extern void  rt_up_write(struct rw_semap
 extern void  rt_downgrade_write(struct rw_semaphore *rwsem);
 
 #define init_rwsem(sem)		rt_init_rwsem(sem)
-#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->lock)
+#define rwsem_is_locked(s)	rt_mutex_is_locked(&(s)->rwlock.mutex)
 
 static inline void down_read(struct rw_semaphore *sem)
 {
Index: linux-rt.git/kernel/futex.c
===================================================================
--- linux-rt.git.orig/kernel/futex.c
+++ linux-rt.git/kernel/futex.c
@@ -2327,7 +2327,7 @@ static int futex_wait_requeue_pi(u32 __u
 	 * The waiter is allocated on our stack, manipulated by the requeue
 	 * code while we sleep on uaddr.
 	 */
-	rt_mutex_init_waiter(&rt_waiter, false);
+	rt_mutex_init_waiter(&rt_waiter, false, false);
 
 	ret = get_futex_key(uaddr2, flags & FLAGS_SHARED, &key2, VERIFY_WRITE);
 	if (unlikely(ret != 0))
 
CD: 4ms