Sunday, July 10, 2016

File Polling using the Spring Integration DSL

Get the Code:  https://github.com/iainporter/spring-file-poller

Introduction

One of the most common use cases for messaging applications is to retrieve files from a directory and process the results.
Spring Integration has great support for this and the DSL makes it very easy to set up.

This post will demonstrate how to set up a transactional file poller using the DSL and offer strategies for unit testing it.

The sample application will

  1.  Poll a directory for files that match a regex pattern. 
  2.  On reading in a file it will transform the content to a String and move it an Inbound message channel
  3.  Another integration flow will listen in on the Inbound channel and process the incoming message by simply reversing the string and writing the result out to another directory.
The whole message flow is transactional and on successful commit the processed file will be moved to a processed directory. If there was a failure then on rollback the file will be moved to a failed directory. The file polling integration acts as a conduit for message processing downstream. The actual processing for this sample project is unimportant, the main point being to demonstrate the management of the file endpoint.




File Polling Components

The directories are configured in the class:  com.porterhead.integration.file.FilePollingConfiguration


@Configuration
public class FilePollingConfiguration {

    @Bean(name="inboundReadDirectory")
    public File inboundReadDirectory(@Value("${inbound.read.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundProcessedDirectory")
    public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundFailedDirectory")
    public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundOutDirectory")
    public File inboundOutDirectory(@Value("${inbound.out.path}") String path) {
        return makeDirectory(path);
    }

    private File makeDirectory(String path) {
        File file = new File(path);
        file.mkdirs();
        return file;
    }

}


The paths are configured in application.yml and can be easily overriden with external properties.

The spring integration components are in the class: com.porterhead.integration.file.FilePollingIntegrationFlow


Message Source

The file reading endpoint needs a configured MessageSource, in this case the one provided as part of the spring-integration-file library, FileReadingMessageSource:

    @Bean
    public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(this.inboundReadDirectory);
        source.setScanner(directoryScanner);
        source.setAutoCreateDirectory(true);
        return source;
    }

NEW Since Spring 5.0
Spring 5 introduced a RecursiveDirectoryScanner class to allow for ease of polling in sub directories.
The bean for this is:

    @Bean
    public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) {
        DirectoryScanner scanner = new RecursiveDirectoryScanner();
        CompositeFileListFilter filter = new CompositeFileListFilter<>(
                Arrays.asList(new AcceptOnceFileListFilter<>(),
                        new RegexPatternFileListFilter(regex))
        );
        scanner.setFilter(filter);
        return scanner;
    }
The component is configured to filter files based on a regex pattern defined in the property inbound.filename.regex
Using the AcceptOnceFileListFilter ensures that duplicate files don't get processed.

Transaction Management

Transactions are handled using the spring provided PseudoTransactionManager. The java docs for this explain perfectly how and why to use this class:

 * An implementation of {@link PlatformTransactionManager} that provides transaction-like semantics to
 * {@link MessageSource}s that are not inherently transactional. It does <b>not</b> make such
 * sources transactional; rather, together with a {@link TransactionSynchronizationFactory}, it provides
 * the ability to synchronize operations after a flow completes, via beforeCommit, afterCommit and
 * afterRollback operations.
The afterCommit expression moves the file to a processed directory. The afterRollback expression moved the file to a failed directory:


    @Bean
    PseudoTransactionManager transactionManager() {
        return new PseudoTransactionManager();
    }

    @Bean
    TransactionSynchronizationFactory transactionSynchronizationFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
                new ExpressionEvaluatingTransactionSynchronizationProcessor();
        syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        syncProcessor.setAfterCommitExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " +
                " + T(java.io.File).separator + payload.name))"));
        syncProcessor.setAfterRollbackExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundFailedDirectory.path " +
                " + T(java.io.File).separator + payload.name))"));
        return new DefaultTransactionSynchronizationFactory(syncProcessor);
    }

Spring Integration Flow

The flow configures a poller and task executor, adds in the transaction components and converts the file content to a string before sending it the Inbound Queue.


    @Bean
    public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                                  @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                                  TaskExecutor taskExecutor,
                                                  MessageSource fileReadingMessageSource) {
        return IntegrationFlows.from(fileReadingMessageSource,
                c -> c.poller(Pollers.fixedDelay(period)
                        .taskExecutor(taskExecutor)
                        .maxMessagesPerPoll(maxMessagesPerPoll)
                        .transactionSynchronizationFactory(transactionSynchronizationFactory())
                        .transactional(transactionManager())))
                .transform(Files.toStringTransformer())
                .channel(ApplicationConfiguration.INBOUND_CHANNEL)
                .get();
    }

Testing

There are four scenarios that we want to test:
  1. Polling the directory results in a successful poll when the file name matches the regex pattern
  2. Files that don't match the regex pattern are ignored
  3. Files that have been seen once are not processed again
  4. Transactional behaviour is configured so that files go to the correct directory after processing
The test class is at com.porterhead.integration.file.FilePollingTest


@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class FilePollingTest  {

    @ClassRule
    public final static TemporaryFolder tempFolder = new TemporaryFolder();

    @Autowired
    @Qualifier("inboundReadDirectory")
    public File inboundReadDirectory;

    @Autowired
    @Qualifier("inboundProcessedDirectory")
    public File inboundProcessedDirectory;

    @Autowired
    @Qualifier("inboundFailedDirectory")
    public File inboundFailedDirectory;

    @Autowired
    @Qualifier("inboundOutDirectory")
    public File inboundOutDirectory;

    @After
    public void tearDown() throws Exception {
        TestUtils.deleteRecursive(inboundReadDirectory);
        TestUtils.deleteRecursive(inboundProcessedDirectory);
        TestUtils.deleteRecursive(inboundFailedDirectory);
        TestUtils.deleteRecursive(inboundOutDirectory);
    }

    @Autowired
    @Qualifier(ApplicationConfiguration.INBOUND_CHANNEL)
    public DirectChannel filePollingChannel;

    @EnableAutoConfiguration
    @ComponentScan(basePackages = "com.porterhead.integration.file,
                                   com.porterhead.integration.configuration")
    public static class TestConfig {

        @Bean
        public File inboundReadDirectory() throws IOException {
            return tempFolder.newFolder("in");
        }

        @Bean
        public File inboundProcessedDirectory() throws IOException {
            return tempFolder.newFolder("processed");
        }

        @Bean
        public File inboundFailedDirectory() throws IOException {
            return tempFolder.newFolder("failed");
        }

        @Bean
        public File inboundOutDirectory() throws IOException {
            return tempFolder.newFolder("out");
        }


        @Bean
        public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL)
                                          MessageChannel inChannel) {
            return IntegrationFlows.from(inChannel)
                    .handle(this.loggingHandler())
                    .get();
        }

        @Bean
        public MessageHandler loggingHandler() {
            LoggingHandler logger = new LoggingHandler("INFO");
            logger.setShouldLogFullMessage(true);
            return logger;
        }
    }

    @Test
    public void pollFindsValidFile() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
              TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
              TestUtils.FILE_FIXTURE_NAME ));
        assertThat(latch.await(5, TimeUnit.SECONDS), is(true));
        TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1);
    }

    @Test
    public void pollIgnoresInvalidFile() throws Exception {
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
               TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
               TestUtils.FILE_FIXTURE_NAME + ".tmp" ));
        TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1);
    }

    @Test
    public void pollIgnoresFileAlreadySeen() throws Exception {
        final CountDownLatch stopLatch = new CountDownLatch(1);
        filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message message, MessageChannel channel, boolean sent) {
                stopLatch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
               TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
               TestUtils.FILE_FIXTURE_NAME ));
        assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true));
        TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1);
        //put file with same name in directory
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
               TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
               TestUtils.FILE_FIXTURE_NAME ));
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1);
        TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1);
    }

    @Test
    public void rollbackMovesFileToFailed() throws Exception {
        final CountDownLatch stopLatch = new CountDownLatch(1);
        filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message message, MessageChannel channel, boolean sent) {
                stopLatch.countDown();
                throw new RuntimeException("Forcing an Exception to trigger rollback");
            }
        })
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
                 TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, 
                 TestUtils.FILE_FIXTURE_NAME ));
        assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true));
        TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundFailedDirectory, 1);
    }

}

The test is run with the usual spring runner:

@RunWith(SpringJUnit4ClassRunner.class)

Rather than load the entire application we only want to load the components that we are interested in testing. We can configure this in a static inner class:

@SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class)

This allows us to specify the packages that we want spring boot to scan on startup:


    @EnableAutoConfiguration
    @ComponentScan(basePackages = "com.porterhead.integration.file, 
                                   com.porterhead.integration.configuration")

Note that the writer package is excluded. The consequence of this is that the Inbound channel does not have any subscribers so we have to define one in the TestConfig:

        @Bean
        public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) MessageChannel inChannel) {
            return IntegrationFlows.from(inChannel)
                    .handle(this.loggingHandler())
                    .get();
        }

The test methods do what their names suggest and make use of channel interceptors to assert that messages arrived, or, in the case of the transaction test, force a rollback.

Runtime Testing

Using maven execute the following command to build the application:

$ mvn clean install

Once built execute the spring boot maven plugin:

$ mvn spring-boot:run

Once running copy the file src/test/resources/data/foo.txt to the created inbound/read directory in the project root. This should result in the file being processed and removed from the read directory.
The processed directory should now have a copy of the original file and the out directory will have a version of the file with the contents reversed.