Skip to content
This repository was archived by the owner on Jul 9, 2022. It is now read-only.

OrderProcessing sample

olegz edited this page Jan 31, 2012 · 39 revisions

This example demonstrates a simple flow for imaginary Order Processing system You can see the flow diagram below.

Order Processing

(If image doesn't show follow this link: http://i1205.photobucket.com/albums/bb431/z_oleg/Screenshot2011-06-23at94026AM.png)

User submits the order via MessagingGateway which needs to be:

  1. validated (proceed or fail)
  2. split if validation is successful (basically separate 'bikes' order from 'books' order)
  3. route order to specific processors ('bikeProcessor' vs 'bookProcessor')
  4. aggregate processing results and send reply back to the user

With Scala DSL we don't need to define an explicit Gateway since we already have a reference to the integration flow:

val orderProcessingFlow = 
      filter.using{p:PurchaseOrder => !p.items.isEmpty}.where(exceptionOnRejection = true) -->
      split.using{p:PurchaseOrder => p.items} -->
      Channel.withDispatcher(taskExecutor = new SimpleAsyncTaskExecutor) -->
      route.using{pi:PurchaseOrderItem => pi.itemType}(
        when("books") {
          handle.using{m:Message[_] => println("Processing bikes order: " + m); m} // prints Message and returns it
        },
        when("bikes") {
          handle.using{
            m:Message[_] => println("Processing books order: " + m); Thread.sleep(new Random().nextInt(2000)); m
          } // prints Message, delays it randomly and returns it
        }
      ) -->
      aggregate()

val errorFlow = handle.using{m:Message[_] => println("Received ERROR: " + m); "ERROR processing order"}

As you can see form above, whenever the EIP endpoint required to perform some custom logic it is provided via using(..) method. You can use both Spring Expression Language or Scala function (as in the above example). Also note that it is not required to explicitly define channels to connect endpoints if such connectivity is relying on default channel (synchronous point-to-point channel). Spring Integration Scala DSL will simply auto-create one, thus greatly simplifying the configuration. So you can see that , filter and splitter do not have an explicit channel configuration connecting them. However you DO see a channel definition that connects splitter and router. We need to define such channel explicitly since we want the message exchange between splitter and router to be asynchronous, thus we have configured this channel with Executor (default Thread executor: Executors.newCachedThreadPool)

You also see that routed Message flows are later aggregated and since the Aggregator is the last component in the Message flow but does not define an output Channel the reply is sent to an implicit reply channel provided by the sendAndRecieve method.

Now we simply interact with this Message flow by sending it valid or invalid order:

    val result = orderProcessingFlow.sendAndReceive[Any](validOrder, errorFlow = errorFlow)
    //val result = orderProcessingFlow.sendAndReceive[Any](invalidOrder, errorFlow = errorFlow)

... and print a reply

    println("Reply: " + result)

First you'll see the log statements showing order processing by the order processors

Processing books order: [Payload=PurchaseOrderItem(books,Spring Integration in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=1, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891276, id=c2219cb0-1999-42ed-a0e2-963a4bc7401c, sequenceSize=3}]
Processing bikes order: [Payload=PurchaseOrderItem(bikes,Canyon Torque FRX)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=3, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=f098e556-04d0-4839-b548-0d8cbe0ff058, sequenceSize=3}]
Processing books order: [Payload=PurchaseOrderItem(books,DSLs in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=2, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=a7be1699-6bcb-4a1b-bfea-13b158dd29ab, sequenceSize=3}]

.. and then a reply after a short delay

Reply: [PurchaseOrderItem(bikes,Canyon Torque FRX), PurchaseOrderItem(books,Spring Integration in Action), PurchaseOrderItem(books,DSLs in Action)]

That is pretty much it. The full source code for this sample available here: OrderProcessing.scala

Clone this wiki locally