GeekNote
注册
登录

Ruby 线程基础

本文是学习笔记,学习过程中主要阅读和参考了以下资料,记录的代码片断也来自以下链接。部分代码稍作了修改。最后那个链接虽然有些标题党,但是内容很值得一看:


如果你想要靠并发提高性能,就得开启更多的进程,Ruby 社区多年以来一直是这么做的。

但是线程的内存开销比进程小得多,因为同一进程内的线程可以共享内存,而进程之间并不共享内存。而使用线程代价是:你的程序必须是线程安全的。

我们的代码始终运行在线程中。在没有主动创建线程的情况下,代码运行在主线程中。

主线程与其它线程的一个主要区别是,当主线程退出时,其它线程会立即退出,并且当前 Ruby 进程也会退出。

Thread.main 用于获取主线程。Thread.current 用于获取当前线程。

线程的执行

多线程中最重要的概念就是共享地址空间,这是多线程的优点,同时也是多线程编程困难的原因。

1.8 及之前版本的 Ruby, 一直使用的是“绿色线程”,即 Ruby 虚拟机自己管理的线程。Ruby 1.9 之后,一个 Ruby 线程直接映射到一个操作系统线程(当你在 Ruby 进程里创建线程时,可以用 top 命令看到它),由操作系统调度。

为公平起见,线程调度器会在任意时刻挂起当前线程,切换到另一个线程执行程序。之后在某个时间,调度器又会切回到之前的线程继续执行。这个动作叫做“上下文切换”(context switching)。

下面的代码演示了“上下文切换”可能引起的问题。其中的 ||= 不是原子性操作,因此不是线程安全的。所谓“原子操作”,是指在它执行完成之前不会被上下文切换中断的操作。

@results ||= Queue.new
@results << status

以上代码大约等价于:

if @results.nil?
  temp = Queue.new
  @results = temp
end
@results << status

因此 ||= 有可能造成在一个线程执行完 @results.nil? 的判断之后挂起,切换到另一个线程。另一个线程执行完整个赋值步骤并往 @results 里塞数据之后再切换回来,这样便造成给 @results 二次赋值。第一次赋值后塞进的数据被丢掉。这种情况下,我们说这里产生了“竞态条件” (race condition).

要解决 ||= 引起的线程安全问题,就应该在初始化公共资源时避免使用它,改成在构造方法里或者用其它方式初始化该变量。

线程的生命周期

Thread.new 用于创建一个线程,block 参数内的代码块即为新线程里运行的代码。

join 方法的调用会让当前线程等待被调用 join 方法的那个线程实例执行完毕,之后接着执行当前线程。

两个不同的线程当中,任意一个线程抛出异常,都会中止自身线程,但都不影响另一个线程继续执行。但是如果在一个线程里调用了另一个线程的 join 方法,则前者会受后者未处理异常的影响。以下两段代码的执行结果会有所不同:

# 代码片断 1
3.times do
  Thread.new { raise 'error' }
end
sleep 1
puts 'end'

# 代码片断 2
3.times.map do
  Thread.new { raise 'error' }
end.each(&:join)
sleep 1
puts 'end'

value 方法与 join 方法类似,但它会返回线程的执行结果。

thread = Thread.new do
  400 + 5
end
puts thread.value #=> 405

status 方法可以获取线程的状态。Ruby 为线程定义了如下几种状态:

  • run: 表示线程正在运行;
  • sleep: 表示线程正在睡眠、被 mutex 阻塞或者在等待 IO.
  • false: 表示线程执行完毕,或者被成功地杀掉;
  • nil: 表示线程内部抛出了异常并且未被捕获和处理;
  • aborting: 线程正在退出
adder = Thread.new do
  # Here this thread checks its own status.
  Thread.current.status #=> 'run'
  2 * 3
end

puts adder.status #=> 'run'
adder.join
puts adder.status #=> false

Thread.stop 方法可以让线程睡眠,之后线程调度器会去调度其它线程。wakeup 方法可以将其唤醒。注意前者是类方法。

thread = Thread.new do
  Thread.stop
  puts 'Hello there'
end

# wait for the thread to trigger its stop
begin
  puts thread.status
end until thread.status == 'sleep'

thread.wakeup
thread.join

Thread.pass 方法直接让调度器去调度其它线程,但当前线程自身并不睡眠——也因此调度器并不一定会照字面上的意图去调度其它线程。

raise, kill 方法:不要使用。

并发 != 并行

两个线程同时分别开始干两件事情,这叫做并发。但在单核 CPU 的环境下,两个并发线程的代码是交替执行的,并不算并行;而在多核心 CPU 的环境下,两个并发线程的代码有可能真正意义上的“同时”进行,这才叫并行。但这也只是有可能,具体是不是并行,还要看线程调度器如何调度。因此,并发的代码是否能够并行,这不是程序员的代码能够控制的。

GIL 和 MRI

因为 GIL 的存在,MRI 允许 Ruby 代码的执行并发,但不允许并行。JRuby 和 Rubinius 没有 GIL,所以允许代码并行执行。

GIL (Globel Interpreter Lock) 有时又被叫做 GVL (Global VM Lock), 是一个全局锁——每个 MRI Ruby 进程有且只有一个 GIL. 也就是说,进程中的所有线程共享一个 GIL. 只有正在执行的那个线程能够获得这个锁,其它线程则会等待,直到轮到它们获取 GIL. 因此,MRI Ruby 里不存在并行。

require 'digest/md5'

3.times.map do
  Thread.new do
    Digest::MD5.hexdigest(rand)
  end
end.each(&:value)

以上代码创建了 3 个线程用来生成一个随机数的 MD5 值。因为 3 个线程都想要执行,所以他们都会去尝试获取 GIL. GIL 是用互斥锁 ( mutex ) 实现的。操作系统会保证在任意时刻只有一个线程持有互斥锁,其余线程则进入睡眠状态,直到互斥锁被释放,再次变得可用。

获得 GIL 的线程(这里我们称之为线程 A)此时开始执行代码,具体执行多久由 MRI 内部实现。一段时间之后,该线程释放 GIL, 线程调度器唤醒另外两个线程,开始争夺 GIL. 系统内核会决定谁将获得 GIL. 这时,线程 A 和另一个未获取到 GIL 的线程进入睡眠状态。

特殊情况:阻塞 IO

GIL 能够阻止 Ruby 代码并行执行,但是阻塞 IO 执行的不是 Ruby 代码:

require 'open-uri'

3.times.map do
  Thread.new do
    open('http://zombo.com')
  end
end.each(&:value)

这段代码中创建了 3 个线程发出 HTTP 请求,MRI 在遇到阻塞 IO 时不会让线程继续占用 GIL, 这样其它线程就可以获得 GIL 以继续执行 Ruby 代码,于是就有第二个、第三个线程跟着执行到此处,同时等待着 HTTP 的响应。

一些误解

GIL 的存在并不能保证线程安全。比如有下面一段代码:

@counter = 0

5.times.map do
  Thread.new do
    temp = @counter
    temp = temp + 1

    @counter = temp
  end
end.each(&:join)

puts @counter

在 MRI 环境中,几乎不会出现什么错误,但是如果在线程里插入一句 puts, 情况就不一样了。如上面所说,阻塞 IO 会让线程释放 GIL, 这样就可能出问题:

@counter = 0

5.times.map do
  Thread.new do
    temp = @counter
    temp = temp + 1
    puts 'message'
    @counter = temp
  end
end.each(&:join)

puts @counter

多少线程才算太多?

1. 操作系统的限制

1.upto(10_000) do |i|
  Thread.new { sleep }
  puts i
end

上面这段代码,在 OS X 系统当中,只能输出到 2000 左右,随后便会抛出 ThreadError 异常中断执行。因为 OS X 对单个进程的最大线程数作出了限制。但在 Linux 当中,这段代码可以完整地执行完。

2. IO 密集型的代码

网络请求、日志输出等任务是属于 IO 密集型的任务,在处理这类任务时,提升网络速度、磁盘读写速度可以很大地提高整体性能。如果你的代码是 IO 密集型的,线程越多运行得可能越快,而且往往线程数多于 CPU 核心数是有作用的。但也不是越多越好。比如下面这段代码:

require 'benchmark'
require 'net/http'

URL = URI('http://www.baidu.com/')
ITERATIONS = 30

def fetch_url(thread_count)
  threads = []

  thread_count.times do
    threads << Thread.new do
      fetches_per_thread = ITERATIONS / thread_count

      fetches_per_thread.times do
        Net::HTTP.get(URL)
      rescue => e
        retry
      end
    end
  end

  threads.each(&:join)
end

Benchmark.bm(20) do |bm|
  [1, 2, 3, 5, 6, 10, 15, 30].each do |thread_count|
    bm.report("with #{thread_count} threads") do
      fetch_url(thread_count)
    end
  end
end

在我的电脑上运行结果如下:

                           user     system      total        real
with 1 threads         6.610000   0.160000   6.770000 (  4.259840)
with 2 threads         3.630000   0.060000   3.690000 (  2.390968)
with 3 threads         1.000000   0.040000   1.040000 (  1.494768)
with 5 threads         1.190000   0.060000   1.250000 (  1.392179)
with 6 threads         0.790000   0.030000   0.820000 (  0.989476)
with 10 threads        0.830000   0.030000   0.860000 (  0.673836)
with 15 threads        0.940000   0.020000   0.960000 (  0.654832)
with 30 threads        0.780000   0.090000   0.870000 (  0.653245)

基本上 10 个线程就已经达到最佳性能。再往上加线程数就不划算了,因为线程的创建也有开销——虽然比进程开销小。因此,到底多少线程合适,还得视具体情况而定。如果网络延迟较高,可能会需要更多的线程达到最佳性能。比如我把上面代码中的网址换成一个俄罗斯的网址,执行结果就变成:

                           user     system      total        real
with 1 threads        17.030000   0.340000  17.370000 ( 22.337289)
with 2 threads         5.110000   0.140000   5.250000 ( 10.433548)
with 3 threads         2.380000   0.070000   2.450000 (  6.352316)
with 5 threads         1.860000   0.090000   1.950000 (  4.780961)
with 6 threads         2.030000   0.070000   2.100000 (  3.578058)
with 10 threads        1.890000   0.030000   1.920000 (  2.331823)
with 15 threads        1.530000   0.060000   1.590000 (  1.875414)
with 30 threads        1.470000   0.050000   1.520000 (  1.350604)

另外,这段代码无论是用 MRI Ruby 还是 JRuby, 运行结果都差不多,也说明了阻塞 IO 会让线程释放 GIL.

3. CPU 密集型的代码

压缩/解压、加密、计算散列值以及一些非常复杂的数学计算之类的任务属于 CPU 密集型任务。在处理这类任务时,CPU 频率的提升能够提高整体性能。线程数不超过核心数时,线程越多运行越快。线程数超过核心数之后,性能不再提升,反而可能因为线程切换的开销有所下降。

require 'benchmark'

DIGITS = 30
ITERATIONS = 24

def fibonacci(n)
  return  n  if (0..1).include? n
  (fibonacci(n - 1) + fibonacci(n - 2))
end

def calculate_fib(thread_count)
  threads = []

  thread_count.times do
    threads << Thread.new do
      iterations_per_thread = ITERATIONS / thread_count

      iterations_per_thread.times do
        fibonacci(DIGITS)
      end
    end
  end

  threads.each(&:join)
end

Benchmark.bm(20) do |bm|
  [1, 2, 3, 4, 6, 8, 12, 24].each do |thread_count|
    bm.report("with #{thread_count} threads") do
      calculate_fib(thread_count)
    end
  end
end

这段代码在我的四核 CPU 上执行的结果如下:

                           user     system      total        real
with 1 threads         5.920000   0.050000   5.970000 (  5.102896)
with 2 threads         4.980000   0.010000   4.990000 (  2.444580)
with 3 threads         4.880000   0.000000   4.880000 (  1.626186)
with 4 threads         5.380000   0.000000   5.380000 (  1.355286)
with 6 threads         7.070000   0.000000   7.070000 (  1.336971)
with 8 threads        10.130000   0.030000  10.160000 (  1.317468)
with 12 threads        9.990000   0.010000  10.000000 (  1.319313)
with 24 threads       10.400000   0.020000  10.420000 (  1.373466)

看起来线程数与核心数相同时就接近最佳性能了。

在真实系统当中,很少有纯粹的 IO 密集或者 CPU 密集型程序。拿常见的 web 服务来举例,当它在接收客户端请求、返回响应内容、读写数据库时,做的就是 IO 密集型的事情;当它在解析数据、进行数学计算、渲染 HTML, JSON 的时候,做的就是 CPU 密集型的工作。所以,到底多少线程是合适的,还是需要具体地测试才能得出结论。

用 Mutex 保护数据

Ruby 标准库当中的 Array 不是线程安全的:

shared_array = Array.new
mutex = Mutex.new

10.times.map do
  Thread.new do
    1000.times do
      shared_array << nil
    end
  end
end.each(&:join)
puts shared_array.size

以上代码将会产生类似以下输出:

$ ruby code/snippets/concurrent_array_pushing.rb
10000
$ jruby code/snippets/concurrent_array_pushing.rb
7521
$ rbx code/snippets/concurrent_array_pushing.rb
8541
$ _

这里可以使用 Mutex (互斥锁)来保证正确的结果:

shared_array = Array.new
mutex = Mutex.new

10.times.map do
  Thread.new do
    1000.times do
      mutex.lock
      shared_array << nil
      mutex.unlock
    end
  end
end.each(&:join)

puts shared_array.size

Mutex 在这里的使用,告诉了线程调度器在当前代码块执行完之前别切换线程。其中 lock ... unlock 代码块也可以改成这样:

mutex.synchronize do
  shared_array << nil
end

Mutex 和内存可见性

线程 A 在持有 mutex 并对变量进行了修改,线程 B 直接读取该变量的话,有可能读到的是修改之前的值。

我们知道在计算机硬件结构当中,早期的 CPU 是直接读写内存的。后来为了提升性能,CPU 加上了高速缓存(一级缓存、二级缓存等),直接在高速缓存里读写数据,运算完成后再把数据同步到内存。在单核时代这并不会有什么问题,但在多核 CPU 中会有内存一致性的问题,因为不同 CPU 核心使用的是独立的一、二级缓存,并不共享。

因此,即使用了 mutex, 线程 B 也有可能在该变量未同步到内存时读取了它的值,造成我们不想要的结果。为了避免这个问题,需要用到一个叫作“内存屏障”的东西,它可以对内存操作进行排序约束,让变量的写操作执行完毕并同步到内存之后再开始读操作,从而防止读操作读到旧数据。

Ruby 的 Mutex 在充当原子性锁的同时,也是一个内存屏障。只要修改一下变量读取的方式,你就可以利用好这个内存屏障。下面是一个例子:

# visibility.rb
mutex = Mutex.new
flags = [false, false, false, false, false, false, false, false, false, false]

threads = 50.times.map do
  Thread.new do
    10000.times do
      puts flags.to_s
      mutex.synchronize do
        flags.map! { |f| !f }
      end
    end
  end
end
threads.each(&:join)

执行以上代码,你会得到类似下面的结果:

$ ruby visibility.rb > visibility.log
$ grep -Hnri 'true, false' visibility.log | wc -l
3151
$ _

这表示有线程读取到了其它线程未完全同步到内存的数据。想要避免这个问题,只需要把读取 flags 的代码也放在 mutex.synchronize 的代码块内即可:

...
mutex.synchronize do
  puts flags.to_s
end
mutex.synchronize do
  flags.map! { |f| !f }
end
...

Mutex 的性能

Mutex 在关键的地方保护着数据在同一时刻只能被一个线程修改,其实 GIL 就是一个 mutex.

但像上面这样一来,并行变成串行,执行速度就明显更慢了。因此,持有 mutex 的代码块应该尽可能的小。因为只需要在取值的时候持有 mutex, 可以把 puts 这样可以并行执行的阻塞 IO 操作移出来,上面的代码可以优化如下:

...
flags_string = nil
mutex.synchronize do
  flags_string flags.to_s
end
puts flags_string
mutex.synchronize do
  flags.map! { |f| !f }
end
...

在我的 x220 笔记本上,这段代码的执行时间由 20 秒减少到了 10 秒,直接少了一半。要是这里用的不是 puts 而是发起一个网络请求,那么这个优化的收益会更可观——或者换句话说,未优化的代码的执行速度会极度地慢,下面就是一个例子:

require 'thread'
require 'net/http'

mutex = Mutex.new
@results = []

10.times.map do
  Thread.new do
    mutex.synchronize do
      response = Net::HTTP.get_response('dynamic.xkcd.com', '/random/comic/')
      random_comic_url = response['Location']

      @results << random_comic_url
    end
  end
end.each(&:join)

puts @results

如果把 HTTP 请求移出 synchronize 块,就能极大地提升性能。另外如果这里的 @results 用的是 Queue 而不是 Array, 就没有 mutex 什么事了。因为 Queue 本身是线程安全的。

死锁

线程 A 在执行代码时持有了 mutex_a, 线程 B 在执行代码时持有了 mutex_b. 这时,线程 A 需要访问线程 B 正在访问的资源,尝试去获取 mutex_b, 而线程 B 也需要访问线程 A 正在访问的资源,尝试去获取 mutex_a. 此时因为线程 A 没有执行完毕,mutex_a 就没有释放;同样因为线程 B 也没有执行完毕,mutex_b 也就没有释放。线程 A 和线程 B 都在等待对方的 mutex 释放,这种状态就被称为死锁。

死锁

由此可以知道,死锁通常发生在线程需要获取多个 mutex 的时候。

Ruby 的 Mutex 提供了一个 try_lock 的方法,该方法的用法跟 lock 一样,区别在于 try_lock 在 mutex 不可用时不会一直等待,而是返回 false. 如果获取到 mutex, 则返回 true.

但如果只是在 try_lock 返回 false 后不停重试,并不解决问题,程序只是由死锁状态变成了死循环。正确的做法是在 try_lock 失败之后,线程各自释放掉自己的 mutex 重新从头开始执行,期待通过多线程本身的不确定性,使之在重新开始后能够执行成功。

即便如此,过程中也可能有不确定次数的重试,而且似乎很多时候,代码是不可以重复执行的,否则会产生预期以外的数据(这么说这个 try_lock 方案根本不解决问题)。

一个更好的方案是定义一个 mutex 层级,或者换句话说:当两个线程都需要获取多个 mutex 的时候,确保它们获取这些 mutex 的顺序是一致的。这样便可以在任何时候避免死锁。

通过条件变量给线程发信号

条件变量 (Condition Variable) 可以用于线程间发送事件通知。ConditionVariablewait(mutex) 方法会让当前线程释放 mutex 并进入睡眠状态,signal() 方法可以唤醒在 ConditionVariable 实例上等待的一个线程:

require 'thread'
require 'net/http'

mutex = Mutex.new
condvar = ConditionVariable.new
results = Array.new

Thread.new do
  10.times do
    uri = URI('https://dynamic.xkcd.com/random/comic/')
    response = Net::HTTP.get_response(uri)
    random_comic_url = response['Location']

    mutex.synchronize do
      if random_comic_url && !random_comic_url.empty?
        results << random_comic_url
        condvar.signal # Signal the ConditionVariable
      end
    end
  end
end

10.times.map do
  Thread.new do
    mutex.synchronize do
      while results.empty?
        condvar.wait(mutex)
      end

      url = results.shift
      puts "You should check out #{url}"
    end
  end
end.each(&:join)

这段代码中比较让人疑惑的地方在于 while results.empty?, 似乎这里使用 if results.empty? 就可以了。但实际上 ConditionVariable#signal 只是发送事件通知,告诉等待着的线程“有另一个线程往 results 里塞了一条数据”,并不保证其它事情——比如说“这条数据会一直等着你去取”,很可能该数据会被其它线程取走。

ConditionVariable 还有一个 broadcast() 方法,用于唤醒在当前 ConditionVariable 实例上等待的所有线程:

mutex.synchronize do
  if random_comic_url && !random_comic_url.empty?
    results << random_comic_url
    condvar.broadcast if results.size == 10
  end
end

下面是结合使用 MutexConditionVariable 实现的一个线程安全的 BlockingQueue:

require 'thread'

class BlockingQueue
  def initialize
    @storage = Array.new
    @mutex = Mutex.new
    @condvar = ConditionVariable.new
  end

  def push(item)
    @mutex.synchronize do
      @storage.push(item)
      @condvar.signal
    end
  end

  def pop
    @mutex.synchronize do
      while @storage.empty?
        @condvar.wait(@mutex)
      end

      @storage.shift
    end
  end
end

前面已经提到过,实际上 Ruby 自带了一个 Queue 类型,这是 Ruby 中唯一一个线程安全的数据结构类型。 require 'thread' 之后便可以使用。其它数据结构包括 Array, Hash 等,都不是线程安全的,即使在 JRuby 的实现当中也一样。因为线程安全相关的代码会降低其在单线程程序中的性能。如果非要用线程安全的数据结构不可,可以考虑使用 thread_safe 这个 Gem.

2
自己是一切问题的答案
登录后评论
评论 2

Ruby 3.0 的 Ractor 带来了并行。

PS:长文可以分开多篇然后用合集功能整理哦。https://geeknote.net/GeekNote/posts/251

社区准则 博客 联系 反馈 状态
主题