From 82baf9e7349838ae754db480a8fdfecff7f996cb Mon Sep 17 00:00:00 2001 From: Mark Kuhn Date: Tue, 2 Aug 2022 16:55:36 -0700 Subject: [PATCH 1/2] add retry queue --- .../cloudwatchlogs/emf/sinks/TCPClient.java | 29 ++++++++++++++++--- .../emf/sinks/TCPClientTest.java | 17 +++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java index 3ae7f43a..39ce7cb9 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java @@ -20,6 +20,7 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.LinkedList; import lombok.extern.slf4j.Slf4j; /** A client that would connect to a TCP socket. */ @@ -29,6 +30,8 @@ public class TCPClient implements SocketClient { private final Endpoint endpoint; private Socket socket; private boolean shouldConnect = true; + private final LinkedList retryQueue = new LinkedList<>(); + private String lastMessageSent; public TCPClient(Endpoint endpoint) { this.endpoint = endpoint; @@ -51,27 +54,45 @@ protected Socket createSocket() { @Override public synchronized void sendMessage(String message) { - if (socket == null || socket.isClosed() || shouldConnect) { - connect(); - } + checkConnection(); OutputStream os; try { os = socket.getOutputStream(); } catch (IOException e) { shouldConnect = true; + retryQueue.add(message); throw new RuntimeException( "Failed to write message to the socket. Failed to open output stream.", e); } try { + while (!retryQueue.isEmpty()) { + String retryMessage = retryQueue.peek(); + os.write(retryMessage.getBytes()); + retryQueue.pop(); + } + os.write(message.getBytes()); - } catch (Exception e) { + lastMessageSent = message; + } catch (IOException e) { + // For broken pipe exception, retry last sent message + if (e.getMessage().contains("Broken pipe")) { + retryQueue.add(lastMessageSent); + } + shouldConnect = true; + retryQueue.add(message); throw new RuntimeException("Failed to write message to the socket.", e); } } + private void checkConnection() { + if (socket == null || socket.isClosed() || shouldConnect) { + connect(); + } + } + @Override public void close() throws IOException { if (socket != null) { diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java index ccbf4a44..a7f32d9f 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.ServerSocket; import java.net.Socket; import org.junit.Test; @@ -48,4 +49,20 @@ protected Socket createSocket() { assertEquals(bos.toString(), message); } + + @Test(timeout = 5000) + public void testSendMessageWithSocketServer() throws IOException { + TCPClient client = new TCPClient(new Endpoint("0.0.0.0", 9999, Protocol.TCP)); + ServerSocket server = new ServerSocket(9999); + client.sendMessage("Test message"); + Socket socket = server.accept(); + + byte[] bytes = new byte[1024]; + int read = socket.getInputStream().read(bytes); + String message = new String(bytes, 0, read); + socket.close(); + server.close(); + + assertEquals("Test message", message); + } } From 48d2ff1248e0134fba6017995d0dbbf819742503 Mon Sep 17 00:00:00 2001 From: Mark Kuhn Date: Thu, 4 Aug 2022 16:21:53 -0700 Subject: [PATCH 2/2] replace retry queue with empty stream write --- .../cloudwatchlogs/emf/sinks/TCPClient.java | 18 ++---------------- .../emf/sinks/TCPClientTest.java | 5 +++-- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java index 39ce7cb9..1805f551 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java @@ -20,7 +20,6 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.LinkedList; import lombok.extern.slf4j.Slf4j; /** A client that would connect to a TCP socket. */ @@ -30,8 +29,6 @@ public class TCPClient implements SocketClient { private final Endpoint endpoint; private Socket socket; private boolean shouldConnect = true; - private final LinkedList retryQueue = new LinkedList<>(); - private String lastMessageSent; public TCPClient(Endpoint endpoint) { this.endpoint = endpoint; @@ -61,28 +58,17 @@ public synchronized void sendMessage(String message) { os = socket.getOutputStream(); } catch (IOException e) { shouldConnect = true; - retryQueue.add(message); throw new RuntimeException( "Failed to write message to the socket. Failed to open output stream.", e); } try { - while (!retryQueue.isEmpty()) { - String retryMessage = retryQueue.peek(); - os.write(retryMessage.getBytes()); - retryQueue.pop(); - } + // Write a space to the socket to verify connection before sending event + os.write(32); os.write(message.getBytes()); - lastMessageSent = message; } catch (IOException e) { - // For broken pipe exception, retry last sent message - if (e.getMessage().contains("Broken pipe")) { - retryQueue.add(lastMessageSent); - } - shouldConnect = true; - retryQueue.add(message); throw new RuntimeException("Failed to write message to the socket.", e); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java index a7f32d9f..e22048cc 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java @@ -46,8 +46,9 @@ protected Socket createSocket() { String message = "Test message"; client.sendMessage(message); + client.close(); - assertEquals(bos.toString(), message); + assertEquals(message, bos.toString().trim()); } @Test(timeout = 5000) @@ -63,6 +64,6 @@ public void testSendMessageWithSocketServer() throws IOException { socket.close(); server.close(); - assertEquals("Test message", message); + assertEquals("Test message", message.trim()); } }