35
35
import java .util .List ;
36
36
import java .util .Queue ;
37
37
import java .util .Set ;
38
+ import java .util .concurrent .ConcurrentHashMap ;
39
+ import java .util .concurrent .ConcurrentMap ;
38
40
import java .util .concurrent .PriorityBlockingQueue ;
39
41
40
42
import org .apache .commons .logging .Log ;
@@ -423,6 +425,8 @@ public enum WatchEventType {
423
425
@ UsesJava7
424
426
private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner implements Lifecycle {
425
427
428
+ private final ConcurrentMap <Path , WatchKey > pathKeys = new ConcurrentHashMap <Path , WatchKey >();
429
+
426
430
private WatchService watcher ;
427
431
428
432
private Collection <File > initialFiles ;
@@ -444,7 +448,7 @@ public void start() {
444
448
this .kinds [i ] = FileReadingMessageSource .this .watchEvents [i ].kind ;
445
449
}
446
450
447
- final Set <File > initialFiles = walkDirectory (FileReadingMessageSource .this .directory .toPath ());
451
+ final Set <File > initialFiles = walkDirectory (FileReadingMessageSource .this .directory .toPath (), null );
448
452
initialFiles .addAll (filesFromEvents ());
449
453
this .initialFiles = initialFiles ;
450
454
}
@@ -489,36 +493,53 @@ private Set<File> filesFromEvents() {
489
493
Path item = (Path ) event .context ();
490
494
File file = new File (parentDir , item .toFile ().getName ());
491
495
if (logger .isDebugEnabled ()) {
492
- logger .debug ("Watch Event: " + event .kind () + ": " + file );
496
+ logger .debug ("Watch event [ " + event .kind () + "] for file [ " + file + "]" );
493
497
}
494
498
495
499
if (event .kind () == StandardWatchEventKinds .ENTRY_DELETE ) {
496
500
if (FileReadingMessageSource .this .filter instanceof ResettableFileListFilter ) {
497
501
((ResettableFileListFilter <File >) FileReadingMessageSource .this .filter ).remove (file );
498
502
}
499
- files .remove (file );
503
+ boolean fileRemoved = files .remove (file );
504
+ if (fileRemoved && logger .isDebugEnabled ()) {
505
+ logger .debug ("The file [" + file +
506
+ "] has been removed from the queue because of DELETE event." );
507
+ }
500
508
}
501
509
else {
502
510
if (file .exists ()) {
503
511
if (file .isDirectory ()) {
504
- files .addAll (walkDirectory (file .toPath ()));
512
+ files .addAll (walkDirectory (file .toPath (), event . kind () ));
505
513
}
506
514
else {
507
515
files .remove (file );
508
516
files .add (file );
509
517
}
510
518
}
519
+ else {
520
+ if (logger .isDebugEnabled ()) {
521
+ logger .debug ("A file [" + file + "] for the event [" + event .kind () +
522
+ "] doesn't exist. Ignored." );
523
+ }
524
+ }
511
525
}
512
526
}
513
527
else if (event .kind () == StandardWatchEventKinds .OVERFLOW ) {
514
528
if (logger .isDebugEnabled ()) {
515
- logger .debug ("Watch Event: " + event .kind () + ": context: " + event .context ());
529
+ logger .debug ("Watch event [" + StandardWatchEventKinds .OVERFLOW +
530
+ "] with context [" + event .context () + "]" );
531
+ }
532
+
533
+ for (WatchKey watchKey : pathKeys .values ()) {
534
+ watchKey .cancel ();
516
535
}
536
+ this .pathKeys .clear ();
537
+
517
538
if (event .context () != null && event .context () instanceof Path ) {
518
- files .addAll (walkDirectory ((Path ) event .context ()));
539
+ files .addAll (walkDirectory ((Path ) event .context (), event . kind () ));
519
540
}
520
541
else {
521
- files .addAll (walkDirectory (FileReadingMessageSource .this .directory .toPath ()));
542
+ files .addAll (walkDirectory (FileReadingMessageSource .this .directory .toPath (), event . kind () ));
522
543
}
523
544
}
524
545
}
@@ -528,7 +549,7 @@ else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
528
549
return files ;
529
550
}
530
551
531
- private Set <File > walkDirectory (Path directory ) {
552
+ private Set <File > walkDirectory (Path directory , final WatchEvent . Kind <?> kind ) {
532
553
final Set <File > walkedFiles = new LinkedHashSet <File >();
533
554
try {
534
555
registerWatch (directory );
@@ -544,7 +565,9 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) th
544
565
@ Override
545
566
public FileVisitResult visitFile (Path file , BasicFileAttributes attrs ) throws IOException {
546
567
FileVisitResult fileVisitResult = super .visitFile (file , attrs );
547
- walkedFiles .add (file .toFile ());
568
+ if (!StandardWatchEventKinds .ENTRY_MODIFY .equals (kind )) {
569
+ walkedFiles .add (file .toFile ());
570
+ }
548
571
return fileVisitResult ;
549
572
}
550
573
@@ -557,10 +580,13 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
557
580
}
558
581
559
582
private void registerWatch (Path dir ) throws IOException {
560
- if (logger .isDebugEnabled ()) {
561
- logger .debug ("registering: " + dir + " for file events" );
583
+ if (!this .pathKeys .containsKey (dir )) {
584
+ if (logger .isDebugEnabled ()) {
585
+ logger .debug ("registering: " + dir + " for file events" );
586
+ }
587
+ WatchKey watchKey = dir .register (this .watcher , this .kinds );
588
+ this .pathKeys .putIfAbsent (dir , watchKey );
562
589
}
563
- dir .register (this .watcher , this .kinds );
564
590
}
565
591
566
592
}
0 commit comments