event_machine_interface.rb

Path: lib/Dnsruby/event_machine_interface.rb
Last Update: Thu Oct 22 23:49:00 +0000 2009
 Support for EventMachine has been deprecated.

require ‘eventmachine’ module Dnsruby

  class EventMachineInterface#:nodoc: all
    @@started_em_here = false
    @@running_clients=[]
    @@outstanding_sends = []
    @@em_thread=nil
    # We want to have one EM loop running continuously in this class.
    # Remember to use stop_event_loop inside of EM callback in order to stop the event machine

    # Timers - can't use EM timers as they max out at 1000.
    # Instead, while queries are outstanding, call next_tick to manage our own list of timers.

    @@timer_procs={} # timeout=>[proc
    @@timer_keys_sorted=[]
    TIMER_PERIOD = 0.1

    def EventMachineInterface::process_timers
      # Go through list of timers
      now = Time.now
      @@timer_keys_sorted.each do |timeout|
        if (timeout > now)
          break
        end
        c, proc = @@timer_procs[timeout]
        @@timer_procs.delete(timeout)
        @@timer_keys_sorted.delete(timeout)
        proc.call
      end

      if (!@@outstanding_sends.empty?)
        EventMachine::add_timer(TIMER_PERIOD) {process_timers}
      end
    end

    def EventMachineInterface::remove_timer(c)
      # Remove from timer structures - if still there!
      @@timer_procs.each do |timeout, value|
        conn, proc = value
        if (c==conn)
          @@timer_procs.delete(timeout)
          @@timer_keys_sorted.delete(timeout)
        end
      end
    end

    def EventMachineInterface::add_to_outstanding(c, timeout)
      # Add to timer structures
      @@timer_procs[Time.now+timeout]=[c, Proc.new {
          # Cancel the send
          c.closing=true
          c.close_connection
          c.send_timeout
        }]
      @@timer_keys_sorted=@@timer_procs.keys.sort
      @@outstanding_sends.push(c)
       puts "#{@@outstanding_sends.length} outstanding connections"
      if (@@outstanding_sends.length==1)
        EventMachine::add_timer(0) {process_timers}
      end
    end

    def EventMachineInterface::remove_from_outstanding(c)
      @@outstanding_sends.delete(c)
       puts "#{@@outstanding_sends.length} outstanding connections"
      remove_timer(c)
      # If we explicitly started the EM loop, and there are no more outstanding sends, then stop the EM loop
      stop_eventmachine
    end

    def EventMachineInterface::start_eventmachine
      if (!eventmachine_running?)
        if Resolver.start_eventmachine_loop?
          Dnsruby.log.debug("Starting EventMachine")
          @@started_em_here = true
          @@em_thread = Thread.new {
            EM.run {
               EventMachine::add_periodic_timer(0.1) {EventMachineInterface::process_timers}
              EventMachine::add_timer(0.1) {EventMachineInterface::process_timers}
              @@df = EventMachine::DefaultDeferrable.new
              @@df.callback{
                Dnsruby.log.debug("Stopping EventMachine")
                EM.stop
                @@em_thread=nil
              }
            }
          }
        else
          Dnsruby.log.debug("Not trying to start event loop")
        end
      end
    end

    def EventMachineInterface::start_em_for_resolver(res)
      @@running_clients.push(res)
      start_eventmachine
    end

    def EventMachineInterface::stop_em_for_resolver(res)
      @@running_clients.each_index do |i|
        if (@@running_clients[i]==res)
          @@running_clients.delete_at(i)
        end
      end
      stop_eventmachine
    end

    def EventMachineInterface::eventmachine_running?
      return (@@em_thread!=nil)
    end

    def EventMachineInterface::stop_eventmachine
      if (@@started_em_here)
        if (@@outstanding_sends.size==0)
          if (@@running_clients.length == 0)
            if (@@em_thread)
              @@df.set_deferred_status :succeeded
              @@started_em_here = false
              #              @@em_thread = nil
            end
          end
        end
      end
    end

    def EventMachineInterface::send(args={})#msg, timeout, server, port, src_add, src_port, use_tcp)
      # Is the EventMachine loop running? If not, we need to start it (and mark that we started it)
      begin
        if (!EventMachine.reactor_running?)
          start_eventmachine
        end
      rescue Exception
        #@TODO@ EM::reactor_running? only introduced in EM v0.9.0 - if it's not there, we simply don't know what to do...
        Dnsruby.log.error("EventMachine::reactor_running? not available.")
        #        if Resolver.start_eventmachine_loop?
        #          Dnsruby.log.debug("Trying to start event loop - may prove fatal...")
        start_eventmachine
        #        else
        #          Dnsruby.log.debug("Not trying to start event loop.")
        #        end
      end
      df = nil
      if (args[:use_tcp])
        df = send_tcp(args)
      else
        df = send_udp(args)
      end
      # Need to add this send to the list of outstanding sends
      add_to_outstanding(df, args[:timeout])
      return df
    end

    def EventMachineInterface::send_tcp(args={})#msg, timeout, server, port, src_add, src_port, use_tcp)
      connection = EventMachine::connect(args[:server], args[:port], EmTcpHandler) { |c|
        #@TODO SRC_PORT FOR TCP!!!
        c.timeout_time=Time.now + args[:timeout]
        c.instance_eval {@args = args}
        lenmsg = [args[:msg].length].pack('n')
        c.send_data(lenmsg)
        c.send_data args[:msg] # , args[:server], args[:port]
        Dnsruby.log.debug {"EventMachine : Sent TCP packet to #{args[:server]}:#{args[:port]}" + # from #{args[:src_address]}:#{args[:src_port]}, timeout=#{args[:timeout]}"
        ", timeout=#{args[:timeout]}"}
      }
      return connection # allows clients to set callback, errback, etc., if desired
    end

    def EventMachineInterface::send_udp(args={})# msg, timeout, server, port, src_add, src_port, use_tcp)
      connection = EventMachine::open_datagram_socket(args[:src_address], args[:src_port], EmUdpHandler) { |c|
        c.timeout_time=Time.now + args[:timeout]
        c.instance_eval {@args = args}
        c.send_datagram args[:msg], args[:server], args[:port]
        Dnsruby.log.debug{"EventMachine : Sent datagram to #{args[:server]}:#{args[:port]} from #{args[:src_address]}:#{args[:src_port]}, timeout=#{args[:timeout]}"}
      }
      return connection # allows clients to set callback, errback, etc., if desired
    end

    class EmUdpHandler < EventMachine::Connection #:nodoc: all
      include EM::Deferrable
      attr_accessor :closing, :timeout_time
      def post_init
        @closing=false
      end
      def receive_data(dgm)
        Dnsruby.log.debug{"UDP receive_data called"}
        process_incoming_message(dgm)
      end

      def process_incoming_message(data)
        Dnsruby.log.debug{"Processing incoming message, #{data.length} bytes"}
        ans=nil
        begin
          ans = Message.decode(data)
        rescue Exception => e
          Dnsruby.log.error{"Decode error! #{e.class}, #{e}\nfor msg (length=#{data.length}) : #{data}"}
          @closing=true
          close_connection
          send_to_client(nil, nil, e)
          return
        end
        Dnsruby.log.debug{"#{ans}"}
        ans.answerfrom=(@args[:server])
        ans.answersize=(data.length)
        exception = ans.header.get_exception
        @closing=true
        close_connection
        send_to_client(ans, data, exception)
      end

      def unbind
        Dnsruby.log.debug{"Unbind called"}
        if (!@closing)
          if (@timeout_time <= Time.now + 1)
            send_timeout
          else
            Dnsruby.log.debug{"Sending IOError to client"}
            send_to_client(nil, nil, IOError.new("Network error"))
          end
        end
        @closing=false
        # Take the last send off the list of outstanding sends
        EventMachineInterface.remove_from_outstanding(self)
      end
      def send_timeout
        Dnsruby.log.debug{"Sending timeout to client"}
        send_to_client(nil, nil, ResolvTimeout.new("Query timed out"))
      end
      def send_to_client(msg, bytes, err)
        #  We call set_defered_status when done
        if (err != nil)
          set_deferred_status :failed, msg, err
        else
          set_deferred_status :succeeded, msg, bytes
        end
      end
    end

    class EmTcpHandler < EmUdpHandler #:nodoc: all
      def post_init
        super
        @data=""
        @answersize = 0
      end
      def receive_data(data)
        Dnsruby.log.debug{"TCP receive_data called"}
        #Buffer up the incoming data until we have a complete packet
        @data << data
        if (@data.length >= 2)
          if (@answersize == 0)
            @answersize = @data[0..1].unpack('n')[0]
            Dnsruby.log.debug{"TCP - expecting #{@answersize} bytes"}
          end
          if (@answersize == @data.length - 2)
            Dnsruby.log.debug{"TCP - got all #{@answersize} bytes "}
            process_incoming_message(@data[2..@data.length])
          else
            Dnsruby.log.debug{"TCP - got #{@data.length-2} message bytes"}
          end
        end
      end
    end

  end

end

[Validate]