# File lib/fluent/output.rb, line 107 def initialize(output) @output = output @finish = false @next_time = Time.now.to_f + 1.0 end
# File lib/fluent/output.rb, line 113 def configure(conf) end
# File lib/fluent/output.rb, line 122 def shutdown @finish = true @mutex.synchronize { @cond.signal } Thread.pass @thread.join end
# File lib/fluent/output.rb, line 116 def start @mutex = Mutex.new @cond = ConditionVariable.new @thread = Thread.new(&method(:run)) end
# File lib/fluent/output.rb, line 131 def submit_flush @mutex.synchronize { @next_time = 0 @cond.signal } Thread.pass end
# File lib/fluent/output.rb, line 173 def cond_wait(sec) @cond.wait(@mutex, sec) end
# File lib/fluent/output.rb, line 140 def run @mutex.lock begin until @finish time = Time.now.to_f if @next_time <= time @mutex.unlock begin @next_time = @output.try_flush ensure @mutex.lock end next_wait = @next_time - Time.now.to_f else next_wait = @next_time - time end cond_wait(next_wait) if next_wait > 0 end ensure @mutex.unlock end rescue $log.error "error on output thread", error: $!.to_s $log.error_backtrace raise ensure @mutex.synchronize { @output.before_shutdown } end