Skip to content

Commit 3151b74

Browse files
committed
INT-3989: Add FileReadingMessageSource.WatchServiceDirectoryScanner
JIRA: https://jira.spring.io/browse/INT-3989, https://jira.spring.io/browse/INT-3990, https://jira.spring.io/browse/INT-3988 * Deprecate top-level `WatchServiceDirectoryScanner` because of inconsistency around `Lifecycle` and shared `directory` property * Copy/paste its logic into the `FileReadingMessageSource.WatchServiceDirectoryScanner` to hide that inconsistency, but still get a gain from the `WatchService` benefits * Add support for the `StandardWatchEventKinds.ENTRY_MODIFY` and `StandardWatchEventKinds.ENTRY_DELETE` events in the `FileReadingMessageSource.WatchServiceDirectoryScanner` * Introduce `useWatchService` option to switch to the internal `FileReadingMessageSource.WatchServiceDirectoryScanner` * Make `CompositeFileListFilter` also as `ResettableFileListFilter` * Deprecate weird `FileReadingMessageSource.onSend()` method and remove its usage from tests * Modify `WatchServiceDirectoryScannerTests` for the new logic * Document changes
1 parent cfdfa62 commit 3151b74

14 files changed

+316
-47
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java

Lines changed: 193 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,18 @@
1717
package org.springframework.integration.file;
1818

1919
import java.io.File;
20+
import java.io.IOException;
21+
import java.nio.file.FileSystems;
22+
import java.nio.file.FileVisitResult;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.nio.file.SimpleFileVisitor;
26+
import java.nio.file.StandardWatchEventKinds;
27+
import java.nio.file.WatchEvent;
28+
import java.nio.file.WatchKey;
29+
import java.nio.file.WatchService;
30+
import java.nio.file.attribute.BasicFileAttributes;
31+
import java.util.Collection;
2032
import java.util.Comparator;
2133
import java.util.LinkedHashSet;
2234
import java.util.List;
@@ -27,11 +39,14 @@
2739
import org.apache.commons.logging.Log;
2840
import org.apache.commons.logging.LogFactory;
2941

42+
import org.springframework.context.Lifecycle;
3043
import org.springframework.integration.aggregator.ResequencingMessageGroupProcessor;
3144
import org.springframework.integration.context.IntegrationObjectSupport;
3245
import org.springframework.integration.core.MessageSource;
3346
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
3447
import org.springframework.integration.file.filters.FileListFilter;
48+
import org.springframework.integration.file.filters.ResettableFileListFilter;
49+
import org.springframework.lang.UsesJava7;
3550
import org.springframework.messaging.Message;
3651
import org.springframework.messaging.MessagingException;
3752
import org.springframework.util.Assert;
@@ -64,7 +79,7 @@
6479
* @author Gary Russell
6580
* @author Artem Bilan
6681
*/
67-
public class FileReadingMessageSource extends IntegrationObjectSupport implements MessageSource<File> {
82+
public class FileReadingMessageSource extends IntegrationObjectSupport implements MessageSource<File>, Lifecycle {
6883

6984
private static final int DEFAULT_INTERNAL_QUEUE_CAPACITY = 5;
7085

@@ -87,10 +102,14 @@ public class FileReadingMessageSource extends IntegrationObjectSupport implement
87102

88103
private volatile boolean scanEachPoll = false;
89104

105+
private volatile boolean running;
106+
90107
private FileListFilter<File> filter;
91108

92109
private FileLocker locker;
93110

111+
private boolean useWatchService;
112+
94113
/**
95114
* Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
96115
*/
@@ -236,11 +255,36 @@ public void setScanEachPoll(boolean scanEachPoll) {
236255
this.scanEachPoll = scanEachPoll;
237256
}
238257

258+
public void setUseWatchService(boolean useWatchService) {
259+
this.useWatchService = useWatchService;
260+
}
261+
239262
@Override
240263
public String getComponentType() {
241264
return "file:inbound-channel-adapter";
242265
}
243266

267+
@Override
268+
public void start() {
269+
if (this.scanner instanceof Lifecycle) {
270+
((Lifecycle) this.scanner).start();
271+
}
272+
this.running = true;
273+
}
274+
275+
@Override
276+
public void stop() {
277+
if (this.scanner instanceof Lifecycle) {
278+
((Lifecycle) this.scanner).start();
279+
}
280+
this.running = false;
281+
}
282+
283+
@Override
284+
public boolean isRunning() {
285+
return this.running;
286+
}
287+
244288
@Override
245289
protected void onInit() {
246290
Assert.notNull(this.directory, "'directory' must not be null");
@@ -253,6 +297,14 @@ protected void onInit() {
253297
"Source path [" + this.directory + "] does not point to a directory.");
254298
Assert.isTrue(this.directory.canRead(),
255299
"Source directory [" + this.directory + "] is not readable.");
300+
301+
Assert.state(!(this.scannerExplicitlySet && this.useWatchService),
302+
"The 'scanner' and 'useWatchService' options are mutually exclusive: " + this.scanner);
303+
304+
if (this.useWatchService) {
305+
this.scanner = new WatchServiceDirectoryScanner();
306+
}
307+
256308
Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)),
257309
"The 'filter' and 'locker' options must be present on the provided external 'scanner': "
258310
+ this.scanner);
@@ -262,6 +314,7 @@ protected void onInit() {
262314
if (this.locker != null) {
263315
this.scanner.setLocker(this.locker);
264316
}
317+
265318
}
266319

267320
public Message<File> receive() throws MessagingException {
@@ -302,9 +355,7 @@ private void scanInputDirectory() {
302355

303356
/**
304357
* Adds the failed message back to the 'toBeReceived' queue if there is room.
305-
*
306-
* @param failedMessage
307-
* the {@link org.springframework.messaging.Message} that failed
358+
* @param failedMessage the {@link Message} that failed
308359
*/
309360
public void onFailure(Message<File> failedMessage) {
310361
if (logger.isWarnEnabled()) {
@@ -316,14 +367,149 @@ public void onFailure(Message<File> failedMessage) {
316367
/**
317368
* The message is just logged. It was already removed from the queue during
318369
* the call to <code>receive()</code>
319-
*
320-
* @param sentMessage
321-
* the message that was successfully delivered
370+
* @param sentMessage the message that was successfully delivered
371+
* @deprecated with no replacement. Redundant method.
322372
*/
373+
@Deprecated
323374
public void onSend(Message<File> sentMessage) {
324375
if (logger.isDebugEnabled()) {
325376
logger.debug("Sent: " + sentMessage);
326377
}
327378
}
328379

380+
@UsesJava7
381+
private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner implements Lifecycle {
382+
383+
private volatile WatchService watcher;
384+
385+
private volatile Collection<File> initialFiles;
386+
387+
@Override
388+
public void start() {
389+
try {
390+
this.watcher = FileSystems.getDefault().newWatchService();
391+
}
392+
catch (IOException e) {
393+
logger.error("Failed to create watcher for " + FileReadingMessageSource.this.directory, e);
394+
}
395+
final Set<File> initialFiles = walkDirectory(FileReadingMessageSource.this.directory.toPath());
396+
initialFiles.addAll(filesFromEvents());
397+
this.initialFiles = initialFiles;
398+
}
399+
400+
@Override
401+
public void stop() {
402+
try {
403+
this.watcher.close();
404+
this.watcher = null;
405+
}
406+
catch (IOException e) {
407+
logger.error("Failed to close watcher for " + FileReadingMessageSource.this.directory, e);
408+
}
409+
}
410+
411+
@Override
412+
public boolean isRunning() {
413+
return true;
414+
}
415+
416+
@Override
417+
protected File[] listEligibleFiles(File directory) {
418+
Assert.state(this.watcher != null, "The WatchService has'nt been started");
419+
if (this.initialFiles != null) {
420+
File[] initial = this.initialFiles.toArray(new File[this.initialFiles.size()]);
421+
this.initialFiles = null;
422+
return initial;
423+
}
424+
Collection<File> files = filesFromEvents();
425+
return files.toArray(new File[files.size()]);
426+
}
427+
428+
private Set<File> filesFromEvents() {
429+
WatchKey key = this.watcher.poll();
430+
Set<File> files = new LinkedHashSet<File>();
431+
while (key != null) {
432+
File parentDir = ((Path) key.watchable()).toAbsolutePath().toFile();
433+
for (WatchEvent<?> event : key.pollEvents()) {
434+
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE ||
435+
event.kind() == StandardWatchEventKinds.ENTRY_MODIFY ||
436+
event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
437+
Path item = (Path) event.context();
438+
File file = new File(parentDir, item.toFile().getName());
439+
if (logger.isDebugEnabled()) {
440+
logger.debug("Watch Event: " + event.kind() + ": " + file);
441+
}
442+
443+
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
444+
if (FileReadingMessageSource.this.filter instanceof ResettableFileListFilter) {
445+
((ResettableFileListFilter<File>) FileReadingMessageSource.this.filter).remove(file);
446+
}
447+
}
448+
else {
449+
if (file.isDirectory()) {
450+
files.addAll(walkDirectory(file.toPath()));
451+
}
452+
else {
453+
files.add(file);
454+
}
455+
}
456+
}
457+
else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
458+
if (logger.isDebugEnabled()) {
459+
logger.debug("Watch Event: " + event.kind() + ": context: " + event.context());
460+
}
461+
if (event.context() != null && event.context() instanceof Path) {
462+
files.addAll(walkDirectory((Path) event.context()));
463+
}
464+
else {
465+
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath()));
466+
}
467+
}
468+
}
469+
key.reset();
470+
key = this.watcher.poll();
471+
}
472+
return files;
473+
}
474+
475+
private Set<File> walkDirectory(Path directory) {
476+
final Set<File> walkedFiles = new LinkedHashSet<File>();
477+
try {
478+
registerWatch(directory);
479+
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
480+
481+
@Override
482+
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
483+
FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs);
484+
registerWatch(dir);
485+
return fileVisitResult;
486+
}
487+
488+
@Override
489+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
490+
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
491+
walkedFiles.add(file.toFile());
492+
return fileVisitResult;
493+
}
494+
495+
});
496+
}
497+
catch (IOException e) {
498+
logger.error("Failed to walk directory: " + directory.toString(), e);
499+
}
500+
return walkedFiles;
501+
}
502+
503+
private void registerWatch(Path dir) throws IOException {
504+
if (logger.isDebugEnabled()) {
505+
logger.debug("registering: " + dir + " for file events");
506+
}
507+
dir.register(this.watcher,
508+
StandardWatchEventKinds.ENTRY_CREATE,
509+
StandardWatchEventKinds.ENTRY_MODIFY,
510+
StandardWatchEventKinds.ENTRY_DELETE);
511+
}
512+
513+
}
514+
329515
}

spring-integration-file/src/main/java/org/springframework/integration/file/RecursiveLeafOnlyDirectoryScanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @author Iwein Fuld
3131
* @author Gary Russell
3232
*
33-
* @deprecated in favor of {@link WatchServiceDirectoryScanner} (when using Java 7 or later)
33+
* @deprecated in favor of {@link FileReadingMessageSource#setUseWatchService(boolean)} (when using Java 7 or later)
3434
*/
3535
@Deprecated
3636
public class RecursiveLeafOnlyDirectoryScanner extends DefaultDirectoryScanner {

spring-integration-file/src/main/java/org/springframework/integration/file/WatchServiceDirectoryScanner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,13 @@
6060
* @author Gary Russell
6161
* @author Artem Bilan
6262
* @since 4.2
63+
* @deprecated since 4.3 in favor of internal {@link WatchService} logic in the {@link FileReadingMessageSource}.
64+
* Will be removed in Spring Integration 5.0.
6365
*
6466
*/
67+
@Deprecated
6568
@UsesJava7
69+
@SuppressWarnings("deprecation")
6670
public class WatchServiceDirectoryScanner extends DefaultDirectoryScanner implements SmartLifecycle {
6771

6872
private final static Log logger = LogFactory.getLog(WatchServiceDirectoryScanner.class);

spring-integration-file/src/main/java/org/springframework/integration/file/config/FileInboundChannelAdapterParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,6 +46,7 @@ protected BeanMetadataElement parseSource(Element element, ParserContext parserC
4646
BeanDefinitionBuilder.genericBeanDefinition(FileReadingMessageSourceFactoryBean.class);
4747
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "comparator");
4848
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "scanner");
49+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "use-watch-service");
4950
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "directory");
5051
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-create-directory");
5152
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "queue-size");

spring-integration-file/src/main/java/org/springframework/integration/file/config/FileReadingMessageSourceFactoryBean.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class FileReadingMessageSourceFactoryBean implements FactoryBean<FileRead
5454

5555
private volatile DirectoryScanner scanner;
5656

57+
private boolean useWatchService;
58+
5759
private volatile Boolean scanEachPoll;
5860

5961
private volatile Boolean autoCreateDirectory;
@@ -77,6 +79,10 @@ public void setScanner(DirectoryScanner scanner) {
7779
this.scanner = scanner;
7880
}
7981

82+
public void setUseWatchService(boolean useWatchService) {
83+
this.useWatchService = useWatchService;
84+
}
85+
8086
public void setFilter(FileListFilter<File> filter) {
8187
if (filter instanceof AbstractFileLockerFilter && (this.locker == null)) {
8288
this.setLocker((AbstractFileLockerFilter) filter);
@@ -146,6 +152,9 @@ else if (queueSizeSet) {
146152
if (this.scanner != null) {
147153
this.source.setScanner(this.scanner);
148154
}
155+
else {
156+
this.source.setUseWatchService(this.useWatchService);
157+
}
149158
if (this.filter != null) {
150159
if (this.locker == null) {
151160
this.source.setFilter(this.filter);

spring-integration-file/src/main/java/org/springframework/integration/file/filters/CompositeFileListFilter.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,10 +36,11 @@
3636
* @author Iwein Fuld
3737
* @author Josh Long
3838
* @author Gary Russell
39+
* @author Artem Bilan
3940
*
4041
* @param <F> The type that will be filtered.
4142
*/
42-
public class CompositeFileListFilter<F> implements ReversibleFileListFilter<F>, Closeable {
43+
public class CompositeFileListFilter<F> implements ReversibleFileListFilter<F>, ResettableFileListFilter<F>, Closeable {
4344

4445
private final Set<FileListFilter<F>> fileFilters;
4546

@@ -120,4 +121,16 @@ public void rollback(F file, List<F> files) {
120121
}
121122
}
122123

124+
@Override
125+
public boolean remove(F f) {
126+
boolean removed = false;
127+
for (FileListFilter<F> fileFilter : this.fileFilters) {
128+
if (fileFilter instanceof ResettableFileListFilter) {
129+
((ResettableFileListFilter<F>) fileFilter).remove(f);
130+
removed = true;
131+
}
132+
}
133+
return removed;
134+
}
135+
123136
}

0 commit comments

Comments
 (0)