18
18
19
19
import java .util .ArrayList ;
20
20
import java .util .Collection ;
21
+ import java .util .Collections ;
22
+ import java .util .Iterator ;
21
23
import java .util .List ;
24
+ import java .util .Map ;
25
+ import java .util .Map .Entry ;
26
+ import java .util .Properties ;
27
+ import java .util .Set ;
28
+ import java .util .concurrent .ConcurrentLinkedQueue ;
22
29
23
30
import org .springframework .beans .factory .InitializingBean ;
24
31
import org .springframework .integration .core .MessageSelector ;
32
+ import org .springframework .integration .filter .ExpressionEvaluatingSelector ;
33
+ import org .springframework .jmx .export .annotation .ManagedAttribute ;
34
+ import org .springframework .jmx .export .annotation .ManagedOperation ;
25
35
import org .springframework .messaging .Message ;
26
36
import org .springframework .messaging .MessageChannel ;
27
37
import org .springframework .util .Assert ;
38
+ import org .springframework .util .StringUtils ;
28
39
29
40
/**
30
41
* <pre class="code">
55
66
* @author Mark Fisher
56
67
* @author Oleg Zhurakousky
57
68
* @author Artem Bilan
69
+ * @author Liujiong
58
70
*/
59
- public class RecipientListRouter extends AbstractMessageRouter implements InitializingBean {
60
-
61
- private volatile List <Recipient > recipients ;
71
+ public class RecipientListRouter extends AbstractMessageRouter
72
+ implements InitializingBean , RecipientListRouterManagement {
62
73
74
+ private final ConcurrentLinkedQueue <Recipient > recipients = new ConcurrentLinkedQueue <Recipient >();
63
75
64
76
/**
65
77
* Set the channels for this router. Either call this method or
@@ -82,32 +94,134 @@ public void setChannels(List<MessageChannel> channels) {
82
94
*/
83
95
public void setRecipients (List <Recipient > recipients ) {
84
96
Assert .notEmpty (recipients , "recipients must not be empty" );
85
- this .recipients = recipients ;
97
+ ConcurrentLinkedQueue <Recipient > originalRecipients = this .recipients ;
98
+ this .recipients .clear ();
99
+ this .recipients .addAll (recipients );
100
+ if (logger .isDebugEnabled ()) {
101
+ logger .debug ("Channel Recipients:" + originalRecipients + " replaced with:" + this .recipients );
102
+ }
86
103
}
87
104
105
+ /**
106
+ * Set the recipients for this router.
107
+ * @param recipientMappings, map contains channelName and expression
108
+ */
88
109
@ Override
89
- public String getComponentType () {
90
- return "recipient-list-router" ;
110
+ @ ManagedAttribute
111
+ public void setRecipientMappings (Map <String , String > recipientMappings ) {
112
+ Assert .notEmpty (recipientMappings , "recipientMappings must not be empty" );
113
+ Assert .noNullElements (recipientMappings .keySet ().toArray (), "'recipientMappings' cannot have null keys." );
114
+ ConcurrentLinkedQueue <Recipient > originalRecipients = this .recipients ;
115
+ this .recipients .clear ();
116
+ for (Entry <String , String > next : recipientMappings .entrySet ()) {
117
+ if (StringUtils .hasText (next .getValue ())) {
118
+ this .addRecipient (next .getKey (), next .getValue ());
119
+ }
120
+ else {
121
+ this .addRecipient (next .getKey ());
122
+ }
123
+ }
124
+ if (logger .isDebugEnabled ()) {
125
+ logger .debug ("Channel Recipients:" + originalRecipients + " replaced with:" + this .recipients );
126
+ }
91
127
}
92
128
93
129
@ Override
94
- public void onInit () throws Exception {
95
- Assert .notEmpty (this .recipients , "recipient list must not be empty" );
96
- super .onInit ();
130
+ public String getComponentType () {
131
+ return "recipient-list-router" ;
97
132
}
98
133
134
+
99
135
@ Override
100
136
protected Collection <MessageChannel > determineTargetChannels (Message <?> message ) {
101
137
List <MessageChannel > channels = new ArrayList <MessageChannel >();
102
- List <Recipient > recipientList = this .recipients ;
103
- for (Recipient recipient : recipientList ) {
138
+ for (Recipient recipient : this .recipients ) {
104
139
if (recipient .accept (message )) {
105
140
channels .add (recipient .getChannel ());
106
141
}
107
142
}
108
143
return channels ;
109
144
}
110
145
146
+ @ Override
147
+ @ ManagedOperation
148
+ public void addRecipient (String channelName , String selectorExpression ) {
149
+ Assert .hasText (channelName , "'channelName' must not be empty." );
150
+ Assert .hasText (selectorExpression , "'selectorExpression' must not be empty." );
151
+ MessageChannel channel = this .getBeanFactory ().getBean (channelName , MessageChannel .class );
152
+ ExpressionEvaluatingSelector expressionEvaluatingSelector = new ExpressionEvaluatingSelector (selectorExpression );
153
+ expressionEvaluatingSelector .setBeanFactory (this .getBeanFactory ());
154
+ this .recipients .add (new Recipient (channel , expressionEvaluatingSelector ));
155
+ }
156
+
157
+ @ Override
158
+ @ ManagedOperation
159
+ public void addRecipient (String channelName ) {
160
+ Assert .hasText (channelName , "'channelName' must not be empty." );
161
+ MessageChannel channel = this .getBeanFactory ().getBean (channelName , MessageChannel .class );
162
+ this .recipients .add (new Recipient (channel ));
163
+ }
164
+
165
+ @ Override
166
+ @ ManagedOperation
167
+ public int removeRecipient (String channelName ) {
168
+ int counter = 0 ;
169
+ MessageChannel channel = this .getBeanFactory ().getBean (channelName , MessageChannel .class );
170
+ for (Iterator <Recipient > it = this .recipients .iterator (); it .hasNext (); ) {
171
+ if (it .next ().getChannel () == channel ) {
172
+ it .remove ();
173
+ counter ++;
174
+ }
175
+ }
176
+ return counter ;
177
+ }
178
+
179
+ @ Override
180
+ @ ManagedOperation
181
+ public int removeRecipient (String channelName , String selectorExpression ) {
182
+ int counter = 0 ;
183
+ MessageChannel targetChannel = this .getBeanFactory ().getBean (channelName , MessageChannel .class );
184
+ for (Iterator <Recipient > it = this .recipients .iterator (); it .hasNext (); ) {
185
+ Recipient next = it .next ();
186
+ MessageSelector selector = next .getSelector ();
187
+ MessageChannel channel = next .getChannel ();
188
+ if (selector instanceof ExpressionEvaluatingSelector &&
189
+ channel == targetChannel &&
190
+ ((ExpressionEvaluatingSelector ) selector ).getExpressionString ().equals (selectorExpression )) {
191
+ it .remove ();
192
+ counter ++;
193
+ }
194
+ }
195
+ return counter ;
196
+ }
197
+
198
+ @ Override
199
+ @ ManagedAttribute
200
+ public Collection <Recipient > getRecipients () {
201
+ return Collections .unmodifiableCollection (this .recipients );
202
+ }
203
+
204
+ @ Override
205
+ @ ManagedOperation
206
+ public void replaceRecipients (Properties recipientMappings ) {
207
+ Assert .notEmpty (recipientMappings , "'recipientMappings' must not be empty" );
208
+ Set <String > keys = recipientMappings .stringPropertyNames ();
209
+ ConcurrentLinkedQueue <Recipient > originalRecipients = this .recipients ;
210
+ this .recipients .clear ();
211
+ for (String key : keys ) {
212
+ Assert .notNull (key , "channelName can't be null." );
213
+ if (StringUtils .hasText (recipientMappings .getProperty (key ))) {
214
+ this .addRecipient (key , recipientMappings .getProperty (key ));
215
+ }
216
+ else {
217
+ this .addRecipient (key );
218
+ }
219
+ }
220
+ if (logger .isDebugEnabled ()) {
221
+ logger .debug ("Channel Recipients:" + originalRecipients + " replaced with:" + this .recipients );
222
+ }
223
+ }
224
+
111
225
112
226
public static class Recipient {
113
227
@@ -125,6 +239,9 @@ public Recipient(MessageChannel channel, MessageSelector selector) {
125
239
this .selector = selector ;
126
240
}
127
241
242
+ private MessageSelector getSelector () {
243
+ return selector ;
244
+ }
128
245
129
246
public MessageChannel getChannel () {
130
247
return this .channel ;
@@ -133,6 +250,7 @@ public MessageChannel getChannel() {
133
250
public boolean accept (Message <?> message ) {
134
251
return (this .selector == null || this .selector .accept (message ));
135
252
}
253
+
136
254
}
137
255
138
256
}
0 commit comments