Thursday, 23 February 2012

Spring Integration - Robust Splitter Aggregator

A Robust Splitter Aggregator Design Strategy - Messaging Gateway Adapter Pattern

What do we mean by robust?
In the context of this article, robustness refers to an ability to manage exception conditions within a flow without immediately returning to the caller. In some processing scenarios n of m responses is good enough to proceed to conclusion. Example processing scenarios that typically have these tendencies are:
  1. Quotations for finance, insurance and booking systems.
  2. Fan-out publishing systems.
Why do we need Robust Splitter Aggregator Designs?
First and foremost an introduction to a typical Splitter Aggregator pattern maybe necessary. The Splitter is an EIP pattern that describes a mechanism for breaking composite messages into parts in order that they can be processed individually. A Router is an EIP pattern that describes routing messages into channels - aiming them at specific messaging endpoints. The Aggregator is an EIP pattern that collates and stores a set of messages that belong to a group, and releases them when that group is complete.

Together, those three EIP constructs form a powerful mechanism for dividing processing into distinct units of work. Spring Integration (SI) uses the same pattern terminology as EIP and so readers of that methodology will be quite comfortable with Spring Integration Framework constructs. The SI Framework allows significant customisations of all three of those constructs and furthermore, by simply using asynchronous channels as you would in any other multi-threaded configuration, allows those units of work to be executed in parallel.

An interesting challenge working with SI Splitter Aggregator designs is building appropriately robust flows that operate predictably in a number of invocation scenarios. A simple splitter aggregator design can be used in many circumstances and operate without heavy customisation of the SI constructs. However, some service requirements demand a more robust processing strategy and therefore more complex configuration. The following sections describe and show what a Simple Splitter Aggregator design actually looks like, the type of processing your design must be able to deal with and then goes on to suggest candidate solutions for more robust processing.

A Simple Splitter Aggregator Design
The following Splitter Aggregator design shows a simple flow that receives document request messages into messaging gateway, splits the message into two processing routes and then aggregates the response. Note that the diagram has been built from EIP constructs in OmniGraffle rather than being an Integration Graph view from within STS; the channels are missing from the diagram for the sake of brevity.





SI Constructs in detail:


Messaging Gateways - there are three messaging gateways. A number of configurations are available for gateway specifications but significantly can return business objects, exceptions and nulls (following a timeout). The gateway to the far left is the service gateway for which we are defining the flow. The other two gateways, between the Router and Aggregator, are external systems that will be providing responses to business questions that our flow generates.

The Splitter - a single splitter exists and is responsible for consuming the document message and producing a collection of messages for onward processing. The Java signature for the, most often, custom Splitter specifies a single object argument and a collection for return.

The Recipient List Router - a single router exists, any appropriate router can be used, chose the one that closely matches your requirements - you can easily route by expression or payload type. The primary purpose of the router is route a collection of messages supplied by the splitter. This is a pretty typical Splitter Aggregator configuration.

Aggregator - a single construct that is responsible for collecting messages together in a group in order that further processing can take place on the gateway responses. Although the Aggregator can be configured with attributes and bean definitions to provide alternative grouping and release strategies, most often the default aggregation strategy suffices.

Interesting Aspects of Splitter Aggregator Operation
  1. Gateway - the inbound gateway, the one on the far left, may or may not have an error handling bean reference defined on it. If it does then that bean will have an opportunity to handle an exceptions thrown within the flow to the right of that gateway. If not, any exception will be thrown straight out of the gateway.
  2. Gateway - an optional default-reply-timeout can be set on each of the gateways, there are significant implications for setting this value, ensure that they're well understood. An expired timeout will result in a null being returned from the gateway. This is the very same condition that can lead to a thread getting parked if an upstream gateway also has no default-reply-timeout set.
  3. Splitter Input Channel - this can be a simple direct channel or a direct channel with a dispatcher defined on it. If the channel has a dispatcher specified the flow downstream of this point will be asynchronous, multi-threaded. This also changes the upstream gateway semantics as it usually means that an otherwise impotent default-reply-timeout becomes active.
  4. Splitter - the splitter must return a single object. The single object returned by the splitter is a collection, a java.util.List. The SI framework will take each member of that list and feed it into the output-channel of the Splitter - as with this example, usually straight into a router. The contract for Splitter List returns is as its use in Java - it may contain zero, one or more elements. If the splitter returns an empty list it's unlikely that the router will have any work to do and so the flow invocation will be complete. However, if the List contains one item, the SI framework will extract that item from the list and push it into the router, if this gets routed successfully, the flow will continue.
  5. Router - the router will simply route messages into one of two gateways in this example.
  6. Gateways - the two gateways that are used between the Splitter and Aggregator are interesting. In this example I have used the generic gateway EIP pattern to represent a message sub-system but not defined it explicitly - we could use an HTTP outbound gateway, another SI flow or any other external system. Of course, for each of those sub-systems, a number of responses is possible. Depending on the protocol and external system, the message request may fail to send, the response fail to arrive, a long running process invoked, a network error or timeout or a general processing exception.
  7. Aggregator - the single aggregator will wait for a number of responses depending on what's been created by the Splitter. In the case where the splitter return list is empty the Aggregator will not get invoked. In the case where the Splitter return list has one entry, the aggregator will be waiting for one gateway response to complete the group. In the case where the Splitter list has n entries the Aggregator will be waiting for n entries to complete the group. Custom correlation strategies, release strategies and message stores can be injected amongst a set of rich configuration aspects.
Interesting Aspects of Simple Splitter Aggregator Operation
The primary deciding factor for establishing whether this type of simple gateway is adequate for requirements is to understand what happens in the event of failure. If any exception occurring in your SI flow results in the flow invocation being abandoned and that suits your requirements, there's no need to read any further. If, however, you need to continue processing following failure in one of the gateways the remainder of this article may be of more interest.

Exceptions, from any source, generated between the splitter and aggregator, will result in an empty or partial group being discarded by the Aggregator. The exception will propagate back to the closest upstream gateway for either handling by a custom bean or re-throwing by the gateway. Note that a custom release strategy on the Aggregator is difficult to use and especially so alongside timeouts but would not help in this case as the exception will propagate back to the leftmost gateway before the aggregator is invoked.

It's also possible to configure exception handlers on the innermost gateways, the exception message could be caught but how do you route messages from a custom exception handler into the aggregator to complete the group, inject the aggregator channel definition into the custom exception handler? This is a poor approach and would involve unpacking an exception message payload, copying the original message headers into a new SI message and then adding the original payload - only four or five lines of code, but dirty it is.

Following exception generation, exception messages (without modification) cannot be routed into an Aggregator to complete the group. The original message, the one that contains the correlation and sequence ids for the group and group position are buried inside the SI messages exception payload.

If processing needs to continue following exception generation, it should be clear that in order to continue processing, the following must take place:
  • the aggregation group needs to be completed,
  • any exceptions must be caught and handled before getting back to the closet upstream gateway,
  • the correlation and sequence identifiers that allow group completion in the aggregator are buried within the exception message payload and will require extraction and setting on the message that's bound for the aggregator


A More Robust Solution - Messaging Gateway Adapter Pattern
Dealing with exceptions and null returns from gateways naturally leads to a design that implements a wrapper around the messaging gateway. This affords a level of control that would otherwise be very difficult to establish.


This adapter technique allows all returns from messaging gateways to be caught and processed as the messaging gateway is injected into the Service Activator and called directly from that. The messaging gateway no longer responds to the aggregator directly, it responds to a custom Java code Spring bean configured in the Service Activator namespace definition.

As expected, processing that does not undergo exception will continue as normal. Those flows that experience exception conditions or unexpected or missing responses from messaging gateways need to process messages in such as way that message groups bound for aggregation can be completed. If the Service Activator were to allow the exception to be propagated outside of it's backing bean, the group would not complete. The same applies not just for exceptions but any return object that does not carry the prerequisite group correlation id and sequence headers - this is where the adaptation is applied.

Exception messages or null responses from messaging gateways are caught and handled as shown in the following example code:

import com.l8mdv.sample.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.Message;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;

public class AvsServiceImpl implements AvsService {

    private static final Logger logger
            = LoggerFactory.getLogger(AvsServiceImpl.class);
    
    public static final String MISSING_MANDATORY_ARG
            = "Mandatory argument is missing.";
    
    private AvsGateway avsGateway;

    public AvsServiceImpl(final AvsGateway avsGateway) {
        this.avsGateway = avsGateway;
    }

    public Message<AvsResponse> service(Message<AvsRequest> message) {

        Assert.notNull(message, MISSING_MANDATORY_ARG);
        Assert.notNull(message.getPayload(), MISSING_MANDATORY_ARG);

        MessageHeaders requestMessageHeaders = message.getHeaders();
        Message<AvsResponse> responseMessage = null;
        try {
            logger.debug("Entering AVS Gateway");
            responseMessage = avsGateway.send(message);
            if (responseMessage == null)
                responseMessage = buildNewResponse(requestMessageHeaders,
                        AvsResponseType.NULL_RESULT);
            logger.debug("Exited AVS Gateway");
            
            return responseMessage;
        } 
        catch (Exception e) {
            return buildNewResponse(responseMessage, requestMessageHeaders,
                    AvsResponseType.EXCEPTION_RESULT, e);
        }
    }

    private Message<AvsResponse> 
                     buildNewResponse(MessageHeaders requestMessageHeaders,
                     AvsResponseType avsResponseType) {

        Assert.notNull(requestMessageHeaders, MISSING_MANDATORY_ARG);
        Assert.notNull(avsResponseType, MISSING_MANDATORY_ARG);

        AvsResponse avsResponse = new AvsResponse();
        avsResponse.setError(avsResponseType);

        return MessageBuilder.withPayload(avsResponse)
                .copyHeadersIfAbsent(requestMessageHeaders).build();
    }

    private Message<AvsResponse> 
                     buildNewResponse(Message<AvsResponse> responseMessage,
                     MessageHeaders requestMessageHeaders,
                     AvsResponseType avsResponseType,
                     Exception e) {

        Assert.notNull(responseMessage, MISSING_MANDATORY_ARG);
        Assert.notNull(responseMessage.getPayload(), MISSING_MANDATORY_ARG);
        Assert.notNull(requestMessageHeaders, MISSING_MANDATORY_ARG);
        Assert.notNull(avsResponseType, MISSING_MANDATORY_ARG);
        Assert.notNull(e, MISSING_MANDATORY_ARG);

        AvsResponse avsResponse = new AvsResponse();
        avsResponse.setError(avsResponseType,
                responseMessage.getPayload(), e);

        return MessageBuilder.withPayload(avsResponse)
                .copyHeadersIfAbsent(requestMessageHeaders).build();
    }
}


Notice the last line of the catch clause of the exception handling block. This line of code copies the correlation and sequence headers into the response message, this is mandatory if the aggregation group is going to be allowed to complete and will always be necessary following an exception as shown here.

Consequences of using this technique
There's no doubt that introducing a Messaging Gateway Adapter into SI config makes the configuration more complex to read and follow. The key factor here is that there is no longer a linear progression through the configuration file. This because the Service Activator must forward reference a Gateway or a Gateway defined before it's adapting Service Activator - in both cases the result is the same.

Resources
Note:- The design for the software that drove creation of this meta-pattern was based on a requirement that a number of external risk assessment services would be accessed by a single, central Risk Assessment Service. In order to satisfy clients of the service, invocation had to take place in parallel and continue despite failure in any one of those external services. This requirement lead to the design of the Messaging Gateway Adapter Pattern for the project.

  1. Spring Integration Reference Manual
  2. The solution approach for this problem was discussed directly with Mark Fisher (SpringSource) in the context of building Risk Assessment flows for a large US financial institution. Although the configuration and code is protected by NDA and copyright, it's acceptable to express the design intention and similar code in this article.



XML Schema to Java Binding - Large Enumerations

Occasionally it becomes necessary to convert XML Schema based enumeration definitions into Java objects for use in application software.

A number of different binding technologies exist to support this process, examples are JAXB (Java Architecture for XML Binding) and Castor to name but a few.

An interesting restriction in the generation of Java Enumeration objects when building source using JAXB is a size limit on the number of enumeration constants that can be declared within an enumeration declaration. An extract from the namespace definition is shown below.

<xs:attributeGroup name="typesafeEnumClassDefaults">
        <xs:attribute name="typesafeEnumMemberName"
                      default="skipGeneration" 
                      type="jaxb:typesafeEnumMemberNameType"/>
        <xs:attribute name="typesafeEnumBase" 
                      default="xs:string" 
                      type="jaxb:typesafeEnumBaseType"/>
        <xs:attribute name="typesafeEnumMaxMembers" 
                      type="xs:int" 
                      default="256"/>
</xs:attributeGroup>

As can be seen from the attribute typesafeEnumMaxMembers the default limit is 256. Parsing some XML Schema models, for example such as those representing XSD exports of C24 IO SWIFT message models, will result in that limit being encountered. The XJC parser will produce the following WARNING output indicating that the 256 limit has been reached:

[WARNING] Error while parsing schema(s).Location [ file:/Users/mattvickery/projects/l8mdv-si-samples/sample-messaging/src/main/resources/SWIFT%20FIN%20November%202011%20Data%20Dictionary.xsd{12060,57}].
org.xml.sax.SAXParseException: Simple type "Field503LoginSelectErrorCode" was not mapped to Enum due to EnumMemberSizeCap limit. Facets count: 599, current limit: 256. You can use customization attribute "typesafeEnumMaxMembers" to extend the limit.
 at ...warning(ErrorReporter.java:88)
 at ...shouldBeMappedToTypeSafeEnumByDefault(SimpleTypeBuilder.java:459)
 at ...find(SimpleTypeBuilder.java:419)
 at ...compose(SimpleTypeBuilder.java:296)
 at ...build(SimpleTypeBuilder.java:172)

Although it's a WARNING that's generated, the resulting enumeration class is not generated rather than partially generated with 256 enumeration constants.

In order to overcome this limitation a change in configuration is necessary for Enumeration binding generation, this can be done by creating an entry such as the following in a custom JAXB bindings file and making this available to the build system of choice.

<jaxb:bindings 
        schemaLocation="file:SWIFT-FIN-November-2011-Data-Dictionary.xsd">
        <jaxb:bindings>
            <jaxb:globalBindings typesafeEnumMaxMembers="600"/>
        </jaxb:bindings>
</jaxb:bindings>

Following this change, generation of Java Enumeration objects whose enumeration constants number greater than the default should be generated without WARNING and result in an ordinary class file.

Wednesday, 8 February 2012

Spring Integration Gateways - Null Handling & Timeouts


Spring Integration (SI) Gateways

Spring Integration Gateways (<int:gateway>) provide a semantically rich interface to message sub-systems. Gateways are specified using namespace constructs, these reference a specific Java interface (<service-interface>) that is backed by an object dynamically implemented at run-time by the Spring Integration framework. Furthermore, these Java interfaces can, if you so wish, be defined entirely independent of any Spring artefacts - that's both code and configuration.

One of the primary advantages of using the SI gateway as an interface to message sub-systems is that it's possible to automatically adopt the benefit of rich, default and customisable, gateway configuration. One such configuration attribute deserves further scrutiny and discussion primarily because it's easy to misunderstand and misconfigure around - default-reply-timeout.

Primary Motivator for Gateway Analysis


During recent consulting engagements, I've encountered a number of deployments that use Spring Integration Gateway specifications that may, in some circumstances, lead to production operational instability. This has often been in high-pressure environments or those where technology support is not backed by adequate training, testing, review or technology mentoring.

How do gateways behave in Spring Integration (R2.0.5)


One of the key sections, regarding gateways, in the Spring Integration manual clearly explains gateway semantics. Below is a 2-dimensional table of possible non-standard gateway returns for each of the scenarios that the SI Manual (r2.0.5) refers to.

Gateway Non-standard Responses

Runtime Events default-reply-timeout=x
Single-threaded
default-reply-timeout=x
Multi-threaded
default-reply-timeout=null
Single-threaded
default-reply-timeout=null
Multi-threaded
1. Long Running
Process
Thread Parked null returned Thread Parked Thread Parked
2. Null Returned
Downstream
null returned null returned Thread Parked Thread Parked
3. void method
Downstream
null returned null returned Thread Parked Thread Parked
4. Runtime
Exception
Error handler invoked or
exception thrown.
Error handler invoked or
exception thrown.
Error handler invoked or
exception thrown.
Error handler invoked or
exception thrown.

The key parts of this table are the conditions that lead to invoking threads being parked (noted in red), nulls returned (noted in orange) and exceptions (noted in green). Each contributor consists of configuration that is under the developers control, deployed code that is under developers control and conditions that are usually not under developers control. 

Clearly, the column headings in the table above are divided into two sections; two gateway configuration attributes. The default-reply-timeout is set by the SI configured and is the amount of time that a client call is wiling to wait for a response from the gateway. Secondly, synchronous flows are represented by Single-threaded flows, asynchronous by Multi-threaded flows.

A synchronous, or single-threaded flow, is one such as the following:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration-2.1.xsd">

    <import resource="common-beans.xml"/>

    <int:gateway id="enrollmentServiceGateway"
            service-interface="com.l8mdv.sample.EnrollmentServiceGateway"
            default-request-channel="gateway-request-channel"
            default-reply-channel="gateway-response-channel"
            default-reply-timeout="6000"/>

    <int:service-activator id="sleeperService" 
                           input-channel="gateway-request-channel"
                           output-channel="gateway-response-channel" 
                           ref="sleeper"/>

</beans>

The implicit input channel (gateway-request-channel) has no associated dispatcher configured. An asynchronous, or multi-threaded flow, is one such as the following:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration-2.1.xsd">

    <import resource="common-beans.xml"/>

    <int:gateway id="enrollmentServiceGateway"
                 service-interface="com.l8mdv.sample.EnrollmentServiceGateway"
                 default-request-channel="gateway-request-channel"
                 default-reply-channel="gateway-response-channel"
                 default-reply-timeout="6000"/>

    <int:channel id="gateway-request-channel">
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>
    <int:service-activator id="sleeperService" 
                           input-channel="gateway-request-channel"
                           output-channel="gateway-response-channel" 
                           ref="sleeper"/>

</beans>

The explicit input channel has a dispatcher configured ("taskExecutor"). This task executor specifies a thread pool that supplies threads for execution and whose configuration as above marks a thread boundary. Note: This is not the only way of making channels asynchronous

The other configuration attribute referenced is default-reply-timeout, this is set on the gateway namespace configuration such as the example above. Note that both of these runtime aspects are set by the configurer during SI flow design and implementation. They are entirely under developer control.

The 'Runtime Events' column indicates gateway relevant runtime events that have to be considered during gateway configuration - these are obviously not under developer control. Trigger conditions for these events are not as unusual as one may hope.

1. Long Running Processes


It's not uncommon for thread pools to become exhausted because all pooled threads are waiting for an external resource accessed through a socket, this may be a long running database query, a firewall keeping a connection open despite the server terminating etc. There is significant potential for these types of trigger. Some long-running processes terminate naturally, sometimes they never completed - an application restart is required.

2. Null returned downstream


A null may be returned from a downstream SI construct such as a Transformer, Service Activator or Gateway. A Gateway may return null in some circumstances such as following a gateway timeout event.

3. Void method downstream

Any custom code invoked during an SI flow may use a void method signature. This can also be caused by configuration in circumstances where flows are determined dynamically at runtime.

4. Runtime Exception

RuntimeException's can be triggered during normal operation and are generally handled by catching them at the gateway or allowing them to propagate through. The reason that they are coloured green in the table above is that they are generally much easier to handle than timeouts.

Gateway Timeout Handling Strategies

There are four possible outcomes from invoking a gateway with a request message, all of these as a result of specific runtime events: a) an ordinary message response, b) an exception message, c) a null or d) no-response.

Ordinary business responses and exceptions are straight forward to understand and will not be covered further in this article. The two significant outcomes that will be explored further are strategies for dealing with nulls and no-response.

Generally speaking, long running processes either terminate or not. Long running processes that terminate may eventually return a message through the invoked gateway or timeout depending on timeout configuration, in which case a null may be returned. The severity of this as a problem depends on throughput volume, length of long running process and system resources (thread-pool size).

Configuration exists for default-reply-timeout 

In the case where a long running process event is underway and a default-reply-timeout has been set, as long as the long running process completes before the default-reply-timeout expires, there is no problem to deal with. However, if the long running process does not complete before that timeout expires one of three outcomes will apply.

Firstly, if the long running process terminates subsequent to the reply timeout expiry, the gateway will have already returned null to the invoker so the null response needs handling by the invoker. The thread handling the long-running process will be returned to the pool.

Secondly, if the long running process does not terminate and a reply timeout has been set, the gateway will return null to the gateway invoker but the thread executing the long-running process will not get returned to the pool.

Thirdly, and most significantly, if a default-reply-timeout has been configured but the long running process is running on the same thread as the invoker, i.e. synchronous channels supply messages to that process, the thread will not return, the default-reply-timeout has no affect.

Assuming the most common processing scenario, a long running process completes either before or after the reply timeout expiry. When a null is returned by the gateway, the invoker is forced to deal with a null response. It's often unacceptable to force gateway consumers to deal with null responses and is not necessary as with a little additional configuration, this can be avoided.

Absent Configuration for default-reply-timeout


The most significant danger exists around gateways that have no default-reply-timeout configuration set.

A long running process or a null returned from downstream will mean that the invoking thread is parked. This is true for both synchronous and asynchronous flows and may ultimately force an application to be restarted because the invoker thread pool is likely to start on a depletion course if this continues to occur.

Spring Integration Timeout Handling Design Strategies


For those Spring Integration configuration designers that are comfortable with gateway invokers dealing with null responses, exceptions and set default-reply-timeouts on gateways, there's no need to read further. However, if you wish to provide clients of your gateway a more predictable response, a couple of strategies exist for handling null responses from gateways in order that invokers are protected from having to deal with them.

Firstly, the simpliest solution is to wrap the gateway with a service activator. The gateway must have the default-reply-timeout attribute value set in order to avoid unnecessary parking of threads. In order to avoid the consequence of long-running threads it's also very prudent to use a dispatcher soon after entry to the gateway - this breaks the thread boundary.

Whilst this is a valid technical approach, the impact is that we have forced a different entry point to our message sub-system. Entry is now via a Service Activator rather than a Gateway. A side affect of this change is that the testing entry point changes. Integration tests that would normally reference a gateway to send a message now have to locate the backing implementation for the Service Activator, not ideal.

An alternative approach toward solving this problem would be to configure two gateways with a Service Activator between them. Only one of the gateways would be exposed to invokers, the outer one. Both Gateways would reference the same service interface. The outer gateway specification would not specify the default-reply-timeout but would specify the input and output channels in the same way that a single gateway would. The Service Activator between the Gateways would handle null gateway responses and possibly any exceptions if preferred to the gateway error handler approach.

An example is as follows:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration
       http://www...org/schema/integration/spring-integration-2.1.xsd">

    <import resource="common-beans.xml"/>

    <int:gateway id="enrollmentServiceGateway"
                 service-interface="com.l8mdv.sample.EnrollmentServiceGateway"
                 default-request-channel="gateway-request-channel"/>

    <!-- This service activator has the adapted gateway injected as a bean and
    calls the adapted gateway from within the service method directly. -->
    <int:service-activator id="enrollmentServiceGatewayHandler"
                           input-channel="gateway-request-channel">
        <bean class="com.l8mdv.sample.EnrollmentServiceGatewayHandler">
            <constructor-arg name="enrollmentServiceAdaptedGateway"
                             ref="enrollmentServiceAdaptedGateway"/>
        </bean>
    </int:service-activator>

    <int:gateway id="enrollmentServiceAdaptedGateway"
                 service-interface="com.l8mdv.sample.EnrollmentServiceGateway"
                 default-request-channel="adapted-gateway-request-channel"
                 default-reply-timeout="6000"/>

    <int:channel id="adapted-gateway-request-channel" 
                 datatype="java.lang.String">
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>

</beans>

The Service Activator bean (enrollmentServiceGatewayHandler) deals with both null and exception responses from the adapted gateway (enrollmentServiceAdaptedGateway), in the situation where these are generated a business response detailing the error is generated.

Spring Integration R2.1 Changes
async-executor on gateway spec