Skip to content
This repository was archived by the owner on Jul 9, 2022. It is now read-only.

OrderProcessing sample

olegz edited this page Jun 23, 2011 · 39 revisions

This example demonstrates a simple flow for imaginary Order Processing system You can see the flow diagram below.

Photobucket

User submits the order via MessagingGateway which needs to be:

  1. validated (proceed or fail)
  2. split if validation is successful (basically separate 'bikes' order from 'books' order)
  3. route order to specific processors ('bikeProcessor' vs 'bookProcessor')
  4. aggregate processing results and send reply back to the user

We begin from defining a MessagingGateway. Just like in Java this MessagingGateway is identified by a strategy interface (or 'trait' in Scala terms) giving you simple POSO (Plain Old Scala Object) access to the Messaging system without direct dependency on Spring Integration API Note that there is no implementation for such trait since it will be represented as Proxy.

val orderGateway = gateway.withErrorChannel("errorFlowChannel").using(classOf[OrderProcessingGateway])

As you can see from above we use Spring Integration Scala fluent API to configure the gateway with errorChannel and user defined trait. We also define an aggregationChannel since its going to be reused in teh flow configuration and we want to make sure we have a shared reference instead of relying on channel name (e.g., String) Than we create an SpringIntegrationContext passing the flow configuration as a constructor argument while configuring the rest of the components in-line and composing the Message flow using '->' operator defined in Spring Integration Scala DSL.

val integrationContext = SpringIntegrationContext(
    {
      orderGateway ->
      filter.withName("orderValidator").andErrorOnRejection(true).using{p:PurchaseOrder => !p.items.isEmpty} ->
      split.using{p:PurchaseOrder => JavaConversions.asList(p.items)} ->  
      channel.withExecutor ->
      route.withChannelMappings(Map("books" -> "booksChannel", "bikes" -> "bikesChannel")).using{pi:PurchaseOrderItem => pi.itemType}
    },
    {
      channel("errorFlowChannel") ->
      service.using{m:Message[_] => println("Received ERROR: " + m); "ERROR processing order"}
    },
    {
      channel("bikesChannel") ->
      service.using{m:Message[_] => println("Processing bikes order: " + m); m} ->
      aggregationChannel
    },
    {
      channel("booksChannel") ->
      service.using{m:Message[_] => println("Processing books order: " + m); Thread.sleep(new Random().nextInt(2000)); m} ->
      aggregationChannel
    },
    {
      aggregationChannel ->
      aggregate()
    }
)

As you can see form above, whenever the EIP endpoint needs or required to perform some custom logic it is provided via using(..) method. You can use both Spring Expression Language or Scala function (as in the above example). Also note that it is not required to explicitly define channels to connect endpoints if such connectivity is relying on default channel (synchronous point-to-point channel). Spring Integration Scala DSL will simply auto-create one, thus greatly simplifying the configuration. So you can see that gateway, filter and splitter do not define channels to connect with one another. However you see a channel definition that connects splitter and router. We need to define such channel explicitly since we want the message exchange between splitter and router to be asynchronous, thus we have configured this channel with Executor (default Thread executor: Executors.newCachedThreadPool)

Clone this wiki locally