1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.ext;
16
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.Timer;
20 import java.util.TimerTask;
21 import java.util.concurrent.atomic.AtomicLong;
22
23 import org.cometd.Bayeux;
24 import org.cometd.Client;
25 import org.cometd.Extension;
26 import org.cometd.Message;
27 import org.mortbay.cometd.ClientImpl;
28 import org.mortbay.log.Log;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 public class StatisticsExtension implements Extension
110 {
111 protected static AtomicLong COUNTER = new AtomicLong();
112
113
114
115
116
117 protected String _statsRequestChannel = null;
118
119
120
121
122 protected String[] _statsRequestKeys = new String[]{"chat"};
123
124
125
126
127 protected String _statsRequestSentinel = "//stats";
128
129
130
131
132
133 protected String _statsConfirmSentinel = "//stats-confirm";
134
135
136
137
138
139 protected String _statsResultSentinel = "//stats-results";
140
141
142
143
144 protected String _probeChannel = null;
145
146
147
148
149 protected String _probeSentinel = "//stats-probe";
150
151
152
153
154 protected String[] _probeKeys = null;
155
156
157
158
159
160
161
162 protected String _probeReplyChannel = null;
163
164
165
166
167 protected String[] _probeReplyKeys = null;
168
169
170
171
172 protected String _probeReplySentinel = "//stats-reply";
173
174
175
176 protected String _delim = "::--";
177 protected Map<String,Statistic> _statistics = new HashMap<String, Statistic>();
178 protected Timer _timer;
179 protected long _timeout = 10000;
180
181
182
183
184
185
186
187 public class Statistic extends TimerTask
188 {
189 public long _id;
190 public Client _client;
191 public Message _probeMessage;
192 public long _timeout = 0;
193 public long _samples = 0;
194
195 public long _minRoundTrip = Long.MAX_VALUE;
196 public long _maxRoundTrip = 0;
197 public long _avgRoundTrip = 0;
198 public long _totalRoundTrip;
199
200 public long _minApp = Long.MAX_VALUE;
201 public long _maxApp = 0;
202 public long _avgApp = 0;
203 public long _totalApp = 0;
204
205 public long _minInfra = Long.MAX_VALUE;
206 public long _maxInfra = 0;
207 public long _avgInfra = 0;
208 public long _totalInfra = 0;
209
210 public long _minCometd = Long.MAX_VALUE;
211 public long _maxCometd = 0;
212 public long _avgCometd = 0;
213 public long _totalCometd = 0;
214
215 public Statistic (Client client, long timeout)
216 {
217 _id = COUNTER.incrementAndGet();
218 _client = client;
219 _timeout = timeout;
220 _statistics.put(String.valueOf(_id), this);
221 _timer.schedule(this, timeout);
222 }
223
224
225 public void setProbe (Message probe)
226 {
227 _probeMessage = (Message)probe.clone();
228 }
229
230 public void sample (long reqTime, long sentTime, long rcvTime, long lag)
231 {
232
233 synchronized (this)
234 {
235 _samples++;
236
237 long roundTrip = rcvTime - reqTime;
238 if (roundTrip < _minRoundTrip)
239 _minRoundTrip = roundTrip;
240 if (roundTrip > _maxRoundTrip)
241 _maxRoundTrip = roundTrip;
242 _totalRoundTrip+=roundTrip;
243 _avgRoundTrip = _totalRoundTrip/_samples;
244
245 long appTime = sentTime - reqTime;
246 if (appTime < _minApp)
247 _minApp = appTime;
248 if (appTime > _maxApp)
249 _maxApp = appTime;
250 _totalApp += appTime;
251 _avgApp = _totalApp/_samples;
252
253 long infraTime = rcvTime - sentTime;
254 if (infraTime < _minInfra)
255 _minInfra = infraTime;
256 if (infraTime > _maxInfra)
257 _maxInfra = infraTime;
258 _totalInfra += infraTime;
259 _avgInfra = _totalInfra/_samples;
260
261 long cometdTime = infraTime - (2*lag);
262 if (cometdTime < _minCometd)
263 _minCometd = cometdTime;
264 if (cometdTime > _maxCometd)
265 _maxCometd = cometdTime;
266 _totalCometd += cometdTime;
267 _avgCometd = _totalCometd/_samples;
268 }
269 }
270
271 public String toString()
272 {
273 return "Id "+_id+": samples="+_samples+
274 ", avgR="+_avgRoundTrip+", minR="+_minRoundTrip+", maxR="+_maxRoundTrip+
275 ", avgI="+_avgInfra+", minI="+_minInfra+", maxI="+_maxInfra+
276 ", avgA="+_avgApp+", minA="+_minApp+", maxA="+_maxApp+
277 ", avgC="+_avgCometd+", minC="+_minCometd+", maxC="+_maxCometd;
278 }
279
280
281
282
283
284 public void run()
285 {
286
287 try
288 {
289
290 synchronized (this)
291 {
292
293 _statistics.remove(String.valueOf(_id));
294
295
296 notifyEnd(_client, this);
297 }
298 }
299 catch (Exception x)
300 {
301 Log.warn("Unexpected exception", x);
302 }
303 }
304
305
306
307
308
309
310
311
312 public void notifyStart ()
313 {
314 HashMap<String,Object> msg = new HashMap<String,Object>();
315 msg.putAll((Map<String,Object>)_probeMessage.get(Bayeux.DATA_FIELD));
316
317 Map<String,Object> map = matchKeys(msg, _statsRequestKeys);
318 map.put(_statsRequestKeys[_statsRequestKeys.length-1], _statsConfirmSentinel+getStartText());
319 _client.deliver(_client, _probeMessage.getChannel(), msg, null);
320 }
321
322 public String getStartText ()
323 {
324 return " Statistic id="+_id+" started. Results due in "+(_timeout/1000)+"s.";
325 }
326
327
328
329
330
331
332
333
334
335 public void notifyEnd (Client client, Statistic stat)
336 {
337 HashMap<String,Object> msg = new HashMap<String,Object>();
338 msg.putAll((Map<String,Object>)_probeMessage.get(Bayeux.DATA_FIELD));
339
340 Map<String,Object> map = matchKeys(msg, _statsRequestKeys);
341 msg.put(_statsRequestKeys[_statsRequestKeys.length-1], _statsResultSentinel+" "+getEndText());
342 _client.deliver(_client, _probeMessage.getChannel(), msg, null);
343 }
344
345 public String getEndText()
346 {
347 return this.toString();
348 }
349 }
350
351
352 public StatisticsExtension ()
353 {
354 _timer = new Timer(true);
355 }
356
357 public void setStatsRequestKeys (String[] keys)
358 {
359 _statsRequestKeys = keys;
360 }
361
362 public String[] getStatsRequestKeys ()
363 {
364 return _statsRequestKeys;
365 }
366
367 public void setStatsRequestSentinel (String val)
368 {
369 _statsRequestSentinel = val;
370 }
371
372 public String getStatsRequestSentinel ()
373 {
374 return _statsRequestSentinel;
375 }
376
377 public void setStatsRequestChannel(String channel)
378 {
379 _statsRequestChannel = channel;
380 }
381
382 public void setStatsConfirmSentinel (String val)
383 {
384 _statsConfirmSentinel = val;
385 }
386
387
388 public String getStatsConfirmSentinel ()
389 {
390 return _statsConfirmSentinel;
391 }
392
393
394 public void setStatsResultSentinel (String val)
395 {
396 _statsResultSentinel = val;
397 }
398
399 public String getStatsResultSentinel ()
400 {
401 return _statsResultSentinel;
402 }
403
404
405 public void setProbeChannel(String channel)
406 {
407 _probeChannel = channel;
408 }
409
410
411 public void setProbeSentinel (String val)
412 {
413 _probeSentinel = val;
414 }
415
416 public String getProbeSentinel ()
417 {
418 return _probeSentinel;
419 }
420
421
422 public void setProbeKeys (String[] keys)
423 {
424 _probeKeys = keys;
425 }
426
427 public String[] getProbeKeys ()
428 {
429 return _probeKeys;
430 }
431
432 public void setProbeReplyKeys (String[] keys)
433 {
434 _probeReplyKeys = keys;
435 }
436
437 public String[] getProbeReplyKeys ()
438 {
439 return _probeReplyKeys;
440 }
441
442 public void setProbeReplySentinel (String val)
443 {
444 _probeReplySentinel = val;
445 }
446
447 public String getProbeReplySentinel ()
448 {
449 return _probeReplySentinel;
450 }
451
452 public void setProbeReplyChannel(String channel)
453 {
454 _probeReplyChannel = channel;
455 }
456
457
458 public void setTimeout (long timeout)
459 {
460 _timeout = timeout;
461 }
462
463 public long getTimeout()
464 {
465 return _timeout;
466 }
467
468 public void setDelim (String delim)
469 {
470 _delim = delim;
471 }
472
473 public String getDelim ()
474 {
475 return _delim;
476 }
477
478
479
480
481
482
483
484
485
486
487
488
489
490 public Message rcv(Client from, Message message)
491 {
492
493 Map<String,Object> map = matchMessage(message, _probeReplyChannel, _probeReplyKeys);
494 if (map != null)
495 {
496 String marker = matchSentinel(map, _probeReplyKeys[_probeReplyKeys.length-1], _probeReplySentinel);
497 if (marker != null)
498 {
499
500 updateStatistic(from,message,marker);
501
502 return null;
503 }
504 }
505
506
507 map = matchMessage(message, _statsRequestChannel, _statsRequestKeys);
508 if (map != null)
509 {
510 String match = matchSentinel(map, _statsRequestKeys[_statsRequestKeys.length-1], _statsRequestSentinel);
511 if (match != null && !match.startsWith(_probeReplySentinel))
512 {
513
514 Statistic stat = createStatistic(from,match);
515
516
517 match = match+_delim+stat._id+_delim+System.currentTimeMillis();
518 map.put(_statsRequestKeys[_statsRequestKeys.length-1], match);
519 onStatisticsRequest(message);
520 }
521 }
522
523 return message;
524 }
525
526
527
528
529
530 protected void onStatisticsRequest(Message message)
531 {
532 }
533
534 public Message rcvMeta(Client from, Message message)
535 {
536 return message;
537 }
538
539
540
541
542
543
544
545
546
547
548
549
550
551 public Message send(Client from, Message message)
552 {
553
554
555
556 String[] keys = (_probeKeys==null?_statsRequestKeys:_probeKeys);
557 Map<String,Object> map = matchMessage(message, _probeChannel, keys);
558 if (map != null)
559 {
560
561 String match = matchSentinel(map, keys[keys.length-1], _statsRequestSentinel);
562
563
564 if (match != null && !match.startsWith(_statsConfirmSentinel) && !match.startsWith(_statsResultSentinel))
565 {
566
567 String[] parts = match.split(_delim);
568 if (parts != null && parts.length > 1)
569 {
570 Statistic stat = _statistics.get(parts[1].trim());
571 if (stat != null)
572 {
573 stat.setProbe(message);
574 stat.notifyStart();
575
576
577
578
579 match = match.substring(_statsRequestSentinel.length());
580 map.put(keys[keys.length-1], _probeSentinel+match+_delim+System.currentTimeMillis());
581 }
582 }
583 }
584 }
585
586
587 return message;
588 }
589
590 public Message sendMeta(Client from, Message message)
591 {
592 return message;
593 }
594
595
596 public Map<String,Object> matchMessage (Message message, String channel, String[] keys)
597 {
598
599 if (keys == null || keys.length == 0)
600 return null;
601
602
603 if (channel != null && !message.getChannel().equals(channel))
604 return null;
605
606
607
608 Object node = message.get(Bayeux.DATA_FIELD);
609 if (node == null)
610 return null;
611
612 if (!Map.class.isAssignableFrom(node.getClass()))
613 return null;
614
615 return matchKeys((Map<String,Object>)node, keys);
616 }
617
618 public Map<String,Object> matchKeys(Map<String,Object> map, String[] keys)
619 {
620
621 Object node = map;
622
623 for (int i=0; i<keys.length-1; i++)
624 {
625 if (node == null)
626 break;
627
628 if (!Map.class.isAssignableFrom(node.getClass()))
629 {
630 node = null;
631 break;
632 }
633 node = ((Map<String,Object>)node).get(keys[i]);
634 }
635
636 if (node == null)
637 return null;
638
639
640 if (!Map.class.isAssignableFrom(node.getClass()))
641 return null;
642
643 return (Map<String, Object>)node;
644 }
645
646 public String matchSentinel (Map<String,Object> map, String key, String sentinel)
647 {
648
649 if (map == null || key == null || sentinel == null)
650 return null;
651
652 Object value = map.get(key);
653
654 if (value == null)
655 return null;
656
657 if (!String.class.isAssignableFrom(value.getClass()))
658 return null;
659
660 String text = (String)value;
661
662 if (!text.startsWith(sentinel))
663 return null;
664
665 return text;
666 }
667
668
669
670
671
672
673
674
675 protected Statistic createStatistic(Client from, String marker)
676 {
677 long timeout = _timeout;
678 String tmp = (marker==null?"":marker);
679 int idx = tmp.indexOf(",");
680 if (idx > 0)
681 {
682 tmp = tmp.substring(idx+1);
683 try
684 {
685 timeout = Long.parseLong(tmp.trim())*1000;
686 }
687 catch (NumberFormatException e)
688 {
689 Log.ignore(e);
690 }
691 }
692
693 return newStatistic(from, timeout);
694 }
695
696
697
698
699
700
701
702 protected void updateStatistic (Client from, Message message, String marker)
703 {
704 String[] tokens = marker.split(_delim);
705 if (tokens == null || tokens.length < 4)
706 return;
707
708
709
710
711 Statistic stat = _statistics.get(tokens[1].trim());
712 if (stat == null)
713 return;
714
715 long reqTimestamp = Long.valueOf(tokens[2].trim());
716 long sentTimestamp = Long.valueOf(tokens[3].trim());
717
718
719 int lag = from instanceof ClientImpl ? ((ClientImpl)from).getLag() : 0;
720 stat.sample(reqTimestamp, sentTimestamp, System.currentTimeMillis(), lag);
721 }
722
723 protected Statistic newStatistic (Client from, long timeout)
724 {
725 return new Statistic(from, timeout);
726 }
727 }