Introduction
One of the most common use cases for messaging applications is to retrieve files from a directory and process the results.Spring Integration has great support for this and the DSL makes it very easy to set up.
This post will demonstrate how to set up a transactional file poller using the DSL and offer strategies for unit testing it.
The sample application will
- Poll a directory for files that match a regex pattern.
- On reading in a file it will transform the content to a String and move it an Inbound message channel
- Another integration flow will listen in on the Inbound channel and process the incoming message by simply reversing the string and writing the result out to another directory.
The whole message flow is transactional and on successful commit the processed file will be moved to a processed directory. If there was a failure then on rollback the file will be moved to a failed directory. The file polling integration acts as a conduit for message processing downstream. The actual processing for this sample project is unimportant, the main point being to demonstrate the management of the file endpoint.
File Polling Components
The directories are configured in the class: com.porterhead.integration.file.FilePollingConfiguration
@Configuration public class FilePollingConfiguration { @Bean(name="inboundReadDirectory") public File inboundReadDirectory(@Value("${inbound.read.path}") String path) { return makeDirectory(path); } @Bean(name="inboundProcessedDirectory") public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) { return makeDirectory(path); } @Bean(name="inboundFailedDirectory") public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) { return makeDirectory(path); } @Bean(name="inboundOutDirectory") public File inboundOutDirectory(@Value("${inbound.out.path}") String path) { return makeDirectory(path); } private File makeDirectory(String path) { File file = new File(path); file.mkdirs(); return file; } }
The spring integration components are in the class: com.porterhead.integration.file.FilePollingIntegrationFlow
Message Source
The file reading endpoint needs a configured MessageSource, in this case the one provided as part of the spring-integration-file library, FileReadingMessageSource:@Bean public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) { FileReadingMessageSource source = new FileReadingMessageSource(); source.setDirectory(this.inboundReadDirectory); source.setScanner(directoryScanner); source.setAutoCreateDirectory(true); return source; }
NEW Since Spring 5.0
Spring 5 introduced a RecursiveDirectoryScanner class to allow for ease of polling in sub directories.
The bean for this is:
@Bean public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) { DirectoryScanner scanner = new RecursiveDirectoryScanner(); CompositeFileListFilterThe component is configured to filter files based on a regex pattern defined in the property inbound.filename.regexfilter = new CompositeFileListFilter<>( Arrays.asList(new AcceptOnceFileListFilter<>(), new RegexPatternFileListFilter(regex)) ); scanner.setFilter(filter); return scanner; }
Using the AcceptOnceFileListFilter ensures that duplicate files don't get processed.
Transaction Management
Transactions are handled using the spring provided PseudoTransactionManager. The java docs for this explain perfectly how and why to use this class:
* An implementation of {@link PlatformTransactionManager} that provides transaction-like semantics toThe afterCommit expression moves the file to a processed directory. The afterRollback expression moved the file to a failed directory:
* {@link MessageSource}s that are not inherently transactional. It does <b>not</b> make such
* sources transactional; rather, together with a {@link TransactionSynchronizationFactory}, it provides
* the ability to synchronize operations after a flow completes, via beforeCommit, afterCommit and
* afterRollback operations.
@Bean PseudoTransactionManager transactionManager() { return new PseudoTransactionManager(); } @Bean TransactionSynchronizationFactory transactionSynchronizationFactory() { ExpressionParser parser = new SpelExpressionParser(); ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor(); syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory()); syncProcessor.setAfterCommitExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))")); syncProcessor.setAfterRollbackExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))")); return new DefaultTransactionSynchronizationFactory(syncProcessor); }
Spring Integration Flow
The flow configures a poller and task executor, adds in the transaction components and converts the file content to a string before sending it the Inbound Queue.
@Bean public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period, @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll, TaskExecutor taskExecutor, MessageSourcefileReadingMessageSource) { return IntegrationFlows.from(fileReadingMessageSource, c -> c.poller(Pollers.fixedDelay(period) .taskExecutor(taskExecutor) .maxMessagesPerPoll(maxMessagesPerPoll) .transactionSynchronizationFactory(transactionSynchronizationFactory()) .transactional(transactionManager()))) .transform(Files.toStringTransformer()) .channel(ApplicationConfiguration.INBOUND_CHANNEL) .get(); }
Testing
There are four scenarios that we want to test:
- Polling the directory results in a successful poll when the file name matches the regex pattern
- Files that don't match the regex pattern are ignored
- Files that have been seen once are not processed again
- Transactional behaviour is configured so that files go to the correct directory after processing
The test class is at com.porterhead.integration.file.FilePollingTest
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class) @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) public class FilePollingTest { @ClassRule public final static TemporaryFolder tempFolder = new TemporaryFolder(); @Autowired @Qualifier("inboundReadDirectory") public File inboundReadDirectory; @Autowired @Qualifier("inboundProcessedDirectory") public File inboundProcessedDirectory; @Autowired @Qualifier("inboundFailedDirectory") public File inboundFailedDirectory; @Autowired @Qualifier("inboundOutDirectory") public File inboundOutDirectory; @After public void tearDown() throws Exception { TestUtils.deleteRecursive(inboundReadDirectory); TestUtils.deleteRecursive(inboundProcessedDirectory); TestUtils.deleteRecursive(inboundFailedDirectory); TestUtils.deleteRecursive(inboundOutDirectory); } @Autowired @Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) public DirectChannel filePollingChannel; @EnableAutoConfiguration @ComponentScan(basePackages = "com.porterhead.integration.file, com.porterhead.integration.configuration") public static class TestConfig { @Bean public File inboundReadDirectory() throws IOException { return tempFolder.newFolder("in"); } @Bean public File inboundProcessedDirectory() throws IOException { return tempFolder.newFolder("processed"); } @Bean public File inboundFailedDirectory() throws IOException { return tempFolder.newFolder("failed"); } @Bean public File inboundOutDirectory() throws IOException { return tempFolder.newFolder("out"); } @Bean public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) MessageChannel inChannel) { return IntegrationFlows.from(inChannel) .handle(this.loggingHandler()) .get(); } @Bean public MessageHandler loggingHandler() { LoggingHandler logger = new LoggingHandler("INFO"); logger.setShouldLogFullMessage(true); return logger; } } @Test public void pollFindsValidFile() throws Exception { final CountDownLatch latch = new CountDownLatch(1); filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() { @Override public void postSend(Message message, MessageChannel channel, boolean sent) { latch.countDown(); super.postSend(message, channel, sent); } }); FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); assertThat(latch.await(5, TimeUnit.SECONDS), is(true)); TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1); } @Test public void pollIgnoresInvalidFile() throws Exception { FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME + ".tmp" )); TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1); } @Test public void pollIgnoresFileAlreadySeen() throws Exception { final CountDownLatch stopLatch = new CountDownLatch(1); filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() { @Override public void postSend(Message message, MessageChannel channel, boolean sent) { stopLatch.countDown(); super.postSend(message, channel, sent); } }); FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true)); TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1); //put file with same name in directory FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1); TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1); } @Test public void rollbackMovesFileToFailed() throws Exception { final CountDownLatch stopLatch = new CountDownLatch(1); filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() { @Override public void postSend(Message message, MessageChannel channel, boolean sent) { stopLatch.countDown(); throw new RuntimeException("Forcing an Exception to trigger rollback"); } }) FileCopyUtils.copy(TestUtils.locateClasspathResource( TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, TestUtils.FILE_FIXTURE_NAME )); assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true)); TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory); TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory); TestUtils.assertThatDirectoryHasFiles(inboundFailedDirectory, 1); } }The test is run with the usual spring runner:
@RunWith(SpringJUnit4ClassRunner.class)
Rather than load the entire application we only want to load the components that we are interested in testing. We can configure this in a static inner class:
This allows us to specify the packages that we want spring boot to scan on startup:
Note that the writer package is excluded. The consequence of this is that the Inbound channel does not have any subscribers so we have to define one in the TestConfig:
The test methods do what their names suggest and make use of channel interceptors to assert that messages arrived, or, in the case of the transaction test, force a rollback.
@SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class)
This allows us to specify the packages that we want spring boot to scan on startup:
@EnableAutoConfiguration @ComponentScan(basePackages = "com.porterhead.integration.file,
com.porterhead.integration.configuration")
Note that the writer package is excluded. The consequence of this is that the Inbound channel does not have any subscribers so we have to define one in the TestConfig:
@Bean public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) MessageChannel inChannel) { return IntegrationFlows.from(inChannel) .handle(this.loggingHandler()) .get(); }
The test methods do what their names suggest and make use of channel interceptors to assert that messages arrived, or, in the case of the transaction test, force a rollback.
Runtime Testing
Using maven execute the following command to build the application:
$ mvn clean install
$ mvn clean install
Once built execute the spring boot maven plugin:
$ mvn spring-boot:run
Once running copy the file src/test/resources/data/foo.txt to the created inbound/read directory in the project root. This should result in the file being processed and removed from the read directory.
Once running copy the file src/test/resources/data/foo.txt to the created inbound/read directory in the project root. This should result in the file being processed and removed from the read directory.
The processed directory should now have a copy of the original file and the out directory will have a version of the file with the contents reversed.