Skip to content

Commit ff69a3f

Browse files
author
Mark Kuhn
committed
add retry queue
1 parent dd7bffa commit ff69a3f

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.OutputStream;
2121
import java.net.InetSocketAddress;
2222
import java.net.Socket;
23+
import java.util.LinkedList;
2324
import lombok.extern.slf4j.Slf4j;
2425

2526
/** A client that would connect to a TCP socket. */
@@ -29,6 +30,8 @@ public class TCPClient implements SocketClient {
2930
private final Endpoint endpoint;
3031
private Socket socket;
3132
private boolean shouldConnect = true;
33+
private final LinkedList<String> retryQueue = new LinkedList<>();
34+
private String lastMessageSent;
3235

3336
public TCPClient(Endpoint endpoint) {
3437
this.endpoint = endpoint;
@@ -51,27 +54,45 @@ protected Socket createSocket() {
5154

5255
@Override
5356
public synchronized void sendMessage(String message) {
54-
if (socket == null || socket.isClosed() || shouldConnect) {
55-
connect();
56-
}
57+
checkConnection();
5758

5859
OutputStream os;
5960
try {
6061
os = socket.getOutputStream();
6162
} catch (IOException e) {
6263
shouldConnect = true;
64+
retryQueue.add(message);
6365
throw new RuntimeException(
6466
"Failed to write message to the socket. Failed to open output stream.", e);
6567
}
6668

6769
try {
70+
while (!retryQueue.isEmpty()) {
71+
String retryMessage = retryQueue.peek();
72+
os.write(retryMessage.getBytes());
73+
retryQueue.pop();
74+
}
75+
6876
os.write(message.getBytes());
69-
} catch (Exception e) {
77+
lastMessageSent = message;
78+
} catch (IOException e) {
79+
// For broken pipe exception, retry last sent message
80+
if (e.getMessage().contains("Broken pipe")) {
81+
retryQueue.add(lastMessageSent);
82+
}
83+
7084
shouldConnect = true;
85+
retryQueue.add(message);
7186
throw new RuntimeException("Failed to write message to the socket.", e);
7287
}
7388
}
7489

90+
private void checkConnection() {
91+
if (socket == null || socket.isClosed() || shouldConnect) {
92+
connect();
93+
}
94+
}
95+
7596
@Override
7697
public void close() throws IOException {
7798
if (socket != null) {

src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.ByteArrayOutputStream;
2424
import java.io.IOException;
25+
import java.net.ServerSocket;
2526
import java.net.Socket;
2627
import org.junit.Test;
2728

@@ -48,4 +49,20 @@ protected Socket createSocket() {
4849

4950
assertEquals(bos.toString(), message);
5051
}
52+
53+
@Test(timeout = 5000)
54+
public void testSendMessageWithSocketServer() throws IOException {
55+
TCPClient client = new TCPClient(new Endpoint("0.0.0.0", 9999, Protocol.TCP));
56+
ServerSocket server = new ServerSocket(9999);
57+
client.sendMessage("Test message");
58+
Socket socket = server.accept();
59+
60+
byte[] bytes = new byte[1024];
61+
int read = socket.getInputStream().read(bytes);
62+
String message = new String(bytes, 0, read);
63+
socket.close();
64+
server.close();
65+
66+
assertEquals("Test message", message);
67+
}
5168
}

0 commit comments

Comments
 (0)