@@ -285,6 +285,16 @@ impl ProcessCdnLogQueue {
285
285
#[ cfg( test) ]
286
286
mod tests {
287
287
use super :: * ;
288
+ use aws_sdk_sqs:: operation:: receive_message:: builders:: ReceiveMessageOutputBuilder ;
289
+ use aws_sdk_sqs:: types:: builders:: MessageBuilder ;
290
+ use aws_sdk_sqs:: types:: Message ;
291
+ use crates_io_test_db:: TestDatabase ;
292
+ use crates_io_worker:: schema:: background_jobs;
293
+ use diesel:: prelude:: * ;
294
+ use diesel:: r2d2:: { ConnectionManager , Pool } ;
295
+ use diesel:: QueryDsl ;
296
+ use insta:: assert_snapshot;
297
+ use parking_lot:: Mutex ;
288
298
289
299
#[ tokio:: test]
290
300
async fn test_process_cdn_log ( ) {
@@ -327,4 +337,185 @@ mod tests {
327
337
let config = CdnLogStorageConfig :: s3 ( access_key, secret_key) ;
328
338
assert_ok ! ( job. build_store( & config) ) ;
329
339
}
340
+
341
+ #[ tokio:: test]
342
+ async fn test_process_cdn_log_queue ( ) {
343
+ let _guard = crate :: util:: tracing:: init_for_test ( ) ;
344
+
345
+ let mut queue = Box :: new ( MockSqsQueue :: new ( ) ) ;
346
+ queue
347
+ . expect_receive_messages ( )
348
+ . once ( )
349
+ . returning ( |_max_messages| {
350
+ Ok ( ReceiveMessageOutputBuilder :: default ( )
351
+ . messages ( message ( "123" , "us-west-1" , "bucket" , "path" ) )
352
+ . build ( ) )
353
+ } ) ;
354
+
355
+ queue
356
+ . expect_receive_messages ( )
357
+ . once ( )
358
+ . returning ( |_max_messages| Ok ( ReceiveMessageOutputBuilder :: default ( ) . build ( ) ) ) ;
359
+
360
+ let deleted_handles = record_deleted_handles ( & mut queue) ;
361
+
362
+ let test_database = TestDatabase :: new ( ) ;
363
+ let connection_pool = build_connection_pool ( test_database. url ( ) ) ;
364
+
365
+ let job = ProcessCdnLogQueue { max_messages : 100 } ;
366
+ assert_ok ! ( job. run( queue, & connection_pool) . await ) ;
367
+
368
+ assert_snapshot ! ( deleted_handles. lock( ) . join( "," ) , @"123" ) ;
369
+ assert_snapshot ! ( open_jobs( & mut test_database. connect( ) ) , @"us-west-1 | bucket | path" ) ;
370
+ }
371
+
372
+ #[ tokio:: test]
373
+ async fn test_process_cdn_log_queue_multi_page ( ) {
374
+ let _guard = crate :: util:: tracing:: init_for_test ( ) ;
375
+
376
+ let mut queue = Box :: new ( MockSqsQueue :: new ( ) ) ;
377
+ queue
378
+ . expect_receive_messages ( )
379
+ . once ( )
380
+ . returning ( |_max_messages| {
381
+ Ok ( ReceiveMessageOutputBuilder :: default ( )
382
+ . messages ( message ( "1" , "us-west-1" , "bucket" , "path1" ) )
383
+ . messages ( message ( "2" , "us-west-1" , "bucket" , "path2" ) )
384
+ . messages ( message ( "3" , "us-west-1" , "bucket" , "path3" ) )
385
+ . messages ( message ( "4" , "us-west-1" , "bucket" , "path4" ) )
386
+ . messages ( message ( "5" , "us-west-1" , "bucket" , "path5" ) )
387
+ . messages ( message ( "6" , "us-west-1" , "bucket" , "path6" ) )
388
+ . messages ( message ( "7" , "us-west-1" , "bucket" , "path7" ) )
389
+ . messages ( message ( "8" , "us-west-1" , "bucket" , "path8" ) )
390
+ . messages ( message ( "9" , "us-west-1" , "bucket" , "path9" ) )
391
+ . messages ( message ( "10" , "us-west-1" , "bucket" , "path10" ) )
392
+ . build ( ) )
393
+ } ) ;
394
+
395
+ queue
396
+ . expect_receive_messages ( )
397
+ . once ( )
398
+ . returning ( |_max_messages| {
399
+ Ok ( ReceiveMessageOutputBuilder :: default ( )
400
+ . messages ( message ( "11" , "us-west-1" , "bucket" , "path11" ) )
401
+ . build ( ) )
402
+ } ) ;
403
+
404
+ queue
405
+ . expect_receive_messages ( )
406
+ . once ( )
407
+ . returning ( |_max_messages| Ok ( ReceiveMessageOutputBuilder :: default ( ) . build ( ) ) ) ;
408
+
409
+ let deleted_handles = record_deleted_handles ( & mut queue) ;
410
+
411
+ let test_database = TestDatabase :: new ( ) ;
412
+ let connection_pool = build_connection_pool ( test_database. url ( ) ) ;
413
+
414
+ let job = ProcessCdnLogQueue { max_messages : 100 } ;
415
+ assert_ok ! ( job. run( queue, & connection_pool) . await ) ;
416
+
417
+ assert_snapshot ! ( deleted_handles. lock( ) . join( "," ) , @"1,2,3,4,5,6,7,8,9,10,11" ) ;
418
+ assert_snapshot ! ( open_jobs( & mut test_database. connect( ) ) , @r###"
419
+ us-west-1 | bucket | path1
420
+ us-west-1 | bucket | path2
421
+ us-west-1 | bucket | path3
422
+ us-west-1 | bucket | path4
423
+ us-west-1 | bucket | path5
424
+ us-west-1 | bucket | path6
425
+ us-west-1 | bucket | path7
426
+ us-west-1 | bucket | path8
427
+ us-west-1 | bucket | path9
428
+ us-west-1 | bucket | path10
429
+ us-west-1 | bucket | path11
430
+ "### ) ;
431
+ }
432
+
433
+ #[ tokio:: test]
434
+ async fn test_process_cdn_log_queue_parse_error ( ) {
435
+ let _guard = crate :: util:: tracing:: init_for_test ( ) ;
436
+
437
+ let mut queue = Box :: new ( MockSqsQueue :: new ( ) ) ;
438
+ queue
439
+ . expect_receive_messages ( )
440
+ . once ( )
441
+ . returning ( |_max_messages| {
442
+ let message = MessageBuilder :: default ( )
443
+ . message_id ( "1" )
444
+ . receipt_handle ( "1" )
445
+ . body ( serde_json:: to_string ( "{}" ) . unwrap ( ) )
446
+ . build ( ) ;
447
+
448
+ Ok ( ReceiveMessageOutputBuilder :: default ( )
449
+ . messages ( message)
450
+ . build ( ) )
451
+ } ) ;
452
+
453
+ queue
454
+ . expect_receive_messages ( )
455
+ . once ( )
456
+ . returning ( |_max_messages| Ok ( ReceiveMessageOutputBuilder :: default ( ) . build ( ) ) ) ;
457
+
458
+ let deleted_handles = record_deleted_handles ( & mut queue) ;
459
+
460
+ let test_database = TestDatabase :: new ( ) ;
461
+ let connection_pool = build_connection_pool ( test_database. url ( ) ) ;
462
+
463
+ let job = ProcessCdnLogQueue { max_messages : 100 } ;
464
+ assert_ok ! ( job. run( queue, & connection_pool) . await ) ;
465
+
466
+ assert_snapshot ! ( deleted_handles. lock( ) . join( "," ) , @"1" ) ;
467
+ assert_snapshot ! ( open_jobs( & mut test_database. connect( ) ) , @"" ) ;
468
+ }
469
+
470
+ fn record_deleted_handles ( queue : & mut MockSqsQueue ) -> Arc < Mutex < Vec < String > > > {
471
+ let deleted_handles = Arc :: new ( Mutex :: new ( vec ! [ ] ) ) ;
472
+
473
+ queue. expect_delete_message ( ) . returning ( {
474
+ let deleted_handles = deleted_handles. clone ( ) ;
475
+ move |receipt_handle| {
476
+ deleted_handles. lock ( ) . push ( receipt_handle. to_owned ( ) ) ;
477
+ Ok ( ( ) )
478
+ }
479
+ } ) ;
480
+
481
+ deleted_handles
482
+ }
483
+
484
+ fn build_connection_pool ( url : & str ) -> DieselPool {
485
+ let pool = Pool :: builder ( ) . build ( ConnectionManager :: new ( url) ) . unwrap ( ) ;
486
+ DieselPool :: new_background_worker ( pool)
487
+ }
488
+
489
+ fn message ( id : & str , region : & str , bucket : & str , path : & str ) -> Message {
490
+ let json = json ! ( {
491
+ "Records" : [ {
492
+ "awsRegion" : region,
493
+ "s3" : {
494
+ "bucket" : { "name" : bucket } ,
495
+ "object" : { "key" : path } ,
496
+ }
497
+ } ]
498
+ } ) ;
499
+
500
+ MessageBuilder :: default ( )
501
+ . message_id ( id)
502
+ . receipt_handle ( id)
503
+ . body ( serde_json:: to_string ( & json) . unwrap ( ) )
504
+ . build ( )
505
+ }
506
+
507
+ fn open_jobs ( conn : & mut PgConnection ) -> String {
508
+ let jobs = background_jobs:: table
509
+ . select ( ( background_jobs:: job_type, background_jobs:: data) )
510
+ . load :: < ( String , serde_json:: Value ) > ( conn)
511
+ . unwrap ( ) ;
512
+
513
+ jobs. into_iter ( )
514
+ . inspect ( |( job_type, _data) | assert_eq ! ( job_type, ProcessCdnLog :: JOB_NAME ) )
515
+ . map ( |( _job_type, data) | data)
516
+ . map ( |data| serde_json:: from_value :: < ProcessCdnLog > ( data) . unwrap ( ) )
517
+ . map ( |job| format ! ( "{} | {} | {}" , job. region, job. bucket, job. path) )
518
+ . collect :: < Vec < _ > > ( )
519
+ . join ( "\n " )
520
+ }
330
521
}
0 commit comments