Skip to content

INT-2856:Add support for adding/removing individual recipients to the Re... #1215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
import org.springframework.integration.router.RecipientListRouter;
import org.springframework.integration.router.RecipientListRouter.Recipient;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;

Expand All @@ -47,8 +46,6 @@ public class RecipientListRouterParser extends AbstractRouterParser {
protected BeanDefinition doParseRouter(Element element, ParserContext parserContext) {
BeanDefinitionBuilder recipientListRouterBuilder = BeanDefinitionBuilder.genericBeanDefinition(RecipientListRouter.class);
List<Element> childElements = DomUtils.getChildElementsByTagName(element, "recipient");
Assert.notEmpty(childElements,
"At least one recipient channel must be defined (e.g., <recipient channel=\"channel1\"/>).");
ManagedList recipientList = new ManagedList();
for (Element childElement : childElements) {
BeanDefinitionBuilder recipientBuilder = BeanDefinitionBuilder.genericBeanDefinition(Recipient.class);
Expand All @@ -62,7 +59,9 @@ protected BeanDefinition doParseRouter(Element element, ParserContext parserCont
}
recipientList.add(recipientBuilder.getBeanDefinition());
}
recipientListRouterBuilder.addPropertyValue("recipients", recipientList);
if(recipientList.size() > 0) {
recipientListRouterBuilder.addPropertyValue("recipients", recipientList);
}
return recipientListRouterBuilder.getBeanDefinition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,34 @@
/**
* A {@link MessageSelector} implementation that evaluates a SpEL expression.
* The evaluation result of the expression must be a boolean value.
*
*
* @author Mark Fisher
* @author Liujiong
* @since 2.0
*/
public class ExpressionEvaluatingSelector extends AbstractMessageProcessingSelector {

private static final ExpressionParser expressionParser = new SpelExpressionParser(new SpelParserConfiguration(true, true));

private final String expressionString;

public ExpressionEvaluatingSelector(String expressionString) {
super(new ExpressionEvaluatingMessageProcessor<Boolean>(expressionParser.parseExpression(expressionString), Boolean.class));
this.expressionString = expressionString;
}

public ExpressionEvaluatingSelector(Expression expression) {
super(new ExpressionEvaluatingMessageProcessor<Boolean>(expression, Boolean.class));
this.expressionString = expression.getExpressionString();
}

public String getExpressionString() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have this one, there is no more reason in the equals() and hashCode()

return expressionString;
}

@Override
public String toString() {
return "ExpressionEvaluatingSelector for: [" + this.expressionString + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,25 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;


/**
* <pre class="code">
Expand Down Expand Up @@ -55,11 +67,12 @@
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Artem Bilan
* @author Liujiong
*/
public class RecipientListRouter extends AbstractMessageRouter implements InitializingBean {

private volatile List<Recipient> recipients;
public class RecipientListRouter extends AbstractMessageRouter
implements InitializingBean, RecipientListRouterManagement {

private final ConcurrentLinkedQueue<Recipient> recipients = new ConcurrentLinkedQueue<Recipient>();

/**
* Set the channels for this router. Either call this method or
Expand All @@ -82,32 +95,140 @@ public void setChannels(List<MessageChannel> channels) {
*/
public void setRecipients(List<Recipient> recipients) {
Assert.notEmpty(recipients, "recipients must not be empty");
this.recipients = recipients;
ConcurrentLinkedQueue<Recipient> originalRecipients = this.recipients;
this.recipients.clear();
this.recipients.addAll(recipients);
if (logger.isDebugEnabled()) {
logger.debug("Channel Recipients:" + originalRecipients
+ " replaced with:" + this.recipients);
}
}

/**
* Set the recipients for this router.
* @param recipientMappings, map contains channelName and expression
*/
@Override
public String getComponentType() {
return "recipient-list-router";
@ManagedAttribute
public void setRecipientMappings(Map<String, String> recipientMappings) {
Assert.notEmpty(recipientMappings, "recipientMappings must not be empty");
ConcurrentLinkedQueue<Recipient> originalRecipients = this.recipients;
this.recipients.clear();
for(Iterator<Entry<String, String>> it = recipientMappings.entrySet().iterator(); it.hasNext();) {
Entry<String, String> next = it.next();
if(StringUtils.hasText(next.getValue())) {
this.addRecipient(next.getKey(), next.getValue());
}
else {
this.addRecipient(next.getKey());
}
}
if (logger.isDebugEnabled()) {
logger.debug("Channel Recipients:" + originalRecipients
+ " replaced with:" + this.recipients);
}
}

@Override
public void onInit() throws Exception {
Assert.notEmpty(this.recipients, "recipient list must not be empty");
super.onInit();
public String getComponentType() {
return "recipient-list-router";
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blank line in the end of each class, even if it is an inner one

@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
List<MessageChannel> channels = new ArrayList<MessageChannel>();
List<Recipient> recipientList = this.recipients;
for (Recipient recipient : recipientList) {
for (Recipient recipient : this.recipients) {
if (recipient.accept(message)) {
channels.add(recipient.getChannel());
}
}
return channels;
}

@Override
@ManagedOperation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true: @ManagedOperation isn't @Inherited, but be careful with that option. I mean try do not miss it, when you are going to copy/paste annotation from superclass

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if they are new methods they should be before inner class.
Please, read the code style doc: https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Code-Style

public void addRecipient(String channelName, String selectorExpression) {
Assert.notNull(channelName, "channelName can't be null.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert.hasText(channelName, "channelName can't be empty.");. Because an empty channelName is bad too

Assert.hasText(channelName, "channelName can't be empty.");
Assert.notNull(selectorExpression, "selectorExpression can't be null.");
Assert.hasText(selectorExpression, "selectorExpression can't be empty.");
MessageChannel channel = this.getBeanFactory().getBean(channelName, MessageChannel.class);
ExpressionEvaluatingSelector expressionEvaluatingSelector = new ExpressionEvaluatingSelector(selectorExpression);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same for selectorExpression: it isn't good that we try to create an Expression for an empty String

expressionEvaluatingSelector.setBeanFactory(this.getBeanFactory());
this.recipients.add(new Recipient(channel, expressionEvaluatingSelector));
}

@Override
@ManagedOperation
public void addRecipient(String channelName) {
Assert.notNull(channelName, "channelName can't be null.");
MessageChannel channel = this.getBeanFactory().getBean(channelName, MessageChannel.class);
this.recipients.add(new Recipient(channel));
}

@Override
@ManagedOperation
public int removeRecipient(String channelName) {
int counter = 0;
MessageChannel channel = this.getBeanFactory().getBean(channelName, MessageChannel.class);
for (Iterator<Recipient> it = this.recipients.iterator(); it.hasNext();) {
if (it.next().getChannel() == channel) {
it.remove();
counter++;
}
}
return counter;
}

@Override
@ManagedOperation
public int removeRecipient(String channelName, String selectorExpression) {
int counter = 0;
MessageChannel targetChannel = this.getBeanFactory().getBean(channelName, MessageChannel.class);
for (Iterator<Recipient> it = this.recipients.iterator();it.hasNext();) {
Recipient next = it.next();
MessageSelector selector = next.getSelector();
MessageChannel channel = next.getChannel();
if(selector instanceof ExpressionEvaluatingSelector &&
channel == targetChannel &&
((ExpressionEvaluatingSelector) selector).getExpressionString().equals(selectorExpression)) {
it.remove();
counter++;
}
}
return counter;
}

@Override
@ManagedAttribute
public Collection<Recipient> getRecipients() {
return Collections.unmodifiableCollection(this.recipients);
}

@Override
@ManagedOperation
public void replaceRecipients(Properties recipientMappings) {
Assert.notNull(recipientMappings, "'recipientMappings' must not be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again Assert.notEmpty, because there is no reason to allow to do something for nothing

Assert.notEmpty(recipientMappings, "'recipientMappings' must not be empty");
Set<String> keys = recipientMappings.stringPropertyNames();
ConcurrentLinkedQueue<Recipient> originalRecipients = this.recipients;
this.recipients.clear();
for (String key : keys) {
Assert.notNull(key, "channelName can't be null.");
if(StringUtils.hasText(recipientMappings.getProperty(key))) {
this.addRecipient(key, recipientMappings.getProperty(key));
}
else {
this.addRecipient(key);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Channel Recipients:" + originalRecipients
+ " replaced with:" + this.recipients);
}
}


public static class Recipient {

Expand All @@ -125,6 +246,9 @@ public Recipient(MessageChannel channel, MessageSelector selector) {
this.selector = selector;
}

private MessageSelector getSelector() {
return selector;
}

public MessageChannel getChannel() {
return this.channel;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.router;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;

import org.springframework.integration.router.RecipientListRouter.Recipient;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;

/**
* Exposes adding/removing individual recipients operations for
* RecipientListRouter. This can be used with a control-bus and JMX.
*
* @author Liujiong
* @since 4.1
*
*/
@ManagedResource
public interface RecipientListRouterManagement {

/**
* Add a recipient with channelName and expression.
* @param channelName The channel name.
* @param selectorExpression The expression to filter the incoming message.
*/
@ManagedOperation
void addRecipient(String channelName, String selectorExpression);

/**
* Add a recipient with channelName.
* @param channelName The channel name.
*/
@ManagedOperation
void addRecipient(String channelName);

/**
* Remove all recipients that match the channelName.
* @param channelName The channel name.
*/
@ManagedOperation
int removeRecipient(String channelName);

/**
* Remove all recipients that match the channelName and expression.
* @param channelName The channel name.
* @param selectorExpression The expression to filter the incoming message
*/
@ManagedOperation
int removeRecipient(String channelName, String selectorExpression);

/**
* @return an unmodifiable collection of recipients.
*/
@ManagedAttribute
Collection<Recipient> getRecipients();

/**
* Replace recipient.
* @param recipientMappings contain channelName and expression.
*/
@ManagedOperation
void replaceRecipients(Properties recipientMappings);

/**
* Set recipients.
* @param recipientMappings contain channelName and expression.
*/
@ManagedAttribute
void setRecipientMappings(Map<String, String> recipientMappings);

}
Original file line number Diff line number Diff line change
Expand Up @@ -2923,7 +2923,7 @@
<xsd:extension base="commonRecipientListRouterType">
<xsd:sequence>
<xsd:element ref="poller" minOccurs="0" maxOccurs="1"/>
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="1" maxOccurs="unbounded">
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="0" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
An expression to be evaluated to determine if this recipient
Expand All @@ -2944,7 +2944,7 @@
<xsd:complexContent>
<xsd:extension base="commonRecipientListRouterType">
<xsd:sequence>
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="1" maxOccurs="unbounded">
<xsd:element name="recipient" type="recipientSelectorExpressionChannelType" minOccurs="0" maxOccurs="unbounded">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we revise it and move these duplications to the common type ?

<xsd:annotation>
<xsd:documentation>
An expression to be evaluated to determine if this recipient
Expand Down
Loading