diff --git a/build.gradle b/build.gradle index fdbb9b207..db4f14529 100644 --- a/build.gradle +++ b/build.gradle @@ -1368,6 +1368,32 @@ project('kafka-dsl') { } } +project('synchronous-udp-multicast') { + description = 'Java DSL synchronous UDP multicast' + + apply plugin: 'org.springframework.boot' + + dependencies { + compile 'org.springframework.boot:spring-boot-starter-web' + compile 'org.springframework.boot:spring-boot-starter-integration' + compile 'org.springframework.integration:spring-integration-ip' + compile 'org.springframework.integration:spring-integration-http' + testCompile 'org.springframework.boot:spring-boot-starter-test' + } + bootRun { + main = 'org.springframework.integration.samples.dsl.synchronous.multicast.Application' + } + + task run(type: JavaExec) { + main 'org.springframework.integration.samples.dsl.synchronous.multicast.Application' + classpath = sourceSets.main.runtimeClasspath + } + + tasks.withType(JavaExec) { + standardInput = System.in + } +} + project('file-split-ftp') { description = 'File Split FTP' diff --git a/dsl/synchronous-udp-multicast/README.md b/dsl/synchronous-udp-multicast/README.md new file mode 100644 index 000000000..b8defab09 --- /dev/null +++ b/dsl/synchronous-udp-multicast/README.md @@ -0,0 +1,25 @@ +Spring Integration Java DSL synchronous UDP multicast +============== + +This example demonstrates the use of `Http Inbound Components`, `Http Outbound Components`, `UDP Adapters` and `IntegrationFlowContext` class to create a UDP multicast synchronous gateway. + +## Flow + +The idea is to send a UDP multicast message in an synchronous way, so, it waits until a response arrieves from any of the UDP nodes joined to the multicast group. + + Client Server + + udpMulticastOutbound -->> udpMulticastInbound + | + | + v + v + httpInbound <<-- httpOutbound + + + +## Running the sample + +Run the test example. + + $ gradlew :synchronous-udp-multicast:test diff --git a/dsl/synchronous-udp-multicast/pom.xml b/dsl/synchronous-udp-multicast/pom.xml new file mode 100644 index 000000000..c193cad9a --- /dev/null +++ b/dsl/synchronous-udp-multicast/pom.xml @@ -0,0 +1,134 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.0.2.RELEASE + + org.springframework.integration.samples + synchronous-udp-multicast + 5.0.0.BUILD-SNAPSHOT + Java DSL synchronous UDP multicast + Java DSL synchronous UDP multicast + http://projects.spring.io/spring-integration + + SpringIO + https://spring.io + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + estigma88 + Daniel Andres Pelaez Lopez + estigma88@gmail.com + + project lead + + + + + scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git + scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git + https://github.com/spring-projects/spring-integration-samples + + + + org.springframework.boot + spring-boot-starter-integration + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-ip + 5.0.4.RELEASE + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-http + 5.0.4.RELEASE + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.boot + spring-boot-starter-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + + + repo.spring.io.milestone + Spring Framework Maven Milestone Repository + https://repo.spring.io/libs-milestone + + + repo.spring.io.snapshot + Spring Framework Maven Snapshot Repository + https://repo.spring.io/libs-snapshot + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + org.springframework.boot + spring-boot-dependencies + 2.0.2.RELEASE + import + pom + + + org.springframework + spring-framework-bom + 5.0.5.RELEASE + import + pom + + + org.springframework.integration + spring-integration-bom + 5.0.5.RELEASE + import + pom + + + + diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/Application.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/Application.java new file mode 100644 index 000000000..d2ee9a306 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/Application.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.samples.dsl.synchronous.multicast.starter.SynchronousUDPStarter; + +/** + * @author Daniel Andres Pelaez Lopez + */ +@SpringBootApplication +public class Application { + + public static void main(String[] args) throws Exception { + SpringApplication.run(Application.class, args); + } + + @Bean + public SynchronousUDPStarter synchronousUDPStarter(IntegrationFlowContext flowContext, + @Value("${synchronous.multicast.group}") String group, + @Value("${synchronous.multicast.port}") Integer port) { + SynchronousUDPStarter synchronousUDPStarter = new SynchronousUDPStarter(flowContext, group, port); + + synchronousUDPStarter.init(); + + return synchronousUDPStarter; + } + +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/gateway/UDPMulticastGateway.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/gateway/UDPMulticastGateway.java new file mode 100644 index 000000000..1bcbaf72b --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/gateway/UDPMulticastGateway.java @@ -0,0 +1,24 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.gateway; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public interface UDPMulticastGateway { + String send(String request); +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/handler/UDPMulticastHandler.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/handler/UDPMulticastHandler.java new file mode 100644 index 000000000..dd8141e0c --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/handler/UDPMulticastHandler.java @@ -0,0 +1,26 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.handler; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class UDPMulticastHandler { + public String handle(String request) { + return "Response for '" + request + "'"; + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java new file mode 100644 index 000000000..4e26f8e1c --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java @@ -0,0 +1,50 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.starter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class ExtendedMessage { + private final String replyOriginChannelId; + private final String errorOriginChannelId; + private final String data; + + @JsonCreator + ExtendedMessage(@JsonProperty("replyOriginChannelId") String replyOriginChannelId, + @JsonProperty("errorOriginChannelId") String errorOriginChannelId, + @JsonProperty("data") String data) { + this.replyOriginChannelId = replyOriginChannelId; + this.errorOriginChannelId = errorOriginChannelId; + this.data = data; + } + + public String getReplyOriginChannelId() { + return replyOriginChannelId; + } + + public String getErrorOriginChannelId() { + return errorOriginChannelId; + } + + public String getData() { + return data; + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java new file mode 100644 index 000000000..b00a12240 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java @@ -0,0 +1,37 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.starter; + +import org.springframework.integration.transformer.Transformer; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class ExtendedMessageTransformer implements Transformer { + @Override + public Message transform(Message message) { + return MessageBuilder + .withPayload(new ExtendedMessage((String) message.getHeaders().get(MessageHeaders.REPLY_CHANNEL), + (String) message.getHeaders().get(MessageHeaders.ERROR_CHANNEL), + (String) message.getPayload())) + .copyHeaders(message.getHeaders()) + .build(); + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java new file mode 100644 index 000000000..e6f59c9da --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java @@ -0,0 +1,156 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.starter; + +import org.springframework.http.HttpMethod; +import org.springframework.integration.channel.NullChannel; +import org.springframework.integration.dsl.*; +import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.http.dsl.Http; +import org.springframework.integration.ip.IpHeaders; +import org.springframework.integration.ip.dsl.Udp; +import org.springframework.integration.samples.dsl.synchronous.multicast.gateway.UDPMulticastGateway; +import org.springframework.integration.samples.dsl.synchronous.multicast.handler.UDPMulticastHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.MimeTypeUtils; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class SynchronousUDPStarter { + private static final String MESSAGES_PATH = "messages/"; + private static final String REPLY_ORIGINAL_CHANNEL_ID_HEADER = "replyOriginChannelId"; + private static final String ERROR_ORIGINAL_CHANNEL_ID_HEADER = "errorOriginChannelId"; + private static final String BASE_URL = "http://{host}:8080/"; + private static final String HTTP_OUTBOUND_CHANNEL = "httpOutbound"; + private final IntegrationFlowContext flowContext; + private final String group; + private final Integer port; + + public SynchronousUDPStarter(IntegrationFlowContext flowContext, String group, Integer port) { + this.flowContext = flowContext; + this.group = group; + this.port = port; + } + + public void init() { + StandardIntegrationFlow httpInbound = getHttpInboundFlow(); + + StandardIntegrationFlow httpOutbound = getHttpOutboundFlow(); + + StandardIntegrationFlow udpInbound = getUDPInboundFlow(); + + StandardIntegrationFlow udpOutbound = getUDPOutboundFlow(); + + flowContext.registration(httpInbound).id("httpInboundFlow").register(); + flowContext.registration(httpOutbound).id("httpOutboundFlow").register(); + flowContext.registration(udpInbound).id("udpInboundFlow").register(); + flowContext.registration(udpOutbound).id("udpOutboundFlow").register(); + + httpInbound.start(); + httpOutbound.start(); + udpInbound.start(); + udpOutbound.start(); + } + + private StandardIntegrationFlow getUDPOutboundFlow() { + //UDPMulticastGateway is the gateway through we are going to consume this flow + return IntegrationFlows.from(UDPMulticastGateway.class) + .enrichHeaders(m -> m + .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) + //Persist the replayChannel and errorChannel headers, getting IDs + .enrichHeaders(HeaderEnricherSpec::headerChannelsToString) + //Transform the message adding the replayChannel and errorChannel IDs as a part of the payload + //To retrieve later after the response arrives + .transform(new ExtendedMessageTransformer()) + .transform(Transformers.toJson()) + .handle(Udp.outboundMulticastAdapter(group, port)) + .get(); + } + + private StandardIntegrationFlow getUDPInboundFlow() { + return IntegrationFlows.from(Udp.inboundMulticastAdapter(port, group)) + .enrichHeaders(m -> m + .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) + .transform(Transformers.fromJson(ExtendedMessage.class)) + //Add new headers to the Message named replyOriginChannelId and errorOriginChannelId from the payload we receive + .enrichHeaders(h -> h + .headerFunction(REPLY_ORIGINAL_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getReplyOriginChannelId()) + .headerFunction(ERROR_ORIGINAL_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getErrorOriginChannelId())) + //Get the read data we want to handle + .transform(ExtendedMessage::getData) + .handle(new UDPMulticastHandler(), "handle") + //Publish a Message response using the httpOutbound channel + .publishSubscribeChannel(p -> p + .subscribe(s -> s + //The inboundMulticastAdapter does not have replyChannel and errorChannel by default + //So, we add a NullChannel where httpOutbound is going to respond + //We do not care about if this request is successful, this communication flow is still unreliable + .enrichHeaders(h -> h + .header(MessageHeaders.REPLY_CHANNEL, new NullChannel(), true) + .header(MessageHeaders.ERROR_CHANNEL, new NullChannel(), true)) + //Send the message to httpOutbound channel + .channel(HTTP_OUTBOUND_CHANNEL))) + .get(); + } + + private StandardIntegrationFlow getHttpOutboundFlow() { + return IntegrationFlows.from(HTTP_OUTBOUND_CHANNEL) + .enrichHeaders(m -> m + .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) + .handle(Http.outboundGateway(BASE_URL + MESSAGES_PATH) + .httpMethod(HttpMethod.POST) + //The headers we created in the UDPInboundFlow are mapped here to headers in the HTTP Request + //So, we can retrieve those from the request origin side + .mappedRequestHeaders(REPLY_ORIGINAL_CHANNEL_ID_HEADER, ERROR_ORIGINAL_CHANNEL_ID_HEADER) + .expectedResponseType(String.class) + //Get the request origin IP to response + .uriVariable("host", m -> m.getHeaders().get(IpHeaders.HOSTNAME))) + .get(); + } + + private StandardIntegrationFlow getHttpInboundFlow() { + return IntegrationFlows.from( + Http.inboundGateway(MESSAGES_PATH) + .requestMapping(m -> m.methods(HttpMethod.POST) + .consumes(MimeTypeUtils.APPLICATION_JSON_VALUE) + .produces(MimeTypeUtils.APPLICATION_JSON_VALUE)) + //The headers we created in the HttpOutboundFlow in the HTTP Request are mapped here to headers in the Message + //The headers names are in lower case + .mappedRequestHeaders(REPLY_ORIGINAL_CHANNEL_ID_HEADER, ERROR_ORIGINAL_CHANNEL_ID_HEADER) + .requestPayloadType(String.class)) + //Publish the Message to two subscribers + .publishSubscribeChannel(p -> p + //First subscriber: response to HttpInboundFlow + //The Message with the default replyChannel and errorChannel are redirect using a bridge + //The bridge resolves those headers and uses those channels + .subscribe(IntegrationFlowDefinition::bridge) + + //Second subscriber: response to UDPMulticastGateway (UDPOutboundFlow) + //The replyChannel and errorChannel headers are replaced with the replyOriginChannelId and errorOriginChannelId we sent at the beginning + //The bridge resolves those headers and uses those channels to unblock the UDPMulticastGateway + .subscribe(sub -> sub + .enrichHeaders(h -> h + .headerFunction(MessageHeaders.REPLY_CHANNEL, m -> m + .getHeaders().get(REPLY_ORIGINAL_CHANNEL_ID_HEADER.toLowerCase()), true) + .headerFunction(MessageHeaders.ERROR_CHANNEL, m -> m + .getHeaders().get(ERROR_ORIGINAL_CHANNEL_ID_HEADER.toLowerCase()), true)) + .bridge()) + ) + .get(); + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/resources/application.yml b/dsl/synchronous-udp-multicast/src/main/resources/application.yml new file mode 100644 index 000000000..799752258 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/resources/application.yml @@ -0,0 +1,4 @@ +synchronous: + multicast: + group: 224.0.0.1 + port: 2000 diff --git a/dsl/synchronous-udp-multicast/src/test/java/org/springframework/integration/samples/dsl/synchronous/multicast/ApplicationTest.java b/dsl/synchronous-udp-multicast/src/test/java/org/springframework/integration/samples/dsl/synchronous/multicast/ApplicationTest.java new file mode 100644 index 000000000..ad74f2f3a --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/test/java/org/springframework/integration/samples/dsl/synchronous/multicast/ApplicationTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.integration.samples.dsl.synchronous.multicast.gateway.UDPMulticastGateway; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Daniel Andres Pelaez Lopez + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) +public class ApplicationTest { + @Autowired + private UDPMulticastGateway udpMulticastGateway; + + public ApplicationTest() { + } + + @Test(timeout = 5000) + public void testUDPMulticastGateway() { + String response = udpMulticastGateway.send("request"); + + assertThat(response, is("Response for 'request'")); + } +}