Spring Integration in Hybris Cluster


2017-08-10_09h10_53.png This article is brought by  Sergei Aksenenko, EPAM Lead software engineer
In the most projects, Hybris tightly integrated with different systems in e-commerce landscape. Most of the SAP Hybris Commerce projects use Spring Integration for communication with external systems, Enterprise Service Buses and ETL tools. Spring Integration is a lightweight implementation of Enterprise Integration patterns, that is becoming part of an application and plays very nicely with projects based on Spring Framework. It brings event driven design to application level and leads to well structured decoupled implementations. SAP Hybris provides the module called ‘holfolders’ (hybris wiki) as part of acceleratorservices` extension. It enables easy CSV file import through folders accessible through OS file system layer. Beyond the use cases mentioned above, there is much more functionality that Spring Integration provides. With declarative configuration we can integrate Hybris with many other systems using different protocols and technologies, such as messaging, file integrations using multiple protocols FTP/FTPS, SFTP , AWS S3 and many others, direct calls, SOAP, REST. However, problems appear when you want to have running Spring Integration on cluster environment and want to get high availability and fault tolerance. I recommend watching  Gary Russell and David Turanski’s presentation, where they have shown what are the problems with naive implementation when it’s run on cluster environment and how to create HA architectures with Spring Integration mainly using configuration and existing SI components. The main problems you may face if you run Spring Integration in the cluster:
  1. Order of messages. For example, Node 1 takes the first message, Node 2 takes the second; However, Node 1 is busy and the message is queued, that results in processing of the second message before the first one. If the order of the message is important, it may create issues.
  2. Race condition and concurrent use of the resources. There can be conflicts with models and DB even without ordering issue, just receive 2 messages in which the same entity created and get a conflict when trying to save models.
  3. Shared data. Using read-only file system channels requires shared metadata implementation.
There can be two main approaches for enabling cluster using Spring Integration.
  1. Active – Passive (by making failover if the master node failed)
  2. Active – Active (concurrent consumers – messages from dependent channels within one poll still receive single node)
In both, we need something shared among nodes to synchronize processing. One of the simplest ways is using a shared DB through hybris out of the box persistence layer. However, it isn’t quite enough to guarantee node synchronization – the right solution should use JDBC directly. And we can find example of that implementation in hybris Task Engine (hybris wiki) that guarantee that only single node runs one particular task or Cron Job (by running task TriggerTaskRunner) at the time. Let’s put all together and make Spring Integration fit well with hybris cluster, for that we need:
  1. Using hybris task engine instead of pollers for inbound channels,
  2. Create custom metadata store based on hybris models,
  3. Make system resisted to any failures during message processing.

Spring Integration within Hybris Cluster

Let’s take a closer look how Spring Integration FTP Inbound Channel implemented. There is self-descriptive XML configuration that used for initializing:
<int-ftp:inbound-channel-adapter id="ftpInbound" channel="ftpChannel"        session-factory="ftpSessionFactory"      auto-create-local-directory="true"      delete-remote-files="true"     filename-pattern="*.txt"     remote-directory="some/remote/path"     remote-file-separator="/"     preserve-timestamp="true"     local-filename-generator-expression="#this.toUpperCase() + '.a'"     local-filter="myFilter" temporary-file-suffix=".writing" local-directory=".">
       <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
This XML creates several beans, main from them is SourcePollingChannelAdapter, that creates a poller thread and orchestrate whole polling process. We need to construct FtpInboundFileSynchronizingMessageSource that can be used for receiving messages directly.
<bean id="ftpInboundFileSynchronizer"    class="org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer">
   <constructor-arg ref="ftpSessionFactory"/>
   <property name="remoteDirectory"                             value="some/remote/path"/>
   <property name="remoteFileSeparator"               value="/"/>
   <property name="filter" ref="myRemoteFilter"/>
   <property name="preserveTimestamp" value="true"/>
   <property name="temporaryFileSuffix" value=".writing"/>
</bean>
<bean id="ftpSynchronizingMessageSource"   class="org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizingMessageSource">
   <constructor-arg ref="ftpInboundFileSynchronizer"/>
   <property name="autoCreateLocalDirectory" value="true"/>
   <property name="localFilter" ref="myFilter"/>
   <property name="localDirectory" value="."/>
</bean>
Now the only thing that we need to do is call method receive on ftpSynchronizingMessageSource and then send a message to channel. The simplest solution is injecting this beans to a cron job performable or new task runner and call receive until it returns null resending messages to the relevant channel.
Message<File> message;
while ((message = messageSource.receive()) != null) {
    messageChannel.send(message);
}

Metadata Store

Another problem can arise if you work with read-only remote sources, like FTP where you need to monitor new files and you have no ability to move them to the archive folder or remove. In order to track the processed files, Spring Integration uses Metadata (essentially key value pairs, file name and timestamp) and MetadataStore. Out-of-the-box Spring Integration provides the following implementations of that: property based, Redis, Mongo, Zookeeper and Gemfire. Spring 5.0 will support JDBC metadata store. It may play better with hybris, but it seems it will take time while SAP upgrades the product to the new Spring version. With the current version, we can make a custom implementation based on hybris persistence layer. For that we need two things:
  • define new type in items.xml
    <itemtype code="HybrisMetadata" generate="true" autocreate="true">
      <deployment table="hybris_metadata" typecode="14444"/>
      <attributes>
         <attribute qualifier="key" type="java.lang.String">
           <persistence type="property"/>
           <description>Metadata key</description>
           <modifiers optional="false" unique="true" initial="true"/>
         </attribute>
         <attribute qualifier="value" type="java.lang.String">
           <persistence type="property"/>
           <description>Metadata value</description>
           <modifiers optional="false" initial="true"/>
         </attribute>
     </attributes>
    <indexes>
      <index name="Metadata_Key" unique="true">
        <key attribute="key"/>
      </index>
    </indexes>
    </itemtype>
  • create an implementation of ConcurrentMetadataStore interface based on model service and DAO for working with hybris models. There is nothing special just implementing several methods: put, putIfAbsent, replace, get, remove.
As the additional bonus – we can easily view and manage our metadata using backoffice. To trigger reprocessing of some consumed files – remove Metadata objects. However, even if we use metadata store for our ftp/sftp/file channels it isn’t fault-tolerant by design. There is a good article by Maksym Bruner which describes that in details.

Transactions and hybris fault-tolerance metadata

So, Spring’s MetadataStore keeps records even if file processing failed or application node was shut down. One solution for that problem can be to wire whole processing in the transaction. So if we open transaction on our custom poller (tasks or cron job) we can guarantee that either we process everything and metadata stored in the database or we rollback everything. To enable this approach in reality we need also to use a Circuit Breaker pattern to avoid constant messages reprocessing and have to monitor and alert at a place to react to the failures. Another approach is soft markers. We can extend Hybris Metadata model with the additional attribute to track a status of processing and set it after processing completion. In this case, we can proceed with processing if there is a problem with the single message and return back and reprocess all deltas when we fix the message or processing. Sergei Aksenenko, EPAM Lead software engineer

2 Responses

  1. Ruslan

    Ruslan

    Reply

    20 September 2017 at 08:56

    Great article! Thank you.

    BTW, the link that is supposed to lead to ‘holfolders’ is dead.

Leave a Reply