Home
Reading
Searching
Subscribe
Sponsors
Statistics
Posting
Contact
Spam
Lists
Links
About
Hosting
Filtering
Features Download
Marketing
Archives
FAQ
Blog
 
Gmane
From: Lai Jiangshan <laijs <at> cn.fujitsu.com>
Subject: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation
Newsgroups: gmane.linux.kernel
Date: Monday 30th August 2010 09:31:44 UTC (over 6 years ago)
0) contents
1) overview
2) scalable
3) multi-GP
4) integrated expedited synchronize_rcu
5) preemptible
6) top-down implies
7) speech
8) core rcuring

1) overview
This is a new RCU implementation: RCURING. It uses a ring atomic counters
to control the completion of GPs and a ring callbacks batches to process
the callbacks.

In 2008, I think out the idea, and wrote the first 300lines code of
rcuring.c,
but I'm too lazy to finish it

2) scalable
There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt),
one of them is for misc things, it is a infrequency lock, the other is
for advancing complete, it is frequency lock. But somebody else would
very probably do the same work for me if somebody else is holding this
lock,
so we alway use spin_trylock() for this lock, so it is scalable.

We don't need any lock for reporting quiescent state, we just need 2
atomic operations for every RCU domain.

3) multi-GP
All old rcu implementations have only one waiting GP.

time  -----ta-tb--------------------tc----------------td------------->
GP    -----|------GP1---------------|--------GP2------|------------->
cpu#x ........|---------------------------------------|...............>
              |->insert a callback                    |->handle callback

the callback have to wait a partial GP1, and a full GP2, it may wait
near 2 GPs in worse case because it have to wait the rcu read sites witch
start at (tb, tc] which are needless to wait. In average, a callback
have to wait (1 + 1/(1+1))= 1.5GPs

if we can have three simultaneous waiting GPs:

GPs   .....|------GP1---------------|..........................
      ...........|--------GP2-----------|....................
      ....................|---------GP3---------------|........
cpu#x ........|-------------------------|..........................>
              |->insert a callback      |->handle callback
The callback is handled more quickly, In average, a callback have to wait
(1 + 1/(3+1))=1.25GPs

if we can have X simultaneous waiting GPs, a callback have to wait
(1 + 1/(X+1)) GPs in average. The default X in this patch is 15
(including the reserved GPs).

4) integrated expedited synchronize_rcu
expedited synchronize_rcu is implemented in rcuring.c, not in other files.
The whole of it is less than 50 lines of code. we just reserve several
GPs for expedited synchronize_rcu, when a expedited callback is inserted,
we start a new GP immediately(we have reserved GPs, this step will
very probably success), and then force this new GP to be completed quickly.
(thanks to multi-GP)

5) preemptible
new simple preemptible code, rcu_read_lock/unlock() is simple and is inline
function.
add new kernel-offset.c to avoid recursively header files inclusion.

6) top-down implies
irq and local_irq_disable() implies
preempt_disable()/rcu_read_lock_sched(),
irq and local_irq_disable() implies rcu_read_lock(). (*)
preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*)
*: In preemptible rcutree/rcutiny, it is false.

7) speech
I will give a speech about RCU and this RCU implementation
in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010.

8) core rcuring
(Comments and document is still been writing)

all read_read_lock/unlock_domain() is nested in a coresponding
rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock()
ciritcal region as a full rcu read site ciritcal region.

read site:
  rcuring_lock()
    lock a complete, and ensure this locked_complete - done_complete(1) > 0
      curr_complete(1) - locked_complete >= 0
    mb(1)

  rcu_derefrence(ptr)

  rcuring_unlock()
    mb(2)
    release its locked_complete.

update site:
  rcu_assign_pointer(ptr, new)

  (handle callback:call_rcu()+rcu_do_batch())
  (call_rcu():)
  mb(3)
  get the curr_complete(2)
  set callback's complete as this curr_complete(2).
    (we don't have any field to record the callback's complete, we just
     emulate it by rdp->done_complete and the position of this callback
     in rdp->batch.)

  time pass... until done_complete(2) - callback's complete > 0

  (rcu_do_batch():)
  mb(4)
  call the callback: (free old ptr, etc.)

Signed-off-by: Lai Jiangshan 
---
 Kbuild                   |   75 ++-
 include/linux/rcupdate.h |    6 
 include/linux/rcuring.h  |  195 +++++++++
 include/linux/sched.h    |    6 
 init/Kconfig             |   32 +
 kernel/Makefile          |    1 
 kernel/kernel-offsets.c  |   20 
 kernel/rcuring.c         | 1002
+++++++++++++++++++++++++++++++++++++++++++++++
 kernel/sched.c           |    2 
 kernel/time/tick-sched.c |    4 
 10 files changed, 1324 insertions(+), 19 deletions(-)
diff --git a/Kbuild b/Kbuild
index e3737ad..bce37cb 100644
--- a/Kbuild
+++ b/Kbuild
@@ -3,7 +3,19 @@
 # This file takes care of the following:
 # 1) Generate bounds.h
 # 2) Generate asm-offsets.h (may need bounds.h)
-# 3) Check for missing system calls
+# 3) Generate kernel-offsets.h
+# 4) Check for missing system calls
+
+# Default sed regexp - multiline due to syntax constraints
+define sed-y
+	"/^->/{s:->#\(.*\):/* \1 */:; \
+	s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
+	s:->::; p;}"
+endef
+
+quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag)  $@
+cmd_offsets_cc_s_c       = $(CC) -D__GENARATING_OFFSETS__               \
+                           $(c_flags) -fverbose-asm -S -o $@ $<
 
 #####
 # 1) Generate bounds.h
@@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild
 # 2) Generate asm-offsets.h
 #
 
-offsets-file := include/generated/asm-offsets.h
+asm-offsets-file := include/generated/asm-offsets.h
 
-always  += $(offsets-file)
-targets += $(offsets-file)
+always  += $(asm-offsets-file)
+targets += $(asm-offsets-file)
 targets += arch/$(SRCARCH)/kernel/asm-offsets.s
 
-
-# Default sed regexp - multiline due to syntax constraints
-define sed-y
-	"/^->/{s:->#\(.*\):/* \1 */:; \
-	s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
-	s:->::; p;}"
-endef
-
-quiet_cmd_offsets = GEN     $@
-define cmd_offsets
+quiet_cmd_asm_offsets = GEN     $@
+define cmd_asm_offsets
 	(set -e; \
 	 echo "#ifndef __ASM_OFFSETS_H__"; \
 	 echo "#define __ASM_OFFSETS_H__"; \
@@ -78,13 +82,48 @@ endef
 arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c
\
                                       $(obj)/$(bounds-file) FORCE
 	$(Q)mkdir -p $(dir $@)
-	$(call if_changed_dep,cc_s_c)
+	$(call if_changed_dep,offsets_cc_s_c)
+
+$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
+	$(call cmd,asm_offsets)
+
+#####
+# 3) Generate kernel-offsets.h
+#
+
+kernel-offsets-file := include/generated/kernel-offsets.h
+
+always  += $(kernel-offsets-file)
+targets += $(kernel-offsets-file)
+targets += kernel/kernel-offsets.s
+
+quiet_cmd_kernel_offsets = GEN     $@
+define cmd_kernel_offsets
+	(set -e; \
+	 echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \
+	 echo "#define __LINUX_KERNEL_OFFSETS_H__"; \
+	 echo "/*"; \
+	 echo " * DO NOT MODIFY."; \
+	 echo " *"; \
+	 echo " * This file was generated by Kbuild"; \
+	 echo " *"; \
+	 echo " */"; \
+	 echo ""; \
+	 sed -ne $(sed-y) $<; \
+	 echo ""; \
+	 echo "#endif" ) > $@
+endef
+
+# We use internal kbuild rules to avoid the "is up to date" message from
make
+kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \
+                         $(obj)/$(asm-offsets-file) FORCE
+	$(call if_changed_dep,offsets_cc_s_c)
 
-$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
-	$(call cmd,offsets)
+$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild
+	$(call cmd,kernel_offsets)
 
 #####
-# 3) Check for missing system calls
+# 4) Check for missing system calls
 #
 
 quiet_cmd_syscalls = CALL    $<
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 89414d6..42cd6bc 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu);
 extern void rcu_check_callbacks(int cpu, int user);
 struct notifier_block;
 
+#ifndef CONFIG_RCURING
 #ifdef CONFIG_NO_HZ
 
 extern void rcu_enter_nohz(void);
@@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void)
 }
 
 #endif /* #else #ifdef CONFIG_NO_HZ */
+#endif
 
 #if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU)
 #include 
 #elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU)
 #include 
+#elif defined(CONFIG_RCURING)
+#include 
 #else
 #error "Unknown RCU implementation specified to kernel configuration"
 #endif
@@ -686,6 +690,7 @@ struct rcu_synchronize {
 
 extern void wakeme_after_rcu(struct rcu_head  *head);
 
+#ifndef CONFIG_RCURING
 #ifdef CONFIG_PREEMPT_RCU
 
 /**
@@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head,
 #define	call_rcu	call_rcu_sched
 
 #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
+#endif
 
 /**
  * call_rcu_bh() - Queue an RCU for invocation after a quicker grace
period.
diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h
new file mode 100644
index 0000000..343b932
--- /dev/null
+++ b/include/linux/rcuring.h
@@ -0,0 +1,195 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ *
+ * Copyright IBM Corporation, 2008
+ * Copyright Fujitsu 2008-2010 Lai Jiangshan
+ *
+ * Authors: Dipankar Sarma 
+ *	    Manfred Spraul 
+ *	    Paul E. McKenney  Hierarchical version
+ *	    Lai Jiangshan  RCURING version
+ */
+
+#ifndef __LINUX_RCURING_H
+#define __LINUX_RCURING_H
+
+#define __rcu_read_lock_bh() local_bh_disable()
+#define __rcu_read_unlock_bh() local_bh_enable()
+#define __rcu_read_lock_sched() preempt_disable()
+#define __rcu_read_unlock_sched() preempt_enable()
+
+#ifdef CONFIG_RCURING_BH
+extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct
rcu_head *));
+extern void synchronize_rcu_bh(void);
+extern void synchronize_rcu_bh_expedited(void);
+extern void rcu_bh_qs(int cpu);
+extern long rcu_batches_completed_bh(void);
+extern void rcu_barrier_bh(void);
+extern void rcu_bh_force_quiescent_state(void);
+#else /* CONFIG_RCURING_BH */
+#define call_rcu_bh call_rcu_sched
+#define synchronize_rcu_bh synchronize_rcu_sched
+#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited
+#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0)
+#define rcu_batches_completed_bh rcu_batches_completed_sched
+#define rcu_barrier_bh rcu_barrier_sched
+#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state
+#endif /* CONFIG_RCURING_BH */
+
+extern
+void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head
*));
+extern void synchronize_rcu_sched(void);
+#define synchronize_sched synchronize_rcu_sched
+extern void synchronize_rcu_sched_expedited(void);
+#define synchronize_sched_expedited synchronize_rcu_sched_expedited
+extern void rcu_note_context_switch(int cpu);
+extern long rcu_batches_completed_sched(void);
+extern void rcu_barrier_sched(void);
+extern void rcu_sched_force_quiescent_state(void);
+
+/*
+ * The @flags is valid only when RCURING_PREEMPT_SAVED is set.
+ *
+ * When preemptible rcu read site is preempted, we save
RCURING_PREEMPT_SAVED,
+ * RCURING_PREEMPT_QS and this read site's locked_complete(only the last
+ * RCURING_COUNTERS_SHIFT bits) in the @flags.
+ *
+ * When the outmost rcu read site is closed, we will clear
+ * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows
+ * this task has passed a quiescent state and release the old read site's
+ * locked_complete.
+ */
+#define RCURING_PREEMPT_SAVED	(1 << 31)
+#define RCURING_PREEMPT_QS	(1 << 30)
+#define RCURING_PREEMPT_FLAGS	(RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS)
+
+struct rcu_task_preempt {
+	unsigned int nesting;
+	unsigned int flags;
+};
+
+static inline void rcu_read_lock_preempt(struct rcu_task_preempt
*rcu_task)
+{
+	rcu_task->nesting++;
+	barrier();
+}
+
+static inline void rcu_read_unlock_preempt(struct rcu_task_preempt
*rcu_task)
+{
+	int nesting = rcu_task->nesting;
+
+	if (nesting == 1) {
+		unsigned int flags;
+
+		barrier();
+		flags = ACCESS_ONCE(rcu_task->flags);
+
+		if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) {
+			flags &= ~RCURING_PREEMPT_QS;
+			ACCESS_ONCE(rcu_task->flags) = flags;
+		}
+	}
+
+	barrier();
+	rcu_task->nesting = nesting - 1;
+}
+
+#ifdef CONFIG_RCURING_PREEMPT
+
+#ifdef __GENARATING_OFFSETS__
+#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL)
+#else
+#include 
+#define task_rcu_preempt(p)						\
+	((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT))
+#endif
+
+#define current_rcu_preempt() task_rcu_preempt(current)
+#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt())
+#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt())
+extern
+void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head
*));
+#define call_rcu call_rcu_preempt
+extern void synchronize_rcu_preempt(void);
+#define synchronize_rcu synchronize_rcu_preempt
+extern void synchronize_rcu_preempt_expedited(void);
+#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited
+extern long rcu_batches_completed_preempt(void);
+#define rcu_batches_completed rcu_batches_completed_preempt
+extern void rcu_barrier_preempt(void);
+#define rcu_barrier rcu_barrier_preempt
+extern void rcu_preempt_force_quiescent_state(void);
+#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state
+
+struct task_struct;
+static inline void rcu_copy_process(struct task_struct *p)
+{
+	struct rcu_task_preempt *rcu_task = task_rcu_preempt(p);
+
+	rcu_task->nesting = 0;
+	rcu_task->flags = 0;
+}
+
+static inline void exit_rcu(void)
+{
+	if (current_rcu_preempt()->nesting) {
+		WARN_ON(1);
+		current_rcu_preempt()->nesting = 0;
+	}
+}
+
+#else /*CONFIG_RCURING_PREEMPT */
+
+#define __rcu_read_lock() __rcu_read_lock_sched();
+#define __rcu_read_unlock() __rcu_read_unlock_sched();
+#define call_rcu call_rcu_sched
+#define synchronize_rcu synchronize_rcu_sched
+#define synchronize_rcu_expedited synchronize_rcu_sched_expedited
+#define rcu_batches_completed rcu_batches_completed_sched
+#define rcu_barrier rcu_barrier_sched
+#define rcu_force_quiescent_state rcu_sched_force_quiescent_state
+
+static inline void rcu_copy_process(struct task_struct *p) {}
+static inline void exit_rcu(void) {}
+#endif /* CONFIG_RCURING_PREEMPT */
+
+#ifdef CONFIG_NO_HZ
+extern void rcu_kernel_enter(void);
+extern void rcu_kernel_exit(void);
+
+static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); }
+static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); }
+static inline void rcu_irq_enter(void) { rcu_kernel_enter(); }
+static inline void rcu_irq_exit(void) { rcu_kernel_exit(); }
+extern void rcu_enter_nohz(void);
+extern void rcu_exit_nohz(void);
+#else
+static inline void rcu_nmi_enter(void) {}
+static inline void rcu_nmi_exit(void) {}
+static inline void rcu_irq_enter(void) {}
+static inline void rcu_irq_exit(void) {}
+static inline void rcu_enter_nohz(void) {}
+static inline void rcu_exit_nohz(void) {}
+#endif
+
+extern int rcu_needs_cpu(int cpu);
+extern void rcu_check_callbacks(int cpu, int user);
+
+extern void rcu_scheduler_starting(void);
+extern int rcu_scheduler_active __read_mostly;
+
+#endif /* __LINUX_RCURING_H */
diff --git a/include/linux/sched.h b/include/linux/sched.h
index e18473f..15b332d 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1211,6 +1211,10 @@ struct task_struct {
 	struct rcu_node *rcu_blocked_node;
 #endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */
 
+#ifdef CONFIG_RCURING_PREEMPT
+	struct rcu_task_preempt rcu_task_preempt;
+#endif
+
 #if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT)
 	struct sched_info sched_info;
 #endif
@@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct
task_struct *p)
 	INIT_LIST_HEAD(&p->rcu_node_entry);
 }
 
-#else
+#elif !defined(CONFIG_RCURING)
 
 static inline void rcu_copy_process(struct task_struct *p)
 {
diff --git a/init/Kconfig b/init/Kconfig
index a619a1a..48e58d9 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU
 	  for real-time UP systems.  This option greatly reduces the
 	  memory footprint of RCU.
 
+config RCURING
+	bool "Multi-GP ring based RCU"
+
 endchoice
 
 config PREEMPT_RCU
@@ -450,6 +453,35 @@ config TREE_RCU_TRACE
 	  TREE_PREEMPT_RCU implementations, permitting Makefile to
 	  trivially select kernel/rcutree_trace.c.
 
+config RCURING_BH
+	bool "RCU for bh"
+	default y
+	depends on RCURING && !PREEMPT_RT
+	help
+	  If Y, use a independent rcu domain for rcu_bh.
+	  If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh,
+	  rcu_read_unlock_bh).
+
+config RCURING_BH_PREEMPT
+	bool
+	depends on PREEMPT_RT
+	help
+	  rcu_bh will map to rcu_preempt.
+
+config RCURING_PREEMPT
+	bool "RCURING based reemptible RCU"
+	default n
+	depends on RCURING && PREEMPT
+	help
+	  If Y, normal rcu functionality will map to rcu_preempt.
+	  If N, normal rcu functionality will map to rcu_sched.
+
+config RCURING_COUNTERS_SHIFT
+	int "RCURING's counter shift"
+	range 1 10
+	depends on RCURING
+	default 4
+
 endmenu # "RCU Subsystem"
 
 config IKCONFIG
diff --git a/kernel/Makefile b/kernel/Makefile
index 92b7420..66eab8c 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o
 obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o
 obj-$(CONFIG_TINY_RCU) += rcutiny.o
 obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o
+obj-$(CONFIG_RCURING) += rcuring.o
 obj-$(CONFIG_RELAY) += relay.o
 obj-$(CONFIG_SYSCTL) += utsname_sysctl.o
 obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c
new file mode 100644
index 0000000..0d30817
--- /dev/null
+++ b/kernel/kernel-offsets.c
@@ -0,0 +1,20 @@
+/*
+ * Generate definitions needed by assembly language modules.
+ *
+ * Copyright (C) 2008-2010 Lai Jiangshan
+ */
+
+#define __GENERATING_KERNEL_OFFSETS__
+#include 
+#include 
+#include 
+
+void foo(void);
+
+void foo(void)
+{
+#ifdef CONFIG_RCURING_PREEMPT
+	OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt);
+#endif
+}
+
diff --git a/kernel/rcuring.c b/kernel/rcuring.c
new file mode 100644
index 0000000..e7cedb5
--- /dev/null
+++ b/kernel/rcuring.c
@@ -0,0 +1,1002 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ *
+ * Copyright IBM Corporation, 2008
+ * Copyright Fujitsu 2008-2010 Lai Jiangshan
+ *
+ * Authors: Dipankar Sarma 
+ *	    Manfred Spraul 
+ *	    Paul E. McKenney  Hierarchical version
+ *	    Lai Jiangshan  RCURING version
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#define RCURING_IDX_SHIFT	CONFIG_RCURING_COUNTERS_SHIFT
+#define RCURING_IDX_MAX		(1L << RCURING_IDX_SHIFT)
+#define RCURING_IDX_MASK	(RCURING_IDX_MAX - 1)
+#define RCURING_IDX(complete)	((complete) & RCURING_IDX_MASK)
+
+struct rcuring {
+	atomic_t	ctr[RCURING_IDX_MAX];
+	long		curr_complete;
+	long		done_complete;
+
+	raw_spinlock_t	lock;
+};
+
+/* It is long history that we use -300 as an initial complete */
+#define RCURING_INIT_COMPLETE	-300L
+#define RCURING_INIT(name)						\
+{									\
+	{[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),},	\
+	RCURING_INIT_COMPLETE,						\
+	RCURING_INIT_COMPLETE - 1,					\
+	__RAW_SPIN_LOCK_UNLOCKED(name.lock),				\
+}
+
+static inline
+int rcuring_ctr_read(struct rcuring *rr, unsigned long complete)
+{
+	int res;
+
+	/* atomic_read() does not ensure to imply barrier(). */
+	barrier();
+	res = atomic_read(rr->ctr + RCURING_IDX(complete));
+	barrier();
+
+	return res;
+}
+
+void __rcuring_advance_done_complete(struct rcuring *rr)
+{
+	long wait_complete = rr->done_complete + 1;
+
+	if (!rcuring_ctr_read(rr, wait_complete)) {
+		do {
+			wait_complete++;
+		} while (!rcuring_ctr_read(rr, wait_complete));
+
+		ACCESS_ONCE(rr->done_complete) = wait_complete - 1;
+	}
+}
+
+static inline
+int rcuring_could_advance_done_complete(struct rcuring *rr)
+{
+	return !rcuring_ctr_read(rr, rr->done_complete + 1);
+}
+
+static inline
+void rcuring_advance_done_complete(struct rcuring *rr)
+{
+	if (rcuring_could_advance_done_complete(rr)) {
+		if (__raw_spin_trylock(&rr->lock)) {
+			__rcuring_advance_done_complete(rr);
+			__raw_spin_unlock(&rr->lock);
+		}
+	}
+}
+
+void __rcuring_advance_curr_complete(struct rcuring *rr, long
next_complete)
+{
+	long curr_complete = rr->curr_complete;
+
+	if (curr_complete + 1 == next_complete) {
+		if (!rcuring_ctr_read(rr, next_complete)) {
+			ACCESS_ONCE(rr->curr_complete) = next_complete;
+			smp_mb__before_atomic_inc();
+			atomic_inc(rr->ctr + RCURING_IDX(next_complete));
+			smp_mb__after_atomic_inc();
+			atomic_dec(rr->ctr + RCURING_IDX(curr_complete));
+		}
+	}
+}
+
+static inline
+int rcuring_could_advance_complete(struct rcuring *rr, long next_complete)
+{
+	if (rcuring_could_advance_done_complete(rr))
+		return 1;
+
+	if (rr->curr_complete + 1 == next_complete) {
+		if (!rcuring_ctr_read(rr, next_complete))
+			return 1;
+	}
+
+	return 0;
+}
+
+static inline
+void rcuring_advance_complete(struct rcuring *rr, long next_complete)
+{
+	if (rcuring_could_advance_complete(rr, next_complete)) {
+		if (__raw_spin_trylock(&rr->lock)) {
+			__rcuring_advance_done_complete(rr);
+			__rcuring_advance_curr_complete(rr, next_complete);
+			__raw_spin_unlock(&rr->lock);
+		}
+	}
+}
+
+static inline
+void rcuring_advance_complete_force(struct rcuring *rr, long
next_complete)
+{
+	if (rcuring_could_advance_complete(rr, next_complete)) {
+		__raw_spin_lock(&rr->lock);
+		__rcuring_advance_done_complete(rr);
+		__rcuring_advance_curr_complete(rr, next_complete);
+		__raw_spin_unlock(&rr->lock);
+	}
+}
+
+static inline long __locked_complete_adjust(int locked_idx, long complete)
+{
+	long locked_complete;
+
+	locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx;
+	if (locked_complete - complete <= 0)
+		return locked_complete;
+	else
+		return locked_complete - RCURING_IDX_MAX;
+}
+
+static long rcuring_lock(struct rcuring *rr)
+{
+	long locked_complete;
+	int idx;
+
+	for (;;) {
+		idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete));
+		if (likely(atomic_inc_not_zero(rr->ctr + idx)))
+			break;
+	}
+	smp_mb__after_atomic_inc();
+
+	locked_complete = ACCESS_ONCE(rr->curr_complete);
+	if (likely(RCURING_IDX(locked_complete) == idx))
+		return locked_complete;
+	else
+		return __locked_complete_adjust(idx, locked_complete);
+}
+
+static void rcuring_unlock(struct rcuring *rr, long locked_complete)
+{
+	smp_mb__before_atomic_dec();
+
+	atomic_dec(rr->ctr + RCURING_IDX(locked_complete));
+}
+
+static inline
+void rcuring_dup_lock(struct rcuring *rr, long locked_complete)
+{
+	atomic_inc(rr->ctr + RCURING_IDX(locked_complete));
+}
+
+static inline
+long rcuring_advance_lock(struct rcuring *rr, long locked_complete)
+{
+	long new_locked_complete = locked_complete;
+
+	if (locked_complete != rr->curr_complete) {
+		/*
+		 * Lock the new complete at first, and then release
+		 * the old one. It prevents errors when NMI/SMI occurs
+		 * after rcuring_unlock() - we still lock it.
+		 */
+		new_locked_complete = rcuring_lock(rr);
+		rcuring_unlock(rr, locked_complete);
+	}
+
+	return new_locked_complete;
+}
+
+static inline long rcuring_get_done_complete(struct rcuring *rr)
+{
+	return ACCESS_ONCE(rr->done_complete);
+}
+
+static inline long rcuring_get_curr_complete(struct rcuring *rr)
+{
+	return ACCESS_ONCE(rr->curr_complete);
+}
+
+struct rcu_batch {
+	struct rcu_head *list, **tail;
+};
+
+static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from)
+{
+	if (from->list != NULL) {
+		*to->tail = from->list;
+		to->tail = from->tail;
+
+		from->list = NULL;
+		from->tail = &from->list;
+	}
+}
+
+static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new)
+{
+	*batch->tail = new;
+	batch->tail = &new->next;
+}
+
+struct rcu_data {
+	long locked_complete;
+
+	/*
+	 * callbacks are in batches (done_complete, curr_complete]
+	 * curr_complete - done_complete >= 0
+	 * curr_complete - done_complete <= RCURING_IDX_MAX
+	 * domain's curr_complete - curr_complete >= 0
+	 * domain's done_complete - done_complete >= 0
+	 *
+	 * curr_complete == done_complete just means @batch is empty.
+	 * we have at least one callback in batch[RCURING_IDX(done_complete)]
+	 * if curr_complete != done_complete
+	 */
+	long curr_complete;
+	long done_complete;
+
+	struct rcu_batch batch[RCURING_IDX_MAX];
+	struct rcu_batch done_batch;
+
+	unsigned int qlen;
+	unsigned long jiffies;
+	unsigned long stall_jiffies;
+};
+
+struct rcu_domain {
+	struct rcuring rcuring;
+	const char *domain_name;
+	struct rcu_data __percpu *rcu_data;
+
+	spinlock_t lock; /* this lock is for fqs or other misc things */
+	long fqs_complete;
+	void (*force_quiescent_state)(struct rcu_domain *domain,
+			long fqs_complete);
+};
+
+#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1)
+
+static inline long gp_reserved(struct rcu_domain *domain)
+{
+	return EXPEDITED_GP_RESERVED_RECOMMEND;
+}
+
+static inline void __init rcu_data_init(struct rcu_data *rdp)
+{
+	int idx;
+
+	for (idx = 0; idx < RCURING_IDX_MAX; idx++)
+		rdp->batch[idx].tail = &rdp->batch[idx].list;
+
+	rdp->done_batch.tail = &rdp->done_batch.list;
+}
+
+static void do_advance_callbacks(struct rcu_data *rdp, long done_complete)
+{
+	int idx;
+
+	while (rdp->done_complete != done_complete) {
+		rdp->done_complete++;
+		idx = RCURING_IDX(rdp->done_complete);
+		rcu_batch_merge(&rdp->done_batch, rdp->batch + idx);
+	}
+}
+
+/* Is value in the range (@left_open, @right_close] */
+static inline bool in_range(long left_open, long right_close, long value)
+{
+	return left_open - value < 0 && value - right_close <= 0;
+}
+
+static void advance_callbacks(struct rcu_data *rdp, long done_complete,
+		long curr_complete)
+{
+	if (in_range(done_complete, curr_complete, rdp->curr_complete)) {
+		do_advance_callbacks(rdp, done_complete);
+	} else {
+		do_advance_callbacks(rdp, rdp->curr_complete);
+		rdp->curr_complete = done_complete;
+		rdp->done_complete = done_complete;
+	}
+}
+
+static void __force_quiescent_state(struct rcu_domain *domain,
+		long fqs_complete)
+{
+	int cpu;
+	long fqs_complete_old;
+	long curr_complete = rcuring_get_curr_complete(&domain->rcuring);
+	long done_complete = rcuring_get_done_complete(&domain->rcuring);
+
+	spin_lock(&domain->lock);
+
+	fqs_complete_old = domain->fqs_complete;
+	if (!in_range(done_complete, curr_complete, fqs_complete_old))
+		fqs_complete_old = done_complete;
+
+	if (fqs_complete_old - fqs_complete >= 0) {
+		spin_unlock(&domain->lock);
+		return;
+	}
+
+	domain->fqs_complete = fqs_complete;
+	spin_unlock(&domain->lock);
+
+	for_each_online_cpu(cpu) {
+		struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
+		long locked_complete = ACCESS_ONCE(rdp->locked_complete);
+
+		if (!in_range(fqs_complete_old, fqs_complete, locked_complete))
+			continue;
+
+		if (cpu == smp_processor_id())
+			set_tsk_need_resched(current);
+		else
+			smp_send_reschedule(cpu);
+	}
+}
+
+static void force_quiescent_state(struct rcu_domain *domain, long
fqs_complete)
+{
+	domain->force_quiescent_state(domain, fqs_complete);
+}
+
+static
+long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data
*rdp)
+{
+	long curr_complete, done_complete;
+	struct rcuring *rr = &domain->rcuring;
+
+	smp_mb();
+	curr_complete = rcuring_get_curr_complete(rr);
+	done_complete = rcuring_get_done_complete(rr);
+
+	advance_callbacks(rdp, done_complete, curr_complete);
+	rdp->curr_complete = curr_complete;
+
+	if (!rdp->qlen) {
+		rdp->jiffies = jiffies;
+		rdp->stall_jiffies = jiffies;
+	}
+
+	return curr_complete;
+}
+
+static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head,
+		void (*func)(struct rcu_head *))
+{
+	struct rcu_data *rdp;
+	long curr_complete;
+
+	head->next = NULL;
+	head->func = func;
+
+	rdp = this_cpu_ptr(domain->rcu_data);
+	curr_complete = prepare_for_new_callback(domain, rdp);
+	rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head);
+	rdp->qlen++;
+
+	return curr_complete;
+}
+
+static void print_rcuring_stall(struct rcu_domain *domain)
+{
+	struct rcuring *rr = &domain->rcuring;
+	unsigned long curr_complete = rcuring_get_curr_complete(rr);
+	unsigned long done_complete = rcuring_get_done_complete(rr);
+	int cpu;
+
+	printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n",
smp_processor_id(),
+			domain->domain_name);
+
+	printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n",
+			curr_complete, done_complete);
+	printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n",
+			rcuring_ctr_read(rr, curr_complete),
+			rcuring_ctr_read(rr, done_complete + 1));
+
+	for_each_online_cpu(cpu) {
+		struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
+		printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)="
+				"%ld/%ld/%ld\n", cpu, rdp->qlen,
+				rdp->locked_complete, rdp->done_complete,
+				rdp->curr_complete);
+	}
+}
+
+static void __rcu_check_callbacks(struct rcu_domain *domain,
+		struct rcu_data *rdp, int in_rcu_softirq)
+{
+	long curr_complete, done_complete;
+	struct rcuring *rr = &domain->rcuring;
+
+	if (!rdp->qlen)
+		return;
+
+	rcuring_advance_done_complete(rr);
+
+	curr_complete = rcuring_get_curr_complete(rr);
+	done_complete = ACCESS_ONCE(rr->done_complete);
+	advance_callbacks(rdp, done_complete, curr_complete);
+
+	if (rdp->curr_complete == curr_complete
+			&& rdp->batch[RCURING_IDX(rdp->curr_complete)].list) {
+		long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain);
+
+		if (curr_complete - done_complete <= max_gp_allow)
+			rcuring_advance_complete(rr, curr_complete + 1);
+	}
+
+	if (rdp->done_batch.list) {
+		if (!in_rcu_softirq)
+			raise_softirq(RCU_SOFTIRQ);
+	} else {
+		if (jiffies - rdp->jiffies > 10) {
+			force_quiescent_state(domain, rdp->curr_complete);
+			rdp->jiffies = jiffies;
+		}
+		if (jiffies - rdp->stall_jiffies > 2 * HZ) {
+			print_rcuring_stall(domain);
+			rdp->stall_jiffies = jiffies;
+		}
+	}
+}
+
+static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp)
+{
+	int count = 0;
+	struct rcu_head *list, *next;
+
+	list = rdp->done_batch.list;
+
+	if (list) {
+		rdp->done_batch.list = NULL;
+		rdp->done_batch.tail = &rdp->done_batch.list;
+
+		local_irq_enable();
+
+		smp_mb();
+
+		while (list) {
+			next = list->next;
+			prefetch(next);
+			list->func(list);
+			count++;
+			list = next;
+		}
+		local_irq_disable();
+
+		rdp->qlen -= count;
+		rdp->jiffies = jiffies;
+	}
+}
+
+static int __rcu_needs_cpu(struct rcu_data *rdp)
+{
+	return !!rdp->qlen;
+}
+
+static inline
+void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp)
+{
+	rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
+			rdp->locked_complete);
+}
+
+static inline void rcu_preempt_save_complete(struct rcu_task_preempt
*rcu_task,
+		long locked_complete)
+{
+	unsigned int new_flags;
+
+	new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete);
+	ACCESS_ONCE(rcu_task->flags) = new_flags;
+}
+
+/*
+ * Every cpu hold a lock_complete. But for preempt rcu, any task may also
+ * hold a lock_complete. This function increases lock_complete for the
current
+ * cpu and check(dup_lock_and_save or release or advance) the
lock_complete of
+ * the current task.
+ */
+static inline
+void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data
*rdp,
+		struct rcu_task_preempt *rcu_task)
+{
+	long locked_complete = rdp->locked_complete;
+	unsigned int flags = ACCESS_ONCE(rcu_task->flags);
+
+	if (!(flags & RCURING_PREEMPT_SAVED)) {
+		BUG_ON(flags & RCURING_PREEMPT_QS);
+		if (rcu_task->nesting) {
+			/* dup_lock_and_save current task's lock_complete */
+			rcuring_dup_lock(&domain->rcuring, locked_complete);
+			rcu_preempt_save_complete(rcu_task, locked_complete);
+		}
+	} else if (!(rcu_task->nesting)) {
+		/* release current task's lock_complete */
+		rcuring_unlock(&domain->rcuring, (long)flags);
+		ACCESS_ONCE(rcu_task->flags) = 0;
+	} else if (!(flags & RCURING_PREEMPT_QS)) {
+		/* advance_and_save current task's lock_complete */
+		if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) {
+			rcuring_unlock(&domain->rcuring, (long)flags);
+			rcuring_dup_lock(&domain->rcuring, locked_complete);
+		}
+		rcu_preempt_save_complete(rcu_task, locked_complete);
+	}
+
+	/* increases lock_complete for the current cpu */
+	rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
+			locked_complete);
+}
+
+static void __synchronize_rcu(struct rcu_domain *domain, int expedited)
+{
+	struct rcu_synchronize rcu;
+	long complete;
+
+	init_completion(&rcu.completion);
+
+	local_irq_disable();
+	complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu);
+
+	if (expedited) {
+		/*
+		 * Fore a new gp to be started immediately(can use reserved gp).
+		 * But if all gp are used, it will fail to start new gp,
+		 * and we have to wait until some new gps available.
+		 */
+		rcuring_advance_complete_force(&domain->rcuring,
+				complete + 1);
+
+		/* Fore qs and expedite the new gp */
+		force_quiescent_state(domain, complete);
+	}
+	local_irq_enable();
+
+	wait_for_completion(&rcu.completion);
+}
+
+static inline long __rcu_batches_completed(struct rcu_domain *domain)
+{
+	return rcuring_get_done_complete(&domain->rcuring);
+}
+
+#ifdef CONFIG_RCURING_BH
+#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args)
+#else
+#define gen_code_for_bh(gen_code, args...)
+#endif
+#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args)
+#ifdef CONFIG_RCURING_PREEMPT
+#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args)
+#else
+#define gen_code_for_preempt(gen_code, args...)
+#endif
+
+#define GEN_CODES(gen_code, args...)			\
+	gen_code_for_bh(gen_code, ##args)		\
+	gen_code_for_sched(gen_code, ##args)		\
+	gen_code_for_preempt(gen_code, ##args)		\
+
+#define gen_basic_code(domain)						\
+									\
+static void force_quiescent_state_##domain(struct rcu_domain *domain,	\
+		long fqs_complete);					\
+									\
+static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain);		\
+									\
+struct rcu_domain rcu_domain_##domain =					\
+{									\
+	.rcuring = RCURING_INIT(rcu_domain_##domain.rcuring),		\
+	.domain_name = #domain,						\
+	.rcu_data = &rcu_data_##domain,					\
+	.lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock),		\
+	.force_quiescent_state = force_quiescent_state_##domain,	\
+};									\
+									\
+void call_rcu_##domain(struct rcu_head *head,				\
+		void (*func)(struct rcu_head *))			\
+{									\
+	unsigned long flags;						\
+									\
+	local_irq_save(flags);						\
+	__call_rcu(&rcu_domain_##domain, head, func);			\
+	local_irq_restore(flags);					\
+}									\
+EXPORT_SYMBOL_GPL(call_rcu_##domain);					\
+									\
+void synchronize_rcu_##domain(void)					\
+{									\
+	__synchronize_rcu(&rcu_domain_##domain, 0);			\
+}									\
+EXPORT_SYMBOL_GPL(synchronize_rcu_##domain);				\
+									\
+void synchronize_rcu_##domain##_expedited(void)				\
+{									\
+	__synchronize_rcu(&rcu_domain_##domain, 1);			\
+}									\
+EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited);		\
+									\
+long rcu_batches_completed_##domain(void)				\
+{									\
+	return __rcu_batches_completed(&rcu_domain_##domain);		\
+}									\
+EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain);			\
+									\
+void rcu_##domain##_force_quiescent_state(void)				\
+{									\
+	force_quiescent_state(&rcu_domain_##domain, 0);			\
+}									\
+EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state);
+
+GEN_CODES(gen_basic_code)
+
+static DEFINE_PER_CPU(int, kernel_count);
+
+#define LOCK_COMPLETE(domain)						\
+	long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring);
+#define SAVE_COMPLETE(domain)						\
+	per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain;
+#define LOAD_COMPLETE(domain)						\
+	int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete;
+#define RELEASE_COMPLETE(domain)					\
+	rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain);
+
+static void __rcu_kernel_enter_outmost(int cpu)
+{
+	GEN_CODES(LOCK_COMPLETE)
+
+	barrier();
+	per_cpu(kernel_count, cpu) = 1;
+	barrier();
+
+	GEN_CODES(SAVE_COMPLETE)
+}
+
+static void __rcu_kernel_exit_outmost(int cpu)
+{
+	GEN_CODES(LOAD_COMPLETE)
+
+	barrier();
+	per_cpu(kernel_count, cpu) = 0;
+	barrier();
+
+	GEN_CODES(RELEASE_COMPLETE)
+}
+
+#ifdef CONFIG_RCURING_BH
+static void force_quiescent_state_bh(struct rcu_domain *domain,
+		long fqs_complete)
+{
+	__force_quiescent_state(domain, fqs_complete);
+}
+
+static inline void rcu_bh_qsctr_inc_irqoff(int cpu)
+{
+	__rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu));
+}
+
+void rcu_bh_qs(int cpu)
+{
+	unsigned long flags;
+
+	local_irq_save(flags);
+	rcu_bh_qsctr_inc_irqoff(cpu);
+	local_irq_restore(flags);
+}
+
+#else /* CONFIG_RCURING_BH */
+static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); }
+#endif /* CONFIG_RCURING_BH */
+
+static void force_quiescent_state_sched(struct rcu_domain *domain,
+		long fqs_complete)
+{
+	__force_quiescent_state(domain, fqs_complete);
+}
+
+static inline void rcu_sched_qsctr_inc_irqoff(int cpu)
+{
+	__rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu));
+}
+
+#ifdef CONFIG_RCURING_PREEMPT
+static void force_quiescent_state_preempt(struct rcu_domain *domain,
+		long fqs_complete)
+{
+	/* To Be Implemented */
+}
+
+static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
+{
+	__rcu_preempt_qsctr_inc(&rcu_domain_preempt,
+			&per_cpu(rcu_data_preempt, cpu),
+			¤t->rcu_task_preempt);
+}
+
+#else /* CONFIG_RCURING_PREEMPT */
+static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
+{
+	(void)(cpu);
+	(void)(__rcu_preempt_qsctr_inc);
+}
+#endif /* CONFIG_RCURING_PREEMPT */
+
+#define gen_needs_cpu(domain, cpu)					\
+	if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu)))		\
+		return 1;
+int rcu_needs_cpu(int cpu)
+{
+	GEN_CODES(gen_needs_cpu, cpu)
+	return 0;
+}
+
+#define call_func(domain, func, args...)				\
+	func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args);
+
+/*
+ * Note a context switch. This is a quiescent state for RCU-sched,
+ * and requires special handling for preemptible RCU.
+ */
+void rcu_note_context_switch(int cpu)
+{
+	unsigned long flags;
+
+#ifdef CONFIG_HOTPLUG_CPU
+	/*
+	 * The stoper thread need to schedule() to the idle thread
+	 * after CPU_DYING.
+	 */
+	if (unlikely(!per_cpu(kernel_count, cpu))) {
+		BUG_ON(cpu_online(cpu));
+		return;
+	}
+#endif
+
+	local_irq_save(flags);
+	rcu_bh_qsctr_inc_irqoff(cpu);
+	rcu_sched_qsctr_inc_irqoff(cpu);
+	rcu_preempt_qsctr_inc_irqoff(cpu);
+	local_irq_restore(flags);
+}
+
+static int rcu_cpu_idle(int cpu)
+{
+	return idle_cpu(cpu) && rcu_scheduler_active &&
+		!in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT);
+}
+
+void rcu_check_callbacks(int cpu, int user)
+{
+	unsigned long flags;
+
+	local_irq_save(flags);
+
+	if (user || rcu_cpu_idle(cpu)) {
+		rcu_bh_qsctr_inc_irqoff(cpu);
+		rcu_sched_qsctr_inc_irqoff(cpu);
+		rcu_preempt_qsctr_inc_irqoff(cpu);
+	} else if (!in_softirq())
+		rcu_bh_qsctr_inc_irqoff(cpu);
+
+	GEN_CODES(call_func, __rcu_check_callbacks, 0)
+	local_irq_restore(flags);
+}
+
+static void rcu_process_callbacks(struct softirq_action *unused)
+{
+	local_irq_disable();
+	GEN_CODES(call_func, __rcu_check_callbacks, 1)
+	GEN_CODES(call_func, rcu_do_batch)
+	local_irq_enable();
+}
+
+static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain,
+		struct rcu_data *off_rdp, struct rcu_data *rdp)
+{
+	if (off_rdp->qlen > 0) {
+		unsigned long flags;
+		long curr_complete;
+
+		/* move all callbacks in @off_rdp to @off_rdp->done_batch */
+		do_advance_callbacks(off_rdp, off_rdp->curr_complete);
+
+		/* move all callbacks in @off_rdp to this cpu(@rdp) */
+		local_irq_save(flags);
+		curr_complete = prepare_for_new_callback(domain, rdp);
+		rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete),
+				&off_rdp->done_batch);
+		rdp->qlen += off_rdp->qlen;
+		local_irq_restore(flags);
+
+		off_rdp->qlen = 0;
+	}
+}
+
+#define gen_offline(domain, cpu, recieve_cpu)				\
+	__rcu_offline_cpu(&rcu_domain_##domain,				\
+			&per_cpu(rcu_data_##domain, cpu),		\
+			&per_cpu(rcu_data_##domain, recieve_cpu));
+
+static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
+				unsigned long action, void *hcpu)
+{
+	int cpu = (long)hcpu;
+	int recieve_cpu;
+
+	(void)(recieve_cpu);
+	(void)(__rcu_offline_cpu);
+	(void)(__rcu_kernel_exit_outmost);
+
+	switch (action) {
+#ifdef CONFIG_HOTPLUG_CPU
+	case CPU_DYING:
+	case CPU_DYING_FROZEN:
+		/*
+		 * the machine is stopped now, we can access to
+		 * any other cpu data.
+		 * */
+		recieve_cpu = cpumask_any_but(cpu_online_mask, cpu);
+		GEN_CODES(gen_offline, cpu, recieve_cpu)
+		__rcu_kernel_exit_outmost(cpu);
+		break;
+#endif
+	case CPU_STARTING:
+	case CPU_STARTING_FROZEN:
+		__rcu_kernel_enter_outmost(cpu);
+	default:
+		break;
+	}
+	return NOTIFY_OK;
+}
+
+#define rcu_init_domain_data(domain, cpu)			\
+	for_each_possible_cpu(cpu)				\
+		rcu_data_init(&per_cpu(rcu_data_##domain, cpu));
+
+void __init rcu_init(void)
+{
+	int cpu;
+
+	GEN_CODES(rcu_init_domain_data, cpu)
+
+	__rcu_kernel_enter_outmost(raw_smp_processor_id());
+	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
+	cpu_notifier(rcu_cpu_notify, 0);
+}
+
+int rcu_scheduler_active __read_mostly;
+EXPORT_SYMBOL_GPL(rcu_scheduler_active);
+
+/*
+ * This function is invoked towards the end of the scheduler's
initialization
+ * process. Before this is called, the idle task might contain
+ * RCU read-side critical sections (during which time, this idle
+ * task is booting the system). After this function is called, the
+ * idle tasks are prohibited from containing RCU read-side critical
+ * sections. This function also enables RCU lockdep checking.
+ */
+void rcu_scheduler_starting(void)
+{
+	rcu_scheduler_active = 1;
+}
+
+#ifdef CONFIG_NO_HZ
+
+void rcu_kernel_enter(void)
+{
+	unsigned long flags;
+
+	raw_local_irq_save(flags);
+	if (__get_cpu_var(kernel_count) == 0)
+		__rcu_kernel_enter_outmost(raw_smp_processor_id());
+	else
+		__get_cpu_var(kernel_count)++;
+	raw_local_irq_restore(flags);
+}
+
+void rcu_kernel_exit(void)
+{
+	unsigned long flags;
+
+	raw_local_irq_save(flags);
+	if (__get_cpu_var(kernel_count) == 1)
+		__rcu_kernel_exit_outmost(raw_smp_processor_id());
+	else
+		__get_cpu_var(kernel_count)--;
+	raw_local_irq_restore(flags);
+}
+
+void rcu_enter_nohz(void)
+{
+	unsigned long flags;
+
+	raw_local_irq_save(flags);
+	__rcu_kernel_exit_outmost(raw_smp_processor_id());
+	raw_local_irq_restore(flags);
+}
+
+void rcu_exit_nohz(void)
+{
+	unsigned long flags;
+
+	raw_local_irq_save(flags);
+	__rcu_kernel_enter_outmost(raw_smp_processor_id());
+	raw_local_irq_restore(flags);
+}
+#endif /* CONFIG_NO_HZ */
+
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
+static struct completion rcu_barrier_completion;
+static struct kref rcu_barrier_wait_ref;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+
+static void rcu_barrier_release_ref(struct kref *notused)
+{
+	complete(&rcu_barrier_completion);
+}
+
+static void rcu_barrier_callback(struct rcu_head *notused)
+{
+	kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
+}
+
+static void rcu_barrier_func(void *data)
+{
+	struct rcu_domain *rcu_domain = data;
+
+	kref_get(&rcu_barrier_wait_ref);
+	__call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head),
+			rcu_barrier_callback);
+}
+
+static void __rcu_barrier(struct rcu_domain *rcu_domain)
+{
+	mutex_lock(&rcu_barrier_mutex);
+
+	init_completion(&rcu_barrier_completion);
+	kref_init(&rcu_barrier_wait_ref);
+
+	/* queue barrier rcu_heads for every cpu */
+	on_each_cpu(rcu_barrier_func, rcu_domain, 1);
+
+	kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
+	wait_for_completion(&rcu_barrier_completion);
+
+	mutex_unlock(&rcu_barrier_mutex);
+}
+
+#define gen_rcu_barrier(domain)						\
+void rcu_barrier_##domain(void)						\
+{									\
+	__rcu_barrier(&rcu_domain_##domain);				\
+}									\
+EXPORT_SYMBOL_GPL(rcu_barrier_##domain);
+
+GEN_CODES(gen_rcu_barrier)
+
diff --git a/kernel/sched.c b/kernel/sched.c
index 09b574e..967f25a 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = {
 };
 #endif	/* CONFIG_CGROUP_CPUACCT */
 
+#ifndef CONFIG_RCURING
 #ifndef CONFIG_SMP
 
 void synchronize_sched_expedited(void)
@@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void)
 EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
 
 #endif /* #else #ifndef CONFIG_SMP */
+#endif /* CONFIG_RCURING */
diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c
index 3e216e0..0eba561 100644
--- a/kernel/time/tick-sched.c
+++ b/kernel/time/tick-sched.c
@@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle)
 	cpu = smp_processor_id();
 	ts = &per_cpu(tick_cpu_sched, cpu);
 
+	/* Don't enter nohz when cpu is offlining, it is going to die */
+	if (cpu_is_offline(cpu))
+		goto end;
+
 	/*
 	 * Call to tick_nohz_start_idle stops the last_update_time from being
 	 * updated. Thus, it must not be called in the event we are called from
 
CD: 2ms