Skip to content

Commit fadf8aa

Browse files
committed
* Add FileReadingMessageSource.setWatchEvents to allow to listen to the specific events,
not only `CREATE` or all of them. * Modify tests and docs to reflect the new API
1 parent b4940c9 commit fadf8aa

File tree

9 files changed

+167
-43
lines changed

9 files changed

+167
-43
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ public class FileReadingMessageSource extends IntegrationObjectSupport implement
110110

111111
private boolean useWatchService;
112112

113+
private WatchEventType[] watchEvents = new WatchEventType[] { WatchEventType.CREATE };
114+
113115
/**
114116
* Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
115117
*/
@@ -255,10 +257,30 @@ public void setScanEachPoll(boolean scanEachPoll) {
255257
this.scanEachPoll = scanEachPoll;
256258
}
257259

260+
/**
261+
* Switch this {@link FileReadingMessageSource} to use its internal
262+
* {@link FileReadingMessageSource.WatchServiceDirectoryScanner}.
263+
* @param useWatchService the {@code boolean} flag to switch to
264+
* {@link FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
265+
* @since 4.3
266+
* @see #setWatchEvents
267+
*/
258268
public void setUseWatchService(boolean useWatchService) {
259269
this.useWatchService = useWatchService;
260270
}
261271

272+
/**
273+
* The {@link WatchService} event types.
274+
* If {@link #setUseWatchService} isn't {@code true}, this option is ignored.
275+
* @param watchEvents the set of {@link WatchEventType}.
276+
* @since 4.3
277+
* @see #setUseWatchService
278+
*/
279+
public void setWatchEvents(WatchEventType... watchEvents) {
280+
Assert.notEmpty(watchEvents, "'watchEvents' must not be empty.");
281+
this.watchEvents = watchEvents;
282+
}
283+
262284
@Override
263285
public String getComponentType() {
264286
return "file:inbound-channel-adapter";
@@ -377,35 +399,61 @@ public void onSend(Message<File> sentMessage) {
377399
}
378400
}
379401

402+
@UsesJava7
403+
public enum WatchEventType {
404+
405+
CREATE(StandardWatchEventKinds.ENTRY_CREATE),
406+
407+
MODIFY(StandardWatchEventKinds.ENTRY_MODIFY),
408+
409+
DELETE(StandardWatchEventKinds.ENTRY_DELETE);
410+
411+
private final WatchEvent.Kind<Path> kind;
412+
413+
WatchEventType(WatchEvent.Kind<Path> kind) {
414+
this.kind = kind;
415+
}
416+
417+
}
418+
380419
@UsesJava7
381420
private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner implements Lifecycle {
382421

383-
private volatile WatchService watcher;
422+
private WatchService watcher;
384423

385-
private volatile Collection<File> initialFiles;
424+
private Collection<File> initialFiles;
425+
426+
private WatchEvent.Kind<?>[] kinds;
386427

387428
@Override
388429
public void start() {
389-
try {
390-
this.watcher = FileSystems.getDefault().newWatchService();
391-
}
392-
catch (IOException e) {
393-
logger.error("Failed to create watcher for " + FileReadingMessageSource.this.directory, e);
394-
}
395-
final Set<File> initialFiles = walkDirectory(FileReadingMessageSource.this.directory.toPath());
396-
initialFiles.addAll(filesFromEvents());
397-
this.initialFiles = initialFiles;
430+
try {
431+
this.watcher = FileSystems.getDefault().newWatchService();
432+
}
433+
catch (IOException e) {
434+
logger.error("Failed to create watcher for " + FileReadingMessageSource.this.directory, e);
435+
}
436+
437+
this.kinds = new WatchEvent.Kind<?>[FileReadingMessageSource.this.watchEvents.length];
438+
439+
for (int i = 0; i < FileReadingMessageSource.this.watchEvents.length; i++) {
440+
this.kinds[i] = FileReadingMessageSource.this.watchEvents[i].kind;
441+
}
442+
443+
final Set<File> initialFiles = walkDirectory(FileReadingMessageSource.this.directory.toPath());
444+
initialFiles.addAll(filesFromEvents());
445+
this.initialFiles = initialFiles;
398446
}
399447

400448
@Override
401449
public void stop() {
402-
try {
403-
this.watcher.close();
404-
this.watcher = null;
405-
}
406-
catch (IOException e) {
407-
logger.error("Failed to close watcher for " + FileReadingMessageSource.this.directory, e);
408-
}
450+
try {
451+
this.watcher.close();
452+
this.watcher = null;
453+
}
454+
catch (IOException e) {
455+
logger.error("Failed to close watcher for " + FileReadingMessageSource.this.directory, e);
456+
}
409457
}
410458

411459
@Override
@@ -506,10 +554,7 @@ private void registerWatch(Path dir) throws IOException {
506554
if (logger.isDebugEnabled()) {
507555
logger.debug("registering: " + dir + " for file events");
508556
}
509-
dir.register(this.watcher,
510-
StandardWatchEventKinds.ENTRY_CREATE,
511-
StandardWatchEventKinds.ENTRY_MODIFY,
512-
StandardWatchEventKinds.ENTRY_DELETE);
557+
dir.register(this.watcher, this.kinds);
513558
}
514559

515560
}

spring-integration-file/src/main/java/org/springframework/integration/file/config/FileInboundChannelAdapterParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ protected BeanMetadataElement parseSource(Element element, ParserContext parserC
4747
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "comparator");
4848
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "scanner");
4949
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "use-watch-service");
50+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "watch-events");
5051
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "directory");
5152
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-create-directory");
5253
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "queue-size");

spring-integration-file/src/main/java/org/springframework/integration/file/config/FileReadingMessageSourceFactoryBean.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
* @author Mark Fisher
3636
* @author Iwein Fuld
3737
* @author Gary Russell
38+
* @author Artem Bilan
39+
*
3840
* @since 1.0.3
3941
*/
4042
public class FileReadingMessageSourceFactoryBean implements FactoryBean<FileReadingMessageSource>,
@@ -56,6 +58,8 @@ public class FileReadingMessageSourceFactoryBean implements FactoryBean<FileRead
5658

5759
private boolean useWatchService;
5860

61+
private FileReadingMessageSource.WatchEventType[] watchEvents;
62+
5963
private volatile Boolean scanEachPoll;
6064

6165
private volatile Boolean autoCreateDirectory;
@@ -83,6 +87,10 @@ public void setUseWatchService(boolean useWatchService) {
8387
this.useWatchService = useWatchService;
8488
}
8589

90+
public void setWatchEvents(FileReadingMessageSource.WatchEventType... watchEvents) {
91+
this.watchEvents = watchEvents;
92+
}
93+
8694
public void setFilter(FileListFilter<File> filter) {
8795
if (filter instanceof AbstractFileLockerFilter && (this.locker == null)) {
8896
this.setLocker((AbstractFileLockerFilter) filter);
@@ -154,6 +162,9 @@ else if (queueSizeSet) {
154162
}
155163
else {
156164
this.source.setUseWatchService(this.useWatchService);
165+
if (this.watchEvents != null) {
166+
this.source.setWatchEvents(this.watchEvents);
167+
}
157168
}
158169
if (this.filter != null) {
159170
if (this.locker == null) {

spring-integration-file/src/main/resources/org/springframework/integration/file/config/spring-integration-file-4.3.xsd

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,29 @@ Only files matching this regular expression will be picked up by this adapter.
106106
</xsd:appinfo>
107107
</xsd:annotation>
108108
</xsd:attribute>
109-
<xsd:attribute name="use-watch-service" type="xsd:string">
109+
<xsd:attribute name="use-watch-service">
110110
<xsd:annotation>
111111
<xsd:documentation>
112112
Indicates if the 'FileReadingMessageSource' should use an internal 'DirectoryScanner'
113113
for the Java 7 'WatchService'.
114114
Mutually exclusive with 'scanner' attribute.
115115
</xsd:documentation>
116-
<xsd:appinfo>
117-
<tool:annotation kind="ref">
118-
<tool:expected-type
119-
type="org.springframework.integration.file.DirectoryScanner"/>
120-
</tool:annotation>
121-
</xsd:appinfo>
122116
</xsd:annotation>
117+
<xsd:simpleType>
118+
<xsd:union memberTypes="xsd:boolean xsd:string"/>
119+
</xsd:simpleType>
120+
</xsd:attribute>
121+
<xsd:attribute name="watch-events" default="CREATE">
122+
<xsd:annotation>
123+
<xsd:documentation>
124+
Comma-separated value for the 'FileReadingMessageSource.WatchEventType's
125+
to specify which kinds of files system events the 'WatchService' will listen to.
126+
Used only if 'use-watch-service == true'.
127+
</xsd:documentation>
128+
</xsd:annotation>
129+
<xsd:simpleType>
130+
<xsd:union memberTypes="watchEventType xsd:string"/>
131+
</xsd:simpleType>
123132
</xsd:attribute>
124133
<xsd:attribute name="ignore-hidden">
125134
<xsd:annotation><xsd:documentation><![CDATA[
@@ -168,7 +177,16 @@ Only files matching this regular expression will be picked up by this adapter.
168177
</xsd:complexType>
169178
</xsd:element>
170179

171-
<xsd:element name="outbound-channel-adapter">
180+
<xsd:simpleType name="watchEventType">
181+
<xsd:restriction base="xsd:token">
182+
<xsd:enumeration value="CREATE"/>
183+
<xsd:enumeration value="MODIFY"/>
184+
<xsd:enumeration value="DELETE"/>
185+
</xsd:restriction>
186+
</xsd:simpleType>
187+
188+
189+
<xsd:element name="outbound-channel-adapter">
172190
<xsd:annotation>
173191
<xsd:documentation>
174192
Configures a Consumer Endpoint for the

spring-integration-file/src/test/java/org/springframework/integration/file/WatchServiceDirectoryScannerTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public void testInitialAndAddMoreThanRemove() throws Exception {
7575
FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
7676
fileReadingMessageSource.setDirectory(folder.getRoot());
7777
fileReadingMessageSource.setUseWatchService(true);
78+
fileReadingMessageSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,
79+
FileReadingMessageSource.WatchEventType.MODIFY,
80+
FileReadingMessageSource.WatchEventType.DELETE);
7881
fileReadingMessageSource.setBeanFactory(mock(BeanFactory.class));
7982

8083
final CountDownLatch removeFileLatch = new CountDownLatch(1);
@@ -161,13 +164,17 @@ public boolean remove(File fileToRemove) {
161164

162165
baz2Copy.setLastModified(baz2.lastModified() + 100000);
163166

167+
Thread.sleep(100);
168+
164169
files = scanner.listFiles(folder.getRoot());
165170

166171
assertEquals(1, files.size());
167172
assertTrue(files.contains(baz2));
168173

169174
baz2.delete();
170175

176+
Thread.sleep(100);
177+
171178
scanner.listFiles(folder.getRoot());
172179

173180
assertTrue(removeFileLatch.await(10, TimeUnit.SECONDS));

spring-integration-file/src/test/java/org/springframework/integration/file/config/FileInboundChannelAdapterParserTests-context.xml

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,22 @@
1515
filter="filter"
1616
comparator="testComparator"
1717
ignore-hidden="false"
18-
auto-startup="false">
18+
auto-startup="false"
19+
use-watch-service="true"
20+
watch-events="MODIFY, DELETE"> <!-- CREATE by default -->
1921
<integration:poller fixed-rate="5000">
20-
<integration:transactional synchronization-factory="syncFactory"/>
22+
<integration:transactional synchronization-factory="syncFactory"/>
2123
</integration:poller>
2224
</inbound-channel-adapter>
2325

2426
<inbound-channel-adapter id="inboundWithJustFilter"
2527
directory="${java.io.tmpdir}"
2628
filter="filter"
2729
auto-startup="false">
28-
<integration:poller fixed-rate="5000" />
30+
<integration:poller fixed-rate="5000"/>
2931
</inbound-channel-adapter>
3032

31-
<integration:channel id="successChannel" />
33+
<integration:channel id="successChannel"/>
3234

3335
<beans:bean id="filter" class="org.springframework.integration.file.config.FileListFilterFactoryBean">
3436
<beans:property name="ignoreHidden" value="false"/>
@@ -47,12 +49,17 @@
4749

4850
<beans:bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager"/>
4951

50-
<beans:bean id="syncFactory" class="org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory">
52+
<beans:bean id="syncFactory"
53+
class="org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory">
5154
<beans:constructor-arg>
52-
<beans:bean class="org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor">
53-
<beans:property name="afterCommitExpression" value="#{new org.springframework.expression.spel.standard.SpelExpressionParser().parseExpression('payload.delete()')}"/>
55+
<beans:bean
56+
class="org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor">
57+
<beans:property name="afterCommitExpression"
58+
value="#{new org.springframework.expression.spel.standard.SpelExpressionParser()
59+
.parseExpression('payload.delete()')}"/>
5460
<beans:property name="afterCommitChannel" ref="successChannel"/>
55-
<beans:property name="afterRollbackExpression" value="#{new org.springframework.expression.common.LiteralExpression('foo')}"/>
61+
<beans:property name="afterRollbackExpression"
62+
value="#{new org.springframework.expression.common.LiteralExpression('foo')}"/>
5663
<beans:property name="afterRollbackChannel" ref="nullChannel"/>
5764
</beans:bean>
5865
</beans:constructor-arg>

spring-integration-file/src/test/java/org/springframework/integration/file/config/FileInboundChannelAdapterParserTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,11 @@
1616

1717
package org.springframework.integration.file.config;
1818

19+
import static org.hamcrest.Matchers.containsString;
1920
import static org.hamcrest.Matchers.instanceOf;
21+
import static org.hamcrest.Matchers.isOneOf;
2022
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertNotEquals;
2124
import static org.junit.Assert.assertSame;
2225
import static org.junit.Assert.assertThat;
2326
import static org.junit.Assert.assertTrue;
@@ -52,13 +55,14 @@
5255
* @author Mark Fisher
5356
* @author Gary Russell
5457
* @author Gunnar Hillert
58+
* @author Artem Bilan
5559
*/
5660
@ContextConfiguration
5761
@RunWith(SpringJUnit4ClassRunner.class)
5862
@DirtiesContext
5963
public class FileInboundChannelAdapterParserTests {
6064

61-
@Autowired(required = true)
65+
@Autowired
6266
private ApplicationContext context;
6367

6468
@Autowired
@@ -107,6 +111,18 @@ public void filter() throws Exception {
107111
Object filter = scannerAccessor.getPropertyValue("filter");
108112
assertTrue("'filter' should be set and be of instance AcceptOnceFileListFilter but got "
109113
+ filter.getClass().getSimpleName(), filter instanceof AcceptOnceFileListFilter);
114+
115+
assertThat(scanner.getClass().getName(),
116+
containsString("FileReadingMessageSource$WatchServiceDirectoryScanner"));
117+
118+
FileReadingMessageSource.WatchEventType[] watchEvents =
119+
(FileReadingMessageSource.WatchEventType[]) this.accessor.getPropertyValue("watchEvents");
120+
assertEquals(2, watchEvents.length);
121+
for (FileReadingMessageSource.WatchEventType watchEvent : watchEvents) {
122+
assertNotEquals(FileReadingMessageSource.WatchEventType.CREATE, watchEvent);
123+
assertThat(watchEvent, isOneOf(FileReadingMessageSource.WatchEventType.MODIFY,
124+
FileReadingMessageSource.WatchEventType.DELETE));
125+
}
110126
}
111127

112128
@Test

0 commit comments

Comments
 (0)