English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Ruby базовый курс

Ruby продвинутый курс

Многоядерные потоки Ruby

Каждая программа, выполняемая на системе, является процессом. Каждый процесс содержит один или несколько потоков.

Токен — это единственная последовательная поток управления в программе, и в одной программе можно одновременно запускать несколько потоков для выполнения различных задач, что называется многопоточностью.

В Ruby мы можем создавать многопоточность через класс Thread. Потоки в Ruby являются легковесными и могут эффективно обеспечивать параллельное выполнение кода.

Создание потока в Ruby

Чтобы запустить новый поток, достаточно вызвать Thread.new:

# Поток #1 часть кода
Thread.new {
  # Поток #2 выполняет код
}
# Поток #1 выполняет код

Онлайн пример

Ниже приведен пример того, как можно использовать многопоточность в программах на Ruby:

Онлайн пример

#!/usr/bin/ruby
 
def func1
   i=0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i=i+1
   end
end
 
def func2
   j=0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j=j+1
   end
end
 
puts "Started At #{Time.now}"
t1=Thread.new{func1()}
t2=Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

Результат выполнения кода:

Started At Wed May 14 08:21:54 -0700 2014
func1 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:55 -0700 2014
func1 at: Wed May 14 08:21:56 -0700 2014
func2 at: Wed May 14 08:21:56 -0700 2014
func1 at: Wed May 14 08:21:58 -0700 2014
End at Wed May 14 08:22:00 -0700 2014

жизненный цикл потока

1. Создание потока можно выполнить с помощью Thread.new, а также можно использовать такие же синтаксисы для Thread.start или Thread.fork для создания потока.

2. После создания потока не нужно запускать его, поток будет автоматически выполняться.

3. Класс Thread определяет некоторые методы для управления потоками. Поток выполняет код блока Thread.new.

4. Последний语句 в блоке кода потока является значением потока, которое можно вызвать через методы потока. Если поток завершен, возвращается значение потока, в противном случае значение не возвращается до завершения потока.

5. Метод Thread.current возвращает объект, представляющий текущий поток. Метод Thread.main возвращает главный поток.

6. Выполнение потока через метод Thread.Join, который挂起 главный поток, до завершения текущего потока.

состояние потока

потоки имеют 5 состояний:

состояние потокавозвратное значение
исполняемыйrun
сонSleeping
выходaborting
нормальное завершениеfalse
прерывание из-за исключенияnil

потоки и исключения

当某线程发生异常,且没有被rescue捕捉到时,该线程通常会被无警告地终止。但是,若有其他线程因为Thread#join的关系一直等待该线程的话,则等待的线程同样会被引发相同的异常。

begin
  t = Thread.new do
    Thread.pass    # 主线程确实在等join
    raise "unhandled exception"
  end
  t.join
rescue
  p $!  # => "unhandled exception"
end

使用下列3个方法,就可以让解释器在某个线程因异常而终止时中断运行。

  • 启动脚本时指定-d选项,并以调试模式运行。

  • 用Thread.abort_on_exception设置标志。

  • 使用Thread#abort_on_exception对指定的线程设定标志。

当使用上述3种方法之一后,整个解释器就会被中断。

t = Thread.new { ... }
t.abort_on_exception = true

线程同步控制

在Ruby中,提供三种实现同步的方式,分别是:

1. 通过Mutex类实现线程同步

2. 监管数据交接的Queue类实现线程同步

3. 使用ConditionVariable实现同步控制

通过Mutex类实现线程同步

通过Mutex类实现线程同步控制,如果在多个线程中同时需要一个程序变量,可以将这个变量部分使用lock锁定。 代码如下:

Онлайн пример

#!/usr/bin/ruby
 
require "thread"
puts "Synchronize Thread"
 
@num=200
@mutex=Mutex.new
 
def buyTicket(num)
     @mutex.lock
          if @num>=num
               @num=@num-num
               puts "you have successfully bought #{num} tickets"
          else
               puts "sorry, no enough tickets"
          end
     @mutex.unlock
end
 
ticket1=Thread.new 10 do
     10.times do |value|
     ticketNum=15
     buyTicket(ticketNum)
     sleep 0.01
     end
end
 
ticket2=Thread.new 10 do
     10.times do |value|
     ticketNum=20
     buyTicket(ticketNum)
     sleep 0.01
     end
end
 
sleep 1
ticket1.join
ticket2.join

Результат выполнения кода:

Синхронизация потока
Вы успешно купили 15 билетов
Вы успешно купили 20 билетов
Вы успешно купили 15 билетов
Вы успешно купили 20 билетов
Вы успешно купили 15 билетов
Вы успешно купили 20 билетов
Вы успешно купили 15 билетов
Вы успешно купили 20 билетов
Вы успешно купили 15 билетов
Вы успешно купили 20 билетов
Вы успешно купили 15 билетов
Извините, не хватает билетов
Извините, не хватает билетов
Извините, не хватает билетов
Извините, не хватает билетов
Извините, не хватает билетов
Извините, не хватает билетов
Извините, не хватает билетов
Извините, не хватает билетов
Извините, не хватает билетов

Кроме использования lock для блокировки переменной, можно использовать try_lock для блокировки переменной, а также использовать Mutex.synchronize для синхронизации доступа к переменной.

Класс Queue для управления передачей данных и обеспечения синхронизации потоков

Класс Queue представляет собой список, поддерживающий потоки, который позволяет синхронизировать доступ к концу списка. Разные потоки могут использовать один и тот же список, но не нужно беспокоиться о том, что данные в списке синхронизированы, а также использование класса SizedQueue позволяет ограничить длину списка

Класс SizedQueue может очень легко помочь нам разрабатывать приложения с синхронизацией потоков, так как для добавления в этот список не нужно заботиться о проблемах синхронизации потоков.

Классическая проблема производителя-потребителя:

Онлайн пример

#!/usr/bin/ruby
 
require "thread"
puts "SizedQuee Test"
 
queue = Queue.new
 
producer = Thread.new do
     10.times do |i|
          sleep rand(i) # Дайте потоку немного поспать
          queue << i
          puts "#{i} produced"
     end
end
 
consumer = Thread.new do
     10.times do |i|
          value = queue.pop
          sleep rand(i/2)
          puts "Consumed #{value}"
     end
end
 
consumer.join

Вывод программы:

SizedQuee Test
Produced 0
Produced 1
Consumed 0
Produced 2
Consumed 1
Consumed 2
Produced 3
Consumed 34, produced
Consumed 4
Produced 5
Consumed 5
Produced 6
Consumed 6
Produced 7
Consumed 7
Produced 8
Produced 9
Consumed 8
Consumed 9

Переменные потока

Thread может иметь свои собственные переменные, свои собственные переменные вносят в поток при его создании. Они могут использоваться в пределах потока, но не могут быть общими для потоков внешне.

但是有时候,线程的局部变量需要被其他线程或主线程访问怎么办?Ruby 中提供了允许通过名字来创建线程变量的功能,类似于将线程视为哈希表式的散列表。通过 []= 写入并通过 [] 读出数据。让我们看一下下面的代码:

Онлайн пример

#!/usr/bin/ruby
 
count = 0
arr = []
 
10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end
 
arr.each {|t| t.join; print t["mycount"], ""}
puts "count = #{count}"

以上代码运行输出结果为:

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

主线程等待子线程执行完成,然后分别输出每个值。 .

线程优先级

线程的优先级是影响线程调度的主要因素。其他因素包括占用CPU的执行时间长短,线程分组调度等等。

可以使用 Thread.priority 方法得到线程的 приоритет线程的优先级,并使用 Thread.priority= 方法来调整线程的优先级。

线程的优先级默认为 0。优先级较高的执行的要快。

一个 Thread 可以访问自己作用域内的所有数据,但如果有需要在某个线程内访问其他线程的数据应该怎么做呢?Thread 类提供了线程数据互相访问的方法,你可以简单地把一个线程作为一个哈希表,可以在任何线程内使用 []= 写入数据,使用 [] 读出数据。

athr = Thread.new { Thread.current["name"] = "Thread A"; Thread.stop }
bthr = Thread.new { Thread.current["name"] = "Thread B"; Thread.stop }
cthr = Thread.new { Thread.current["name"] = "Thread C"; Thread.stop }
Thread.list.each {|x| puts "#{x.inspect}: #{x["name"]}"}

可以看到,将线程作为哈希表使用 [] 和 []= 方法,我们实现了线程之间的数据共享。

Межпоточный доступ

Mutex (Мутальный исключение = блокировка) - это механизм, используемый в многоядерном программировании для предотвращения одновременного доступа двух потоков к одному общему ресурсу (например, глобальным переменным).

Пример без использования Mutex

Онлайн пример

#!/usr/bin/ruby
require 'thread'
 
count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 : #{count1}"
puts "count2 : #{count2}"
puts "difference : #{difference}"

Результат выполнения примера выше:

count1 : 9712487
count2 : 12501239
difference : 0

Пример использования mutex

Онлайн пример

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
 
count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
    end
end
spy = Thread.new do
   loop do
       mutex.synchronize do
          difference += (count1 - count2).abs
       end
   end
end
sleep 1
mutex.lock
puts "count1 : #{count1}"
puts "count2 : #{count2}"
puts "difference : #{difference}"

Результат выполнения примера выше:

count1 : 1336406
count2 : 1336406
difference : 0

Блокировка

Когда более чем два операционных элемента ждут, чтобы другие停止 работу, чтобы получить системные ресурсы, и ни один из них не выходит из ожидания первым, это состояние называется блокировкой.

Например, процесс p1 занял монитор, но также должен использовать принтер, который занят процессом p2, а p2 также должен использовать монитор, что создает блокировку.

При использовании объекта Mutex необходимо учитывать возможную блокировку线程.

Онлайн пример

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
 
cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: У меня есть критическая секция, но я буду ждать cv"
      cv.wait(mutex)
      puts "A: У меня снова есть критическая секция! Я правлю!"
   }
}
 
puts "(Позже, вернусь на ранч...)"
 
b = Thread.new {
   mutex.synchronize {
      puts "B: Теперь я критичен, но закончил с cv"
      cv.signal
      puts "B: Я все еще критичен, заканчиваю"
   }
}
a.join
b.join

Результат выполнения примера выше таков:

A: У меня критическая секция, но я буду ждать cv
(Позже, вернувшись на ранчо...)
B: Теперь я критичен, но я закончил с cv
B: Я все еще критичен, заканчиваю
A: У меня снова критическая секция! Я правлю!

Методы класса поток

Полный список методов класса Thread (поток) таков:

НомерОписание метода
1Thread.abort_on_exception
Если значение true, весь интерпретатор будет прерван, если поток завершается из-за исключения. Его значение по умолчанию false, то есть в обычных условиях, если поток завершается из-за исключения и это исключение не было обнаружено Thread#join и т.д., поток будет завершен без предупреждения.
2Thread.abort_on_exception=
Если установлено: trueЕсли поток завершается из-за исключения, весь интерпретатор будет прерван. Возврат нового состояния
3Thread.critical
Вернуть булево значение.
4Thread.critical=
Когда значение true, не будет производиться переключение потоков. Если текущий поток приостановлен (stop) или был получен сигнал (signal), его значение автоматически изменяется на false.
5Thread.current
Возврат текущего выполняющегося потока (текущий поток).
6Thread.exit
Прекращение выполнения текущего потока. Возврат текущего потока. Если текущий поток является единственным потоком, то используется exit(0) для его прекращения.
7Thread.fork { block }
Генерирует поток так же, как и Thread.new.
8Thread.kill( aThread )
Прекращение выполнения потока.
9Thread.list
Возврат массива живых потоков, находящихся в состоянии выполнения или приостановки.
10Thread.main
Возврат к основному потоку.
11Thread.new( [ arg ]* ) {| args | block }
Создать thread и начать его выполнение. Значение будет передано в блок без изменений. Это позволяет передавать значения в локальные переменные thread при запуске thread.
12Thread.pass
Принимает управление другими потоками. Он не изменяет состояние выполняющегося потока, а передает управление другим выполняемым потокам (явное планирование потоков).
13Thread.start( [ args ]* ) {| args | block }
Создать thread и начать его выполнение. Значение будет передано в блок без изменений. Это позволяет передавать значения в локальные переменные thread при запуске thread.
14Thread.stop
Приостановить текущий thread до тех пор, пока another thread не использует метод run для повторного пробуждения этой thread.

Методы thread

Ниже приведен пример использования метода join thread:

Онлайн пример

#!/usr/bin/ruby
 
thr = Thread.new do   # Пример
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Пример использования метода example join

Ниже приведен полный список методов примеров:

НомерОписание метода
1thr[ name ]
Получить固有ные данные thread, соответствующие name. name может быть строкой или символом. Если нет данных, соответствующих name, вернет nil.
2thr[ name ]=
Установить значение固有ных данных thread, соответствующих name, name может быть строкой или символом. Если установить nil, будет удалено соответствующее данные thread.
3thr.abort_on_exception
Вернуть булево значение.
4thr.abort_on_exception=
Если значение true, интерпретатор будет прерван, если alguna thread завершится из-за исключения.
5thr.alive?
Если thread "живой", вернет true.
6thr.exit
Прекратить выполнение thread. Возвращает self.
7thr.join
Приостановить текущий thread до тех пор, пока thread self не завершит свою работу. Если self завершится из-за исключения, текущий thread также вызовет то же исключение.
8thr.key?
Если固有ные данные thread, соответствующие name, уже определены, вернет true
9thr.kill
Похоже на Thread.exit .
10thr.priority
Вернуть приоритет thread. Значение по умолчанию для приоритета составляет 0. Чем больше значение, тем выше приоритет.
11thr.priority=
Установить приоритет thread. Его также можно установить отрицательным числом.
12thr.raise(анException)
Принудительно вызывать исключение в этой thread.
13thr.run
Перезапуск-thread, который был остановлен(stop). В отличие от wakeup, он немедленно производит переключение threads. Если использовать этот метод для процесса, который уже мертв, будет вызван ThreadError.
14thr.safe_level
Возвращает уровень безопасности self. Уровень безопасности текущего потока safe_level равен $SAFE.
15thr.status
Используйте строки "run", "sleep" или "aborting", чтобы выразить состояние живого потока. Если поток завершается normalmente, возвращает false. Если поток завершается из-за исключения, возвращает nil.
16thr.stop?
Возвращает true, если поток находится в состоянии завершения (dead) или остановлен (stop).
17thr.value
Ожидать, пока поток self не завершит работу (равносильно join), и вернуться значению блока потока. Если в процессе выполнения потока возникнет исключение, оно будет повторно вызванно.
18thr.wakeup
Изменить состояние остановленного (stop) потока на выполняемый (run). Если при вызове этого метода на死的 потоке (thread) будет вызван ThreadError.