Introduction
One of the most common use cases for messaging applications is to retrieve files from a directory and process the results.Spring Integration has great support for this and the DSL makes it very easy to set up.
This post will demonstrate how to set up a transactional file poller using the DSL and offer strategies for unit testing it.
The sample application will
- Poll a directory for files that match a regex pattern.
- On reading in a file it will transform the content to a String and move it an Inbound message channel
- Another integration flow will listen in on the Inbound channel and process the incoming message by simply reversing the string and writing the result out to another directory.
The whole message flow is transactional and on successful commit the processed file will be moved to a processed directory. If there was a failure then on rollback the file will be moved to a failed directory. The file polling integration acts as a conduit for message processing downstream. The actual processing for this sample project is unimportant, the main point being to demonstrate the management of the file endpoint.
File Polling Components
The directories are configured in the class: com.porterhead.integration.file.FilePollingConfiguration
@Configuration public class FilePollingConfiguration { @Bean(name="inboundReadDirectory") public File inboundReadDirectory(@Value("${inbound.read.path}") String path) { return makeDirectory(path); } @Bean(name="inboundProcessedDirectory") public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) { return makeDirectory(path); } @Bean(name="inboundFailedDirectory") public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) { return makeDirectory(path); } @Bean(name="inboundOutDirectory") public File inboundOutDirectory(@Value("${inbound.out.path}") String path) { return makeDirectory(path); } private File makeDirectory(String path) { File file = new File(path); file.mkdirs(); return file; } }
The spring integration components are in the class: com.porterhead.integration.file.FilePollingIntegrationFlow
Message Source
The file reading endpoint needs a configured MessageSource, in this case the one provided as part of the spring-integration-file library, FileReadingMessageSource:@Bean public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) { FileReadingMessageSource source = new FileReadingMessageSource(); source.setDirectory(this.inboundReadDirectory); source.setScanner(directoryScanner); source.setAutoCreateDirectory(true); return source; }
NEW Since Spring 5.0
Spring 5 introduced a RecursiveDirectoryScanner class to allow for ease of polling in sub directories.
The bean for this is:
@Bean public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) { DirectoryScanner scanner = new RecursiveDirectoryScanner(); CompositeFileListFilterThe component is configured to filter files based on a regex pattern defined in the property inbound.filename.regexfilter = new CompositeFileListFilter<>( Arrays.asList(new AcceptOnceFileListFilter<>(), new RegexPatternFileListFilter(regex)) ); scanner.setFilter(filter); return scanner; }
Using the AcceptOnceFileListFilter ensures that duplicate files don't get processed.
Transaction Management
Transactions are handled using the spring provided PseudoTransactionManager. The java docs for this explain perfectly how and why to use this class:
* An implementation of {@link PlatformTransactionManager} that provides transaction-like semantics toThe afterCommit expression moves the file to a processed directory. The afterRollback expression moved the file to a failed directory:
* {@link MessageSource}s that are not inherently transactional. It does <b>not</b> make such
* sources transactional; rather, together with a {@link TransactionSynchronizationFactory}, it provides
* the ability to synchronize operations after a flow completes, via beforeCommit, afterCommit and
* afterRollback operations.
@Bean PseudoTransactionManager transactionManager() { return new PseudoTransactionManager(); } @Bean TransactionSynchronizationFactory transactionSynchronizationFactory() { ExpressionParser parser = new SpelExpressionParser(); ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor(); syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory()); syncProcessor.setAfterCommitExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))")); syncProcessor.setAfterRollbackExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))")); return new DefaultTransactionSynchronizationFactory(syncProcessor); }
Spring Integration Flow
The flow configures a poller and task executor, adds in the transaction components and converts the file content to a string before sending it the Inbound Queue.
@Bean public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period, @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll, TaskExecutor taskExecutor, MessageSourcefileReadingMessageSource) { return IntegrationFlows.from(fileReadingMessageSource, c -> c.poller(Pollers.fixedDelay(period) .taskExecutor(taskExecutor) .maxMessagesPerPoll(maxMessagesPerPoll) .transactionSynchronizationFactory(transactionSynchronizationFactory()) .transactional(transactionManager()))) .transform(Files.toStringTransformer()) .channel(ApplicationConfiguration.INBOUND_CHANNEL) .get(); }
Testing
There are four scenarios that we want to test:
- Polling the directory results in a successful poll when the file name matches the regex pattern
- Files that don't match the regex pattern are ignored
- Files that have been seen once are not processed again
- Transactional behaviour is configured so that files go to the correct directory after processing
The test class is at com.porterhead.integration.file.FilePollingTest
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class) @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) public class FilePollingTest { @ClassRule public final static TemporaryFolder tempFolder = new TemporaryFolder(); @Autowired @Qualifier("inboundReadDirectory") public File inboundReadDirectory; @Autowired @Qualifier("inboundProcessedDirectory") public File inboundProcessedDirectory; @Autowired @Qualifier("inboundFailedDirectory") public File inboundFailedDirectory; @Autowired @Qualifier("inboundOutDirectory") public File inboundOutDirectory; @After public void tearDown() throws Exception { TestUtils.deleteRecursive(inboundReadDirectory); TestUtils.deleteRecursive(inboundProcessedDirectory); TestUtils.deleteRecursive(inboundFailedDirectory); TestUtils.deleteRecursive(inboundOutDirectory); } @Autowired @Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) public DirectChannel filePollingChannel; @EnableAutoConfiguration @ComponentScan(basePackages = "com.porterhead.integration.file, com.porterhead.integration.configuration") public static class TestConfig { @Bean public File inboundReadDirectory() throws IOException { return tempFolder.newFolder("in"); } @Bean public File inboundProcessedDirectory() throws IOException { return tempFolder.newFolder("processed"); } @Bean public File inboundFailedDirectory() throws IOException { return tempFolder.newFolder("failed"); } @Bean public File inboundOutDirectory() throws IOException { return tempFolder.newFolder("out"); } @Bean public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) MessageChannel inChannel) { return IntegrationFlows.from(inChannel) .handle(this.loggingHandler()) .get(); } @Bean public MessageHandler loggingHandler() { LoggingHandler logger = new LoggingHandler("INFO"); logger.setShouldLogFullMessage(true); return logger; } } @Test public void pollFindsValidFile() throws Exception { final CountDownLatch latch = new CountDownLatch(1); filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() { @Override public void postSend(Message message, MessageChannel channel, boolean sent) { latch.countDown(); super.postSend(message, channel, sent); } }); FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); assertThat(latch.await(5, TimeUnit.SECONDS), is(true)); TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1); } @Test public void pollIgnoresInvalidFile() throws Exception { FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME + ".tmp" )); TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1); } @Test public void pollIgnoresFileAlreadySeen() throws Exception { final CountDownLatch stopLatch = new CountDownLatch(1); filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() { @Override public void postSend(Message message, MessageChannel channel, boolean sent) { stopLatch.countDown(); super.postSend(message, channel, sent); } }); FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true)); TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1); //put file with same name in directory FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1); TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1); } @Test public void rollbackMovesFileToFailed() throws Exception { final CountDownLatch stopLatch = new CountDownLatch(1); filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() { @Override public void postSend(Message message, MessageChannel channel, boolean sent) { stopLatch.countDown(); throw new RuntimeException("Forcing an Exception to trigger rollback"); } }) FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true)); TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundFailedDirectory, 1); } }The test is run with the usual spring runner:
@RunWith(SpringJUnit4ClassRunner.class)
Rather than load the entire application we only want to load the components that we are interested in testing. We can configure this in a static inner class:
This allows us to specify the packages that we want spring boot to scan on startup:
Note that the writer package is excluded. The consequence of this is that the Inbound channel does not have any subscribers so we have to define one in the TestConfig:
The test methods do what their names suggest and make use of channel interceptors to assert that messages arrived, or, in the case of the transaction test, force a rollback.
@SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class)
This allows us to specify the packages that we want spring boot to scan on startup:
@EnableAutoConfiguration @ComponentScan(basePackages = "com.porterhead.integration.file,
com.porterhead.integration.configuration")
Note that the writer package is excluded. The consequence of this is that the Inbound channel does not have any subscribers so we have to define one in the TestConfig:
@Bean public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) MessageChannel inChannel) { return IntegrationFlows.from(inChannel) .handle(this.loggingHandler()) .get(); }
The test methods do what their names suggest and make use of channel interceptors to assert that messages arrived, or, in the case of the transaction test, force a rollback.
Runtime Testing
Using maven execute the following command to build the application:
$ mvn clean install
$ mvn clean install
Once built execute the spring boot maven plugin:
$ mvn spring-boot:run
Once running copy the file src/test/resources/data/foo.txt to the created inbound/read directory in the project root. This should result in the file being processed and removed from the read directory.
Once running copy the file src/test/resources/data/foo.txt to the created inbound/read directory in the project root. This should result in the file being processed and removed from the read directory.
The processed directory should now have a copy of the original file and the out directory will have a version of the file with the contents reversed.
I see out.filename.suffix property never being used. How to use this property to change the fileName when saving the response in the directory defined by property inbound.outgoing.path
ReplyDeleteGood question.
DeleteI added a FileNameGenerator that uses that property.
See https://github.com/iainporter/spring-file-poller/blob/master/src/main/java/com/porterhead/integration/writer/MessageProcessingIntegrationFlow.java
I want to read large file, can you tell me how to read file using splitter and aggregator, or there is any other way of doing that?
ReplyDeleteThanks.
One method is to use Spring Batch
DeleteSee example I wrote here https://github.com/C24-Technologies/c24-demo-springone-2015
I loved your article! I'd love to be able to point a root folder and have it scanned recursively. I've seen there's a RecursiveDirectoryScanner now in v5, but when I change the version of the depencendies in your project, it all get's broken. Do you have any englightment on how to achieve this goal?
ReplyDeleteThanks.
Hi Enrico,
DeleteYou can pull this branch https://github.com/iainporter/spring-file-poller/tree/SPRING-5
which makes use of the RecursiveDirectoryScanner
in place of RecursiveDirectoryScanner, can not we use any folder pattern, so it will poll fast.
ReplyDeleteex:app/dir/123/abc/test.txt, app/dir/234/abc/test.txt
we can do app/dir/*/abc/test.txt
Do you have any idea?
Excellent Post :-) My major takeaway was how you tested things.
ReplyDeleteThanks!
Hi could you please help with file content validation, Below is my requirement.
ReplyDeleteI want to read json files validate it against jsonExpression, if it fails push to failed directory if it else push to processed direcotry.
you could just write a handler and wire it in place of the transformer in the following code
Deletepublic IntegrationFlow writeToFile(@Qualifier("fileWritingMessageHandler") MessageHandler fileWritingMessageHandler) {
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
.transform(m -> new StringBuilder((String)m).reverse().toString())
.handle(fileWritingMessageHandler)
.handle(loggingHandler())
.get();
}
i.e.
public IntegrationFlow writeToFile(@Qualifier("fileWritingMessageHandler") MessageHandler fileWritingMessageHandler) {
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
.handle((p, h) -> validateJson(p))
.handle(fileWritingMessageHandler)
.handle(loggingHandler())
.get();
}
Hi,
ReplyDeleteI have list of json files in a input directory. ie /etc/input/*.json
I want to read only UTF-8 encoding json files using IntegrationFlow.
Because I am splitting the json file based on some key called "records" using ExpressionEvaluatingSplitter and publishing each record as one message to other chanel.
But I am getting PathNotFoundException for non UTF-8 files.
com.jayway.jsonpath.PathNotFoundException: Expected to find an object with property ['records'] in path $ but found 'java.lang.String'. This is not a json object according to the JsonProvider: 'com.jayway.jsonpath.spi.json.JsonSmartJsonProvider'.
My json file content looks like below.{
"records": [
{
"id":"1",
"name":"Admin"
},
{
"id":"2",
"name":"Sys"
},
{
"id":"3",
"name":"System"
},
{
"id":"4",
"name":"User"
}
]
}
This comment has been removed by a blog administrator.
ReplyDelete