Skip to content

Commit e5bf1ac

Browse files
committed
Add support for multiple hosts configuration
- Allow to use Mono for user and password - Add multiple hosts connection strategy - Add HA protocol support for multiple hosts - Add DNS SRV driver for HA protocol
1 parent 52bc71f commit e5bf1ac

31 files changed

+2416
-760
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql;
18+
19+
import io.asyncer.r2dbc.mysql.client.Client;
20+
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
21+
import io.netty.channel.ChannelOption;
22+
import io.netty.resolver.AddressResolver;
23+
import io.netty.resolver.AddressResolverGroup;
24+
import io.netty.resolver.DefaultNameResolver;
25+
import io.netty.resolver.RoundRobinInetAddressResolver;
26+
import io.netty.util.concurrent.EventExecutor;
27+
import io.netty.util.internal.logging.InternalLogger;
28+
import io.netty.util.internal.logging.InternalLoggerFactory;
29+
import reactor.core.publisher.Mono;
30+
import reactor.netty.resources.LoopResources;
31+
import reactor.netty.tcp.TcpClient;
32+
33+
import java.net.InetSocketAddress;
34+
import java.time.Duration;
35+
import java.time.ZoneId;
36+
37+
/**
38+
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
39+
*
40+
* @since 1.2.0
41+
*/
42+
@FunctionalInterface
43+
interface ConnectionStrategy {
44+
45+
InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionStrategy.class);
46+
47+
/**
48+
* Establish a connection to a target server that is determined by this connection strategy.
49+
*
50+
* @return a logged-in {@link Client} object.
51+
*/
52+
Mono<Client> connect();
53+
54+
/**
55+
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
56+
* <p>
57+
* Note: Unix Domain Socket also uses this method to create a general-purpose {@link TcpClient client}.
58+
*
59+
* @param configuration socket client configuration.
60+
* @return a general-purpose {@link TcpClient client}.
61+
*/
62+
static TcpClient createTcpClient(SocketClientConfiguration configuration, boolean balancedDns) {
63+
LoopResources loopResources = configuration.getLoopResources();
64+
Duration connectTimeout = configuration.getConnectTimeout();
65+
TcpClient client = TcpClient.newConnection();
66+
67+
if (loopResources != null) {
68+
client = client.runOn(loopResources);
69+
}
70+
71+
if (connectTimeout != null) {
72+
client = client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
73+
}
74+
75+
if (balancedDns) {
76+
client = client.resolver(BalancedResolverGroup.INSTANCE);
77+
}
78+
79+
return client;
80+
}
81+
82+
/**
83+
* Logins to a MySQL server with the given {@link TcpClient}, {@link Credential} and configurations.
84+
*
85+
* @param tcpClient a TCP client to connect to a MySQL server.
86+
* @param credential user and password to log in to a MySQL server.
87+
* @param configuration a configuration that affects login behavior.
88+
* @return a logged-in {@link Client} object.
89+
*/
90+
static Mono<Client> connectWithInit(
91+
TcpClient tcpClient,
92+
Credential credential,
93+
MySqlConnectionConfiguration configuration
94+
) {
95+
return Mono.fromSupplier(() -> {
96+
String timeZone = configuration.getConnectionTimeZone();
97+
ZoneId connectionTimeZone;
98+
if ("LOCAL".equalsIgnoreCase(timeZone)) {
99+
connectionTimeZone = ZoneId.systemDefault().normalized();
100+
} else if ("SERVER".equalsIgnoreCase(timeZone)) {
101+
connectionTimeZone = null;
102+
} else {
103+
connectionTimeZone = StringUtils.parseZoneId(timeZone);
104+
}
105+
106+
return new ConnectionContext(
107+
configuration.getZeroDateOption(),
108+
configuration.getLoadLocalInfilePath(),
109+
configuration.getLocalInfileBufferSize(),
110+
configuration.isPreserveInstants(),
111+
connectionTimeZone
112+
);
113+
}).flatMap(context -> Client.connect(tcpClient, configuration.getSsl(), context)).flatMap(client -> {
114+
// Lazy init database after handshake/login
115+
MySqlSslConfiguration ssl = configuration.getSsl();
116+
String loginDb = configuration.isCreateDatabaseIfNotExist() ? "" : configuration.getDatabase();
117+
118+
return InitFlow.initHandshake(
119+
client,
120+
ssl.getSslMode(),
121+
loginDb,
122+
credential.getUser(),
123+
credential.getPassword(),
124+
configuration.getCompressionAlgorithms(),
125+
configuration.getZstdCompressionLevel()
126+
).then(Mono.just(client)).onErrorResume(e -> client.forceClose().then(Mono.error(e)));
127+
});
128+
}
129+
}
130+
131+
/**
132+
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
133+
*
134+
* @since 1.2.0
135+
*/
136+
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {
137+
138+
BalancedResolverGroup() {
139+
}
140+
141+
public static final BalancedResolverGroup INSTANCE;
142+
143+
static {
144+
INSTANCE = new BalancedResolverGroup();
145+
Runtime.getRuntime().addShutdownHook(new Thread(
146+
INSTANCE::close,
147+
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
148+
));
149+
}
150+
151+
@Override
152+
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
153+
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
154+
}
155+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql;
18+
19+
import org.jetbrains.annotations.Nullable;
20+
21+
import java.util.Objects;
22+
23+
/**
24+
* A value object representing a user with an optional password.
25+
*/
26+
final class Credential {
27+
28+
private final String user;
29+
30+
@Nullable
31+
private final CharSequence password;
32+
33+
Credential(String user, @Nullable CharSequence password) {
34+
this.user = user;
35+
this.password = password;
36+
}
37+
38+
String getUser() {
39+
return user;
40+
}
41+
42+
@Nullable
43+
CharSequence getPassword() {
44+
return password;
45+
}
46+
47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) {
50+
return true;
51+
}
52+
if (!(o instanceof Credential)) {
53+
return false;
54+
}
55+
56+
Credential that = (Credential) o;
57+
58+
return user.equals(that.user) && Objects.equals(password, that.password);
59+
}
60+
61+
@Override
62+
public int hashCode() {
63+
return 31 * user.hashCode() + Objects.hashCode(password);
64+
}
65+
66+
@Override
67+
public String toString() {
68+
return "Credential{user=" + user + ", password=REDACTED}";
69+
}
70+
}

0 commit comments

Comments
 (0)