class Fluent::ExecInput

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Input.new
# File lib/fluent/plugin/in_exec.rb, line 28
def initialize
  super
  require 'fluent/plugin/exec_util'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Input#configure
# File lib/fluent/plugin/in_exec.rb, line 52
def configure(conf)
  super

  if localtime = conf['localtime']
    @localtime = true
  elsif utc = conf['utc']
    @localtime = false
  end

  if conf['timezone']
    @timezone = conf['timezone']
    Fluent::Timezone.validate!(@timezone)
  end

  if !@tag && !@tag_key
    raise ConfigError, "'tag' or 'tag_key' option is required on exec input"
  end

  if @time_key
    if @time_format
      f = @time_format
      @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i }
    else
      @time_parse_proc = Proc.new {|str| str.to_i }
    end
  end

  @parser = setup_parser(conf)
end
run() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 132
def run
  @parser.call(@io)
end
run_periodic() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 136
def run_periodic
  sleep @run_interval
  until @finished
    begin
      io = IO.popen(@command, "r")
      @parser.call(io)
      Process.waitpid(io.pid)
    rescue
      log.error "exec failed to run or shutdown child process", error: $!.to_s, error_class: $!.class.to_s
      log.warn_backtrace $!.backtrace
    ensure
      sleep @run_interval
    end
  end
end
setup_parser(conf) click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 82
def setup_parser(conf)
  case @format
  when 'tsv'
    if @keys.empty?
      raise ConfigError, "keys option is required on exec input for tsv format"
    end
    ExecUtil::TSVParser.new(@keys, method(:on_message))
  when 'json'
    ExecUtil::JSONParser.new(method(:on_message))
  when 'msgpack'
    ExecUtil::MessagePackParser.new(method(:on_message))
  else
    ExecUtil::TextParserWrapperParser.new(conf, method(:on_message))
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 109
def shutdown
  if @run_interval
    @finished = true
    # call Thread#run which interupts sleep in order to stop run_periodic thread immediately.
    @thread.run
    @thread.join
  else
    begin
      Process.kill(:TERM, @pid)
    rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    end
    if @thread.join(60)  # TODO wait time
      return
    end

    begin
      Process.kill(:KILL, @pid)
    rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    end
    @thread.join
  end
end
start() click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 98
def start
  if @run_interval
    @finished = false
    @thread = Thread.new(&method(:run_periodic))
  else
    @io = IO.popen(@command, "r")
    @pid = @io.pid
    @thread = Thread.new(&method(:run))
  end
end

Private Instance Methods

on_message(record, parsed_time = nil) click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 154
def on_message(record, parsed_time = nil)
  if val = record.delete(@tag_key)
    tag = val
  else
    tag = @tag
  end

  if parsed_time
    time = parsed_time
  else
    if val = record.delete(@time_key)
      time = @time_parse_proc.call(val)
    else
      time = Engine.now
    end
  end

  router.emit(tag, time, record)
rescue => e
  log.error "exec failed to emit", error: e.to_s, error_class: e.class.to_s, tag: tag, record: Yajl.dump(record)
end