Sunday, July 10, 2016

File Polling using the Spring Integration DSL

Get the Code:  https://github.com/iainporter/spring-file-poller

Introduction

One of the most common use cases for messaging applications is to retrieve files from a directory and process the results.
Spring Integration has great support for this and the DSL makes it very easy to set up.

This post will demonstrate how to set up a transactional file poller using the DSL and offer strategies for unit testing it.

The sample application will

  1.  Poll a directory for files that match a regex pattern. 
  2.  On reading in a file it will transform the content to a String and move it an Inbound message channel
  3.  Another integration flow will listen in on the Inbound channel and process the incoming message by simply reversing the string and writing the result out to another directory.
The whole message flow is transactional and on successful commit the processed file will be moved to a processed directory. If there was a failure then on rollback the file will be moved to a failed directory. The file polling integration acts as a conduit for message processing downstream. The actual processing for this sample project is unimportant, the main point being to demonstrate the management of the file endpoint.




File Polling Components

The directories are configured in the class:  com.porterhead.integration.file.FilePollingConfiguration


@Configuration
public class FilePollingConfiguration {

    @Bean(name="inboundReadDirectory")
    public File inboundReadDirectory(@Value("${inbound.read.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundProcessedDirectory")
    public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundFailedDirectory")
    public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundOutDirectory")
    public File inboundOutDirectory(@Value("${inbound.out.path}") String path) {
        return makeDirectory(path);
    }

    private File makeDirectory(String path) {
        File file = new File(path);
        file.mkdirs();
        return file;
    }

}


The paths are configured in application.yml and can be easily overriden with external properties.

The spring integration components are in the class: com.porterhead.integration.file.FilePollingIntegrationFlow


Message Source

The file reading endpoint needs a configured MessageSource, in this case the one provided as part of the spring-integration-file library, FileReadingMessageSource:

    @Bean
    public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(this.inboundReadDirectory);
        source.setScanner(directoryScanner);
        source.setAutoCreateDirectory(true);
        return source;
    }

NEW Since Spring 5.0
Spring 5 introduced a RecursiveDirectoryScanner class to allow for ease of polling in sub directories.
The bean for this is:

    @Bean
    public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) {
        DirectoryScanner scanner = new RecursiveDirectoryScanner();
        CompositeFileListFilter filter = new CompositeFileListFilter<>(
                Arrays.asList(new AcceptOnceFileListFilter<>(),
                        new RegexPatternFileListFilter(regex))
        );
        scanner.setFilter(filter);
        return scanner;
    }
The component is configured to filter files based on a regex pattern defined in the property inbound.filename.regex
Using the AcceptOnceFileListFilter ensures that duplicate files don't get processed.

Transaction Management

Transactions are handled using the spring provided PseudoTransactionManager. The java docs for this explain perfectly how and why to use this class:

 * An implementation of {@link PlatformTransactionManager} that provides transaction-like semantics to
 * {@link MessageSource}s that are not inherently transactional. It does <b>not</b> make such
 * sources transactional; rather, together with a {@link TransactionSynchronizationFactory}, it provides
 * the ability to synchronize operations after a flow completes, via beforeCommit, afterCommit and
 * afterRollback operations.
The afterCommit expression moves the file to a processed directory. The afterRollback expression moved the file to a failed directory:


    @Bean
    PseudoTransactionManager transactionManager() {
        return new PseudoTransactionManager();
    }

    @Bean
    TransactionSynchronizationFactory transactionSynchronizationFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
                new ExpressionEvaluatingTransactionSynchronizationProcessor();
        syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        syncProcessor.setAfterCommitExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " +
                " + T(java.io.File).separator + payload.name))"));
        syncProcessor.setAfterRollbackExpression(parser.parseExpression("payload.renameTo(new java.io.File(@inboundFailedDirectory.path " +
                " + T(java.io.File).separator + payload.name))"));
        return new DefaultTransactionSynchronizationFactory(syncProcessor);
    }

Spring Integration Flow

The flow configures a poller and task executor, adds in the transaction components and converts the file content to a string before sending it the Inbound Queue.


    @Bean
    public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                                  @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                                  TaskExecutor taskExecutor,
                                                  MessageSource fileReadingMessageSource) {
        return IntegrationFlows.from(fileReadingMessageSource,
                c -> c.poller(Pollers.fixedDelay(period)
                        .taskExecutor(taskExecutor)
                        .maxMessagesPerPoll(maxMessagesPerPoll)
                        .transactionSynchronizationFactory(transactionSynchronizationFactory())
                        .transactional(transactionManager())))
                .transform(Files.toStringTransformer())
                .channel(ApplicationConfiguration.INBOUND_CHANNEL)
                .get();
    }

Testing

There are four scenarios that we want to test:
  1. Polling the directory results in a successful poll when the file name matches the regex pattern
  2. Files that don't match the regex pattern are ignored
  3. Files that have been seen once are not processed again
  4. Transactional behaviour is configured so that files go to the correct directory after processing
The test class is at com.porterhead.integration.file.FilePollingTest


@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class FilePollingTest  {

    @ClassRule
    public final static TemporaryFolder tempFolder = new TemporaryFolder();

    @Autowired
    @Qualifier("inboundReadDirectory")
    public File inboundReadDirectory;

    @Autowired
    @Qualifier("inboundProcessedDirectory")
    public File inboundProcessedDirectory;

    @Autowired
    @Qualifier("inboundFailedDirectory")
    public File inboundFailedDirectory;

    @Autowired
    @Qualifier("inboundOutDirectory")
    public File inboundOutDirectory;

    @After
    public void tearDown() throws Exception {
        TestUtils.deleteRecursive(inboundReadDirectory);
        TestUtils.deleteRecursive(inboundProcessedDirectory);
        TestUtils.deleteRecursive(inboundFailedDirectory);
        TestUtils.deleteRecursive(inboundOutDirectory);
    }

    @Autowired
    @Qualifier(ApplicationConfiguration.INBOUND_CHANNEL)
    public DirectChannel filePollingChannel;

    @EnableAutoConfiguration
    @ComponentScan(basePackages = "com.porterhead.integration.file,
                                   com.porterhead.integration.configuration")
    public static class TestConfig {

        @Bean
        public File inboundReadDirectory() throws IOException {
            return tempFolder.newFolder("in");
        }

        @Bean
        public File inboundProcessedDirectory() throws IOException {
            return tempFolder.newFolder("processed");
        }

        @Bean
        public File inboundFailedDirectory() throws IOException {
            return tempFolder.newFolder("failed");
        }

        @Bean
        public File inboundOutDirectory() throws IOException {
            return tempFolder.newFolder("out");
        }


        @Bean
        public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL)
                                          MessageChannel inChannel) {
            return IntegrationFlows.from(inChannel)
                    .handle(this.loggingHandler())
                    .get();
        }

        @Bean
        public MessageHandler loggingHandler() {
            LoggingHandler logger = new LoggingHandler("INFO");
            logger.setShouldLogFullMessage(true);
            return logger;
        }
    }

    @Test
    public void pollFindsValidFile() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
              TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
              TestUtils.FILE_FIXTURE_NAME ));
        assertThat(latch.await(5, TimeUnit.SECONDS), is(true));
        TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1);
    }

    @Test
    public void pollIgnoresInvalidFile() throws Exception {
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
               TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
               TestUtils.FILE_FIXTURE_NAME + ".tmp" ));
        TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1);
    }

    @Test
    public void pollIgnoresFileAlreadySeen() throws Exception {
        final CountDownLatch stopLatch = new CountDownLatch(1);
        filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message message, MessageChannel channel, boolean sent) {
                stopLatch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
               TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
               TestUtils.FILE_FIXTURE_NAME ));
        assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true));
        TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1);
        //put file with same name in directory
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
               TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory,
               TestUtils.FILE_FIXTURE_NAME ));
        TestUtils.assertThatDirectoryIsEmpty(inboundFailedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundReadDirectory, 1);
        TestUtils.assertThatDirectoryHasFiles(inboundProcessedDirectory, 1);
    }

    @Test
    public void rollbackMovesFileToFailed() throws Exception {
        final CountDownLatch stopLatch = new CountDownLatch(1);
        filePollingChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message message, MessageChannel channel, boolean sent) {
                stopLatch.countDown();
                throw new RuntimeException("Forcing an Exception to trigger rollback");
            }
        })
        FileCopyUtils.copy(TestUtils.locateClasspathResource(
                 TestUtils.FILE_FIXTURE_PATH), new File(inboundReadDirectory, 
                 TestUtils.FILE_FIXTURE_NAME ));
        assertThat(stopLatch.await(5, TimeUnit.SECONDS), is(true));
        TestUtils.assertThatDirectoryIsEmpty(inboundReadDirectory);
        TestUtils.assertThatDirectoryIsEmpty(inboundProcessedDirectory);
        TestUtils.assertThatDirectoryHasFiles(inboundFailedDirectory, 1);
    }

}

The test is run with the usual spring runner:

@RunWith(SpringJUnit4ClassRunner.class)

Rather than load the entire application we only want to load the components that we are interested in testing. We can configure this in a static inner class:

@SpringApplicationConfiguration(classes = FilePollingTest.TestConfig.class)

This allows us to specify the packages that we want spring boot to scan on startup:


    @EnableAutoConfiguration
    @ComponentScan(basePackages = "com.porterhead.integration.file, 
                                   com.porterhead.integration.configuration")

Note that the writer package is excluded. The consequence of this is that the Inbound channel does not have any subscribers so we have to define one in the TestConfig:

        @Bean
        public IntegrationFlow loggingFlow(@Qualifier(ApplicationConfiguration.INBOUND_CHANNEL) MessageChannel inChannel) {
            return IntegrationFlows.from(inChannel)
                    .handle(this.loggingHandler())
                    .get();
        }

The test methods do what their names suggest and make use of channel interceptors to assert that messages arrived, or, in the case of the transaction test, force a rollback.

Runtime Testing

Using maven execute the following command to build the application:

$ mvn clean install

Once built execute the spring boot maven plugin:

$ mvn spring-boot:run

Once running copy the file src/test/resources/data/foo.txt to the created inbound/read directory in the project root. This should result in the file being processed and removed from the read directory.
The processed directory should now have a copy of the original file and the out directory will have a version of the file with the contents reversed.



12 comments:

  1. I see out.filename.suffix property never being used. How to use this property to change the fileName when saving the response in the directory defined by property inbound.outgoing.path

    ReplyDelete
    Replies
    1. Good question.
      I added a FileNameGenerator that uses that property.

      See https://github.com/iainporter/spring-file-poller/blob/master/src/main/java/com/porterhead/integration/writer/MessageProcessingIntegrationFlow.java

      Delete
  2. I want to read large file, can you tell me how to read file using splitter and aggregator, or there is any other way of doing that?
    Thanks.

    ReplyDelete
    Replies
    1. One method is to use Spring Batch
      See example I wrote here https://github.com/C24-Technologies/c24-demo-springone-2015

      Delete
  3. I loved your article! I'd love to be able to point a root folder and have it scanned recursively. I've seen there's a RecursiveDirectoryScanner now in v5, but when I change the version of the depencendies in your project, it all get's broken. Do you have any englightment on how to achieve this goal?
    Thanks.

    ReplyDelete
    Replies
    1. Hi Enrico,

      You can pull this branch https://github.com/iainporter/spring-file-poller/tree/SPRING-5

      which makes use of the RecursiveDirectoryScanner

      Delete
  4. in place of RecursiveDirectoryScanner, can not we use any folder pattern, so it will poll fast.
    ex:app/dir/123/abc/test.txt, app/dir/234/abc/test.txt
    we can do app/dir/*/abc/test.txt
    Do you have any idea?

    ReplyDelete
  5. Excellent Post :-) My major takeaway was how you tested things.

    Thanks!

    ReplyDelete
  6. Hi could you please help with file content validation, Below is my requirement.

    I want to read json files validate it against jsonExpression, if it fails push to failed directory if it else push to processed direcotry.

    ReplyDelete
    Replies
    1. you could just write a handler and wire it in place of the transformer in the following code

      public IntegrationFlow writeToFile(@Qualifier("fileWritingMessageHandler") MessageHandler fileWritingMessageHandler) {
      return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
      .transform(m -> new StringBuilder((String)m).reverse().toString())
      .handle(fileWritingMessageHandler)
      .handle(loggingHandler())
      .get();
      }

      i.e.

      public IntegrationFlow writeToFile(@Qualifier("fileWritingMessageHandler") MessageHandler fileWritingMessageHandler) {
      return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
      .handle((p, h) -> validateJson(p))
      .handle(fileWritingMessageHandler)
      .handle(loggingHandler())
      .get();
      }

      Delete
  7. Hi,
    I have list of json files in a input directory. ie /etc/input/*.json
    I want to read only UTF-8 encoding json files using IntegrationFlow.
    Because I am splitting the json file based on some key called "records" using ExpressionEvaluatingSplitter and publishing each record as one message to other chanel.
    But I am getting PathNotFoundException for non UTF-8 files.

    com.jayway.jsonpath.PathNotFoundException: Expected to find an object with property ['records'] in path $ but found 'java.lang.String'. This is not a json object according to the JsonProvider: 'com.jayway.jsonpath.spi.json.JsonSmartJsonProvider'.

    My json file content looks like below.{
    "records": [
    {
    "id":"1",
    "name":"Admin"
    },
    {
    "id":"2",
    "name":"Sys"
    },
    {
    "id":"3",
    "name":"System"
    },
    {
    "id":"4",
    "name":"User"
    }
    ]
    }

    ReplyDelete
  8. This comment has been removed by a blog administrator.

    ReplyDelete