-
Notifications
You must be signed in to change notification settings - Fork 33
OrderProcessing sample
This example demonstrates a simple flow for imaginary Order Processing system You can see the flow diagram below.
(If image doesn't show follow this link: http://i1205.photobucket.com/albums/bb431/z_oleg/Screenshot2011-06-23at94026AM.png)
User submits the order via MessagingGateway which needs to be:
- validated (proceed or fail)
- split if validation is successful (basically separate 'bikes' order from 'books' order)
- route order to specific processors ('bikeProcessor' vs 'bookProcessor')
- aggregate processing results and send reply back to the user
With Scala DSL we don't need to define an explicit Gateway since we already have a reference to the integration flow:
val orderProcessingFlow =
filter.using{p:PurchaseOrder => !p.items.isEmpty}.where(exceptionOnRejection = true) -->
split.using{p:PurchaseOrder => p.items} -->
Channel.withDispatcher(taskExecutor = new SimpleAsyncTaskExecutor) -->
route.using{pi:PurchaseOrderItem => pi.itemType}(
when("books") {
handle.using{m:Message[_] => println("Processing bikes order: " + m); m} // prints Message and returns it
},
when("bikes") {
handle.using{
m:Message[_] => println("Processing books order: " + m); Thread.sleep(new Random().nextInt(2000)); m
} // prints Message, delays it randomly and returns it
}
) -->
aggregate()
val errorFlow = handle.using{m:Message[_] => println("Received ERROR: " + m); "ERROR processing order"}
val result = orderProcessingFlow.sendAndReceive[Any](validOrder, errorFlow = errorFlow)
As you can see form above, whenever the EIP endpoint needs or required to perform some custom logic it is provided via using(..) method. You can use both Spring Expression Language or Scala function (as in the above example). Also note that it is not required to explicitly define channels to connect endpoints if such connectivity is relying on default channel (synchronous point-to-point channel). Spring Integration Scala DSL will simply auto-create one, thus greatly simplifying the configuration. So you can see that gateway, filter and splitter do not have an explicit channel configuration connecting them. However you DO see a channel definition that connects splitter and router. We need to define such channel explicitly since we want the message exchange between splitter and router to be asynchronous, thus we have configured this channel with Executor (default Thread executor: Executors.newCachedThreadPool)
You also see 4 more message flows. Remember our order is split and routed to two different processing flow (one for 'bikes' which starts with bikesChannel and another for 'books' which starts with booksChannel). You can also see that we artificially delaying the book processing flow to emulate long processing while demonstrating that all 3 orders will be processed concurrently. We also have an error flow which will deal with any exception thrown back to the Messaging Gateway (starts with errorFlowChannel). And we have a flow that deals with aggregation of the processing results from 'books' and 'bikes' processors (starts with aggregationChannel)
The only remaining question is what happens after aggregator aggregated Messages? Those familiar with Spring Integration know that whenever you define a non-void method on Messaging Gateway strategy (trait) Spring Integration will auto-create a reply channel that will be passed with Message Headers ('replyChannel'). So the aggregator will send its Message to this channel thus returning reply back to the Messaging Gateway.
Now we simply invoke Messaging Gateway method:
val reply = orderGateway.processOrder(validOrder)
... and print a reply
println("Reply: " + reply)
First you'll see the log statements showing order processing by the order processors
Processing books order: [Payload=PurchaseOrderItem(books,Spring Integration in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=1, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891276, id=c2219cb0-1999-42ed-a0e2-963a4bc7401c, sequenceSize=3}]
Processing bikes order: [Payload=PurchaseOrderItem(bikes,Canyon Torque FRX)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=3, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=f098e556-04d0-4839-b548-0d8cbe0ff058, sequenceSize=3}]
Processing books order: [Payload=PurchaseOrderItem(books,DSLs in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=2, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=a7be1699-6bcb-4a1b-bfea-13b158dd29ab, sequenceSize=3}]
.. and then a reply after a short delay
Reply: [PurchaseOrderItem(bikes,Canyon Torque FRX), PurchaseOrderItem(books,Spring Integration in Action), PurchaseOrderItem(books,DSLs in Action)]
That is pretty much it. The full source code for this sample available here: OrderProcessing.scala