Skip to content

Commit 7b1f77a

Browse files
artembilangaryrussell
authored andcommitted
INT-3989: WatchServiceDirectoryScanner Improvement
JIRA: https://jira.spring.io/browse/INT-3989, https://jira.spring.io/browse/INT-3990, https://jira.spring.io/browse/INT-3988 INT-3989: Add `FileReadingMessageSource.WatchServiceDirectoryScanner` * Deprecate top-level `WatchServiceDirectoryScanner` because of inconsistency around `Lifecycle` and shared `directory` property * Copy/paste its logic into the `FileReadingMessageSource.WatchServiceDirectoryScanner` to hide that inconsistency, but still get a gain from the `WatchService` benefits * Add support for the `StandardWatchEventKinds.ENTRY_MODIFY` and `StandardWatchEventKinds.ENTRY_DELETE` events in the `FileReadingMessageSource.WatchServiceDirectoryScanner` * Introduce `useWatchService` option to switch to the internal `FileReadingMessageSource.WatchServiceDirectoryScanner` * Make `CompositeFileListFilter` also as `ResettableFileListFilter` * Deprecate weird `FileReadingMessageSource.onSend()` method and remove its usage from tests * Modify `WatchServiceDirectoryScannerTests` for the new logic * Document changes Add `MODIFY` and `DELETE` test coverage Optimize the `filesFromEvents()` logic replacing item with the fresh event sources. Remove the item from the result set in case of `DELETE` event, because file removal generates both `MODIFY` and `DELETE` events. * Add `FileReadingMessageSource.setWatchEvents` to allow to listen to the specific events, not only `CREATE` or all of them. * Modify tests and docs to reflect the new API Address PR comments * Improve `FileReadingMessageSource.setWatchEvents()` * Add `file.exists()` before adding file from event. Add more DEBUG logs into the `WatchServiceDirectoryScanner` With the fact of those logs provide more optimizations: * Don't register the same directory for watching: - use the `ConcurrentMap<Path, WatchKey> pathKeys` to track registrations - any modification within the directory causes the `ENTRY_MODIFY` for the directory as well. So, skip such an event exactly for the directory during `walkDirectory` * Add debug logs in case of `ENTRY_DELETE`
1 parent 379a8a2 commit 7b1f77a

17 files changed

+497
-57
lines changed

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AsyncAmqpGatewayTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void handleMessage(Message<?> message) throws MessagingException {
203203
Message<?> returned = returnChannel.receive(10000);
204204
assertNotNull(returned);
205205
assertEquals("fiz", returned.getPayload());
206-
206+
ackChannel.receive(10000);
207207
ackChannel.purge(null);
208208

209209
// Simulate a nack - it's hard to get Rabbit to generate one

spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java

Lines changed: 272 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,39 @@
1717
package org.springframework.integration.file;
1818

1919
import java.io.File;
20+
import java.io.IOException;
21+
import java.nio.file.FileSystems;
22+
import java.nio.file.FileVisitResult;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.nio.file.SimpleFileVisitor;
26+
import java.nio.file.StandardWatchEventKinds;
27+
import java.nio.file.WatchEvent;
28+
import java.nio.file.WatchKey;
29+
import java.nio.file.WatchService;
30+
import java.nio.file.attribute.BasicFileAttributes;
31+
import java.util.Arrays;
32+
import java.util.Collection;
2033
import java.util.Comparator;
2134
import java.util.LinkedHashSet;
2235
import java.util.List;
2336
import java.util.Queue;
2437
import java.util.Set;
38+
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.ConcurrentMap;
2540
import java.util.concurrent.PriorityBlockingQueue;
2641

2742
import org.apache.commons.logging.Log;
2843
import org.apache.commons.logging.LogFactory;
2944

45+
import org.springframework.context.Lifecycle;
3046
import org.springframework.integration.aggregator.ResequencingMessageGroupProcessor;
3147
import org.springframework.integration.context.IntegrationObjectSupport;
3248
import org.springframework.integration.core.MessageSource;
3349
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
3450
import org.springframework.integration.file.filters.FileListFilter;
51+
import org.springframework.integration.file.filters.ResettableFileListFilter;
52+
import org.springframework.lang.UsesJava7;
3553
import org.springframework.messaging.Message;
3654
import org.springframework.messaging.MessagingException;
3755
import org.springframework.util.Assert;
@@ -64,7 +82,7 @@
6482
* @author Gary Russell
6583
* @author Artem Bilan
6684
*/
67-
public class FileReadingMessageSource extends IntegrationObjectSupport implements MessageSource<File> {
85+
public class FileReadingMessageSource extends IntegrationObjectSupport implements MessageSource<File>, Lifecycle {
6886

6987
private static final int DEFAULT_INTERNAL_QUEUE_CAPACITY = 5;
7088

@@ -87,10 +105,16 @@ public class FileReadingMessageSource extends IntegrationObjectSupport implement
87105

88106
private volatile boolean scanEachPoll = false;
89107

108+
private volatile boolean running;
109+
90110
private FileListFilter<File> filter;
91111

92112
private FileLocker locker;
93113

114+
private boolean useWatchService;
115+
116+
private WatchEventType[] watchEvents = new WatchEventType[] { WatchEventType.CREATE };
117+
94118
/**
95119
* Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
96120
*/
@@ -236,11 +260,59 @@ public void setScanEachPoll(boolean scanEachPoll) {
236260
this.scanEachPoll = scanEachPoll;
237261
}
238262

263+
/**
264+
* Switch this {@link FileReadingMessageSource} to use its internal
265+
* {@link FileReadingMessageSource.WatchServiceDirectoryScanner}.
266+
* @param useWatchService the {@code boolean} flag to switch to
267+
* {@link FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
268+
* @since 4.3
269+
* @see #setWatchEvents
270+
*/
271+
public void setUseWatchService(boolean useWatchService) {
272+
this.useWatchService = useWatchService;
273+
}
274+
275+
/**
276+
* The {@link WatchService} event types.
277+
* If {@link #setUseWatchService} isn't {@code true}, this option is ignored.
278+
* @param watchEvents the set of {@link WatchEventType}.
279+
* @since 4.3
280+
* @see #setUseWatchService
281+
*/
282+
public void setWatchEvents(WatchEventType... watchEvents) {
283+
Assert.notEmpty(watchEvents, "'watchEvents' must not be empty.");
284+
Assert.noNullElements(watchEvents, "'watchEvents' must not contain null elements.");
285+
Assert.state(!this.running, "Cannot change watch events while running.");
286+
287+
this.watchEvents = Arrays.copyOf(watchEvents, watchEvents.length);
288+
}
289+
239290
@Override
240291
public String getComponentType() {
241292
return "file:inbound-channel-adapter";
242293
}
243294

295+
@Override
296+
public void start() {
297+
if (this.scanner instanceof Lifecycle) {
298+
((Lifecycle) this.scanner).start();
299+
}
300+
this.running = true;
301+
}
302+
303+
@Override
304+
public void stop() {
305+
if (this.scanner instanceof Lifecycle) {
306+
((Lifecycle) this.scanner).start();
307+
}
308+
this.running = false;
309+
}
310+
311+
@Override
312+
public boolean isRunning() {
313+
return this.running;
314+
}
315+
244316
@Override
245317
protected void onInit() {
246318
Assert.notNull(this.directory, "'directory' must not be null");
@@ -253,6 +325,14 @@ protected void onInit() {
253325
"Source path [" + this.directory + "] does not point to a directory.");
254326
Assert.isTrue(this.directory.canRead(),
255327
"Source directory [" + this.directory + "] is not readable.");
328+
329+
Assert.state(!(this.scannerExplicitlySet && this.useWatchService),
330+
"The 'scanner' and 'useWatchService' options are mutually exclusive: " + this.scanner);
331+
332+
if (this.useWatchService) {
333+
this.scanner = new WatchServiceDirectoryScanner();
334+
}
335+
256336
Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)),
257337
"The 'filter' and 'locker' options must be present on the provided external 'scanner': "
258338
+ this.scanner);
@@ -262,6 +342,7 @@ protected void onInit() {
262342
if (this.locker != null) {
263343
this.scanner.setLocker(this.locker);
264344
}
345+
265346
}
266347

267348
public Message<File> receive() throws MessagingException {
@@ -302,9 +383,7 @@ private void scanInputDirectory() {
302383

303384
/**
304385
* Adds the failed message back to the 'toBeReceived' queue if there is room.
305-
*
306-
* @param failedMessage
307-
* the {@link org.springframework.messaging.Message} that failed
386+
* @param failedMessage the {@link Message} that failed
308387
*/
309388
public void onFailure(Message<File> failedMessage) {
310389
if (logger.isWarnEnabled()) {
@@ -316,14 +395,200 @@ public void onFailure(Message<File> failedMessage) {
316395
/**
317396
* The message is just logged. It was already removed from the queue during
318397
* the call to <code>receive()</code>
319-
*
320-
* @param sentMessage
321-
* the message that was successfully delivered
398+
* @param sentMessage the message that was successfully delivered
399+
* @deprecated with no replacement. Redundant method.
322400
*/
401+
@Deprecated
323402
public void onSend(Message<File> sentMessage) {
324403
if (logger.isDebugEnabled()) {
325404
logger.debug("Sent: " + sentMessage);
326405
}
327406
}
328407

408+
@UsesJava7
409+
public enum WatchEventType {
410+
411+
CREATE(StandardWatchEventKinds.ENTRY_CREATE),
412+
413+
MODIFY(StandardWatchEventKinds.ENTRY_MODIFY),
414+
415+
DELETE(StandardWatchEventKinds.ENTRY_DELETE);
416+
417+
private final WatchEvent.Kind<Path> kind;
418+
419+
WatchEventType(WatchEvent.Kind<Path> kind) {
420+
this.kind = kind;
421+
}
422+
423+
}
424+
425+
@UsesJava7
426+
private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner implements Lifecycle {
427+
428+
private final ConcurrentMap<Path, WatchKey> pathKeys = new ConcurrentHashMap<Path, WatchKey>();
429+
430+
private WatchService watcher;
431+
432+
private Collection<File> initialFiles;
433+
434+
private WatchEvent.Kind<?>[] kinds;
435+
436+
@Override
437+
public void start() {
438+
try {
439+
this.watcher = FileSystems.getDefault().newWatchService();
440+
}
441+
catch (IOException e) {
442+
logger.error("Failed to create watcher for " + FileReadingMessageSource.this.directory, e);
443+
}
444+
445+
this.kinds = new WatchEvent.Kind<?>[FileReadingMessageSource.this.watchEvents.length];
446+
447+
for (int i = 0; i < FileReadingMessageSource.this.watchEvents.length; i++) {
448+
this.kinds[i] = FileReadingMessageSource.this.watchEvents[i].kind;
449+
}
450+
451+
final Set<File> initialFiles = walkDirectory(FileReadingMessageSource.this.directory.toPath(), null);
452+
initialFiles.addAll(filesFromEvents());
453+
this.initialFiles = initialFiles;
454+
}
455+
456+
@Override
457+
public void stop() {
458+
try {
459+
this.watcher.close();
460+
this.watcher = null;
461+
}
462+
catch (IOException e) {
463+
logger.error("Failed to close watcher for " + FileReadingMessageSource.this.directory, e);
464+
}
465+
}
466+
467+
@Override
468+
public boolean isRunning() {
469+
return true;
470+
}
471+
472+
@Override
473+
protected File[] listEligibleFiles(File directory) {
474+
Assert.state(this.watcher != null, "The WatchService has'nt been started");
475+
if (this.initialFiles != null) {
476+
File[] initial = this.initialFiles.toArray(new File[this.initialFiles.size()]);
477+
this.initialFiles = null;
478+
return initial;
479+
}
480+
Collection<File> files = filesFromEvents();
481+
return files.toArray(new File[files.size()]);
482+
}
483+
484+
private Set<File> filesFromEvents() {
485+
WatchKey key = this.watcher.poll();
486+
Set<File> files = new LinkedHashSet<File>();
487+
while (key != null) {
488+
File parentDir = ((Path) key.watchable()).toAbsolutePath().toFile();
489+
for (WatchEvent<?> event : key.pollEvents()) {
490+
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE ||
491+
event.kind() == StandardWatchEventKinds.ENTRY_MODIFY ||
492+
event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
493+
Path item = (Path) event.context();
494+
File file = new File(parentDir, item.toFile().getName());
495+
if (logger.isDebugEnabled()) {
496+
logger.debug("Watch event [" + event.kind() + "] for file [" + file + "]");
497+
}
498+
499+
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
500+
if (FileReadingMessageSource.this.filter instanceof ResettableFileListFilter) {
501+
((ResettableFileListFilter<File>) FileReadingMessageSource.this.filter).remove(file);
502+
}
503+
boolean fileRemoved = files.remove(file);
504+
if (fileRemoved && logger.isDebugEnabled()) {
505+
logger.debug("The file [" + file +
506+
"] has been removed from the queue because of DELETE event.");
507+
}
508+
}
509+
else {
510+
if (file.exists()) {
511+
if (file.isDirectory()) {
512+
files.addAll(walkDirectory(file.toPath(), event.kind()));
513+
}
514+
else {
515+
files.remove(file);
516+
files.add(file);
517+
}
518+
}
519+
else {
520+
if (logger.isDebugEnabled()) {
521+
logger.debug("A file [" + file + "] for the event [" + event.kind() +
522+
"] doesn't exist. Ignored.");
523+
}
524+
}
525+
}
526+
}
527+
else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
528+
if (logger.isDebugEnabled()) {
529+
logger.debug("Watch event [" + StandardWatchEventKinds.OVERFLOW +
530+
"] with context [" + event.context() + "]");
531+
}
532+
533+
for (WatchKey watchKey : pathKeys.values()) {
534+
watchKey.cancel();
535+
}
536+
this.pathKeys.clear();
537+
538+
if (event.context() != null && event.context() instanceof Path) {
539+
files.addAll(walkDirectory((Path) event.context(), event.kind()));
540+
}
541+
else {
542+
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind()));
543+
}
544+
}
545+
}
546+
key.reset();
547+
key = this.watcher.poll();
548+
}
549+
return files;
550+
}
551+
552+
private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
553+
final Set<File> walkedFiles = new LinkedHashSet<File>();
554+
try {
555+
registerWatch(directory);
556+
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
557+
558+
@Override
559+
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
560+
FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs);
561+
registerWatch(dir);
562+
return fileVisitResult;
563+
}
564+
565+
@Override
566+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
567+
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
568+
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
569+
walkedFiles.add(file.toFile());
570+
}
571+
return fileVisitResult;
572+
}
573+
574+
});
575+
}
576+
catch (IOException e) {
577+
logger.error("Failed to walk directory: " + directory.toString(), e);
578+
}
579+
return walkedFiles;
580+
}
581+
582+
private void registerWatch(Path dir) throws IOException {
583+
if (!this.pathKeys.containsKey(dir)) {
584+
if (logger.isDebugEnabled()) {
585+
logger.debug("registering: " + dir + " for file events");
586+
}
587+
WatchKey watchKey = dir.register(this.watcher, this.kinds);
588+
this.pathKeys.putIfAbsent(dir, watchKey);
589+
}
590+
}
591+
592+
}
593+
329594
}

spring-integration-file/src/main/java/org/springframework/integration/file/RecursiveLeafOnlyDirectoryScanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @author Iwein Fuld
3131
* @author Gary Russell
3232
*
33-
* @deprecated in favor of {@link WatchServiceDirectoryScanner} (when using Java 7 or later)
33+
* @deprecated in favor of {@link FileReadingMessageSource#setUseWatchService(boolean)} (when using Java 7 or later)
3434
*/
3535
@Deprecated
3636
public class RecursiveLeafOnlyDirectoryScanner extends DefaultDirectoryScanner {

spring-integration-file/src/main/java/org/springframework/integration/file/WatchServiceDirectoryScanner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,13 @@
6060
* @author Gary Russell
6161
* @author Artem Bilan
6262
* @since 4.2
63+
* @deprecated since 4.3 in favor of internal {@link WatchService} logic in the {@link FileReadingMessageSource}.
64+
* Will be removed in Spring Integration 5.0.
6365
*
6466
*/
67+
@Deprecated
6568
@UsesJava7
69+
@SuppressWarnings("deprecation")
6670
public class WatchServiceDirectoryScanner extends DefaultDirectoryScanner implements SmartLifecycle {
6771

6872
private final static Log logger = LogFactory.getLog(WatchServiceDirectoryScanner.class);

0 commit comments

Comments
 (0)