Skip to content

Commit d53f5e9

Browse files
author
Mark Kuhn
committed
replace retry queue with empty stream write
1 parent ff69a3f commit d53f5e9

File tree

2 files changed

+5
-17
lines changed

2 files changed

+5
-17
lines changed

src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ public class TCPClient implements SocketClient {
3030
private final Endpoint endpoint;
3131
private Socket socket;
3232
private boolean shouldConnect = true;
33-
private final LinkedList<String> retryQueue = new LinkedList<>();
34-
private String lastMessageSent;
3533

3634
public TCPClient(Endpoint endpoint) {
3735
this.endpoint = endpoint;
@@ -61,28 +59,17 @@ public synchronized void sendMessage(String message) {
6159
os = socket.getOutputStream();
6260
} catch (IOException e) {
6361
shouldConnect = true;
64-
retryQueue.add(message);
6562
throw new RuntimeException(
6663
"Failed to write message to the socket. Failed to open output stream.", e);
6764
}
6865

6966
try {
70-
while (!retryQueue.isEmpty()) {
71-
String retryMessage = retryQueue.peek();
72-
os.write(retryMessage.getBytes());
73-
retryQueue.pop();
74-
}
67+
// Write a space to the socket to verify connection before sending event
68+
os.write(32);
7569

7670
os.write(message.getBytes());
77-
lastMessageSent = message;
7871
} catch (IOException e) {
79-
// For broken pipe exception, retry last sent message
80-
if (e.getMessage().contains("Broken pipe")) {
81-
retryQueue.add(lastMessageSent);
82-
}
83-
8472
shouldConnect = true;
85-
retryQueue.add(message);
8673
throw new RuntimeException("Failed to write message to the socket.", e);
8774
}
8875
}

src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ protected Socket createSocket() {
4646

4747
String message = "Test message";
4848
client.sendMessage(message);
49+
client.close();
4950

50-
assertEquals(bos.toString(), message);
51+
assertEquals(message, bos.toString().trim());
5152
}
5253

5354
@Test(timeout = 5000)
@@ -63,6 +64,6 @@ public void testSendMessageWithSocketServer() throws IOException {
6364
socket.close();
6465
server.close();
6566

66-
assertEquals("Test message", message);
67+
assertEquals("Test message", message.trim());
6768
}
6869
}

0 commit comments

Comments
 (0)