View Javadoc

1   // ========================================================================
2   // Copyright 2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd.ext;
16  
17  import java.util.HashMap;
18  import java.util.Map;
19  import java.util.Timer;
20  import java.util.TimerTask;
21  import java.util.concurrent.atomic.AtomicLong;
22  
23  import org.cometd.Bayeux;
24  import org.cometd.Client;
25  import org.cometd.Extension;
26  import org.cometd.Message;
27  import org.mortbay.cometd.ClientImpl;
28  import org.mortbay.log.Log;
29  
30  
31  
32  
33  
34  /**
35   * StatisticsExtension
36   *
37   * This extension helps gather statistics about round-trip message times. It works by intercepting
38   * a message sent from a client to request a statistics measurement be started, and then propagates
39   * messages to each client, measuring how long it takes for the clients to receive and respond to
40   * the message. A summary of the statistics collected is then sent back to the originating client.
41   *
42   * The server recognizes a request to start a statistics measurement looking in the received message
43   * for a key with a particular value (_statsRequestSentinel). As clients can generate messages with
44   * arbitrarily deep structures, you can specify an array of keys (_statsRequestKeys), where each
45   * element in the array is one level deeper down a nested structure. NOTE that if your messages are
46   * not just Map<String,Object> you will need to subclass this extension to do the message examination.
47   * Optionally, the extension can only look for these messages on a specific channel (_statsRequestChannel).
48   * The message can also specify the time to wait for all clients to respond to the probe message. The default is 10seconds.
49   *
50   * For example, if the _statsReqestKeys =  {"chat"} and the sentinel is "//stats" (these is the default)
51   * a client would publish a message like so:
52   *
53   * <pre>chat: //stats,20</pre>
54   *
55   * The server will deliver a message back to the sending client:
56   *
57   * <pre>chat: //stats-confirm Statistic id=99 started. Results due in 20s.</pre>
58   *
59   * The inbound message is examined to see if it has the structure of a potential stats message.
60   * If a stats request is recognized, a new stats collection started, and the id of the stats
61   * collection and  timestamp on the server is appended. Then, the server passes the message back to
62   * Bayeux so that it can be processed, and published out.
63   *
64   * At this point, the extension's send() method is called. If a stats request with an id and
65   * timestamp is recognized, the outbound message is modifed to be a probe message (_probeSentinel)
66   * and the sending time appended to the message.
67   *
68   * A client receives the probe message, and responds by publishing a reply (_probeReplySentinel).
69   *
70   * Continuing the example above, assuming we set _probeSentinel = "stats-probe",
71   * then each client will receive a message like:
72   * <pre>chat: //stats-probe,20::--99::--1243945849978::--12345667464</pre>
73   *
74   * Where ::-- are field delimiters, 99 is the id of the statistic collection underway and 1243945849978
75   * is the time on the server the stats request was received, and 12345667464 is the server time at
76   * which the stats probe was published to all clients.
77   *
78   * The client responds with the same message, but prefixed by "//stats-reply":
79   * <pre>chat: //stats-reply::--99::--1243945849978::--12345667464</pre>
80   *
81   * The server receives the replies from each client, and updates the min,max and average
82   * statistics for:
83   *
84   * <ul>
85   * <li>RoundTrip: time since the stats request was received on the server to stats-reply
86   * <li>Application: time spent in the application between time stats request received to probe being published
87   * <li>Infrastructure: time since probe published to reply received
88   * <li>Cometd: portion of Infrastructure time that was due to cometd processing (estimate based on TimesyncExtension network lag calculation)
89   * </ul>
90   *
91   * In other words, if T0 is the time at which the original stats request is received on the server,
92   * and T1 is the time at which the stats probe is sent, and T2 is the time the probe reply is
93   * received:
94   * <ul>
95   * <li>RoundTrip = T2-T0
96   * <li>Application = T1-T0
97   * <li>Infrastructure = T2-T1
98   * <li>Cometd = Infrastructure - Lag
99   * </ul>
100  *
101  * After waiting the prescribed interval, eg 20 secs,  to collect as many of the clients'
102  * messages as possible, it sends a message to the client who originated the statistics request
103  * with the results, eg:
104  *
105  * <pre>chat://stats-results Id 2: samples=1, avgR=43, minR=43, maxR=43, avgI=42, minI=42, maxI=42, avgA=1, minA=1, maxA=1, avgC=42, minC=42, maxC=42</pre>
106  *
107  * Note that there can be many stats collections in progress at once.
108  */
109 public class StatisticsExtension implements Extension
110 {
111     protected static AtomicLong COUNTER = new AtomicLong();
112 
113     /**
114      * Channel on which to examine inbound messages for a stats request. Can be null, in which case
115      * all channels are examined.
116      */
117     protected String _statsRequestChannel = null;
118 
119     /**
120      * Structure of keys to look for on inbound message that is a request to start a stats run.
121      */
122     protected String[] _statsRequestKeys = new String[]{"chat"};
123 
124     /**
125      * Sentinel received on inbound that indicates request to start stats run.
126      */
127     protected String _statsRequestSentinel = "//stats";
128 
129 
130     /**
131      * Sentinel sent to user to indicate confirmation of start of a stats run.
132      */
133     protected String _statsConfirmSentinel = "//stats-confirm";
134 
135 
136     /**
137      * Sentinel prefix sent do user to indicate the results of the stats run.
138      */
139     protected String _statsResultSentinel = "//stats-results";
140 
141     /**
142      * Channel on which outbound stats probes are sent to clients.
143      */
144     protected String _probeChannel = null;
145 
146     /**
147      * Sentinel prefix that is put on an outbound stats probe message to indicate a probe to all clients.
148      */
149     protected String _probeSentinel = "//stats-probe";
150 
151     /**
152      * Structure of keys to look for on outbound messages that indicate a probe being sent.
153      */
154     protected String[] _probeKeys = null;
155 
156 
157 
158     /**
159      * Channel on which to expect stats probe replies from clients. Can be null, in which case,
160      * all channels are examined.
161      */
162     protected String _probeReplyChannel = null;
163 
164     /**
165      * Structure of keys to look for on inbound message with probe replies from clients.
166      */
167     protected String[] _probeReplyKeys = null;
168 
169     /**
170      * Sentinel prefix on inbound message with probe results from a client.
171      */
172     protected String _probeReplySentinel = "//stats-reply";
173 
174 
175 
176     protected String _delim = "::--";
177     protected Map<String,Statistic> _statistics = new HashMap<String, Statistic>();
178     protected Timer _timer;
179     protected long _timeout = 10000; //wait 10 sec for all samples for a statistic to complete
180 
181 
182     /**
183      * Statistic
184      *
185      * One measurement of min,max and average roundtrip time of a cometd message to all clients.
186      */
187     public class Statistic extends TimerTask
188     {
189         public long _id;
190         public Client _client;
191         public Message _probeMessage;
192         public long _timeout = 0;
193         public long _samples = 0;
194 
195         public long _minRoundTrip = Long.MAX_VALUE;
196         public long _maxRoundTrip = 0;
197         public long _avgRoundTrip = 0;
198         public long _totalRoundTrip;
199 
200         public long _minApp = Long.MAX_VALUE;
201         public long _maxApp = 0;
202         public long _avgApp = 0;
203         public long _totalApp = 0;
204 
205         public long _minInfra = Long.MAX_VALUE;
206         public long _maxInfra = 0;
207         public long _avgInfra = 0;
208         public long _totalInfra = 0;
209 
210         public long _minCometd = Long.MAX_VALUE;
211         public long _maxCometd = 0;
212         public long _avgCometd = 0;
213         public long _totalCometd = 0;
214 
215         public Statistic (Client client,  long timeout)
216         {
217             _id = COUNTER.incrementAndGet();
218             _client = client;
219             _timeout = timeout;
220             _statistics.put(String.valueOf(_id), this); //add it to the statistics
221             _timer.schedule(this, timeout);
222         }
223 
224 
225         public void setProbe (Message probe)
226         {
227             _probeMessage = (Message)probe.clone();
228         }
229 
230         public void sample (long reqTime, long sentTime, long rcvTime, long lag)
231         {
232             // Synchronize sampling with TimerTask execution
233             synchronized (this)
234             {
235                 _samples++;
236 
237                 long roundTrip = rcvTime - reqTime;
238                 if (roundTrip < _minRoundTrip)
239                     _minRoundTrip = roundTrip;
240                 if (roundTrip > _maxRoundTrip)
241                     _maxRoundTrip = roundTrip;
242                 _totalRoundTrip+=roundTrip;
243                 _avgRoundTrip = _totalRoundTrip/_samples;
244 
245                 long appTime = sentTime - reqTime;
246                 if (appTime < _minApp)
247                     _minApp = appTime;
248                 if (appTime > _maxApp)
249                     _maxApp = appTime;
250                 _totalApp += appTime;
251                 _avgApp = _totalApp/_samples;
252 
253                 long infraTime = rcvTime - sentTime;
254                 if (infraTime < _minInfra)
255                     _minInfra = infraTime;
256                 if (infraTime > _maxInfra)
257                     _maxInfra = infraTime;
258                 _totalInfra += infraTime;
259                 _avgInfra = _totalInfra/_samples;
260 
261                 long cometdTime = infraTime - (2*lag);
262                 if (cometdTime < _minCometd)
263                     _minCometd = cometdTime;
264                 if (cometdTime > _maxCometd)
265                     _maxCometd = cometdTime;
266                 _totalCometd += cometdTime;
267                 _avgCometd = _totalCometd/_samples;
268             }
269         }
270 
271         public String toString()
272         {
273             return "Id "+_id+": samples="+_samples+
274             ", avgR="+_avgRoundTrip+", minR="+_minRoundTrip+", maxR="+_maxRoundTrip+
275             ", avgI="+_avgInfra+", minI="+_minInfra+", maxI="+_maxInfra+
276             ", avgA="+_avgApp+", minA="+_minApp+", maxA="+_maxApp+
277             ", avgC="+_avgCometd+", minC="+_minCometd+", maxC="+_maxCometd;
278         }
279 
280         /**
281          * Timer expiry: send off the samples we have gathered for this instance
282          * @see java.util.TimerTask#run()
283          */
284         public void run()
285         {
286             // No exception must escape, otherwise the Timer dies
287             try
288             {
289                 // Synchronize sampling with TimerTask execution
290                 synchronized (this)
291                 {
292                     //remove myself from the list
293                     _statistics.remove(String.valueOf(_id));
294 
295                     //send the originator of the sample request the result
296                     notifyEnd(_client, this);
297                 }
298             }
299             catch (Exception x)
300             {
301                 Log.warn("Unexpected exception", x);
302             }
303         }
304 
305 
306         /**
307          * Send a message to the originator of the statistics request to advise
308          * that it is underway.
309          *
310          * The message will be sent on the same channel as the probe message.
311          */
312         public void notifyStart ()
313         {
314             HashMap<String,Object> msg = new HashMap<String,Object>();
315             msg.putAll((Map<String,Object>)_probeMessage.get(Bayeux.DATA_FIELD));
316 
317             Map<String,Object> map = matchKeys(msg,  _statsRequestKeys);
318             map.put(_statsRequestKeys[_statsRequestKeys.length-1], _statsConfirmSentinel+getStartText());
319             _client.deliver(_client, _probeMessage.getChannel(), msg, null); //tell client statistic has started
320         }
321 
322         public String getStartText ()
323         {
324            return " Statistic id="+_id+" started. Results due in "+(_timeout/1000)+"s.";
325         }
326 
327 
328         /**
329          * Send a message to the originator of the statistics request to advise of the
330          * results.
331          *
332          * The message will include all data from the original message from the originator,
333          * just with the _markerKey field modified.
334          */
335         public void notifyEnd (Client client, Statistic stat)
336         {
337            HashMap<String,Object> msg = new HashMap<String,Object>();
338            msg.putAll((Map<String,Object>)_probeMessage.get(Bayeux.DATA_FIELD));
339 
340             Map<String,Object> map = matchKeys(msg,  _statsRequestKeys);
341             msg.put(_statsRequestKeys[_statsRequestKeys.length-1], _statsResultSentinel+" "+getEndText());
342             _client.deliver(_client, _probeMessage.getChannel(), msg, null);
343         }
344 
345         public String getEndText()
346         {
347             return this.toString();
348         }
349     }
350 
351 
352     public StatisticsExtension ()
353     {
354         _timer = new Timer(true);
355     }
356 
357     public void setStatsRequestKeys (String[] keys)
358     {
359         _statsRequestKeys = keys;
360     }
361 
362     public String[] getStatsRequestKeys ()
363     {
364         return _statsRequestKeys;
365     }
366 
367     public void setStatsRequestSentinel (String val)
368     {
369         _statsRequestSentinel = val;
370     }
371 
372     public String getStatsRequestSentinel ()
373     {
374         return _statsRequestSentinel;
375     }
376 
377     public void setStatsRequestChannel(String channel)
378     {
379         _statsRequestChannel = channel;
380     }
381 
382     public void setStatsConfirmSentinel (String val)
383     {
384         _statsConfirmSentinel = val;
385     }
386 
387 
388     public String getStatsConfirmSentinel ()
389     {
390         return _statsConfirmSentinel;
391     }
392 
393 
394     public void setStatsResultSentinel (String val)
395     {
396         _statsResultSentinel = val;
397     }
398 
399     public String getStatsResultSentinel ()
400     {
401         return _statsResultSentinel;
402     }
403 
404 
405     public void setProbeChannel(String channel)
406     {
407         _probeChannel = channel;
408     }
409 
410 
411     public void setProbeSentinel (String val)
412     {
413         _probeSentinel = val;
414     }
415 
416     public String getProbeSentinel ()
417     {
418         return _probeSentinel;
419     }
420 
421 
422     public void setProbeKeys (String[] keys)
423     {
424         _probeKeys = keys;
425     }
426 
427     public String[] getProbeKeys ()
428     {
429         return _probeKeys;
430     }
431 
432     public void setProbeReplyKeys (String[] keys)
433     {
434         _probeReplyKeys = keys;
435     }
436 
437     public String[] getProbeReplyKeys ()
438     {
439         return _probeReplyKeys;
440     }
441 
442     public void setProbeReplySentinel (String val)
443     {
444         _probeReplySentinel = val;
445     }
446 
447     public String getProbeReplySentinel ()
448     {
449         return _probeReplySentinel;
450     }
451 
452     public void setProbeReplyChannel(String channel)
453     {
454         _probeReplyChannel = channel;
455     }
456 
457 
458     public void setTimeout (long timeout)
459     {
460         _timeout = timeout;
461     }
462 
463     public long getTimeout()
464     {
465         return _timeout;
466     }
467 
468     public void setDelim (String delim)
469     {
470         _delim = delim;
471     }
472 
473     public String getDelim ()
474     {
475         return _delim;
476     }
477 
478 
479     /**
480      * Called whenever a message is received from a client.
481      *
482      * A client will initiate a stats collection run by sending a message containing the
483      * _statsRequestKeys structure with a _statsRequestSentinel as the value of the final key in the structure.
484      * This message is then passed on to the application, who should pass it back unmodified so that the
485      * send () method on this Extension can be called, where we can take timestamps and start measuring the stats.
486      *
487      * Filter the inbound messages to determine if we have received a reply to a stats probe.
488      * @see org.cometd.Extension#rcv(org.cometd.Client, org.cometd.Message)
489      */
490     public Message rcv(Client from, Message message)
491     {
492         //Check to see if we have received a reply to a statistics probe from a client
493         Map<String,Object> map = matchMessage(message, _probeReplyChannel, _probeReplyKeys);
494         if (map != null)
495         {
496             String marker = matchSentinel(map, _probeReplyKeys[_probeReplyKeys.length-1], _probeReplySentinel);
497             if (marker != null)
498             {
499                 //This is a round trip reply from a client, so update the stats with this result
500                 updateStatistic(from,message,marker);
501                 //But don't propagate it to the application
502                 return null;
503             }
504         }
505 
506         //Check to see if we have received a request to start gathering statistics
507         map = matchMessage(message, _statsRequestChannel, _statsRequestKeys);
508         if (map != null)
509         {
510             String match = matchSentinel(map, _statsRequestKeys[_statsRequestKeys.length-1], _statsRequestSentinel);
511             if (match != null && !match.startsWith(_probeReplySentinel))
512             {
513                 //This is a request to start a stats collection
514                 Statistic stat = createStatistic(from,match);
515 
516                 //Modify the message to include the id of the stat and the time at which the server received the stats start request
517                 match = match+_delim+stat._id+_delim+System.currentTimeMillis();
518                 map.put(_statsRequestKeys[_statsRequestKeys.length-1], match);
519                 onStatisticsRequest(message);
520             }
521         }
522 
523         return message;
524     }
525 
526     /**
527      * Override to be able to modify the message that has been identified as a statistics request
528      * @param message the bayeux message identified as statistics request
529      */
530     protected void onStatisticsRequest(Message message)
531     {
532     }
533 
534     public Message rcvMeta(Client from, Message message)
535     {
536        return message;
537     }
538 
539     /**
540      * Called before an outbound message is sent. Note this is called once per message only, not once per client.
541      *
542      * A request to start statistics collection should be passed on unmodified from the application.
543      * We will intercept this on the way to clients and start a stats run. We modify the outbound message to
544      * be a stats-probe message.
545      *
546      * If your application is likey to modify the message structure significantly from the _statsRequestKeys
547      * structure, then configure the _probeKeys so we know how to filter the outbound message.
548      *
549      * @see org.cometd.Extension#send(org.cometd.Client, org.cometd.Message)
550      */
551     public Message send(Client from, Message message)
552     {
553         //Check the outgoing message to see if it is an outgoing probe. If it is, we start the stats collection.
554         //An outgoing probe should look exactly the same as the triggering inbound _statsRequestKeys, but if the
555         //application changes the message structure, use the _probeKeys to describe the structure.
556         String[] keys = (_probeKeys==null?_statsRequestKeys:_probeKeys);
557         Map<String,Object> map = matchMessage(message, _probeChannel, keys);
558         if (map != null)
559         {
560             //Potential outbound probe message, which will be the stats request message. We need to tweak it to make it into a probe
561             String match = matchSentinel(map, keys[keys.length-1], _statsRequestSentinel);
562 
563             //Ignore any outbound messages that are confirmations or results of stats runs
564             if (match != null && !match.startsWith(_statsConfirmSentinel) && !match.startsWith(_statsResultSentinel))
565             {
566                 //Tell the stats requestor that the stats run has started
567                 String[] parts = match.split(_delim);
568                 if (parts != null && parts.length > 1)
569                 {
570                     Statistic stat = _statistics.get(parts[1].trim());
571                     if (stat != null)
572                     {
573                         stat.setProbe(message);
574                         stat.notifyStart();
575 
576                         //Change the sentinel on the outbound message to be the stats-probe and
577                         //put the current server timestamp that we were ready to send the messages
578                         //so we can retrieve it on the replies
579                         match = match.substring(_statsRequestSentinel.length());
580                         map.put(keys[keys.length-1], _probeSentinel+match+_delim+System.currentTimeMillis());
581                     }
582                 }
583             }
584         }
585 
586         //pass on the message
587         return message;
588     }
589 
590     public Message sendMeta(Client from, Message message)
591     {
592         return message;
593     }
594 
595 
596     public Map<String,Object> matchMessage (Message message, String channel, String[] keys)
597     {
598         //No special key structure in message to look for, ignore it
599         if (keys == null || keys.length == 0)
600             return null;
601 
602         //is there a special channel to look for?
603         if (channel != null && !message.getChannel().equals(channel))
604             return null; //not the channel we're monitoring
605 
606         //Look for the key structure. The last key will be the leaf that holds the text value to check, but
607         //we want to get a reference to the map that holds that key:value pair
608         Object node = message.get(Bayeux.DATA_FIELD);
609         if (node == null)
610             return null;
611 
612         if (!Map.class.isAssignableFrom(node.getClass()))
613             return null;
614 
615         return matchKeys((Map<String,Object>)node, keys);
616     }
617 
618     public Map<String,Object> matchKeys(Map<String,Object> map, String[] keys)
619     {
620 
621         Object node = map;
622 
623         for (int i=0; i<keys.length-1; i++)
624         {
625             if (node == null)
626                 break;
627 
628             if (!Map.class.isAssignableFrom(node.getClass()))
629             {
630                 node = null;
631                 break;
632             }
633             node = ((Map<String,Object>)node).get(keys[i]);
634         }
635 
636         if (node == null)
637             return null;
638 
639         //Node we're looking at now should contain the final key:value pair
640         if (!Map.class.isAssignableFrom(node.getClass()))
641             return null;
642 
643         return (Map<String, Object>)node;
644     }
645 
646     public String matchSentinel (Map<String,Object> map, String key, String sentinel)
647     {
648 
649         if (map == null || key == null || sentinel == null)
650             return null;
651 
652         Object value = map.get(key);
653 
654         if (value == null)
655             return null;
656 
657         if (!String.class.isAssignableFrom(value.getClass()))
658             return null;
659 
660         String text = (String)value;
661 
662         if (!text.startsWith(sentinel))
663             return null;
664 
665         return text;
666     }
667 
668     /**
669      * Start a statistics collection run.
670      *
671      * @param from
672      * @param message
673      * @param marker
674      */
675     protected Statistic createStatistic(Client from,  String marker)
676     {
677         long timeout = _timeout;
678         String tmp = (marker==null?"":marker);
679         int idx = tmp.indexOf(",");
680         if (idx > 0)
681         {
682             tmp = tmp.substring(idx+1);
683             try
684             {
685                 timeout = Long.parseLong(tmp.trim())*1000;
686             }
687             catch (NumberFormatException e)
688             {
689                 Log.ignore(e);
690             }
691         }
692 
693         return newStatistic(from, timeout);
694     }
695 
696     /**
697      * Update a statistics collection with the result for the client.
698      * @param from
699      * @param message
700      * @param marker
701      */
702     protected void updateStatistic (Client from, Message message, String marker)
703     {
704         String[] tokens = marker.split(_delim);
705         if (tokens == null || tokens.length < 4)
706             return; //nothing to update!
707 
708 
709         //Message is: _probeReplySentinelt+_delim+id+_delim+rcvTimestamp+_delim+sendTimestamp
710         //retrieve the id of the statistic this message is associated with from the message
711         Statistic stat = _statistics.get(tokens[1].trim());
712         if (stat == null)
713             return;
714 
715         long reqTimestamp = Long.valueOf(tokens[2].trim()); //time the server received the start stats request
716         long sentTimestamp = Long.valueOf(tokens[3].trim());//time the server send the stats probe to all clients
717 
718         //update the statistic with this client's result
719         int lag = from instanceof ClientImpl ? ((ClientImpl)from).getLag() : 0;
720         stat.sample(reqTimestamp, sentTimestamp, System.currentTimeMillis(), lag);
721     }
722 
723     protected Statistic newStatistic (Client from, long timeout)
724     {
725         return new Statistic(from, timeout);
726     }
727 }