![]() |
This article is brought to you by Sergei Aksenenko, EPAM Lead Software Engineer |
In most projects, Hybris is tightly integrated with different systems in the e-commerce landscape. Most SAP Hybris Commerce projects use Spring Integration for communication with external systems, Enterprise Service Buses, and ETL tools.
Spring Integration is a lightweight implementation of Enterprise Integration Patterns that becomes part of an application and works very nicely with projects based on the Spring Framework. It brings event-driven design to the application level and leads to well-structured, decoupled implementations.
SAP Hybris provides a module called "hotfolders" (hybris wiki) as part of the acceleratorservices extension. It enables easy CSV file import through folders accessible through the OS file system layer.
Beyond the use cases mentioned above, Spring Integration provides much more functionality. With declarative configuration, we can integrate Hybris with many other systems using different protocols and technologies, such as messaging; file integrations using multiple protocols, including FTP/FTPS, SFTP, AWS S3, and many others; direct calls; SOAP; and REST.
However, problems appear when you want to run Spring Integration in a cluster environment and achieve high availability and fault tolerance. I recommend watching Gary Russell and David Turanski's presentation, where they show the problems with a naive implementation when it is run in a cluster environment, and how to create HA architectures with Spring Integration mainly using configuration and existing SI components.
The main problems you may face if you run Spring Integration in a cluster:
- Order of messages. For example, Node 1 takes the first message, and Node 2 takes the second. However, Node 1 is busy and the message is queued, which results in the second message being processed before the first one. If the order of messages is important, this may create issues.
- Race conditions and concurrent use of resources. There can be conflicts with models and the DB even without ordering issues. For example, you may receive two messages in which the same entity is created and then get a conflict when trying to save models.
- Shared data. Using read-only file system channels requires a shared metadata implementation.
There can be two main approaches for enabling clustering using Spring Integration.
- Active-Passive (by making a failover if the master node fails)
- Active-Active (concurrent consumers — messages from dependent channels within one poll are still received by a single node)
In both cases, we need something shared among nodes to synchronize processing. One of the simplest ways is using a shared DB through the Hybris out-of-the-box persistence layer. However, it isn't quite enough to guarantee node synchronization — the right solution should use JDBC directly. We can find an example of that implementation in the Hybris Task Engine (hybris wiki), which guarantees that only a single node runs one particular task or CronJob (by running the task TriggerTaskRunner) at a time.
Let's put everything together and make Spring Integration fit well with a Hybris cluster. For that, we need to:
- Use the Hybris task engine instead of pollers for inbound channels.
- Create a custom metadata store based on Hybris models.
- Make the system resistant to any failures during message processing.
Spring Integration within Hybris Cluster
Let's take a closer look at how the Spring Integration FTP inbound channel is implemented.
There is self-descriptive XML configuration used for initialization:
<int-ftp:inbound-channel-adapter
id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
local-filter="myFilter"
temporary-file-suffix=".writing"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>This XML creates several beans. The main one is SourcePollingChannelAdapter, which creates a poller thread and orchestrates the whole polling process. We need to construct FtpInboundFileSynchronizingMessageSource, which can be used for receiving messages directly.
<bean
id="ftpInboundFileSynchronizer"
class="org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer">
<constructor-arg ref="ftpSessionFactory"/>
<property name="remoteDirectory" value="some/remote/path"/>
<property name="remoteFileSeparator" value="/"/>
<property name="filter" ref="myRemoteFilter"/>
<property name="preserveTimestamp" value="true"/>
<property name="temporaryFileSuffix" value=".writing"/>
</bean>
<bean
id="ftpSynchronizingMessageSource"
class="org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizingMessageSource">
<constructor-arg ref="ftpInboundFileSynchronizer"/>
<property name="autoCreateLocalDirectory" value="true"/>
<property name="localFilter" ref="myFilter"/>
<property name="localDirectory" value="."/>
</bean>Now the only thing we need to do is call the receive method on ftpSynchronizingMessageSource and then send a message to the channel. The simplest solution is to inject these beans into a cron job performable or a new task runner, and call receive until it returns null, resending messages to the relevant channel.
Message<File> message;
while ((message = messageSource.receive()) != null) {
messageChannel.send(message);
}Metadata Store
Another problem can arise if you work with read-only remote sources, like FTP, where you need to monitor new files and have no ability to move them to an archive folder or remove them. In order to track processed files, Spring Integration uses metadata — essentially key-value pairs, such as file name and timestamp — and a MetadataStore. Out of the box, Spring Integration provides the following implementations: property-based, Redis, Mongo, ZooKeeper, and GemFire. Spring 5.0 will support a JDBC metadata store. It may work better with Hybris, but it seems it will take time before SAP upgrades the product to the new Spring version. With the current version, we can make a custom implementation based on the Hybris persistence layer.
For that, we need two things:
- Define a new type in
items.xml:
<itemtype code="HybrisMetadata" generate="true" autocreate="true">
<deployment table="hybris_metadata" typecode="14444"/>
<attributes>
<attribute qualifier="key" type="java.lang.String">
<persistence type="property"/>
<description>Metadata key</description>
<modifiers optional="false" unique="true" initial="true"/>
</attribute>
<attribute qualifier="value" type="java.lang.String">
<persistence type="property"/>
<description>Metadata value</description>
<modifiers optional="false" initial="true"/>
</attribute>
</attributes>
<indexes>
<index name="Metadata_Key" unique="true">
<key attribute="key"/>
</index>
</indexes>
</itemtype>- Create an implementation of the ConcurrentMetadataStore interface based on the model service and DAO for working with Hybris models. There is nothing special here — just implementing several methods:
put,putIfAbsent,replace,get, andremove.
As an additional bonus, we can easily view and manage our metadata using Backoffice. To trigger reprocessing of some consumed files, remove the metadata objects. However, even if we use a metadata store for our FTP/SFTP/file channels, it isn't fault-tolerant by design. There is a good article by Maksym Bruner that describes this in detail.
Transactions and Hybris fault-tolerance metadata
Spring's MetadataStore keeps records even if file processing fails or an application node is shut down. One solution to that problem can be to wire the whole processing flow into a transaction. If we open a transaction in our custom poller (tasks or cron job), we can guarantee that either we process everything and metadata is stored in the database, or we roll everything back.
To enable this approach in reality, we also need to use the Circuit Breaker pattern to avoid constant message reprocessing, and we need monitoring and alerting in place to react to failures. Another approach is soft markers. We can extend the Hybris metadata model with an additional attribute to track processing status and set it after processing completion. In this case, we can proceed with processing if there is a problem with a single message, and return later to reprocess all deltas when we fix the message or processing.
Sergei Aksenenko,
EPAM Lead Software Engineer