Class Jabber::Stream
In: lib/xmpp4r/stream.rb
Parent: Object

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.

To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.

Methods

Classes and Modules

Class Jabber::Stream::ThreadBlock

Constants

DISCONNECTED = 1
CONNECTED = 2

Attributes

fd  [R]  file descriptor used
status  [R]  connection status

Public Class methods

Create a new stream (just initializes)

[Source]

    # File lib/xmpp4r/stream.rb, line 42
42:     def initialize(threaded = true)
43:       @fd = nil
44:       @status = DISCONNECTED
45:       @xmlcbs = CallbackList::new
46:       @stanzacbs = CallbackList::new
47:       @messagecbs = CallbackList::new
48:       @iqcbs = CallbackList::new
49:       @presencecbs = CallbackList::new
50:       unless threaded
51:         $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded"
52:         threaded = true
53:       end
54:       @threaded = threaded
55:       @stanzaqueue = []
56:       @stanzaqueue_lock = Mutex::new
57:       @exception_block = nil
58:       @threadblocks = []
59: #      @pollCounter = 10
60:       @waiting_thread = nil
61:       @wakeup_thread = nil
62:       @streamid = nil
63:       @features_lock = Mutex.new
64:     end

Public Instance methods

Adds a callback block to process received Iqs

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 538
538:     def add_iq_callback(priority = 0, ref = nil, &block)
539:       @iqcbs.add(priority, ref, block)
540:     end

Adds a callback block to process received Messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 484
484:     def add_message_callback(priority = 0, ref = nil, &block)
485:       @messagecbs.add(priority, ref, block)
486:     end

Adds a callback block to process received Presences

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 520
520:     def add_presence_callback(priority = 0, ref = nil, &block)
521:       @presencecbs.add(priority, ref, block)
522:     end

Adds a callback block to process received Stanzas

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 502
502:     def add_stanza_callback(priority = 0, ref = nil, &block)
503:       @stanzacbs.add(priority, ref, block)
504:     end

Adds a callback block to process received XML messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 466
466:     def add_xml_callback(priority = 0, ref = nil, &block)
467:       @xmlcbs.add(priority, ref, block)
468:     end

Closes the connection to the Jabber service

[Source]

     # File lib/xmpp4r/stream.rb, line 552
552:     def close
553:       close!
554:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 556
556:     def close!
557:       @parserThread.kill if @parserThread
558: #      @pollThread.kill
559:       @fd.close if @fd and !@fd.closed?
560:       @status = DISCONNECTED
561:     end

Delete an Iq callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 547
547:     def delete_iq_callback(ref)
548:       @iqcbs.delete(ref)
549:     end

Delete an Message callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 492
492:     def delete_message_callback(ref)
493:       @messagecbs.delete(ref)
494:     end

Delete a Presence callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 528
528:     def delete_presence_callback(ref)
529:       @presencecbs.delete(ref)
530:     end

Delete a Stanza callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 510
510:     def delete_stanza_callback(ref)
511:       @stanzacbs.delete(ref)
512:     end

Delete an XML-messages callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 474
474:     def delete_xml_callback(ref)
475:       @xmlcbs.delete(ref)
476:     end

Returns if this connection is connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 158
158:     def is_connected?
159:       return @status == CONNECTED
160:     end

Returns if this connection is NOT connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 166
166:     def is_disconnected?
167:       return @status == DISCONNECTED
168:     end

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception
  • the Jabber::Stream object (self)
  • a symbol where it happened, namely :start, :parser, :sending and :end

[Source]

     # File lib/xmpp4r/stream.rb, line 116
116:     def on_exception(&block)
117:       @exception_block = block
118:     end

This method is called by the parser when a failure occurs

[Source]

     # File lib/xmpp4r/stream.rb, line 122
122:     def parse_failure(e)
123:       Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
124: 
125:       # A new thread has to be created because close will cause the thread
126:       # to commit suicide(???)
127:       if @exception_block
128:         # New thread, because close will kill the current thread
129:         Thread.new {
130:           close
131:           @exception_block.call(e, self, :parser)
132:         }
133:       else
134:         puts "Stream#parse_failure was called by XML parser. Dumping " +
135:         "backtrace...\n" + e.exception + "\n"
136:         puts e.backtrace
137:         close
138:         raise
139:       end
140:     end

This method is called by the parser upon receiving </stream:stream>

[Source]

     # File lib/xmpp4r/stream.rb, line 144
144:     def parser_end
145:       if @exception_block
146:         Thread.new {
147:           close
148:           @exception_block.call(nil, self, :close)
149:         }
150:       else
151:         close
152:       end
153:     end

Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.

Currently not working!

[Source]

     # File lib/xmpp4r/stream.rb, line 444
444:     def poll
445:       sleep 10
446:       while true
447:         sleep 2
448: #        @pollCounter = @pollCounter - 1
449: #        if @pollCounter < 0
450: #          begin
451: #            send("  \t  ")
452: #          rescue
453: #            Thread.new {@exception_block.call if @exception_block}
454: #            break
455: #          end
456: #        end
457:       end
458:     end

Process |max| XML stanzas and call listeners for all of them.

max:[Integer] the number of stanzas to process (nil means process

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 289
289:     def process(max = nil)
290:       n = 0
291:       @stanzaqueue_lock.lock
292:       while @stanzaqueue.size > 0 and (max == nil or n < max)
293:         e = @stanzaqueue.shift
294:         @stanzaqueue_lock.unlock
295:         process_one(e)
296:         n += 1
297:         @stanzaqueue_lock.lock
298:       end
299:       @stanzaqueue_lock.unlock
300:       n
301:     end

Processes a received REXML::Element and executes registered thread blocks and filters against it.

If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.

element:[REXML::Element] The received element

[Source]

     # File lib/xmpp4r/stream.rb, line 177
177:     def receive(element)
178:       if @threaded
179:         # Don't spawn a new thread here. An implicit feature
180:         # of XMPP is constant order of stanzas.
181:         receive_nonthreaded(element)
182:       else
183:         receive_nonthreaded(element)
184:       end
185:     end

Sends XML data to the socket and (optionally) waits to process received data.

Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).

xml:[String] The xml data to send
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 369
369:     def send(xml, &block)
370:       Jabber::debuglog("SENDING:\n#{xml}")
371:       @threadblocks.unshift(ThreadBlock.new(block)) if block
372:       Thread.critical = true # we don't want to be interupted before we stop!
373:       begin
374:         @fd << xml.to_s
375:         @fd.flush
376:       rescue Exception => e
377:         Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
378: 
379:         if @exception_block 
380:           Thread.new { close!; @exception_block.call(e, self, :sending) }
381:         else
382:           puts "Exception caught while sending!"
383:           close!
384:           raise
385:         end
386:       end
387:       Thread.critical = false
388:       # The parser thread might be running this (think of a callback running send())
389:       # If this is the case, we mustn't stop (or we would cause a deadlock)
390:       Thread.stop if block and Thread.current != @parserThread
391:       @pollCounter = 10
392:     end

Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.

Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.

Please see Stream#send for some implementational details.

Please read the note about nesting at Stream#send

xml:[XMLStanza]

[Source]

     # File lib/xmpp4r/stream.rb, line 411
411:     def send_with_id(xml, &block)
412:       if xml.id.nil?
413:         xml.id = Jabber::IdGenerator.instance.generate_id
414:       end
415: 
416:       res = nil
417:       error = nil
418:       send(xml) do |received|
419:         if received.kind_of? XMLStanza and received.id == xml.id
420:           if received.type == :error
421:             error = (received.error ? received.error : Error.new)
422:             true
423:           else
424:             res = yield(received)
425:             true
426:           end
427:         else
428:           false
429:         end
430:       end
431: 
432:       unless error.nil?
433:         raise ErrorException.new(error)
434:       end
435: 
436:       res
437:     end

Start the XML parser on the fd

[Source]

     # File lib/xmpp4r/stream.rb, line 68
 68:     def start(fd)
 69:       @stream_mechanisms = []
 70:       @stream_features = {}
 71: 
 72:       @fd = fd
 73:       @parser = StreamParser.new(@fd, self)
 74:       @parserThread = Thread.new do
 75:         begin
 76:           @parser.parse
 77:         rescue Exception => e
 78:           Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
 79: 
 80:           if @exception_block
 81:             Thread.new { close; @exception_block.call(e, self, :start) }
 82:           else
 83:             puts "Exception caught in Parser thread!"
 84:             close
 85:             raise
 86:           end
 87:         end
 88:       end
 89: #      @pollThread = Thread.new do
 90: #        begin
 91: #        poll
 92: #        rescue
 93: #          puts "Exception caught in Poll thread, dumping backtrace and" +
 94: #            " exiting...\n" + $!.exception + "\n"
 95: #          puts $!.backtrace
 96: #          exit
 97: #        end
 98: #      end
 99:       @status = CONNECTED
100:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 102
102:     def stop
103:       @parserThread.kill
104:       @parser = nil
105:     end

Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.

time:[Integer] time to wait in seconds. If nil, wait infinitely.

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 309
309:     def wait_and_process(time = nil)
310:       if time == 0 
311:         return process(1)
312:       end
313:       @stanzaqueue_lock.lock
314:       if @stanzaqueue.size > 0
315:         e = @stanzaqueue.shift
316:         @stanzaqueue_lock.unlock
317:         process_one(e)
318:         return 1
319:       end
320: 
321:       @waiting_thread = Thread.current
322:       @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread }
323:       @waiting_thread.stop
324:       @wakeup_thread.kill if @wakeup_thread
325:       @wakeup_thread = nil
326:       @waiting_thread = nil
327: 
328:       @stanzaqueue_lock.lock
329:       if @stanzaqueue.size > 0
330:         e = @stanzaqueue.shift
331:         @stanzaqueue_lock.unlock
332:         process_one(e)
333:         return 1
334:       end
335:       return 0
336:     end

Private Instance methods

Process |element| until it is consumed. Returns element.consumed? element The element to process

[Source]

     # File lib/xmpp4r/stream.rb, line 269
269:     def process_one(stanza)
270:       Jabber::debuglog("PROCESSING:\n#{stanza.to_s}")
271:       return true if @xmlcbs.process(stanza)
272:       return true if @stanzacbs.process(stanza)
273:       case stanza
274:       when Message
275:         return true if @messagecbs.process(stanza)
276:       when Iq
277:         return true if @iqcbs.process(stanza)
278:       when Presence
279:         return true if @presencecbs.process(stanza)
280:       end
281:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 187
187:     def receive_nonthreaded(element)
188:       Jabber::debuglog("RECEIVED:\n#{element.to_s}")
189:       case element.prefix
190:       when 'stream'
191:         case element.name
192:           when 'stream'
193:             stanza = element
194:             @streamid = element.attributes['id']
195:             unless element.attributes['version']  # isn't XMPP compliant, so
196:               Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
197:               @features_lock.unlock               # don't wait for <stream:features/>
198:             end
199:           when 'features'
200:             stanza = element
201:             element.each { |e|
202:               if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
203:                 e.each_element('mechanism') { |mech|
204:                   @stream_mechanisms.push(mech.text)
205:                 }
206:               else
207:                 @stream_features[e.name] = e.namespace
208:               end
209:             }
210:             Jabber::debuglog("FEATURES: received")
211:             @features_lock.unlock
212:           else
213:             stanza = element
214:         end
215:       else
216:         case element.name
217:           when 'message'
218:             stanza = Message::import(element)
219:           when 'iq'
220:             stanza = Iq::import(element)
221:           when 'presence'
222:             stanza = Presence::import(element)
223:           else
224:             stanza = element
225:         end
226:       end
227: 
228:       # Iterate through blocked threads (= waiting for an answer)
229:       #
230:       # We're dup'ping the @threadblocks here, so that we won't end up in an
231:       # endless loop if Stream#send is being nested. That means, the nested
232:       # threadblock won't receive the stanza currently processed, but the next
233:       # one.
234:       threadblocks = @threadblocks.dup
235:       threadblocks.each { |threadblock|
236:         exception = nil
237:         r = false
238:         begin
239:           r = threadblock.call(stanza)
240:         rescue Exception => e
241:           exception = e
242:         end
243: 
244:         if r == true
245:           @threadblocks.delete(threadblock)
246:           threadblock.wakeup
247:           return
248:         elsif exception
249:           @threadblocks.delete(threadblock)
250:           threadblock.raise(exception)
251:         end
252:       }
253: 
254:       if @threaded
255:         process_one(stanza)
256:       else
257:         # stanzaqueue will be read when the user call process
258:         @stanzaqueue_lock.lock
259:         @stanzaqueue.push(stanza)
260:         @stanzaqueue_lock.unlock
261:         @waiting_thread.wakeup if @waiting_thread
262:       end
263:     end

[Validate]