Skip to content

Commit 3d2b5a0

Browse files
odrotbohmmp911de
authored andcommitted
DATAES-470 - Fixed parsing of cluster nodes in TransportClientFactoryBean.
Extracted ClusterNodes value object to capture the parsing logic and actually properly test it. Added unit tests to verify the proper rejection and the two cases outlined in the ticket. Related tickets: DATAES-283.
1 parent 2497fcd commit 3d2b5a0

File tree

3 files changed

+204
-31
lines changed

3 files changed

+204
-31
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.client;
17+
18+
import java.net.InetAddress;
19+
import java.net.UnknownHostException;
20+
import java.util.Arrays;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.stream.Collectors;
24+
25+
import org.elasticsearch.common.transport.InetSocketTransportAddress;
26+
import org.elasticsearch.common.transport.TransportAddress;
27+
import org.springframework.data.util.Streamable;
28+
import org.springframework.util.Assert;
29+
import org.springframework.util.StringUtils;
30+
31+
/**
32+
* Value object to represent a list of cluster nodes.
33+
*
34+
* @author Oliver Gierke
35+
* @since 3.0.9
36+
*/
37+
class ClusterNodes implements Streamable<TransportAddress> {
38+
39+
public static ClusterNodes DEFAULT = ClusterNodes.of("127.0.0.1:9300");
40+
41+
private static final String COLON = ":";
42+
private static final String COMMA = ",";
43+
44+
private final List<TransportAddress> clusterNodes;
45+
46+
/**
47+
* Creates a new {@link ClusterNodes} by parsing the given source.
48+
*
49+
* @param source must not be {@literal null} or empty.
50+
*/
51+
private ClusterNodes(String source) {
52+
53+
Assert.hasText(source, "Cluster nodes source must not be null or empty!");
54+
55+
String[] nodes = StringUtils.delimitedListToStringArray(source, COMMA);
56+
57+
this.clusterNodes = Arrays.stream(nodes).map(node -> {
58+
59+
String[] segments = StringUtils.delimitedListToStringArray(node, COLON);
60+
61+
Assert.isTrue(segments.length == 2,
62+
() -> String.format("Invalid cluster node %s in %s! Must be in the format host:port!", node, source));
63+
64+
String host = segments[0].trim();
65+
String port = segments[1].trim();
66+
67+
Assert.hasText(host, () -> String.format("No host name given cluster node %s!", node));
68+
Assert.hasText(port, () -> String.format("No port given in cluster node %s!", node));
69+
70+
return new InetSocketTransportAddress(toInetAddress(host), Integer.valueOf(port));
71+
72+
}).collect(Collectors.toList());
73+
}
74+
75+
/**
76+
* Creates a new {@link ClusterNodes} by parsing the given source. The expected format is a comma separated list of
77+
* host-port-combinations separated by a colon: {@code host:port,host:port,…}.
78+
*
79+
* @param source must not be {@literal null} or empty.
80+
* @return
81+
*/
82+
public static ClusterNodes of(String source) {
83+
return new ClusterNodes(source);
84+
}
85+
86+
/*
87+
* (non-Javadoc)
88+
* @see java.lang.Iterable#iterator()
89+
*/
90+
@Override
91+
public Iterator<TransportAddress> iterator() {
92+
return clusterNodes.iterator();
93+
}
94+
95+
private static InetAddress toInetAddress(String host) {
96+
97+
try {
98+
return InetAddress.getByName(host);
99+
} catch (UnknownHostException o_O) {
100+
throw new IllegalArgumentException(o_O);
101+
}
102+
}
103+
}

src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.netty.util.ThreadDeathWatcher;
1919
import io.netty.util.concurrent.GlobalEventExecutor;
2020

21-
import java.net.InetAddress;
2221
import java.util.ArrayList;
2322
import java.util.Arrays;
2423
import java.util.Collection;
@@ -31,7 +30,6 @@
3130
import org.elasticsearch.common.SuppressForbidden;
3231
import org.elasticsearch.common.network.NetworkModule;
3332
import org.elasticsearch.common.settings.Settings;
34-
import org.elasticsearch.common.transport.InetSocketTransportAddress;
3533
import org.elasticsearch.index.reindex.ReindexPlugin;
3634
import org.elasticsearch.join.ParentJoinPlugin;
3735
import org.elasticsearch.percolator.PercolatorPlugin;
@@ -46,7 +44,6 @@
4644
import org.springframework.beans.factory.InitializingBean;
4745
import org.springframework.util.Assert;
4846
import org.springframework.util.ClassUtils;
49-
import org.springframework.util.StringUtils;
5047

5148
/**
5249
* TransportClientFactoryBean
@@ -55,21 +52,19 @@
5552
* @author Mohsin Husen
5653
* @author Jakub Vavrik
5754
* @author Piotr Betkier
55+
* @author Oliver Gierke
5856
*/
59-
6057
public class TransportClientFactoryBean implements FactoryBean<TransportClient>, InitializingBean, DisposableBean {
6158

6259
private static final Logger logger = LoggerFactory.getLogger(TransportClientFactoryBean.class);
63-
private String clusterNodes = "127.0.0.1:9300";
60+
private ClusterNodes clusterNodes = ClusterNodes.of("127.0.0.1:9300");
6461
private String clusterName = "elasticsearch";
6562
private Boolean clientTransportSniff = true;
6663
private Boolean clientIgnoreClusterName = Boolean.FALSE;
6764
private String clientPingTimeout = "5s";
6865
private String clientNodesSamplerInterval = "5s";
6966
private TransportClient client;
7067
private Properties properties;
71-
static final String COLON = ":";
72-
static final String COMMA = ",";
7368

7469
@Override
7570
public void destroy() throws Exception {
@@ -106,39 +101,26 @@ public void afterPropertiesSet() throws Exception {
106101
protected void buildClient() throws Exception {
107102

108103
client = new SpringDataTransportClient(settings());
109-
Assert.hasText(clusterNodes, "[Assertion failed] clusterNodes settings missing.");
110-
String[] clusterNodesArray = StringUtils.split(clusterNodes, COMMA);
111-
if (clusterNodesArray != null) {
112-
for (String clusterNode : clusterNodesArray) {
113-
if (clusterNode != null) {
114-
int colonPosition = clusterName.lastIndexOf(COLON);
115-
String hostName = colonPosition != -1 ? clusterNode.substring(0, colonPosition) : clusterNode;
116-
String port = colonPosition != -1 ? clusterNode.substring(colonPosition, clusterNode.length()) : "";
117-
Assert.hasText(hostName, "[Assertion failed] missing host name in 'clusterNodes'");
118-
Assert.hasText(port, "[Assertion failed] missing port in 'clusterNodes'");
119-
logger.info("adding transport node : " + clusterNode);
120-
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostName), Integer.valueOf(port)));
121-
}
122-
}
123-
}
104+
105+
clusterNodes.stream() //
106+
.peek(it -> logger.info("Adding transport node : " + it.toString())) //
107+
.forEach(client::addTransportAddress);
108+
124109
client.connectedNodes();
125110
}
126111

127112
private Settings settings() {
128113
if (properties != null) {
129114
return Settings.builder().put(properties).build();
130115
}
131-
return Settings.builder()
132-
.put("cluster.name", clusterName)
133-
.put("client.transport.sniff", clientTransportSniff)
116+
return Settings.builder().put("cluster.name", clusterName).put("client.transport.sniff", clientTransportSniff)
134117
.put("client.transport.ignore_cluster_name", clientIgnoreClusterName)
135118
.put("client.transport.ping_timeout", clientPingTimeout)
136-
.put("client.transport.nodes_sampler_interval", clientNodesSamplerInterval)
137-
.build();
119+
.put("client.transport.nodes_sampler_interval", clientNodesSamplerInterval).build();
138120
}
139121

140122
public void setClusterNodes(String clusterNodes) {
141-
this.clusterNodes = clusterNodes;
123+
this.clusterNodes = ClusterNodes.of(clusterNodes);
142124
}
143125

144126
public void setClusterName(String clusterName) {
@@ -176,7 +158,7 @@ public void setClientIgnoreClusterName(Boolean clientIgnoreClusterName) {
176158
public void setProperties(Properties properties) {
177159
this.properties = properties;
178160
}
179-
161+
180162
/**
181163
* Pretty exact copy of {@link PreBuiltTransportClient} except that we're inspecting the classpath for Netty
182164
* dependencies to only include the ones available. {@link PreBuiltTransportClient} expects both Netty 3 and Netty 4
@@ -236,8 +218,9 @@ private static void setSystemPropertyIfUnset(final String key, final String valu
236218
found = true;
237219
} catch (ClassNotFoundException | LinkageError e) {}
238220
}
239-
240-
Assert.state(found, "Neither Netty 3 or Netty 4 plugin found on the classpath. One of them is required to run the transport client!");
221+
222+
Assert.state(found,
223+
"Neither Netty 3 or Netty 4 plugin found on the classpath. One of them is required to run the transport client!");
241224

242225
plugins.add(ReindexPlugin.class);
243226
plugins.add(PercolatorPlugin.class);
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.client;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import org.junit.Test;
21+
22+
/**
23+
* Unit tests for {@link ClusterNodes}.
24+
*
25+
* @author Oliver Gierke
26+
*/
27+
public class ClusterNodesUnitTests {
28+
29+
@Test // DATAES-470
30+
public void parsesSingleClusterNode() {
31+
32+
ClusterNodes nodes = ClusterNodes.DEFAULT;
33+
34+
assertThat(nodes).hasSize(1) //
35+
.first().satisfies(it -> {
36+
assertThat(it.getAddress()).isEqualTo("127.0.0.1");
37+
assertThat(it.getPort()).isEqualTo(9300);
38+
});
39+
}
40+
41+
@Test // DATAES-470
42+
public void parsesMultiClusterNode() {
43+
44+
ClusterNodes nodes = ClusterNodes.of("127.0.0.1:1234,10.1.0.1:5678");
45+
46+
assertThat(nodes.stream()).hasSize(2); //
47+
assertThat(nodes.stream()).element(0).satisfies(it -> {
48+
assertThat(it.getAddress()).isEqualTo("127.0.0.1");
49+
assertThat(it.getPort()).isEqualTo(1234);
50+
});
51+
assertThat(nodes.stream()).element(1).satisfies(it -> {
52+
assertThat(it.getAddress()).isEqualTo("10.1.0.1");
53+
assertThat(it.getPort()).isEqualTo(5678);
54+
});
55+
}
56+
57+
@Test // DATAES-470
58+
public void rejectsEmptyHostName() {
59+
60+
assertThatExceptionOfType(IllegalArgumentException.class) //
61+
.isThrownBy(() -> ClusterNodes.of(":8080")) //
62+
.withMessageContaining("host");
63+
}
64+
65+
@Test // DATAES-470
66+
public void rejectsEmptyPort() {
67+
68+
assertThatExceptionOfType(IllegalArgumentException.class) //
69+
.isThrownBy(() -> ClusterNodes.of("localhost:")) //
70+
.withMessageContaining("port");
71+
}
72+
73+
@Test // DATAES-470
74+
public void rejectsMissingPort() {
75+
76+
assertThatExceptionOfType(IllegalArgumentException.class) //
77+
.isThrownBy(() -> ClusterNodes.of("localhost")) //
78+
.withMessageContaining("host:port");
79+
}
80+
81+
@Test // DATAES-470
82+
public void rejectsUnresolvableHost() {
83+
84+
assertThatExceptionOfType(IllegalArgumentException.class) //
85+
.isThrownBy(() -> ClusterNodes.of("mylocalhost:80"));
86+
}
87+
}

0 commit comments

Comments
 (0)