1
- // xfail-test - #1038 - Can't do this safely with bare functions
2
-
3
1
/**
4
2
A parallel word-frequency counting program.
5
3
@@ -18,21 +16,41 @@ import option::none;
18
16
import str;
19
17
import std:: treemap;
20
18
import vec;
21
- import std:: io;
19
+ import io;
20
+ import io:: { reader_util, writer_util} ;
22
21
23
22
import std:: time;
24
23
import u64;
25
24
26
25
import task;
27
- import task:: joinable_task;
28
26
import comm;
29
27
import comm:: chan;
30
28
import comm:: port;
31
29
import comm:: recv;
32
30
import comm:: send;
31
+ import comm:: methods;
32
+
33
+ // These used to be in task, but they disappeard.
34
+ type joinable_task = port < ( ) > ;
35
+ fn spawn_joinable ( f : fn ~( ) ) -> joinable_task {
36
+ let p = port ( ) ;
37
+ let c = chan ( p) ;
38
+ task:: spawn ( ) { ||
39
+ f ( ) ;
40
+ c. send ( ( ) ) ;
41
+ }
42
+ p
43
+ }
44
+
45
+ fn join ( t : joinable_task ) {
46
+ t. recv ( )
47
+ }
33
48
34
49
fn map ( & & filename: [ u8 ] , emit : map_reduce:: putter < [ u8 ] , int > ) {
35
- let f = io:: file_reader ( str:: from_bytes ( filename) ) ;
50
+ let f = alt io:: file_reader ( str:: from_bytes ( filename) ) {
51
+ result:: ok ( f) { f }
52
+ result:: err ( e) { fail #fmt( "%?" , e) }
53
+ } ;
36
54
37
55
loop {
38
56
alt read_word ( f) {
@@ -42,10 +60,12 @@ fn map(&&filename: [u8], emit: map_reduce::putter<[u8], int>) {
42
60
}
43
61
}
44
62
45
- fn reduce ( & & _word : [ u8 ] , get : map_reduce:: getter < int > ) {
46
- let count = 0 ;
63
+ fn reduce ( & & word : [ u8 ] , get : map_reduce:: getter < int > ) {
64
+ let mut count = 0 ;
47
65
48
66
loop { alt get ( ) { some ( _) { count += 1 ; } none { break; } } }
67
+
68
+ io:: println ( #fmt ( "%?\t %?" , word, count) ) ;
49
69
}
50
70
51
71
mod map_reduce {
@@ -59,41 +79,43 @@ mod map_reduce {
59
79
60
80
// FIXME: the first K1 parameter should probably be a -, but that
61
81
// doesn't parse at the moment.
62
- type mapper < K1 : send , K2 : send , V : send > = fn ( K1 , putter < K2 , V > ) ;
82
+ type mapper < K1 : send , K2 : send , V : send > = fn ~ ( K1 , putter < K2 , V > ) ;
63
83
64
84
type getter < V : send > = fn ( ) -> option < V > ;
65
85
66
- type reducer < K : send , V : send > = fn ( K , getter < V > ) ;
86
+ type reducer < K : copy send, V : copy send> = fn ~ ( K , getter < V > ) ;
67
87
68
- enum ctrl_proto < K : send , V : send > {
69
- find_reducer( K , chan < chan < reduce_proto < V > > > ) ;
70
- mapper_done;
88
+ enum ctrl_proto < K : copy send, V : copy send> {
89
+ find_reducer( K , chan < chan < reduce_proto < V > > > ) ,
90
+ mapper_done
71
91
}
72
92
73
- enum reduce_proto < V : send > { emit_val( V ) ; done; ref; release; }
93
+ enum reduce_proto < V : copy send> { emit_val( V ) , done, ref, release }
74
94
75
- fn start_mappers < K1 : send , K2 : send ,
76
- V : send > ( map : mapper < K1 , K2 , V > ,
77
- ctrl : chan < ctrl_proto < K2 , V > > , inputs : [ K1 ] ) ->
78
- [ joinable_task ] {
79
- let tasks = [ ] ;
95
+ fn start_mappers < K1 : copy send, K2 : copy send, V : copy send> (
96
+ map : mapper < K1 , K2 , V > ,
97
+ ctrl : chan < ctrl_proto < K2 , V > > , inputs : [ K1 ] )
98
+ -> [ joinable_task ]
99
+ {
100
+ let mut tasks = [ ] ;
80
101
for inputs. each { |i|
81
- let m = map, c = ctrl, ii = i;
82
- tasks += [ task:: spawn_joinable { || map_task ( m, c, ii) } ] ;
102
+ tasks += [ spawn_joinable { || map_task( map, ctrl, i) } ] ;
83
103
}
84
104
ret tasks;
85
105
}
86
106
87
- fn map_task < K : send1 , K : send2 ,
88
- V : send > ( -map : mapper < K1 , K2 , V > ,
89
- -ctrl : chan < ctrl_proto < K2 , V > > ,
90
- -input : K1 ) {
107
+ fn map_task < K1 : copy send, K2 : copy send, V : copy send> (
108
+ map : mapper < K1 , K2 , V > ,
109
+ ctrl : chan < ctrl_proto < K2 , V > > ,
110
+ input : K1 )
111
+ {
91
112
// log(error, "map_task " + input);
92
- let intermediates = treemap:: init ( ) ;
113
+ let intermediates = treemap:: treemap ( ) ;
93
114
94
- fn emit < K : send2 ,
95
- V : send > ( im : treemap:: treemap < K2 , chan < reduce_proto < V > > > ,
96
- ctrl : chan < ctrl_proto < K2 , V > > , key : K2 , val : V ) {
115
+ fn emit < K2 : copy send, V : copy send> (
116
+ im : treemap:: treemap < K2 , chan < reduce_proto < V > > > ,
117
+ ctrl : chan < ctrl_proto < K2 , V > > , key : K2 , val : V )
118
+ {
97
119
let c;
98
120
alt treemap:: find ( im, key) {
99
121
some ( _c) { c = _c; }
@@ -110,25 +132,28 @@ mod map_reduce {
110
132
111
133
map ( input, bind emit ( intermediates, ctrl, _, _) ) ;
112
134
113
- fn finish < K : send , V : send > ( _k : K , v : chan < reduce_proto < V > > ) {
135
+ fn finish < K : copy send, V : copy send> ( _k : K , v : chan < reduce_proto < V > > )
136
+ {
114
137
send ( v, release) ;
115
138
}
116
139
treemap:: traverse ( intermediates, finish) ;
117
140
send ( ctrl, mapper_done) ;
118
141
}
119
142
120
- fn reduce_task < K : send ,
121
- V : send > ( -reduce : reducer < K , V > , -key : K ,
122
- -out : chan < chan < reduce_proto < V > > > ) {
143
+ fn reduce_task < K : copy send, V : copy send> (
144
+ reduce : reducer < K , V > ,
145
+ key : K ,
146
+ out : chan < chan < reduce_proto < V > > > )
147
+ {
123
148
let p = port ( ) ;
124
149
125
150
send ( out, chan ( p) ) ;
126
151
127
152
let ref_count = 0 ;
128
153
let is_done = false ;
129
154
130
- fn get < V : send > ( p : port < reduce_proto < V > > ,
131
- & ref_count: int , & is_done: bool )
155
+ fn get < V : copy send> ( p : port < reduce_proto < V > > ,
156
+ & ref_count: int , & is_done: bool )
132
157
-> option < V > {
133
158
while !is_done || ref_count > 0 {
134
159
alt recv ( p) {
@@ -140,8 +165,8 @@ mod map_reduce {
140
165
// #error("all done");
141
166
is_done = true ;
142
167
}
143
- ref. { ref_count += 1 ; }
144
- release. { ref_count -= 1 ; }
168
+ ref { ref_count += 1 ; }
169
+ release { ref_count -= 1 ; }
145
170
}
146
171
}
147
172
ret none;
@@ -150,19 +175,19 @@ mod map_reduce {
150
175
reduce ( key, bind get( p, ref_count, is_done) ) ;
151
176
}
152
177
153
- fn map_reduce < K : send1 , K : send2 ,
154
- V : send > ( map : mapper < K1 , K2 , V > , reduce : reducer < K2 , V > ,
155
- inputs : [ K1 ] ) {
178
+ fn map_reduce < K1 : copy send, K2 : copy send, V : copy send> (
179
+ map : mapper < K1 , K2 , V > ,
180
+ reduce : reducer < K2 , V > ,
181
+ inputs : [ K1 ] )
182
+ {
156
183
let ctrl = port ( ) ;
157
184
158
185
// This task becomes the master control task. It task::_spawns
159
186
// to do the rest.
160
187
161
- let reducers = treemap:: init ( ) ;
162
-
163
- let tasks = start_mappers ( map, chan ( ctrl) , inputs) ;
164
-
165
- let num_mappers = vec:: len ( inputs) as int ;
188
+ let reducers = treemap:: treemap ( ) ;
189
+ let mut tasks = start_mappers ( map, chan ( ctrl) , inputs) ;
190
+ let mut num_mappers = vec:: len ( inputs) as int ;
166
191
167
192
while num_mappers > 0 {
168
193
alt recv ( ctrl) {
@@ -185,7 +210,7 @@ mod map_reduce {
185
210
let ch = chan ( p) ;
186
211
let r = reduce, kk = k;
187
212
tasks += [
188
- task :: spawn_joinable { || reduce_task ( r, kk, ch) }
213
+ spawn_joinable { || reduce_task ( r, kk, ch) }
189
214
] ;
190
215
c = recv ( p) ;
191
216
treemap:: insert ( reducers, k, c) ;
@@ -196,12 +221,13 @@ mod map_reduce {
196
221
}
197
222
}
198
223
199
- fn finish < K : send , V : send > ( _k : K , v : chan < reduce_proto < V > > ) {
224
+ fn finish < K : copy send, V : copy send> ( _k : K , v : chan < reduce_proto < V > > )
225
+ {
200
226
send ( v, done) ;
201
227
}
202
228
treemap:: traverse ( reducers, finish) ;
203
229
204
- for tasks. each { |t| task :: join( t) ; }
230
+ for tasks. each { |t| join( t) ; }
205
231
}
206
232
}
207
233
@@ -217,7 +243,7 @@ fn main(argv: [str]) {
217
243
ret;
218
244
}
219
245
220
- let iargs = [ ] ;
246
+ let mut iargs = [ ] ;
221
247
vec:: iter_between ( argv, 1 u, vec:: len ( argv) ) { |a|
222
248
iargs += [ str:: bytes ( a) ] ;
223
249
}
@@ -227,20 +253,18 @@ fn main(argv: [str]) {
227
253
map_reduce:: map_reduce ( map, reduce, iargs) ;
228
254
let stop = time:: precise_time_ns ( ) ;
229
255
230
- let elapsed = stop - start;
231
- elapsed /= 1000000u64 ;
256
+ let elapsed = ( stop - start) / 1000000u64 ;
232
257
233
258
log ( error, "MapReduce completed in "
234
259
+ u64:: str ( elapsed) + "ms" ) ;
235
260
}
236
261
237
262
fn read_word ( r : io:: reader ) -> option < str > {
238
- let w = "" ;
263
+ let mut w = "" ;
239
264
240
265
while !r. eof ( ) {
241
266
let c = r. read_char ( ) ;
242
267
243
-
244
268
if is_word_char ( c) {
245
269
w += str:: from_char ( c) ;
246
270
} else { if w != "" { ret some ( w) ; } }
0 commit comments