1
+ from threading import *
2
+ from collections import deque
3
+ from time import sleep
4
+
5
+ def _test ():
6
+
7
+ class BoundedQueue ():
8
+
9
+ def __init__ (self , limit ):
10
+ self .mon = RLock ()
11
+ self .rc = Condition (self .mon )
12
+ self .wc = Condition (self .mon )
13
+ self .limit = limit
14
+ self .queue = deque ()
15
+
16
+ def put (self , item ):
17
+ self .mon .acquire ()
18
+ while len (self .queue ) >= self .limit :
19
+ self ._note ("put(%s): queue full" , item )
20
+ self .wc .wait ()
21
+ self .queue .append (item )
22
+ self ._note ("put(%s): appended, length now %d" ,
23
+ item , len (self .queue ))
24
+ self .rc .notify ()
25
+ self .mon .release ()
26
+
27
+ def get (self ):
28
+ self .mon .acquire ()
29
+ while not self .queue :
30
+ self ._note ("get(): queue empty" )
31
+ self .rc .wait ()
32
+ item = self .queue .popleft ()
33
+ self ._note ("get(): got %s, %d left" , item , len (self .queue ))
34
+ self .wc .notify ()
35
+ self .mon .release ()
36
+ return item
37
+
38
+ def _note (self , format , * args ):
39
+ format = format % args
40
+ ident = get_ident ()
41
+ try :
42
+ name = current_thread ().name
43
+ except KeyError :
44
+ name = "<OS thread %d>" % ident
45
+ format = "%s: %s" % (name , format )
46
+ print (format )
47
+
48
+ class ProducerThread (Thread ):
49
+
50
+ def __init__ (self , queue , quota ):
51
+ Thread .__init__ (self , name = "Producer" )
52
+ self .queue = queue
53
+ self .quota = quota
54
+
55
+ def run (self ):
56
+ from random import random
57
+ counter = 0
58
+ while counter < self .quota :
59
+ counter = counter + 1
60
+ self .queue .put ("%s.%d" % (self .name , counter ))
61
+ sleep (random () * 0.00001 )
62
+
63
+
64
+ class ConsumerThread (Thread ):
65
+
66
+ def __init__ (self , queue , count ):
67
+ Thread .__init__ (self , name = "Consumer" )
68
+ self .queue = queue
69
+ self .count = count
70
+
71
+ def run (self ):
72
+ while self .count > 0 :
73
+ item = self .queue .get ()
74
+ print (item )
75
+ self .count = self .count - 1
76
+
77
+ NP = 3
78
+ QL = 4
79
+ NI = 5
80
+
81
+ Q = BoundedQueue (QL )
82
+ P = []
83
+ for i in range (NP ):
84
+ t = ProducerThread (Q , NI )
85
+ t .name = ("Producer-%d" % (i + 1 ))
86
+ P .append (t )
87
+ C = ConsumerThread (Q , NI * NP )
88
+ for t in P :
89
+ t .start ()
90
+ sleep (0.000001 )
91
+ C .start ()
92
+ for t in P :
93
+ t .join ()
94
+ C .join ()
95
+
96
+ _test ()
0 commit comments