简介
  RCU(Read-Copy Update)是数据同步的一种方式,在当前的Linux内核中发挥着重要的作用。RCU主要针对的数据对象是链表,目的是提高遍历读取数据的效率,为了达到目的使用RCU机制读取数据的时候不对链表进行耗时的加锁操作。这样在同一时间可以有多个线程同时读取该链表,并且允许一个线程对链表进行修改(修改的时候,需要加锁)。RCU适用于需要频繁的读取数据,而相应修改数据并不多的情景,例如在文件系统中,经常需要查找定位目录,而对目录的修改相对来说并不多,这是RCU发挥作用的佳场景。
  Linux内核源码当中,关于RCU的文档比较齐全,你可以在 /Documentation/RCU/ 目录下找到这些文件。Paul E. McKenney 是内核中RCU源码的主要实现者,他也写了很多RCU方面的文章。他把这些文章和一些关于RCU的论文的链接整理到了一起。
  在RCU的实现过程中,我们主要解决以下问题:
  1,在读取过程中,另外一个线程删除了一个节点。删除线程可以把这个节点从链表中移除,但它不能直接销毁这个节点,必须等到所有的读取线程读取完成以后,才进行销毁操作。RCU中把这个过程称为宽限期(Grace period)。
  2,在读取过程中,另外一个线程插入了一个新节点,而读线程读到了这个节点,那么需要保证读到的这个节点是完整的。这里涉及到了发布-订阅机制(Publish-Subscribe Mechanism)。
  3, 保证读取链表的完整性。新增或者删除一个节点,不至于导致遍历一个链表从中间断开。但是RCU并不保证一定能读到新增的节点或者不读到要被删除的节点。
  宽限期
  通过例子,方便理解这个内容。以下例子修改于Paul的文章。
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_read (void)
{
foo *fp = gbl_foo;
if ( fp != NULL )
dosomething(fp->a, fp->b , fp->c );
}
void foo_update( foo* new_fp )
{
spin_lock(&foo_mutex);
foo *old_fp = gbl_foo;
gbl_foo = new_fp;
spin_unlock(&foo_mutex);
kfee(old_fp);
}
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_read (void)
{
foo *fp = gbl_foo;
if ( fp != NULL )
dosomething(fp->a, fp->b , fp->c );
}
void foo_update( foo* new_fp )
{
spin_lock(&foo_mutex);
foo *old_fp = gbl_foo;
gbl_foo = new_fp;
spin_unlock(&foo_mutex);
kfee(old_fp);
}
  如上的程序,是针对于全局变量gbl_foo的操作。假设以下场景。有两个线程同时运行 foo_ read和foo_update的时候,当foo_ read执行完赋值操作后,线程发生切换;此时另一个线程开始执行foo_update并执行完成。当foo_ read运行的进程切换回来后,运行dosomething 的时候,fp已经被删除,这将对系统造成危害。为了防止此类事件的发生,RCU里增加了一个新的概念叫宽限期(Grace period)。如下图所示:

  图中每行代表一个线程,下面的一行是删除线程,当它执行完删除操作后,线程进入了宽限期。宽限期的意义是,在一个删除动作发生后,它必须等待所有在宽限期开始前已经开始的读线程结束,才可以进行销毁操作。这样做的原因是这些线程有可能读到了要删除的元素。图中的宽限期必须等待1和2结束;而读线程5在宽限期开始前已经结束,不需要考虑;而3,4,6也不需要考虑,因为在宽限期结束后开始后的线程不可能读到已删除的元素。为此RCU机制提供了相应的API来实现这个功能。
void foo_read(void)
{
rcu_read_lock();
foo *fp = gbl_foo;
if ( fp != NULL )
dosomething(fp->a,fp->b,fp->c);
rcu_read_unlock();
}
void foo_update( foo* new_fp )
{
spin_lock(&foo_mutex);
foo *old_fp = gbl_foo;
gbl_foo = new_fp;
spin_unlock(&foo_mutex);
synchronize_rcu();
kfee(old_fp);
}
void foo_read(void)
{
rcu_read_lock();
foo *fp = gbl_foo;
if ( fp != NULL )
dosomething(fp->a,fp->b,fp->c);
rcu_read_unlock();
}
void foo_update( foo* new_fp )
{
spin_lock(&foo_mutex);
foo *old_fp = gbl_foo;
gbl_foo = new_fp;
spin_unlock(&foo_mutex);
synchronize_rcu();
kfee(old_fp);
}
  其中foo_read中增加了rcu_read_lock和rcu_read_unlock,这两个函数用来标记一个RCU读过程的开始和结束。其实作用是帮助检测宽限期是否结束。foo_update增加了一个函数synchronize_rcu(),调用该函数意味着一个宽限期的开始,而直到宽限期结束,该函数才会返回。我们再对比着图看一看,线程1和2,在synchronize_rcu之前可能得到了旧的gbl_foo,也是foo_update中的old_fp,如果不等它们运行结束,调用kfee(old_fp),极有可能造成系统崩溃。而3,4,6在synchronize_rcu之后运行,此时它们已经不可能得到old_fp,此次的kfee将不对它们产生影响。
  宽限期是RCU实现中复杂的部分,原因是在提高读数据性能的同时,删除数据的性能也不能太差。
  订阅——发布机制
  当前使用的编译器大多会对代码做一定程度的优化,CPU也会对执行指令做一些优化调整,目的是提高代码的执行效率,但这样的优化,有时候会带来不期望的结果。如例:
void foo_update( foo* new_fp )
{
spin_lock(&foo_mutex);
foo *old_fp = gbl_foo;
new_fp->a = 1;
new_fp->b = ‘b’;
new_fp->c = 100;
gbl_foo = new_fp;
spin_unlock(&foo_mutex);
synchronize_rcu();
kfee(old_fp);
}
void foo_update( foo* new_fp )
{
spin_lock(&foo_mutex);
foo *old_fp = gbl_foo;
new_fp->a = 1;
new_fp->b = ‘b’;
new_fp->c = 100;
gbl_foo = new_fp;
spin_unlock(&foo_mutex);
synchronize_rcu();
kfee(old_fp);
}
  这段代码中,我们期望的是6,7,8行的代码在第10行代码之前执行。但优化后的代码并不对执行顺序做出保证。在这种情形下,一个读线程很可能读到 new_fp,但new_fp的成员赋值还没执行完成。当读线程执行dosomething(fp->a, fp->b , fp->c ) 的 时候,有不确定的参数传入到dosomething,极有可能造成不期望的结果,甚至程序崩溃。可以通过优化屏障来解决该问题,RCU机制对优化屏障做了包装,提供了专用的API来解决该问题。这时候,第十行不再是直接的指针赋值,而应该改为 :
#define rcu_assign_pointer(p, v)
__rcu_assign_pointer((p), (v), __rcu)
#define __rcu_assign_pointer(p, v, space)
do {
smp_wmb();
(p) = (typeof(*v) __force space *)(v);
} while (0)
#define rcu_assign_pointer(p, v)
__rcu_assign_pointer((p), (v), __rcu)
#define __rcu_assign_pointer(p, v, space)
do {
smp_wmb();
(p) = (typeof(*v) __force space *)(v);
} while (0)
  我们可以看到它的实现只是在赋值之前加了优化屏障 smp_wmb来确保代码的执行顺序。另外是宏中用到的__rcu,只是作为编译过程的检测条件来使用的。
  在DEC Alpha CPU机器上还有一种更强悍的优化,如下所示:
void foo_read(void)
{
rcu_read_lock();
foo *fp = gbl_foo;
if ( fp != NULL )
dosomething(fp->a, fp->b ,fp->c);
rcu_read_unlock();
}
void foo_read(void)
{
rcu_read_lock();
foo *fp = gbl_foo;
if ( fp != NULL )
dosomething(fp->a, fp->b ,fp->c);
rcu_read_unlock();
}
  第六行的 fp->a,fp->b,fp->c会在第3行还没执行的时候预先判断运行,当他和foo_update同时运行的时候,可能导致传入dosomething的一部分属于旧的gbl_foo,而另外的属于新的。这样导致运行结果的错误。为了避免该类问题,RCU还是提供了宏来解决该问题:
#define rcu_dereference(p) rcu_dereference_check(p, 0)
#define rcu_dereference_check(p, c)
__rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu)
#define __rcu_dereference_check(p, c, space)
({
typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p);
rcu_lockdep_assert(c, "suspicious rcu_dereference_check()"
" usage");
rcu_dereference_sparse(p, space);
smp_read_barrier_depends();
((typeof(*p) __force __kernel *)(_________p1));
})
static inline int rcu_read_lock_held(void)
{
if (!debug_lockdep_rcu_enabled())
return 1;
if (rcu_is_cpu_idle())
return 0;
if (!rcu_lockdep_current_cpu_online())
return 0;
return lock_is_held(&rcu_lock_map);
}
#define rcu_dereference(p) rcu_dereference_check(p, 0)
#define rcu_dereference_check(p, c)
__rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu)
#define __rcu_dereference_check(p, c, space)
({
typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p);
rcu_lockdep_assert(c, "suspicious rcu_dereference_check()"
" usage");
rcu_dereference_sparse(p, space);
smp_read_barrier_depends();
((typeof(*p) __force __kernel *)(_________p1));
})
static inline int rcu_read_lock_held(void)
{
if (!debug_lockdep_rcu_enabled())
return 1;
if (rcu_is_cpu_idle())
return 0;
if (!rcu_lockdep_current_cpu_online())
return 0;
return lock_is_held(&rcu_lock_map);
}