Class | Jabber::Stream |
In: |
lib/xmpp4r/stream.rb
|
Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.
To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.
DISCONNECTED | = | 1 |
CONNECTED | = | 2 |
fd | [R] | file descriptor used |
status | [R] | connection status |
Create a new stream (just initializes)
# File lib/xmpp4r/stream.rb, line 42 42: def initialize(threaded = true) 43: @fd = nil 44: @status = DISCONNECTED 45: @xmlcbs = CallbackList::new 46: @stanzacbs = CallbackList::new 47: @messagecbs = CallbackList::new 48: @iqcbs = CallbackList::new 49: @presencecbs = CallbackList::new 50: unless threaded 51: $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded" 52: threaded = true 53: end 54: @threaded = threaded 55: @stanzaqueue = [] 56: @stanzaqueue_lock = Mutex::new 57: @exception_block = nil 58: @threadblocks = [] 59: # @pollCounter = 10 60: @waiting_thread = nil 61: @wakeup_thread = nil 62: @streamid = nil 63: @features_lock = Mutex.new 64: end
Adds a callback block to process received Iqs
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 538 538: def add_iq_callback(priority = 0, ref = nil, &block) 539: @iqcbs.add(priority, ref, block) 540: end
Adds a callback block to process received Messages
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 484 484: def add_message_callback(priority = 0, ref = nil, &block) 485: @messagecbs.add(priority, ref, block) 486: end
Adds a callback block to process received Presences
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 520 520: def add_presence_callback(priority = 0, ref = nil, &block) 521: @presencecbs.add(priority, ref, block) 522: end
Adds a callback block to process received Stanzas
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 502 502: def add_stanza_callback(priority = 0, ref = nil, &block) 503: @stanzacbs.add(priority, ref, block) 504: end
Adds a callback block to process received XML messages
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 466 466: def add_xml_callback(priority = 0, ref = nil, &block) 467: @xmlcbs.add(priority, ref, block) 468: end
# File lib/xmpp4r/stream.rb, line 556 556: def close! 557: @parserThread.kill if @parserThread 558: # @pollThread.kill 559: @fd.close if @fd and !@fd.closed? 560: @status = DISCONNECTED 561: end
Delete a Stanza callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 510 510: def delete_stanza_callback(ref) 511: @stanzacbs.delete(ref) 512: end
Delete an XML-messages callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 474 474: def delete_xml_callback(ref) 475: @xmlcbs.delete(ref) 476: end
Returns if this connection is connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 158 158: def is_connected? 159: return @status == CONNECTED 160: end
Returns if this connection is NOT connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 166 166: def is_disconnected? 167: return @status == DISCONNECTED 168: end
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
# File lib/xmpp4r/stream.rb, line 116 116: def on_exception(&block) 117: @exception_block = block 118: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 122 122: def parse_failure(e) 123: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 124: 125: # A new thread has to be created because close will cause the thread 126: # to commit suicide(???) 127: if @exception_block 128: # New thread, because close will kill the current thread 129: Thread.new { 130: close 131: @exception_block.call(e, self, :parser) 132: } 133: else 134: puts "Stream#parse_failure was called by XML parser. Dumping " + 135: "backtrace...\n" + e.exception + "\n" 136: puts e.backtrace 137: close 138: raise 139: end 140: end
This method is called by the parser upon receiving </stream:stream>
# File lib/xmpp4r/stream.rb, line 144 144: def parser_end 145: if @exception_block 146: Thread.new { 147: close 148: @exception_block.call(nil, self, :close) 149: } 150: else 151: close 152: end 153: end
Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.
Currently not working!
# File lib/xmpp4r/stream.rb, line 444 444: def poll 445: sleep 10 446: while true 447: sleep 2 448: # @pollCounter = @pollCounter - 1 449: # if @pollCounter < 0 450: # begin 451: # send(" \t ") 452: # rescue 453: # Thread.new {@exception_block.call if @exception_block} 454: # break 455: # end 456: # end 457: end 458: end
Process |max| XML stanzas and call listeners for all of them.
max: | [Integer] the number of stanzas to process (nil means process |
all available)
# File lib/xmpp4r/stream.rb, line 289 289: def process(max = nil) 290: n = 0 291: @stanzaqueue_lock.lock 292: while @stanzaqueue.size > 0 and (max == nil or n < max) 293: e = @stanzaqueue.shift 294: @stanzaqueue_lock.unlock 295: process_one(e) 296: n += 1 297: @stanzaqueue_lock.lock 298: end 299: @stanzaqueue_lock.unlock 300: n 301: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.
element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 177 177: def receive(element) 178: if @threaded 179: # Don't spawn a new thread here. An implicit feature 180: # of XMPP is constant order of stanzas. 181: receive_nonthreaded(element) 182: else 183: receive_nonthreaded(element) 184: end 185: end
Sends XML data to the socket and (optionally) waits to process received data.
Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).
xml: | [String] The xml data to send |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 369 369: def send(xml, &block) 370: Jabber::debuglog("SENDING:\n#{xml}") 371: @threadblocks.unshift(ThreadBlock.new(block)) if block 372: Thread.critical = true # we don't want to be interupted before we stop! 373: begin 374: @fd << xml.to_s 375: @fd.flush 376: rescue Exception => e 377: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 378: 379: if @exception_block 380: Thread.new { close!; @exception_block.call(e, self, :sending) } 381: else 382: puts "Exception caught while sending!" 383: close! 384: raise 385: end 386: end 387: Thread.critical = false 388: # The parser thread might be running this (think of a callback running send()) 389: # If this is the case, we mustn't stop (or we would cause a deadlock) 390: Thread.stop if block and Thread.current != @parserThread 391: @pollCounter = 10 392: end
Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.
Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.
Please see Stream#send for some implementational details.
Please read the note about nesting at Stream#send
xml: | [XMLStanza] |
# File lib/xmpp4r/stream.rb, line 411 411: def send_with_id(xml, &block) 412: if xml.id.nil? 413: xml.id = Jabber::IdGenerator.instance.generate_id 414: end 415: 416: res = nil 417: error = nil 418: send(xml) do |received| 419: if received.kind_of? XMLStanza and received.id == xml.id 420: if received.type == :error 421: error = (received.error ? received.error : Error.new) 422: true 423: else 424: res = yield(received) 425: true 426: end 427: else 428: false 429: end 430: end 431: 432: unless error.nil? 433: raise ErrorException.new(error) 434: end 435: 436: res 437: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 68 68: def start(fd) 69: @stream_mechanisms = [] 70: @stream_features = {} 71: 72: @fd = fd 73: @parser = StreamParser.new(@fd, self) 74: @parserThread = Thread.new do 75: begin 76: @parser.parse 77: rescue Exception => e 78: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 79: 80: if @exception_block 81: Thread.new { close; @exception_block.call(e, self, :start) } 82: else 83: puts "Exception caught in Parser thread!" 84: close 85: raise 86: end 87: end 88: end 89: # @pollThread = Thread.new do 90: # begin 91: # poll 92: # rescue 93: # puts "Exception caught in Poll thread, dumping backtrace and" + 94: # " exiting...\n" + $!.exception + "\n" 95: # puts $!.backtrace 96: # exit 97: # end 98: # end 99: @status = CONNECTED 100: end
# File lib/xmpp4r/stream.rb, line 102 102: def stop 103: @parserThread.kill 104: @parser = nil 105: end
Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.
time: | [Integer] time to wait in seconds. If nil, wait infinitely. |
all available)
# File lib/xmpp4r/stream.rb, line 309 309: def wait_and_process(time = nil) 310: if time == 0 311: return process(1) 312: end 313: @stanzaqueue_lock.lock 314: if @stanzaqueue.size > 0 315: e = @stanzaqueue.shift 316: @stanzaqueue_lock.unlock 317: process_one(e) 318: return 1 319: end 320: 321: @waiting_thread = Thread.current 322: @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread } 323: @waiting_thread.stop 324: @wakeup_thread.kill if @wakeup_thread 325: @wakeup_thread = nil 326: @waiting_thread = nil 327: 328: @stanzaqueue_lock.lock 329: if @stanzaqueue.size > 0 330: e = @stanzaqueue.shift 331: @stanzaqueue_lock.unlock 332: process_one(e) 333: return 1 334: end 335: return 0 336: end
Process |element| until it is consumed. Returns element.consumed? element The element to process
# File lib/xmpp4r/stream.rb, line 269 269: def process_one(stanza) 270: Jabber::debuglog("PROCESSING:\n#{stanza.to_s}") 271: return true if @xmlcbs.process(stanza) 272: return true if @stanzacbs.process(stanza) 273: case stanza 274: when Message 275: return true if @messagecbs.process(stanza) 276: when Iq 277: return true if @iqcbs.process(stanza) 278: when Presence 279: return true if @presencecbs.process(stanza) 280: end 281: end
# File lib/xmpp4r/stream.rb, line 187 187: def receive_nonthreaded(element) 188: Jabber::debuglog("RECEIVED:\n#{element.to_s}") 189: case element.prefix 190: when 'stream' 191: case element.name 192: when 'stream' 193: stanza = element 194: @streamid = element.attributes['id'] 195: unless element.attributes['version'] # isn't XMPP compliant, so 196: Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features") 197: @features_lock.unlock # don't wait for <stream:features/> 198: end 199: when 'features' 200: stanza = element 201: element.each { |e| 202: if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl' 203: e.each_element('mechanism') { |mech| 204: @stream_mechanisms.push(mech.text) 205: } 206: else 207: @stream_features[e.name] = e.namespace 208: end 209: } 210: Jabber::debuglog("FEATURES: received") 211: @features_lock.unlock 212: else 213: stanza = element 214: end 215: else 216: case element.name 217: when 'message' 218: stanza = Message::import(element) 219: when 'iq' 220: stanza = Iq::import(element) 221: when 'presence' 222: stanza = Presence::import(element) 223: else 224: stanza = element 225: end 226: end 227: 228: # Iterate through blocked threads (= waiting for an answer) 229: # 230: # We're dup'ping the @threadblocks here, so that we won't end up in an 231: # endless loop if Stream#send is being nested. That means, the nested 232: # threadblock won't receive the stanza currently processed, but the next 233: # one. 234: threadblocks = @threadblocks.dup 235: threadblocks.each { |threadblock| 236: exception = nil 237: r = false 238: begin 239: r = threadblock.call(stanza) 240: rescue Exception => e 241: exception = e 242: end 243: 244: if r == true 245: @threadblocks.delete(threadblock) 246: threadblock.wakeup 247: return 248: elsif exception 249: @threadblocks.delete(threadblock) 250: threadblock.raise(exception) 251: end 252: } 253: 254: if @threaded 255: process_one(stanza) 256: else 257: # stanzaqueue will be read when the user call process 258: @stanzaqueue_lock.lock 259: @stanzaqueue.push(stanza) 260: @stanzaqueue_lock.unlock 261: @waiting_thread.wakeup if @waiting_thread 262: end 263: end