-
Notifications
You must be signed in to change notification settings - Fork 1.1k
INT-2856:Add support for adding/removing individual recipients to the Re... #1215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1e0174b
e15d680
010d98d
401851c
b006eac
4e194fe
868b1ce
bb1fdee
89dfa83
0c2e081
0c8e5ff
aabe058
47f3115
c6cee14
53affa5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,25 @@ | |
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.Properties; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
|
||
import org.springframework.beans.factory.InitializingBean; | ||
import org.springframework.integration.core.MessageSelector; | ||
import org.springframework.integration.filter.ExpressionEvaluatingSelector; | ||
import org.springframework.jmx.export.annotation.ManagedAttribute; | ||
import org.springframework.jmx.export.annotation.ManagedOperation; | ||
import org.springframework.messaging.Message; | ||
import org.springframework.messaging.MessageChannel; | ||
import org.springframework.util.Assert; | ||
import org.springframework.util.StringUtils; | ||
|
||
|
||
/** | ||
* <pre class="code"> | ||
|
@@ -55,11 +67,12 @@ | |
* @author Mark Fisher | ||
* @author Oleg Zhurakousky | ||
* @author Artem Bilan | ||
* @author Liujiong | ||
*/ | ||
public class RecipientListRouter extends AbstractMessageRouter implements InitializingBean { | ||
|
||
private volatile List<Recipient> recipients; | ||
public class RecipientListRouter extends AbstractMessageRouter | ||
implements InitializingBean, RecipientListRouterManagement { | ||
|
||
private final ConcurrentLinkedQueue<Recipient> recipients = new ConcurrentLinkedQueue<Recipient>(); | ||
|
||
/** | ||
* Set the channels for this router. Either call this method or | ||
|
@@ -82,32 +95,140 @@ public void setChannels(List<MessageChannel> channels) { | |
*/ | ||
public void setRecipients(List<Recipient> recipients) { | ||
Assert.notEmpty(recipients, "recipients must not be empty"); | ||
this.recipients = recipients; | ||
ConcurrentLinkedQueue<Recipient> originalRecipients = this.recipients; | ||
this.recipients.clear(); | ||
this.recipients.addAll(recipients); | ||
if (logger.isDebugEnabled()) { | ||
logger.debug("Channel Recipients:" + originalRecipients | ||
+ " replaced with:" + this.recipients); | ||
} | ||
} | ||
|
||
/** | ||
* Set the recipients for this router. | ||
* @param recipientMappings, map contains channelName and expression | ||
*/ | ||
@Override | ||
public String getComponentType() { | ||
return "recipient-list-router"; | ||
@ManagedAttribute | ||
public void setRecipientMappings(Map<String, String> recipientMappings) { | ||
Assert.notEmpty(recipientMappings, "recipientMappings must not be empty"); | ||
ConcurrentLinkedQueue<Recipient> originalRecipients = this.recipients; | ||
this.recipients.clear(); | ||
for(Iterator<Entry<String, String>> it = recipientMappings.entrySet().iterator(); it.hasNext();) { | ||
Entry<String, String> next = it.next(); | ||
if(StringUtils.hasText(next.getValue())) { | ||
this.addRecipient(next.getKey(), next.getValue()); | ||
} | ||
else { | ||
this.addRecipient(next.getKey()); | ||
} | ||
} | ||
if (logger.isDebugEnabled()) { | ||
logger.debug("Channel Recipients:" + originalRecipients | ||
+ " replaced with:" + this.recipients); | ||
} | ||
} | ||
|
||
@Override | ||
public void onInit() throws Exception { | ||
Assert.notEmpty(this.recipients, "recipient list must not be empty"); | ||
super.onInit(); | ||
public String getComponentType() { | ||
return "recipient-list-router"; | ||
} | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blank line in the end of each class, even if it is an inner one |
||
@Override | ||
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { | ||
List<MessageChannel> channels = new ArrayList<MessageChannel>(); | ||
List<Recipient> recipientList = this.recipients; | ||
for (Recipient recipient : recipientList) { | ||
for (Recipient recipient : this.recipients) { | ||
if (recipient.accept(message)) { | ||
channels.add(recipient.getChannel()); | ||
} | ||
} | ||
return channels; | ||
} | ||
|
||
@Override | ||
@ManagedOperation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's true: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if they are new methods they should be before inner class. |
||
public void addRecipient(String channelName, String selectorExpression) { | ||
Assert.notNull(channelName, "channelName can't be null."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
Assert.hasText(channelName, "channelName can't be empty."); | ||
Assert.notNull(selectorExpression, "selectorExpression can't be null."); | ||
Assert.hasText(selectorExpression, "selectorExpression can't be empty."); | ||
MessageChannel channel = this.getBeanFactory().getBean(channelName, MessageChannel.class); | ||
ExpressionEvaluatingSelector expressionEvaluatingSelector = new ExpressionEvaluatingSelector(selectorExpression); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same for |
||
expressionEvaluatingSelector.setBeanFactory(this.getBeanFactory()); | ||
this.recipients.add(new Recipient(channel, expressionEvaluatingSelector)); | ||
} | ||
|
||
@Override | ||
@ManagedOperation | ||
public void addRecipient(String channelName) { | ||
Assert.notNull(channelName, "channelName can't be null."); | ||
MessageChannel channel = this.getBeanFactory().getBean(channelName, MessageChannel.class); | ||
this.recipients.add(new Recipient(channel)); | ||
} | ||
|
||
@Override | ||
@ManagedOperation | ||
public int removeRecipient(String channelName) { | ||
int counter = 0; | ||
MessageChannel channel = this.getBeanFactory().getBean(channelName, MessageChannel.class); | ||
for (Iterator<Recipient> it = this.recipients.iterator(); it.hasNext();) { | ||
if (it.next().getChannel() == channel) { | ||
it.remove(); | ||
counter++; | ||
} | ||
} | ||
return counter; | ||
} | ||
|
||
@Override | ||
@ManagedOperation | ||
public int removeRecipient(String channelName, String selectorExpression) { | ||
int counter = 0; | ||
MessageChannel targetChannel = this.getBeanFactory().getBean(channelName, MessageChannel.class); | ||
for (Iterator<Recipient> it = this.recipients.iterator();it.hasNext();) { | ||
Recipient next = it.next(); | ||
MessageSelector selector = next.getSelector(); | ||
MessageChannel channel = next.getChannel(); | ||
if(selector instanceof ExpressionEvaluatingSelector && | ||
channel == targetChannel && | ||
((ExpressionEvaluatingSelector) selector).getExpressionString().equals(selectorExpression)) { | ||
it.remove(); | ||
counter++; | ||
} | ||
} | ||
return counter; | ||
} | ||
|
||
@Override | ||
@ManagedAttribute | ||
public Collection<Recipient> getRecipients() { | ||
return Collections.unmodifiableCollection(this.recipients); | ||
} | ||
|
||
@Override | ||
@ManagedOperation | ||
public void replaceRecipients(Properties recipientMappings) { | ||
Assert.notNull(recipientMappings, "'recipientMappings' must not be null"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again |
||
Assert.notEmpty(recipientMappings, "'recipientMappings' must not be empty"); | ||
Set<String> keys = recipientMappings.stringPropertyNames(); | ||
ConcurrentLinkedQueue<Recipient> originalRecipients = this.recipients; | ||
this.recipients.clear(); | ||
for (String key : keys) { | ||
Assert.notNull(key, "channelName can't be null."); | ||
if(StringUtils.hasText(recipientMappings.getProperty(key))) { | ||
this.addRecipient(key, recipientMappings.getProperty(key)); | ||
} | ||
else { | ||
this.addRecipient(key); | ||
} | ||
} | ||
if (logger.isDebugEnabled()) { | ||
logger.debug("Channel Recipients:" + originalRecipients | ||
+ " replaced with:" + this.recipients); | ||
} | ||
} | ||
|
||
|
||
public static class Recipient { | ||
|
||
|
@@ -125,6 +246,9 @@ public Recipient(MessageChannel channel, MessageSelector selector) { | |
this.selector = selector; | ||
} | ||
|
||
private MessageSelector getSelector() { | ||
return selector; | ||
} | ||
|
||
public MessageChannel getChannel() { | ||
return this.channel; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright 2014 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.springframework.integration.router; | ||
|
||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
|
||
import org.springframework.integration.router.RecipientListRouter.Recipient; | ||
import org.springframework.jmx.export.annotation.ManagedAttribute; | ||
import org.springframework.jmx.export.annotation.ManagedOperation; | ||
import org.springframework.jmx.export.annotation.ManagedResource; | ||
|
||
/** | ||
* Exposes adding/removing individual recipients operations for | ||
* RecipientListRouter. This can be used with a control-bus and JMX. | ||
* | ||
* @author Liujiong | ||
* @since 4.1 | ||
* | ||
*/ | ||
@ManagedResource | ||
public interface RecipientListRouterManagement { | ||
|
||
/** | ||
* Add a recipient with channelName and expression. | ||
* @param channelName The channel name. | ||
* @param selectorExpression The expression to filter the incoming message. | ||
*/ | ||
@ManagedOperation | ||
void addRecipient(String channelName, String selectorExpression); | ||
|
||
/** | ||
* Add a recipient with channelName. | ||
* @param channelName The channel name. | ||
*/ | ||
@ManagedOperation | ||
void addRecipient(String channelName); | ||
|
||
/** | ||
* Remove all recipients that match the channelName. | ||
* @param channelName The channel name. | ||
*/ | ||
@ManagedOperation | ||
int removeRecipient(String channelName); | ||
|
||
/** | ||
* Remove all recipients that match the channelName and expression. | ||
* @param channelName The channel name. | ||
* @param selectorExpression The expression to filter the incoming message | ||
*/ | ||
@ManagedOperation | ||
int removeRecipient(String channelName, String selectorExpression); | ||
|
||
/** | ||
* @return an unmodifiable collection of recipients. | ||
*/ | ||
@ManagedAttribute | ||
Collection<Recipient> getRecipients(); | ||
|
||
/** | ||
* Replace recipient. | ||
* @param recipientMappings contain channelName and expression. | ||
*/ | ||
@ManagedOperation | ||
void replaceRecipients(Properties recipientMappings); | ||
|
||
/** | ||
* Set recipients. | ||
* @param recipientMappings contain channelName and expression. | ||
*/ | ||
@ManagedAttribute | ||
void setRecipientMappings(Map<String, String> recipientMappings); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2923,7 +2923,7 @@ | |
<xsd:extension base="commonRecipientListRouterType"> | ||
<xsd:sequence> | ||
<xsd:element ref="poller" minOccurs="0" maxOccurs="1"/> | ||
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="1" maxOccurs="unbounded"> | ||
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="0" maxOccurs="unbounded"> | ||
<xsd:annotation> | ||
<xsd:documentation> | ||
An expression to be evaluated to determine if this recipient | ||
|
@@ -2944,7 +2944,7 @@ | |
<xsd:complexContent> | ||
<xsd:extension base="commonRecipientListRouterType"> | ||
<xsd:sequence> | ||
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="1" maxOccurs="unbounded"> | ||
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="0" maxOccurs="unbounded"> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we revise it and move these duplications to the common type ? |
||
<xsd:annotation> | ||
<xsd:documentation> | ||
An expression to be evaluated to determine if this recipient | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you have this one, there is no more reason in the
equals()
andhashCode()