Implementing distributed asynchronous crash resilient pipelines with Terracotta DSO.

I this notes I want to share some tricks I’ve used during last two years in the project I’m working on.  I decided to publish them in a hope that it can be useful because I have not seen these tricks described before.

Problem description.

Imagine a Web service which has to sustain a heavy load 24x365. To fulfill most of the requests it has to make a sequence of calls to some external legacy subsystem. These calls are pretty heavy-weight and can take from tens of seconds to tens of minutes to be finished so they are invoked asynchronously and use a callback to report the result.  An additional requirement is that in most cases these sequences of calls should be transactional, i.e. if some step fails all previous steps should be “undone”.

High-level architecture.

It is natural to use clustered service to achieve horizontal scalability and crash resilience.  We use Java as an implementation platform so I decided to build the solution on the top of the Terracotta DSO. However being a solid foundation this framework does not provide a solution for all our problems out of the box, so some tricks had to be invented.

For those readers who are unfamiliar with Terracotta DSO I’ll briefly describe the basic abstractions of this framework which were used in the algorithms I want to discuss.

Asynchronous Pipelines.

Initially I’ve used a Finite State Machine pattern to implement the execution of the sequence of asynchronous operations described in section 1. However very soon it becomes clear that this approach results in a messy and error-prone code. In addition it was very difficult (if possible at all) to achieve a desired level of scalability and crash resilience. Indeed, albeit it  is fairly easy to use load balancer in front of the cluster to distribute the incoming requests between the nodes, different requests require different sequences of operations to be executed so the load will not be distributed evenly. In addition if some node goes down all the compound operations initiated by it will be left in an unknown state.

To deal with the above mentioned issues I decided to use another approach. It was inspired by Haskell-style monads but can also be considered as a specialized version of PipelineProcessing and AsynchronousComposition patterns. With this approach each step involving an asynchronous call to an external subsystem is wrapped into a “task”. These tasks extend a base abstract class and must implement “exec” and “undo” methods (both performing asynchronous calls) as well as callback methods “execDone” and “undoDone” for “exec” and “undo” respectively (for a sake of brevity I present a slightly simplified picture, in particular I assume here that all callbacks have the same signature). There is a “context” object shared between all tasks in the sequence.  The context object and all tasks are shareable in a Terracotta sense.

To serve an external request the application creates a wrapper for the sequence of tasks (“pipeline”) – in our case the sequence is known for each external request. This pipeline is an instance of a shareable class which implements the Runnable interface.  In addition the Pipeline class is a Task itself, so one can create a pipeline of pipelines, so we indeed have a monad in the sense of the Category Theory. After creation the pipeline is submitted to a cluster-wide shared ExecutorService (Terracotta does not allow sharing instances of the ExecutorService class, but one can have a shared LinkedBlockingQueue and create a local instances of  the ExecutorService on each node of the cluster which are backed by this queue).

The pipeline maintains its internal state which contains the current task, the direction of the execution flow (direct or reverse) and a mode – EXEC or CALLBACK. The initial mode is EXEC and the direction of the execution flow is direct. At each step the pipeline invokes “exec” or “undo” method of the current task (depending on the direction of the execution flow) providing itself as callback object. On the callback invocation the pipeline stores its arguments, changes its own state to CALLBACK and resubmits itself to the shared executor. At the next run it invokes “execDone” or “undoDone” callbacks of the current task with stored arguments, changes its own state to EXEC and resubmits itself again. If at some point a failure is detected the direction of execution flow is changed to the reverse and the pipeline tries to perform a rollback invoking the “undo” method of the previously executed tasks. Once again – for the sake of brevity I omit some minor details which are pretty obvious.

 Please note that each step (including actual processing of callback) can be potentially executed on a different node of the cluster. In particular this makes possible to implement a graceful shutdown of a particular node without a need to wait while all pipelines which are running on this node will be completely finished – the application has to wait for a completion of a current step only (an accurate algorithm of such a shutdown is not completely straightforward, but the details are out of the scope of this text).

Crash resilience.

So far so good. However we still have a problem. If a particular node crashes in the middle of the processing of a step of a particular pipeline there will be no way to recover or even to report about such a failure. Imagine for example that a node dies right after it has taken a pipeline from the shared queue. In this case there will be very difficult to figure what happened with this pipeline. To overcome this problem I have implemented the trick described below.

When a pipeline terminates it removes itself from the shared map.

It is worth to mention that the algorithm described above can be implemented in a less tricky way with the explicit use of the Terracotta-specific API. However I decided to avoid using this API as long as possible and use only declarative instrumentation (via tc-config.xml). So far it appears to be feasible.  In particular is seems the same trick works (may be with some minor modifications) with other Java clustering frameworks (such as Hazelcast).