@@ -764,7 +764,7 @@ pub fn start_async_translation(sess: &Session,
764
764
} ) ;
765
765
766
766
let ( shared_emitter, shared_emitter_main) = SharedEmitter :: new ( ) ;
767
- let ( trans_worker_send, _trans_worker_receive ) = channel ( ) ;
767
+ let ( trans_worker_send, trans_worker_receive ) = channel ( ) ;
768
768
let ( coordinator_send, coordinator_receive) = channel ( ) ;
769
769
770
770
let coordinator_thread = start_executing_work ( sess,
@@ -792,6 +792,7 @@ pub fn start_async_translation(sess: &Session,
792
792
time_graph,
793
793
output_filenames : crate_output. clone ( ) ,
794
794
coordinator_send,
795
+ trans_worker_receive,
795
796
shared_emitter_main,
796
797
future : coordinator_thread
797
798
}
@@ -987,7 +988,7 @@ pub fn dump_incremental_data(trans: &CrateTranslation) {
987
988
eprintln ! ( "incremental: re-using {} out of {} modules" , reuse, trans. modules. len( ) ) ;
988
989
}
989
990
990
- pub struct WorkItem {
991
+ struct WorkItem {
991
992
mtrans : ModuleTranslation ,
992
993
config : ModuleConfig ,
993
994
output_names : OutputFilenames
@@ -1074,9 +1075,11 @@ enum Message {
1074
1075
result : Result < CompiledModule , ( ) > ,
1075
1076
worker_id : usize ,
1076
1077
} ,
1077
- WorkItem ( WorkItem ) ,
1078
- CheckErrorMessages ,
1079
- TranslationDone ,
1078
+ TranslationDone {
1079
+ llvm_work_item : WorkItem ,
1080
+ is_last : bool
1081
+ } ,
1082
+ TranslateItem ,
1080
1083
}
1081
1084
1082
1085
struct Diagnostic {
@@ -1085,6 +1088,13 @@ struct Diagnostic {
1085
1088
lvl : Level ,
1086
1089
}
1087
1090
1091
+ #[ derive( PartialEq , Clone , Copy , Debug ) ]
1092
+ enum TransWorkerState {
1093
+ Idle ,
1094
+ Translating ,
1095
+ LLVMing ,
1096
+ }
1097
+
1088
1098
fn start_executing_work ( sess : & Session ,
1089
1099
shared_emitter : SharedEmitter ,
1090
1100
trans_worker_send : Sender < Message > ,
@@ -1189,7 +1199,6 @@ fn start_executing_work(sess: &Session,
1189
1199
// Before that work finishes, however, we may acquire a token. In that case
1190
1200
// we actually wastefully acquired the token, so we relinquish it back to
1191
1201
// the jobserver.
1192
-
1193
1202
thread:: spawn ( move || {
1194
1203
let mut worker_id_counter = 0 ;
1195
1204
let mut free_worker_ids = Vec :: new ( ) ;
@@ -1211,13 +1220,74 @@ fn start_executing_work(sess: &Session,
1211
1220
let mut work_items = Vec :: new ( ) ;
1212
1221
let mut tokens = Vec :: new ( ) ;
1213
1222
1223
+ let mut trans_worker_state = TransWorkerState :: Idle ;
1214
1224
let mut running = 0 ;
1215
1225
1216
- while !translation_done || work_items. len ( ) > 0 || running > 0 {
1226
+ while !translation_done ||
1227
+ work_items. len ( ) > 0 ||
1228
+ running > 0 ||
1229
+ trans_worker_state != TransWorkerState :: Idle {
1230
+
1231
+ if !translation_done {
1232
+ if trans_worker_state == TransWorkerState :: Idle {
1233
+ // Translation is not done yet, so there are two things the
1234
+ // translation worker could do:
1235
+ //
1236
+ // (1) Translate another CGU
1237
+ // (2) Run an already translated CGU through LLVM
1238
+ //
1239
+ // Option (2) makes sense if there's already enough work for
1240
+ // all the other workers. In that case it's better to run
1241
+ // a CGU through LLVM, so its resources can be freed.
1242
+ //
1243
+ // However, it's not trivial to determines what "enough work
1244
+ // for all the other workers" means because:
1245
+ //
1246
+ // (1) We don't know how long the currently working workers
1247
+ // will need to finish their work package, and
1248
+ // (2) we don't know how many idle workers would be available
1249
+ // because that is dynamically decided by the jobserver.
1250
+ //
1251
+ // TODO: Come up with a useful heuristic.
1252
+ if work_items. len ( ) <= 4 {
1253
+ trans_worker_send. send ( Message :: TranslateItem ) . unwrap ( ) ;
1254
+ trans_worker_state = TransWorkerState :: Translating ;
1255
+ } else {
1256
+ let item = work_items. pop ( ) . unwrap ( ) ;
1257
+ let cgcx = CodegenContext {
1258
+ worker : TRANS_WORKER_ID ,
1259
+ .. cgcx. clone ( )
1260
+ } ;
1261
+ trans_worker_state = TransWorkerState :: LLVMing ;
1262
+ spawn_work ( cgcx, item) ;
1263
+ }
1264
+ }
1265
+ } else {
1266
+ match trans_worker_state {
1267
+ TransWorkerState :: Idle => {
1268
+ if let Some ( item) = work_items. pop ( ) {
1269
+ let cgcx = CodegenContext {
1270
+ worker : TRANS_WORKER_ID ,
1271
+ .. cgcx. clone ( )
1272
+ } ;
1273
+
1274
+ trans_worker_state = TransWorkerState :: LLVMing ;
1275
+ spawn_work ( cgcx, item) ;
1276
+ }
1277
+ }
1278
+ TransWorkerState :: Translating => {
1279
+ bug ! ( "trans worker should not be translating after \
1280
+ translation was already completed")
1281
+ }
1282
+ TransWorkerState :: LLVMing => {
1283
+ // Already making good use of that token
1284
+ }
1285
+ }
1286
+ }
1217
1287
1218
1288
// Spin up what work we can, only doing this while we've got available
1219
1289
// parallelism slots and work left to spawn.
1220
- while work_items. len ( ) > 0 && running < tokens. len ( ) + 1 {
1290
+ while work_items. len ( ) > 0 && running < tokens. len ( ) {
1221
1291
let item = work_items. pop ( ) . unwrap ( ) ;
1222
1292
let worker_id = get_worker_id ( & mut free_worker_ids) ;
1223
1293
@@ -1231,7 +1301,7 @@ fn start_executing_work(sess: &Session,
1231
1301
}
1232
1302
1233
1303
// Relinquish accidentally acquired extra tokens
1234
- tokens. truncate ( running. saturating_sub ( 1 ) ) ;
1304
+ tokens. truncate ( running) ;
1235
1305
1236
1306
match coordinator_receive. recv ( ) . unwrap ( ) {
1237
1307
// Save the token locally and the next turn of the loop will use
@@ -1242,15 +1312,25 @@ fn start_executing_work(sess: &Session,
1242
1312
tokens. push ( token) ;
1243
1313
} else {
1244
1314
shared_emitter. fatal ( "failed to acquire jobserver token" ) ;
1245
- drop ( trans_worker_send. send ( Message :: CheckErrorMessages ) ) ;
1246
1315
// Exit the coordinator thread
1247
1316
panic ! ( )
1248
1317
}
1249
1318
}
1250
1319
1251
- Message :: WorkItem ( work_item) => {
1252
- work_items. push ( work_item) ;
1253
- helper. request_token ( ) ;
1320
+ Message :: TranslationDone { llvm_work_item, is_last } => {
1321
+ work_items. insert ( 0 , llvm_work_item) ;
1322
+
1323
+ if is_last {
1324
+ // If this is the last, don't request a token because
1325
+ // the trans worker thread will be free to handle this
1326
+ // immediately.
1327
+ translation_done = true ;
1328
+ } else {
1329
+ helper. request_token ( ) ;
1330
+ }
1331
+
1332
+ assert_eq ! ( trans_worker_state, TransWorkerState :: Translating ) ;
1333
+ trans_worker_state = TransWorkerState :: Idle ;
1254
1334
}
1255
1335
1256
1336
// If a thread exits successfully then we drop a token associated
@@ -1262,10 +1342,14 @@ fn start_executing_work(sess: &Session,
1262
1342
// Note that if the thread failed that means it panicked, so we
1263
1343
// abort immediately.
1264
1344
Message :: Done { result : Ok ( compiled_module) , worker_id } => {
1265
- drop ( tokens. pop ( ) ) ;
1266
- running -= 1 ;
1267
- free_worker_ids. push ( worker_id) ;
1268
- drop ( trans_worker_send. send ( Message :: CheckErrorMessages ) ) ;
1345
+ if worker_id == TRANS_WORKER_ID {
1346
+ assert_eq ! ( trans_worker_state, TransWorkerState :: LLVMing ) ;
1347
+ trans_worker_state = TransWorkerState :: Idle ;
1348
+ } else {
1349
+ drop ( tokens. pop ( ) ) ;
1350
+ running -= 1 ;
1351
+ free_worker_ids. push ( worker_id) ;
1352
+ }
1269
1353
1270
1354
match compiled_module. kind {
1271
1355
ModuleKind :: Regular => {
@@ -1283,15 +1367,11 @@ fn start_executing_work(sess: &Session,
1283
1367
}
1284
1368
Message :: Done { result : Err ( ( ) ) , worker_id : _ } => {
1285
1369
shared_emitter. fatal ( "aborting due to worker thread panic" ) ;
1286
- drop ( trans_worker_send. send ( Message :: CheckErrorMessages ) ) ;
1287
1370
// Exit the coordinator thread
1288
1371
panic ! ( )
1289
1372
}
1290
- Message :: TranslationDone => {
1291
- translation_done = true ;
1292
- }
1293
- msg @ Message :: CheckErrorMessages => {
1294
- bug ! ( "unexpected message: {:?}" , msg) ;
1373
+ Message :: TranslateItem => {
1374
+ bug ! ( "the coordinator should not receive translation requests" )
1295
1375
}
1296
1376
}
1297
1377
}
@@ -1316,10 +1396,6 @@ fn spawn_work(cgcx: CodegenContext, work: WorkItem) {
1316
1396
let depth = time_depth ( ) ;
1317
1397
1318
1398
thread:: spawn ( move || {
1319
- let _timing_guard = cgcx. time_graph
1320
- . as_ref ( )
1321
- . map ( |tg| tg. start ( time_graph:: TimelineId ( cgcx. worker ) ,
1322
- LLVM_WORK_PACKAGE_KIND ) ) ;
1323
1399
set_time_depth ( depth) ;
1324
1400
1325
1401
// Set up a destructor which will fire off a message that we're done as
@@ -1362,7 +1438,13 @@ fn spawn_work(cgcx: CodegenContext, work: WorkItem) {
1362
1438
// we just ignore the result and then send off our message saying that
1363
1439
// we're done, which if `execute_work_item` failed is unlikely to be
1364
1440
// seen by the main thread, but hey we might as well try anyway.
1365
- bomb. result = Some ( execute_work_item ( & cgcx, work) . unwrap ( ) ) ;
1441
+ bomb. result = {
1442
+ let _timing_guard = cgcx. time_graph
1443
+ . as_ref ( )
1444
+ . map ( |tg| tg. start ( time_graph:: TimelineId ( cgcx. worker ) ,
1445
+ LLVM_WORK_PACKAGE_KIND ) ) ;
1446
+ Some ( execute_work_item ( & cgcx, work) . unwrap ( ) )
1447
+ } ;
1366
1448
} ) ;
1367
1449
}
1368
1450
@@ -1578,6 +1660,7 @@ pub struct OngoingCrateTranslation {
1578
1660
1579
1661
time_graph : Option < TimeGraph > ,
1580
1662
coordinator_send : Sender < Message > ,
1663
+ trans_worker_receive : Receiver < Message > ,
1581
1664
shared_emitter_main : SharedEmitterMain ,
1582
1665
future : thread:: JoinHandle < CompiledModules > ,
1583
1666
}
@@ -1645,25 +1728,49 @@ impl OngoingCrateTranslation {
1645
1728
1646
1729
pub fn submit_translated_module_to_llvm ( & self ,
1647
1730
sess : & Session ,
1648
- mtrans : ModuleTranslation ) {
1731
+ mtrans : ModuleTranslation ,
1732
+ is_last : bool ) {
1649
1733
let module_config = match mtrans. kind {
1650
1734
ModuleKind :: Regular => self . regular_module_config . clone ( sess) ,
1651
1735
ModuleKind :: Metadata => self . metadata_module_config . clone ( sess) ,
1652
1736
ModuleKind :: Allocator => self . allocator_module_config . clone ( sess) ,
1653
1737
} ;
1654
1738
1655
- let work_item = build_work_item ( mtrans,
1656
- module_config,
1657
- self . output_filenames . clone ( ) ) ;
1739
+ let llvm_work_item = build_work_item ( mtrans,
1740
+ module_config,
1741
+ self . output_filenames . clone ( ) ) ;
1658
1742
1659
- drop ( self . coordinator_send . send ( Message :: WorkItem ( work_item) ) ) ;
1743
+ drop ( self . coordinator_send . send ( Message :: TranslationDone {
1744
+ llvm_work_item,
1745
+ is_last
1746
+ } ) ) ;
1660
1747
}
1661
1748
1662
- pub fn signal_translation_done ( & self ) {
1663
- drop ( self . coordinator_send . send ( Message :: TranslationDone ) ) ;
1749
+ pub fn submit_pre_translated_module_to_llvm ( & self ,
1750
+ sess : & Session ,
1751
+ mtrans : ModuleTranslation ,
1752
+ is_last : bool ) {
1753
+ self . wait_for_signal_to_translate_item ( ) ;
1754
+ self . check_for_errors ( sess) ;
1755
+ self . submit_translated_module_to_llvm ( sess, mtrans, is_last) ;
1664
1756
}
1665
1757
1666
1758
pub fn check_for_errors ( & self , sess : & Session ) {
1667
1759
self . shared_emitter_main . check ( sess, false ) ;
1668
1760
}
1761
+
1762
+ pub fn wait_for_signal_to_translate_item ( & self ) {
1763
+ match self . trans_worker_receive . recv ( ) {
1764
+ Ok ( Message :: TranslateItem ) => {
1765
+ // Nothing to do
1766
+ }
1767
+ Ok ( message) => {
1768
+ panic ! ( "unexpected message: {:?}" , message)
1769
+ }
1770
+ Err ( _) => {
1771
+ // One of the LLVM threads must have panicked, fall through so
1772
+ // error handling can be reached.
1773
+ }
1774
+ }
1775
+ }
1669
1776
}
0 commit comments