-
Notifications
You must be signed in to change notification settings - Fork 33
OrderProcessing sample
This example demonstrates a simple flow for imaginary Order Processing system You can see the flow diagram below.
User submits the order via MessagingGateway which needs to be:
- validated (proceed or fail)
- split if validation is successful (basically separate 'bikes' order from 'books' order)
- route order to specific processors ('bikeProcessor' vs 'bookProcessor')
- aggregate processing results and send reply back to the user
We begin from defining a MessagingGateway. Just like in Java this MessagingGateway is identified by a strategy interface (or 'trait' in Scala terms) giving you simple POSO (Plain Old Scala Object) access to the Messaging system without direct dependency on Spring Integration API Note that there is no implementation for such trait since it will be represented as Proxy.
val orderGateway = gateway.withErrorChannel("errorFlowChannel").using(classOf[OrderProcessingGateway])
As you can see from above we use Spring Integration Scala fluent API to configure the gateway with errorChannel and user defined trait.
The OrderProcessingGateway is a simple Scala trait:
trait OrderProcessingGateway {
def processOrder(order:PurchaseOrder): Object
}
We also define an aggregationChannel since its going to be reused in the flow configuration and we want to make sure we have a shared reference instead of relying on channel name (e.g., String)
val aggregationChannel = channel("aggregationChannel")
Than we create an SpringIntegrationContext passing the flow configuration as a constructor argument while configuring the rest of the components in-line and composing the Message flow using '->' operator defined in Spring Integration Scala DSL.
val integrationContext = SpringIntegrationContext(
{
orderGateway ->
filter.withName("orderValidator").andErrorOnRejection(true).using{p:PurchaseOrder => !p.items.isEmpty} ->
split.using{p:PurchaseOrder => JavaConversions.asList(p.items)} ->
channel.withExecutor ->
route.withChannelMappings(Map("books" -> "booksChannel", "bikes" -> "bikesChannel")).using{pi:PurchaseOrderItem => pi.itemType}
},
{
channel("errorFlowChannel") ->
service.using{m:Message[_] => println("Received ERROR: " + m); "ERROR processing order"}
},
{
channel("bikesChannel") ->
service.using{m:Message[_] => println("Processing bikes order: " + m); m} ->
aggregationChannel
},
{
channel("booksChannel") ->
service.using{m:Message[_] => println("Processing books order: " + m); Thread.sleep(new Random().nextInt(2000)); m} ->
aggregationChannel
},
{
aggregationChannel ->
aggregate()
}
)
As you can see form above, whenever the EIP endpoint needs or required to perform some custom logic it is provided via using(..) method. You can use both Spring Expression Language or Scala function (as in the above example). Also note that it is not required to explicitly define channels to connect endpoints if such connectivity is relying on default channel (synchronous point-to-point channel). Spring Integration Scala DSL will simply auto-create one, thus greatly simplifying the configuration. So you can see that gateway, filter and splitter do not define channels to connect with one another. However you DO see a channel definition that connects splitter and router. We need to define such channel explicitly since we want the message exchange between splitter and router to be asynchronous, thus we have configured this channel with Executor (default Thread executor: Executors.newCachedThreadPool)
You also see 4 more message flows. Remember our order is split and routed to two different processing flow (one for 'bikes' which starts with bikesChannel and another for 'books' which starts with booksChannel). We also have an error flow which will deal with any exception thrown back to the Messaging Gateway (starts with errorFlowChannel). And we have a flow that deals with aggregation of the processing results from 'books' and 'bikes' processors (starts with aggregationChannel)
The only remaining question is what happens after aggregator aggregated Messages? Those familiar with Spring Integration know that whenever your Gateway strategy (trait) with non-void method Spring Integration will create a reply channel that wil be passed with Message Headers. So the aggregator will send its Message to this channel thus returning reply back to the Messaging Gateway.
Now we simply invoke Messaging Gateway method:
val reply = orderGateway.processOrder(validOrder)
... and print a reply
println("Reply: " + reply)
First you'll see the log statements showing order processing by the order processors
Processing books order: [Payload=PurchaseOrderItem(books,Spring Integration in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=1, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891276, id=c2219cb0-1999-42ed-a0e2-963a4bc7401c, sequenceSize=3}]
Processing bikes order: [Payload=PurchaseOrderItem(bikes,Canyon Torque FRX)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=3, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=f098e556-04d0-4839-b548-0d8cbe0ff058, sequenceSize=3}]
Processing books order: [Payload=PurchaseOrderItem(books,DSLs in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=2, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=a7be1699-6bcb-4a1b-bfea-13b158dd29ab, sequenceSize=3}]
.. and then a reply after a short delay
Reply: [PurchaseOrderItem(bikes,Canyon Torque FRX), PurchaseOrderItem(books,Spring Integration in Action), PurchaseOrderItem(books,DSLs in Action)]
That is pretty much it. The full source code for this sample available here: OrderProcessing.scala