class Ractor
Ruby 2.0+ backport of `Ractor` class Extra private methods and instance variables all start with `ractor_`
Constants
- RACTOR_STATE
- RactorThreadGroups
Attributes
@api private
@api private
@api private
Public Class Methods
# File lib/backports/ractor/ractor.rb, line 222 def count ObjectSpace.each_object(Ractor).count(&:ractor_live?) end
# File lib/backports/ractor/ractor.rb, line 217 def current Thread.current.thread_variable_get(:backports_ractor) || Thread.current.thread_variable_set(:backports_ractor, ractor_find_current) end
Implementation notes
Uses one `Thread` for each `Ractor`, as well as queues for communication
The incoming queue is strict: contrary to standard queue, you can't pop from an empty closed queue. Since standard queues return `nil` is those conditions, we wrap/unwrap `nil` values and consider all `nil` values to be results of closed queues. `ClosedQueueError` are re-raised as `Ractor::ClosedError`
The outgoing queue is strict and blocking. Same wrapping / raising as incoming, with an extra queue to acknowledge when a value has been read (or if the port is closed while waiting).
The last result is a bit tricky as it needs to be pushed on the outgoing queue but can not be blocking. For this, we “soft close” the outgoing port.
# File lib/backports/ractor/ractor.rb, line 27 def initialize(*args, &block) @ractor_incoming_queue = IncomingQueue.new @ractor_outgoing_queue = OutgoingQueue.new raise ArgumentError, 'must be called with a block' unless block kw = args.last if kw.is_a?(Hash) && kw.size == 1 && kw.key?(:name) args.pop name = kw[:name] end @ractor_name = name && Backports.coerce_to_str(name) if Ractor.main == nil # then initializing main Ractor @ractor_thread = ::Thread.current @ractor_origin = nil @ractor_thread.thread_variable_set(:backports_ractor, self) else @ractor_origin = caller(1, 1).first.split(':in `').first args.map! { |a| Ractor.ractor_isolate(a, false) } ractor_thread_start(args, block) end end
@api private
# File lib/backports/ractor/sharing.rb, line 4 def ractor_isolate(val, move = false) return val if move Cloner.deep_clone(val) end
@api private
# File lib/backports/ractor/ractor.rb, line 227 def ractor_reset ObjectSpace.each_object(Ractor).each do |r| next if r == Ractor.current next unless (th = r.ractor_thread) th.kill th.join end Ractor.current.ractor_incoming_queue.clear end
# File lib/backports/ractor/ractor.rb, line 167 def receive current.__send__(:receive) end
# File lib/backports/ractor/ractor.rb, line 172 def receive_if(&block) current.__send__(:receive_if, &block) end
# File lib/backports/ractor/ractor.rb, line 176 def select(*ractors, yield_value: not_given = true, move: false) cur = Ractor.current queues = ractors.map do |r| r == cur ? r.ractor_incoming_queue : r.ractor_outgoing_queue end if !not_given out = current.ractor_outgoing_queue yield_value = ractor_isolate(yield_value, move) elsif ractors.empty? raise ArgumentError, 'specify at least one ractor or `yield_value`' end while true # rubocop:disable Style/InfiniteLoop # Don't `loop`, in case of `ClosedError` (not that there should be any) queues.each_with_index do |q, i| q.pop_non_blocking do |val| r = ractors[i] return [r == cur ? :receive : r, val] end end if out && out.num_waiting > 0 # Not quite atomic... out.push(yield_value, ack: true) return [:yield, nil] end sleep(0.001) end end
# File lib/backports/ractor/ractor.rb, line 160 def yield(value, move: false) value = ractor_isolate(value, move) current.ractor_outgoing_queue.push(value, ack: true) rescue ClosedQueueError raise ClosedError, 'The outgoing-port is already closed' end
Private Class Methods
# File lib/backports/ractor/ractor.rb, line 246 def ractor_find_current RactorThreadGroups[Thread.current.group] end
# File lib/backports/ractor/ractor.rb, line 240 def ractor_init @ractor_shareable = ::ObjectSpace::WeakMap.new @main = Ractor.new { nil } RactorThreadGroups[::ThreadGroup::Default] = @main end
# File lib/backports/ractor/sharing.rb, line 52 def ractor_traverse(obj, &block) case obj when ::Hash Hash obj.default yield obj.default_proc obj.each do |key, value| yield key yield value end when ::Range yield obj.begin yield obj.end when ::Array, ::Struct obj.each(&block) when ::Complex yield obj.real yield obj.imaginary when ::Rational yield obj.numerator yield obj.denominator end obj.instance_variables.each do |var| yield obj.instance_variable_get(var) end end
Public Instance Methods
# File lib/backports/ractor/ractor.rb, line 146 def [](key) Ractor.current.ractor_locals[key] end
# File lib/backports/ractor/ractor.rb, line 150 def []=(key, value) Ractor.current.ractor_locals[key] = value end
# File lib/backports/ractor/ractor.rb, line 125 def close_incoming r = ractor_incoming_queue.closed? ractor_incoming_queue.close r end
# File lib/backports/ractor/ractor.rb, line 131 def close_outgoing r = ractor_outgoing_queue.closed? ractor_outgoing_queue.close r end
# File lib/backports/ractor/ractor.rb, line 113 def inspect state = RACTOR_STATE[@ractor_thread ? @ractor_thread.status : 'run'] info = [ 'Ractor:#1', name, @ractor_origin, state ].compact.join(' ') "#<#{info}>" end
# File lib/backports/ractor/ractor.rb, line 100 def name @ractor_name end
@api private
# File lib/backports/ractor/ractor.rb, line 252 def ractor_live? !defined?(@ractor_thread) || # May happen if `count` is called from another thread before `initialize` has completed @ractor_thread.status end
@api private
# File lib/backports/ractor/ractor.rb, line 155 def ractor_locals @ractor_locals ||= {}.compare_by_identity end
# File lib/backports/ractor/ractor.rb, line 88 def send(obj, move: false) ractor_incoming_queue << Ractor.ractor_isolate(obj, move) self rescue ::ClosedQueueError raise ClosedError, 'The incoming-port is already closed' end
# File lib/backports/ractor/ractor.rb, line 96 def take ractor_outgoing_queue.pop(ack: true) end
Private Instance Methods
# File lib/backports/ractor/ractor.rb, line 51 def ractor_thread_start(args, block) ::Thread.new do @ractor_thread = Thread.current @ractor_thread_group = ThreadGroup.new RactorThreadGroups[@ractor_thread_group] = self @ractor_thread_group.add(@ractor_thread) ::Thread.current.thread_variable_set(:backports_ractor, self) result = nil begin result = instance_exec(*args, &block) rescue ::Exception => err # rubocop:disable Lint/RescueException begin raise RemoteError, "thrown by remote Ractor: #{err.message}" rescue RemoteError => e # Hack to create exception with `cause` result = OutgoingQueue::WrappedException.new(e) end ensure ractor_thread_terminate(result) end end end
# File lib/backports/ractor/ractor.rb, line 73 def ractor_thread_terminate(result) begin ractor_outgoing_queue.push(result, ack: false) unless ractor_outgoing_queue.closed? rescue ClosedQueueError return # ignore end ractor_incoming_queue.close ractor_outgoing_queue.close(:soft) ensure # TODO: synchronize? @ractor_thread_group.list.each do |thread| thread.kill unless thread == Thread.current end end
# File lib/backports/ractor/ractor.rb, line 137 def receive ractor_incoming_queue.pop end
# File lib/backports/ractor/ractor.rb, line 141 def receive_if(&block) raise ArgumentError, 'no block given' unless block ractor_incoming_queue.pop(&block) end