1 """
2 This file contains Process, SimEvent, the resources Resource, Leven and Storage
3 as well as their dependencies Buffer, Queue, FIFO and PriorityQ.
4 """
5
6
7 import inspect
8 import new
9 import sys
10 import types
11
12 from SimPy.Lister import Lister
13 from SimPy.Recording import Monitor, Tally
14
15
16 import SimPy.Globals as Globals
17
18
20 """Superclass of classes which may use generator functions"""
21 - def __init__(self, name = 'a_process', sim = None):
22 if not sim: sim = Globals.sim
23 self.sim = sim
24
25 self._nextpoint = None
26 self.name = name
27 self._nextTime = None
28 self._remainService = 0
29 self._preempted = 0
30 self._priority={}
31 self._getpriority={}
32 self._putpriority={}
33 self._terminated = False
34 self._inInterrupt = False
35 self.eventsFired = []
36 if hasattr(sim, 'trace'):
37 self._doTracing = True
38 else:
39 self._doTracing = False
40
42 return self._nextTime <> None and not self._inInterrupt
43
45 return self._nextTime is None and not self._terminated
46
48 return self._terminated
49
51 return self._inInterrupt and not self._terminated
52
54 return self in resource.waitQ
55
57 """Application function to cancel all event notices for this Process
58 instance;(should be all event notices for the _generator_)."""
59 self.sim._e._unpost(whom = victim)
60
61 - def start(self, pem = None, at = 'undefined', delay = 'undefined', prior = False):
62 """Activates PEM of this Process.
63 p.start(p.pemname([args])[,{at = t | delay = period}][, prior = False]) or
64 p.start([p.ACTIONS()][,{at = t | delay = period}][, prior = False]) (ACTIONS
65 parameter optional)
66 """
67 if pem is None:
68 try:
69 pem = self.ACTIONS()
70 except AttributeError:
71 raise FatalSimerror\
72 ('Fatal SimPy error: no generator function to activate')
73 else:
74 pass
75 if self.sim._e is None:
76 raise FatalSimerror\
77 ('Fatal SimPy error: simulation is not initialized'\
78 '(call initialize() first)')
79 if not (type(pem) == types.GeneratorType):
80 raise FatalSimerror('Fatal SimPy error: activating function which'+
81 ' is not a generator (contains no \'yield\')')
82 if not self._terminated and not self._nextTime:
83
84 self._nextpoint = pem
85 if at == 'undefined':
86 at = self.sim._t
87 if delay == 'undefined':
88 zeit = max(self.sim._t, at)
89 else:
90 zeit = max(self.sim._t, self.sim._t + delay)
91 if self._doTracing:
92 self.sim.trace.recordActivate(who = self, when = zeit,
93 prior = prior)
94 self.sim._e._post(what = self, at = zeit, prior = prior)
95
97 if len(a[0]) == 3:
98 delay = abs(a[0][2])
99 else:
100 delay = 0
101 who = a[1]
102 self.interruptLeft = delay
103 self._inInterrupt = False
104 self.interruptCause = None
105 self.sim._e._post(what = who, at = self.sim._t + delay)
106
108 a[0][1]._nextTime = None
109
111 """Application function to interrupt active processes"""
112
113 if victim.active():
114 if self._doTracing:
115 save = self.sim.trace._comment
116 self.sim.trace._comment = None
117 victim.interruptCause = self
118 left = victim._nextTime - self.sim._t
119 victim.interruptLeft = left
120 victim._inInterrupt = True
121 self.sim.reactivate(victim)
122 if self._doTracing:
123 self.sim.trace._comment = save
124 self.sim.trace.recordInterrupt(self, victim)
125 return left
126 else:
127 return None
128
130 """
131 Application function for an interrupt victim to get out of
132 'interrupted' state.
133 """
134 self._inInterrupt = False
135
137 """Multi - functional test for reneging for 'request' and 'get':
138 (1)If res of type Resource:
139 Tests whether resource res was acquired when proces reactivated.
140 If yes, the parallel wakeup process is killed.
141 If not, process is removed from res.waitQ (reneging).
142 (2)If res of type Store:
143 Tests whether item(s) gotten from Store res.
144 If yes, the parallel wakeup process is killed.
145 If no, process is removed from res.getQ
146 (3)If res of type Level:
147 Tests whether units gotten from Level res.
148 If yes, the parallel wakeup process is killed.
149 If no, process is removed from res.getQ.
150 """
151 if isinstance(res, Resource):
152 test = self in res.activeQ
153 if test:
154 self.cancel(self._holder)
155 else:
156 res.waitQ.remove(self)
157 if res.monitored:
158 res.waitMon.observe(len(res.waitQ),t = self.sim.now())
159 return test
160 elif isinstance(res, Store):
161 test = len(self.got)
162 if test:
163 self.cancel(self._holder)
164 else:
165 res.getQ.remove(self)
166 if res.monitored:
167 res.getQMon.observe(len(res.getQ),t = self.sim.now())
168 return test
169 elif isinstance(res, Level):
170 test = not (self.got is None)
171 if test:
172 self.cancel(self._holder)
173 else:
174 res.getQ.remove(self)
175 if res.monitored:
176 res.getQMon.observe(len(res.getQ),t = self.sim.now())
177 return test
178
180 """Test for reneging for 'yield put . . .' compound statement (Level and
181 Store. Returns True if not reneged.
182 If self not in buffer.putQ, kill wakeup process, else take self out of
183 buffer.putQ (reneged)"""
184 test = self in buffer.putQ
185 if test:
186 buffer.putQ.remove(self)
187 if buffer.monitored:
188 buffer.putQMon.observe(len(buffer.putQ),t = self.sim.now())
189 else:
190 self.cancel(self._holder)
191 return not test
192
193
195 """Supports one - shot signalling between processes. All processes waiting for an event to occur
196 get activated when its occurrence is signalled. From the processes queuing for an event, only
197 the first gets activated.
198 """
199 - def __init__(self, name = 'a_SimEvent', sim = None):
200 if not sim: sim = Globals.sim
201 self.sim = sim
202 self.name = name
203 self.waits = []
204 self.queues = []
205 self.occurred = False
206 self.signalparam = None
207 if hasattr(sim, 'trace'):
208 self._doTracing = True
209 else:
210 self._doTracing = False
211
212 - def signal(self, param = None):
213 """Produces a signal to self;
214 Fires this event (makes it occur).
215 Reactivates ALL processes waiting for this event. (Cleanup waits lists
216 of other events if wait was for an event - group (OR).)
217 Reactivates the first process for which event(s) it is queuing for
218 have fired. (Cleanup queues of other events if wait was for an event - group (OR).)
219 """
220 self.signalparam = param
221 if self._doTracing:
222 self.sim.trace.recordSignal(self)
223 if not self.waits and not self.queues:
224 self.occurred = True
225 else:
226
227 for p in self.waits:
228 p[0].eventsFired.append(self)
229 self.sim.reactivate(p[0], prior = True)
230
231 for ev in p[1]:
232 if ev != self:
233 if ev.occurred:
234 p[0].eventsFired.append(ev)
235 for iev in ev.waits:
236 if iev[0] == p[0]:
237 ev.waits.remove(iev)
238 break
239 self.waits = []
240 if self.queues:
241 proc = self.queues.pop(0)[0]
242 proc.eventsFired.append(self)
243 self.sim.reactivate(proc)
244
246 """Consumes a signal if it has occurred, otherwise process 'proc'
247 waits for this event.
248 """
249 proc = par[0][1]
250 proc.eventsFired = []
251 if not self.occurred:
252 self.waits.append([proc, [self]])
253 proc._nextTime = None
254 else:
255 proc.eventsFired.append(self)
256 self.occurred = False
257 self.sim._e._post(proc, at = self.sim._t, prior = 1)
258
260 """Handles waiting for an OR of events in a tuple / list.
261 """
262 proc = par[0][1]
263 evlist = par[0][2]
264 proc.eventsFired = []
265 anyoccur = False
266 for ev in evlist:
267 if ev.occurred:
268 anyoccur = True
269 proc.eventsFired.append(ev)
270 ev.occurred = False
271 if anyoccur:
272 self.sim._e._post(proc, at = self.sim._t, prior = 1)
273
274 else:
275 proc.eventsFired = []
276 proc._nextTime = None
277 for ev in evlist:
278 ev.waits.append([proc, evlist])
279
281 """Consumes a signal if it has occurred, otherwise process 'proc'
282 queues for this event.
283 """
284 proc = par[0][1]
285 proc.eventsFired = []
286 if not self.occurred:
287 self.queues.append([proc, [self]])
288 proc._nextTime = None
289 else:
290 proc.eventsFired.append(self)
291 self.occurred = False
292 self.sim._e._post(proc, at = self.sim._t, prior = 1)
293
295 """Handles queueing for an OR of events in a tuple / list.
296 """
297 proc = par[0][1]
298 evlist = par[0][2]
299 proc.eventsFired = []
300 anyoccur = False
301 for ev in evlist:
302 if ev.occurred:
303 anyoccur = True
304 proc.eventsFired.append(ev)
305 ev.occurred = False
306 if anyoccur:
307 self.sim._e._post(proc, at = self.sim._t, prior = 1)
308
309 else:
310 proc.eventsFired = []
311 proc._nextTime = None
312 for ev in evlist:
313 ev.queues.append([proc, evlist])
314
315
318 if not moni is None:
319 self.monit = True
320 else:
321 self.monit = False
322 self.moni = moni
323 self.resource = res
324
327
330
332 self.remove(obj)
333 if self.monit:
334 self.moni.observe(len(self), t = self.moni.sim.now())
335
339
341 self.append(obj)
342 if self.monit:
343 self.moni.observe(len(self),t = self.moni.sim.now())
344
347
350
352 a = self.pop(0)
353 if self.monit:
354 self.moni.observe(len(self),t = self.moni.sim.now())
355 return a
356
358 """Queue is always ordered according to priority.
359 Higher value of priority attribute == higher priority.
360 """
363
365 """Handles request queue for Resource"""
366 if len(self):
367 ix = self.resource
368 if self[-1]._priority[ix] >= obj._priority[ix]:
369 self.append(obj)
370 else:
371 z = 0
372 while self[z]._priority[ix] >= obj._priority[ix]:
373 z += 1
374 self.insert(z, obj)
375 else:
376 self.append(obj)
377 if self.monit:
378 self.moni.observe(len(self),t = self.moni.sim.now())
379
381 """Handles getQ in Buffer"""
382 if len(self):
383 ix = self.resource
384
385 if self[-1]._getpriority[ix] >= obj._getpriority[ix]:
386 self.append(obj)
387 else:
388 z = 0
389 while self[z]._getpriority[ix] >= obj._getpriority[ix]:
390 z += 1
391 self.insert(z, obj)
392 else:
393 self.append(obj)
394 if self.monit:
395 self.moni.observe(len(self),t = self.moni.sim.now())
396
398 """Handles putQ in Buffer"""
399 if len(self):
400 ix = self.resource
401
402 if self[-1]._putpriority[ix] >= obj._putpriority[ix]:
403 self.append(obj)
404 else:
405 z = 0
406 while self[z]._putpriority[ix] >= obj._putpriority[ix]:
407 z += 1
408 self.insert(z, obj)
409 else:
410 self.append(obj)
411 if self.monit:
412 self.moni.observe(len(self),t = self.moni.sim.now())
413
415 """Models shared, limited capacity resources with queuing;
416 FIFO is default queuing discipline.
417 """
418
419 - def __init__(self, capacity = 1, name = 'a_resource', unitName = 'units',
420 qType = FIFO, preemptable = 0, monitored = False,
421 monitorType = Monitor,sim=None):
422 """
423 monitorType={Monitor(default) | Tally}
424 """
425
426 if not sim: sim = Globals.sim
427 self.sim = sim
428 self.name = name
429 self.capacity = capacity
430 self.unitName = unitName
431 self.n = capacity
432 self.monitored = monitored
433
434 if self.monitored:
435 self.actMon = monitorType(name = 'Active Queue Monitor %s'%self.name,
436 ylab = 'nr in queue', tlab = 'time',
437 sim=self.sim)
438 monact = self.actMon
439 self.waitMon = monitorType(name = 'Wait Queue Monitor %s'%self.name,
440 ylab = 'nr in queue', tlab = 'time',
441 sim=self.sim)
442 monwait = self.waitMon
443 else:
444 monwait = None
445 monact = None
446 self.waitQ = qType(self, monwait)
447 self.preemptable = preemptable
448 self.activeQ = qType(self, monact)
449 self.priority_default = 0
450
452 """Process request event for this resource"""
453 obj = arg[1]
454 if len(arg[0]) == 4:
455 obj._priority[self] = arg[0][3]
456 else:
457 obj._priority[self] = self.priority_default
458 if self.preemptable and self.n == 0:
459
460 preempt = obj._priority[self] > self.activeQ[-1]._priority[self]
461
462 if preempt:
463 z = self.activeQ[-1]
464
465 z._preempted += 1
466
467
468 if z._preempted == 1:
469 z._remainService = z._nextTime - self.sim._t
470
471 Process(sim=self.sim).cancel(z)
472
473 self.activeQ.remove(z)
474
475 self.waitQ.insert(0, z)
476
477 if self.monitored:
478 self.waitMon.observe(len(self.waitQ), self.sim.now())
479
480 z._nextTime = None
481
482 self.activeQ.enter(obj)
483
484 self.sim._e._post(obj, at = self.sim._t, prior = 1)
485 else:
486 self.waitQ.enter(obj)
487
488 obj._nextTime = None
489 else:
490 if self.n == 0:
491 self.waitQ.enter(obj)
492
493 obj._nextTime = None
494 else:
495 self.n -= 1
496 self.activeQ.enter(obj)
497 self.sim._e._post(obj, at = self.sim._t, prior = 1)
498
500 """Process release request for this resource"""
501 actor = arg[1]
502 self.n += 1
503 self.activeQ.remove(arg[1])
504 if self.monitored:
505 self.actMon.observe(len(self.activeQ),t = self.sim.now())
506
507 if self.waitQ:
508 obj = self.waitQ.leave()
509 self.n -= 1
510 self.activeQ.enter(obj)
511
512 if self.preemptable:
513
514 if obj._preempted:
515
516 obj._preempted -= 1
517
518
519 if obj._preempted == 0:
520 self.sim.reactivate(obj, delay = obj._remainService,
521 prior = 1)
522
523 else:
524 self.sim.reactivate(obj, delay = 0, prior = 1)
525
526 else:
527 self.sim.reactivate(obj, delay = 0, prior = 1)
528 self.sim._e._post(arg[1], at = self.sim._t, prior = 1)
529
531 """Abstract class for buffers
532 Blocks a process when a put would cause buffer overflow or a get would cause
533 buffer underflow.
534 Default queuing discipline for blocked processes is FIFO."""
535
536 priorityDefault = 0
537 - def __init__(self, name = None, capacity = 'unbounded', unitName = 'units',
538 putQType = FIFO, getQType = FIFO,
539 monitored = False, monitorType = Monitor, initialBuffered = None,
540 sim = None):
541 if not sim: sim = Globals.sim
542 self.sim = sim
543 if capacity == 'unbounded': capacity = sys.maxint
544 self.capacity = capacity
545 self.name = name
546 self.putQType = putQType
547 self.getQType = getQType
548 self.monitored = monitored
549 self.initialBuffered = initialBuffered
550 self.unitName = unitName
551 if self.monitored:
552
553 self.putQMon = monitorType(name = 'Producer Queue Monitor %s'%self.name,
554 ylab = 'nr in queue', tlab = 'time',
555 sim=self.sim)
556
557 self.getQMon = monitorType(name = 'Consumer Queue Monitor %s'%self.name,
558 ylab = 'nr in queue', tlab = 'time',
559 sim=self.sim)
560
561 self.bufferMon = monitorType(name = 'Buffer Monitor %s'%self.name,
562 ylab = 'nr in buffer', tlab = 'time',
563 sim=self.sim)
564 else:
565 self.putQMon = None
566 self.getQMon = None
567 self.bufferMon = None
568 self.putQ = self.putQType(res = self, moni = self.putQMon)
569 self.getQ = self.getQType(res = self, moni = self.getQMon)
570 if self.monitored:
571 self.putQMon.observe(y = len(self.putQ),t = self.sim.now())
572 self.getQMon.observe(y = len(self.getQ),t = self.sim.now())
573 self._putpriority={}
574 self._getpriority={}
575
576 def _put(self):
577 pass
578 def _get(self):
579 pass
580
582 """Models buffers for processes putting / getting un - distinguishable items.
583 """
586
589
590 theBuffer = property(gettheBuffer)
591
593 Buffer.__init__(self,**pars)
594 if self.name is None:
595 self.name = 'a_level'
596
597 if (type(self.capacity) != type(1.0) and\
598 type(self.capacity) != type(1)) or\
599 self.capacity < 0:
600 raise FatalSimerror\
601 ('Level: capacity parameter not a positive number: %s'\
602 %self.initialBuffered)
603
604 if type(self.initialBuffered) == type(1.0) or\
605 type(self.initialBuffered) == type(1):
606 if self.initialBuffered > self.capacity:
607 raise FatalSimerror('initialBuffered exceeds capacity')
608 if self.initialBuffered >= 0:
609 self.nrBuffered = self.initialBuffered
610
611 else:
612 raise FatalSimerror\
613 ('initialBuffered param of Level negative: %s'\
614 %self.initialBuffered)
615 elif self.initialBuffered is None:
616 self.initialBuffered = 0
617 self.nrBuffered = 0
618 else:
619 raise FatalSimerror\
620 ('Level: wrong type of initialBuffered (parameter=%s)'\
621 %self.initialBuffered)
622 if self.monitored:
623 self.bufferMon.observe(y = self.amount, t = self.sim.now())
624 amount = property(getamount)
625
626 - def _put(self, arg):
627 """Handles put requests for Level instances"""
628 obj = arg[1]
629 whichSim=self.sim
630 if len(arg[0]) == 5:
631 obj._putpriority[self] = arg[0][4]
632 whatToPut = arg[0][3]
633 elif len(arg[0]) == 4:
634 obj._putpriority[self] = Buffer.priorityDefault
635 whatToPut = arg[0][3]
636 else:
637 obj._putpriority[self] = Buffer.priorityDefault
638 whatToPut = 1
639 if type(whatToPut) != type(1) and type(whatToPut) != type(1.0):
640 raise FatalSimerror('Level: put parameter not a number')
641 if not whatToPut >= 0.0:
642 raise FatalSimerror('Level: put parameter not positive number')
643 whatToPutNr = whatToPut
644 if whatToPutNr + self.amount > self.capacity:
645 obj._nextTime = None
646 obj._whatToPut = whatToPutNr
647 self.putQ.enterPut(obj)
648 else:
649 self.nrBuffered += whatToPutNr
650 if self.monitored:
651 self.bufferMon.observe(y = self.amount, t = self.sim.now())
652
653
654
655 while len(self.getQ) and self.amount > 0:
656 proc = self.getQ[0]
657 if proc._nrToGet <= self.amount:
658 proc.got = proc._nrToGet
659 self.nrBuffered -= proc.got
660 if self.monitored:
661 self.bufferMon.observe(y = self.amount, t = self.sim.now())
662 self.getQ.takeout(proc)
663 whichSim._e._post(proc, at = whichSim._t)
664 else:
665 break
666 whichSim._e._post(obj, at = whichSim._t, prior = 1)
667
668 - def _get(self, arg):
669 """Handles get requests for Level instances"""
670 obj = arg[1]
671 obj.got = None
672 if len(arg[0]) == 5:
673 obj._getpriority[self] = arg[0][4]
674 nrToGet = arg[0][3]
675 elif len(arg[0]) == 4:
676 obj._getpriority[self] = Buffer.priorityDefault
677 nrToGet = arg[0][3]
678 else:
679 obj._getpriority[self] = Buffer.priorityDefault
680 nrToGet = 1
681 if type(nrToGet) != type(1.0) and type(nrToGet) != type(1):
682 raise FatalSimerror\
683 ('Level: get parameter not a number: %s'%nrToGet)
684 if nrToGet < 0:
685 raise FatalSimerror\
686 ('Level: get parameter not positive number: %s'%nrToGet)
687 if self.amount < nrToGet:
688 obj._nrToGet = nrToGet
689 self.getQ.enterGet(obj)
690
691 obj._nextTime = None
692 else:
693 obj.got = nrToGet
694 self.nrBuffered -= nrToGet
695 if self.monitored:
696 self.bufferMon.observe(y = self.amount, t = self.sim.now())
697 self.sim._e._post(obj, at = self.sim._t, prior = 1)
698
699
700
701 while len(self.putQ):
702 proc = self.putQ[0]
703 if proc._whatToPut + self.amount <= self.capacity:
704 self.nrBuffered += proc._whatToPut
705 if self.monitored:
706 self.bufferMon.observe(y = self.amount, t = self.sim.now())
707 self.putQ.takeout(proc)
708 self.sim._e._post(proc, at = self.sim._t)
709 else:
710 break
711
713 """Models buffers for processes coupled by putting / getting distinguishable
714 items.
715 Blocks a process when a put would cause buffer overflow or a get would cause
716 buffer underflow.
717 Default queuing discipline for blocked processes is priority FIFO.
718 """
721 nrBuffered = property(getnrBuffered)
722
725 buffered = property(getbuffered)
726
728 Buffer.__init__(self,**pars)
729 self.theBuffer = []
730 if self.name is None:
731 self.name = 'a_store'
732 if type(self.capacity) != type(1) or self.capacity <= 0:
733 raise FatalSimerror\
734 ('Store: capacity parameter not a positive integer > 0: %s'\
735 %self.initialBuffered)
736 if type(self.initialBuffered) == type([]):
737 if len(self.initialBuffered) > self.capacity:
738 raise FatalSimerror('initialBuffered exceeds capacity')
739 else:
740 self.theBuffer[:] = self.initialBuffered
741 elif self.initialBuffered is None:
742 self.theBuffer = []
743 else:
744 raise FatalSimerror\
745 ('Store: initialBuffered not a list')
746 if self.monitored:
747 self.bufferMon.observe(y = self.nrBuffered, t = self.sim.now())
748 self._sort = None
749
750
751
753 """Adds buffer sorting to this instance of Store. It maintains
754 theBuffer sorted by the sortAttr attribute of the objects in the
755 buffer.
756 The user - provided 'sortFunc' must look like this:
757
758 def mySort(self, par):
759 tmplist = [(x.sortAttr, x) for x in par]
760 tmplist.sort()
761 return [x for (key, x) in tmplist]
762
763 """
764
765 self._sort = new.instancemethod(sortFunc, self, self.__class__)
766 self.theBuffer = self._sort(self.theBuffer)
767
768 - def _put(self, arg):
769 """Handles put requests for Store instances"""
770 obj = arg[1]
771 whichSim=self.sim
772 if len(arg[0]) == 5:
773 obj._putpriority[self] = arg[0][4]
774 whatToPut = arg[0][3]
775 elif len(arg[0]) == 4:
776 obj._putpriority[self] = Buffer.priorityDefault
777 whatToPut = arg[0][3]
778 else:
779 raise FatalSimerror('Item to put missing in yield put stmt')
780 if type(whatToPut) != type([]):
781 raise FatalSimerror('put parameter is not a list')
782 whatToPutNr = len(whatToPut)
783 if whatToPutNr + self.nrBuffered > self.capacity:
784 obj._nextTime = None
785 obj._whatToPut = whatToPut
786 self.putQ.enterPut(obj)
787 else:
788 self.theBuffer.extend(whatToPut)
789 if not(self._sort is None):
790 self.theBuffer = self._sort(self.theBuffer)
791 if self.monitored:
792 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now())
793
794
795
796
797 while self.nrBuffered > 0 and len(self.getQ):
798 proc = self.getQ[0]
799 if inspect.isfunction(proc._nrToGet):
800 movCand = proc._nrToGet(self.theBuffer)
801 if movCand:
802 proc.got = movCand[:]
803 for i in movCand:
804 self.theBuffer.remove(i)
805 self.getQ.takeout(proc)
806 if self.monitored:
807 self.bufferMon.observe(
808 y = self.nrBuffered, t = whichSim._t)
809 whichSim._e._post(what = proc, at = whichSim._t)
810 else:
811 break
812 else:
813 if proc._nrToGet <= self.nrBuffered:
814 nrToGet = proc._nrToGet
815 proc.got = []
816 proc.got[:] = self.theBuffer[0:nrToGet]
817 self.theBuffer[:] = self.theBuffer[nrToGet:]
818 if self.monitored:
819 self.bufferMon.observe(
820 y = self.nrBuffered, t = whichSim._t)
821
822 self.getQ.takeout(proc)
823 whichSim._e._post(what = proc, at = whichSim._t)
824 else:
825 break
826
827 whichSim._e._post(what = obj, at = whichSim._t, prior = 1)
828
829 - def _get(self, arg):
830 """Handles get requests"""
831 filtfunc = None
832 obj = arg[1]
833 whichSim=obj.sim
834 obj.got = []
835 if len(arg[0]) == 5:
836 obj._getpriority[self] = arg[0][4]
837 if inspect.isfunction(arg[0][3]):
838 filtfunc = arg[0][3]
839 else:
840 nrToGet = arg[0][3]
841 elif len(arg[0]) == 4:
842 obj._getpriority[self] = Buffer.priorityDefault
843 if inspect.isfunction(arg[0][3]):
844 filtfunc = arg[0][3]
845 else:
846 nrToGet = arg[0][3]
847 else:
848 obj._getpriority[self] = Buffer.priorityDefault
849 nrToGet = 1
850 if not filtfunc:
851 if nrToGet < 0:
852 raise FatalSimerror\
853 ('Store: get parameter not positive number: %s'%nrToGet)
854 if self.nrBuffered < nrToGet:
855 obj._nrToGet = nrToGet
856 self.getQ.enterGet(obj)
857
858 obj._nextTime = None
859 else:
860 for i in range(nrToGet):
861 obj.got.append(self.theBuffer.pop(0))
862
863 if self.monitored:
864 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now())
865 whichSim._e._post(obj, at = whichSim._t, prior = 1)
866
867
868
869 while len(self.putQ):
870 proc = self.putQ[0]
871 if len(proc._whatToPut) + self.nrBuffered <= self.capacity:
872 for i in proc._whatToPut:
873 self.theBuffer.append(i)
874 if not(self._sort is None):
875 self.theBuffer = self._sort(self.theBuffer)
876 if self.monitored:
877 self.bufferMon.observe(
878 y = self.nrBuffered, t = whichSim.now())
879 self.putQ.takeout(proc)
880 whichSim._e._post(proc, at = whichSim._t)
881 else:
882 break
883 else:
884 movCand = filtfunc(self.theBuffer)
885 if movCand:
886 whichSim._e._post(obj, at = whichSim._t, prior = 1)
887 obj.got = movCand[:]
888 for item in movCand:
889 self.theBuffer.remove(item)
890 if self.monitored:
891 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now())
892
893
894
895 while len(self.putQ):
896 proc = self.putQ[0]
897 if len(proc._whatToPut) + self.nrBuffered <= self.capacity:
898 for i in proc._whatToPut:
899 self.theBuffer.append(i)
900 if not(self._sort is None):
901 self.theBuffer = self._sort(self.theBuffer)
902 if self.monitored:
903 self.bufferMon.observe(
904 y = self.nrBuffered, t = whichSim.now())
905 self.putQ.takeout(proc)
906 whichSim._e._post(proc, at = whichSim._t)
907 else:
908 break
909 else:
910 obj._nrToGet = filtfunc
911 self.getQ.enterGet(obj)
912
913 obj._nextTime = None
914