Class | Jabber::Stream |
In: |
lib/xmpp4r/stream.rb
|
Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
DISCONNECTED | = | 1 |
CONNECTED | = | 2 |
fd | [R] | file descriptor used |
status | [R] | connection status |
Create a new stream (just initializes)
# File lib/xmpp4r/stream.rb, line 34 34: def initialize(threaded = true) 35: @fd = nil 36: @status = DISCONNECTED 37: @xmlcbs = CallbackList::new 38: @stanzacbs = CallbackList::new 39: @messagecbs = CallbackList::new 40: @iqcbs = CallbackList::new 41: @presencecbs = CallbackList::new 42: @threaded = threaded 43: @StanzaQueue = [] 44: @StanzaQueueMutex = Mutex::new 45: @exception_block = nil 46: @threadBlocks = {} 47: # @pollCounter = 10 48: @waitingThread = nil 49: @wakeupThread = nil 50: @streamid = nil 51: end
Adds a callback block/proc to process received Iqs
priority: | [Integer] The callback’s priority, the higher, the sooner |
ref: | [String] The callback’s reference |
proc: | [Proc = nil] The optional proc |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 410 410: def add_iq_callback(priority = 0, ref = nil, proc=nil, &block) 411: block = proc if proc 412: @iqcbs.add(priority, ref, block) 413: end
Adds a callback block/proc to process received Messages
priority: | [Integer] The callback’s priority, the higher, the sooner |
ref: | [String] The callback’s reference |
proc: | [Proc = nil] The optional proc |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 350 350: def add_message_callback(priority = 0, ref = nil, proc=nil, &block) 351: block = proc if proc 352: @messagecbs.add(priority, ref, block) 353: end
Adds a callback block/proc to process received Presences
priority: | [Integer] The callback’s priority, the higher, the sooner |
ref: | [String] The callback’s reference |
proc: | [Proc = nil] The optional proc |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 390 390: def add_presence_callback(priority = 0, ref = nil, proc=nil, &block) 391: block = proc if proc 392: @presencecbs.add(priority, ref, block) 393: end
Adds a callback block/proc to process received Stanzas
priority: | [Integer] The callback’s priority, the higher, the sooner |
ref: | [String] The callback’s reference |
proc: | [Proc = nil] The optional proc |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 370 370: def add_stanza_callback(priority = 0, ref = nil, proc=nil, &block) 371: block = proc if proc 372: @stanzacbs.add(priority, ref, block) 373: end
Adds a callback block/proc to process received XML messages
priority: | [Integer] The callback’s priority, the higher, the sooner |
ref: | [String] The callback’s reference |
proc: | [Proc = nil] The optional proc |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 330 330: def add_xml_callback(priority = 0, ref = nil, proc=nil, &block) 331: block = proc if proc 332: @xmlcbs.add(priority, ref, block) 333: end
Delete a Stanza callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 379 379: def delete_stanza_callback(ref) 380: @stanzacbs.delete(ref) 381: end
Delete an XML-messages callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 339 339: def delete_xml_callback(ref) 340: @xmlcbs.delete(ref) 341: end
Returns if this connection is connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 111 111: def is_connected? 112: return @status == CONNECTED 113: end
Returns if this connection is NOT connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 119 119: def is_disconnected? 120: return @status == DISCONNECTED 121: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 94 94: def parse_failure 95: # A new thread has to be created because close will cause the thread 96: # to commit suicide 97: if @exception_block 98: Thread.new { @exception_block.call($!, self, :parser) } 99: else 100: puts "Stream#parse_failure was called by XML parser. Dumping " + 101: "backtrace...\n" + $!.exception + "\n" 102: puts $!.backtrace 103: close 104: raise 105: end 106: end
Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.
Currently not working!
# File lib/xmpp4r/stream.rb, line 307 307: def poll 308: sleep 10 309: while true 310: sleep 2 311: # @pollCounter = @pollCounter - 1 312: # if @pollCounter < 0 313: # begin 314: # send(" \t ") 315: # rescue 316: # Thread.new {@exception_block.call if @exception_block} 317: # break 318: # end 319: # end 320: end 321: end
Process |max| XML stanzas and call listeners for all of them.
max: | [Integer] the number of stanzas to process (nil means process |
all available)
# File lib/xmpp4r/stream.rb, line 187 187: def process(max = nil) 188: n = 0 189: @StanzaQueueMutex.lock 190: while @StanzaQueue.size > 0 and (max == nil or n < max) 191: e = @StanzaQueue.shift 192: @StanzaQueueMutex.unlock 193: process_one(e) 194: n += 1 195: @StanzaQueueMutex.lock 196: end 197: @StanzaQueueMutex.unlock 198: n 199: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 128 128: def receive(element) 129: Jabber::debuglog("RECEIVED:\n#{element.to_s}") 130: case element.name 131: when 'stream' 132: stanza = element 133: i = element.attribute("id") 134: @streamid = i.value if i 135: when 'message' 136: stanza = Message::import(element) 137: when 'iq' 138: stanza = Iq::import(element) 139: when 'presence' 140: stanza = Presence::import(element) 141: else 142: stanza = element 143: end 144: # Iterate through blocked theads (= waiting for an answer) 145: @threadBlocks.each { |thread, proc| 146: r = proc.call(stanza) 147: if r == true 148: @threadBlocks.delete(thread) 149: thread.wakeup if thread.alive? 150: return 151: end 152: } 153: if @threaded 154: process_one(stanza) 155: else 156: # StanzaQueue will be read when the user call process 157: @StanzaQueueMutex.lock 158: @StanzaQueue.push(stanza) 159: @StanzaQueueMutex.unlock 160: @waitingThread.wakeup if @waitingThread 161: end 162: end
Sends XML data to the socket and (optionally) waits to process received data.
xml: | [String] The xml data to send |
proc: | [Proc = nil] The optional proc |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 243 243: def send(xml, proc=nil, &block) 244: Jabber::debuglog("SENDING:\n#{ xml.kind_of?(String) ? xml : xml.to_s }") 245: xml = xml.to_s if not xml.kind_of? String 246: block = proc if proc 247: @threadBlocks[Thread.current]=block if block 248: Thread.critical = true # we don't want to be interupted before we stop! 249: begin 250: @fd << xml 251: @fd.flush 252: rescue 253: if @exception_block 254: @exception_block.call($!, self, :sending) 255: else 256: puts "Exception caught while sending!" 257: raise 258: end 259: end 260: Thread.critical = false 261: # The parser thread might be running this (think of a callback running send()) 262: # If this is the case, we mustn't stop (or we would cause a deadlock) 263: Thread.stop if block and Thread.current != @parserThread 264: @pollCounter = 10 265: end
Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!
Be aware that if a stanza with type=’error’ is received the function does not yield but raises an ErrorException with the corresponding error element.
xml: | [XMLStanza] |
# File lib/xmpp4r/stream.rb, line 278 278: def send_with_id(xml, &block) 279: if xml.id.nil? 280: xml.id = Jabber::IdGenerator.instance.generate_id 281: end 282: 283: error = nil 284: send(xml) do |received| 285: if received.id == xml.id 286: if received.type == :error 287: error = received.error 288: true 289: else 290: yield(received) 291: end 292: else 293: false 294: end 295: end 296: 297: unless error.nil? 298: raise ErrorException.new(error) 299: end 300: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 55 55: def start(fd) 56: @fd = fd 57: @parser = StreamParser.new(@fd, self) 58: @parserThread = Thread.new do 59: begin 60: @parser.parse 61: rescue 62: if @exception_block 63: Thread.new { @exception_block.call($!, self, :start) } 64: else 65: puts "Exception caught in Parser thread!" 66: raise 67: end 68: end 69: end 70: # @pollThread = Thread.new do 71: # begin 72: # poll 73: # rescue 74: # puts "Exception caught in Poll thread, dumping backtrace and" + 75: # " exiting...\n" + $!.exception + "\n" 76: # puts $!.backtrace 77: # exit 78: # end 79: # end 80: @status = CONNECTED 81: end
Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.
time: | [Integer] time to wait in seconds. If nil, wait infinitely. |
all available)
# File lib/xmpp4r/stream.rb, line 207 207: def wait_and_process(time = nil) 208: if time == 0 209: return process(1) 210: end 211: @StanzaQueueMutex.lock 212: if @StanzaQueue.size > 0 213: e = @StanzaQueue.shift 214: @StanzaQueueMutex.unlock 215: process_one(e) 216: return 1 217: end 218: 219: @waitingThread = Thread.current 220: @wakeupThread = Thread.new { sleep time ; @waitingThread.wakeup if @waitingThread } 221: @waitingThread.stop 222: @wakeupThread.kill if @wakeupThread 223: @wakeupThread = nil 224: @waitingThread = nil 225: 226: @StanzaQueueMutex.lock 227: if @StanzaQueue.size > 0 228: e = @StanzaQueue.shift 229: @StanzaQueueMutex.unlock 230: process_one(e) 231: return 1 232: end 233: return 0 234: end
Process |element| until it is consumed. Returns element.consumed? element The element to process
# File lib/xmpp4r/stream.rb, line 167 167: def process_one(stanza) 168: Jabber::debuglog("PROCESSING:\n#{stanza.to_s}") 169: return true if @xmlcbs.process(stanza) 170: return true if @stanzacbs.process(stanza) 171: case stanza 172: when Message 173: return true if @messagecbs.process(stanza) 174: when Iq 175: return true if @iqcbs.process(stanza) 176: when Presence 177: return true if @presencecbs.process(stanza) 178: end 179: end