diff --git a/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/BodyCaptureAsyncListener.java b/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/BodyCaptureAsyncListener.java index d9b108277..e92a6a2d2 100644 --- a/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/BodyCaptureAsyncListener.java +++ b/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/BodyCaptureAsyncListener.java @@ -18,18 +18,24 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import java.io.BufferedReader; import java.io.PrintWriter; import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; +import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; +import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.hypertrace.agent.config.Config.AgentConfig; import org.hypertrace.agent.core.config.HypertraceConfig; import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream; import org.hypertrace.agent.core.instrumentation.buffer.BoundedCharArrayWriter; +import org.hypertrace.agent.core.instrumentation.buffer.ByteBufferSpanPair; +import org.hypertrace.agent.core.instrumentation.buffer.CharBufferSpanPair; import org.hypertrace.agent.core.instrumentation.utils.ContentTypeUtils; public class BodyCaptureAsyncListener implements AsyncListener { @@ -39,30 +45,39 @@ public class BodyCaptureAsyncListener implements AsyncListener { private final ContextStore streamContextStore; private final ContextStore writerContextStore; + private final ContextStore inputStreamContext; + private final ContextStore readerContext; + private final AgentConfig agentConfig = HypertraceConfig.get(); public BodyCaptureAsyncListener( AtomicBoolean responseHandled, Span span, ContextStore streamContextStore, - ContextStore writerContextStore) { + ContextStore writerContextStore, + ContextStore inputStreamContext, + ContextStore readerContext) { this.responseHandled = responseHandled; this.span = span; this.streamContextStore = streamContextStore; this.writerContextStore = writerContextStore; + this.inputStreamContext = inputStreamContext; + this.readerContext = readerContext; } @Override public void onComplete(AsyncEvent event) { if (responseHandled.compareAndSet(false, true)) { - captureResponseData(event.getSuppliedResponse()); + captureResponseDataAndClearRequestBuffer( + event.getSuppliedResponse(), event.getSuppliedRequest()); } } @Override public void onError(AsyncEvent event) { if (responseHandled.compareAndSet(false, true)) { - captureResponseData(event.getSuppliedResponse()); + captureResponseDataAndClearRequestBuffer( + event.getSuppliedResponse(), event.getSuppliedRequest()); } } @@ -72,7 +87,8 @@ public void onTimeout(AsyncEvent event) {} @Override public void onStartAsync(AsyncEvent event) {} - private void captureResponseData(ServletResponse servletResponse) { + private void captureResponseDataAndClearRequestBuffer( + ServletResponse servletResponse, ServletRequest servletRequest) { if (servletResponse instanceof HttpServletResponse) { HttpServletResponse httpResponse = (HttpServletResponse) servletResponse; @@ -89,5 +105,14 @@ private void captureResponseData(ServletResponse servletResponse) { } } } + if (servletRequest instanceof HttpServletRequest) { + HttpServletRequest httpRequest = (HttpServletRequest) servletRequest; + + // remove request body buffers from context stores, otherwise they might get reused + if (agentConfig.getDataCapture().getHttpBody().getRequest().getValue() + && ContentTypeUtils.shouldCapture(httpRequest.getContentType())) { + Utils.resetRequestBodyBuffers(inputStreamContext, readerContext, httpRequest); + } + } } } diff --git a/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet31NoWrappingInstrumentation.java b/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet31NoWrappingInstrumentation.java index 39145c36f..48d2856ca 100644 --- a/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet31NoWrappingInstrumentation.java +++ b/instrumentation/servlet/servlet-3.0-no-wrapping/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet31NoWrappingInstrumentation.java @@ -157,15 +157,11 @@ public static void exit( ContextStore writerContext = InstrumentationContext.get(PrintWriter.class, BoundedCharArrayWriter.class); - // remove request body buffers from context stores, otherwise they might get reused - if (agentConfig.getDataCapture().getHttpBody().getRequest().getValue() - && ContentTypeUtils.shouldCapture(httpRequest.getContentType())) { - ContextStore inputStreamContext = - InstrumentationContext.get(ServletInputStream.class, ByteBufferSpanPair.class); - ContextStore readerContext = - InstrumentationContext.get(BufferedReader.class, CharBufferSpanPair.class); - Utils.resetRequestBodyBuffers(inputStreamContext, readerContext, httpRequest); - } + // request context to clear body buffer + ContextStore inputStreamContext = + InstrumentationContext.get(ServletInputStream.class, ByteBufferSpanPair.class); + ContextStore readerContext = + InstrumentationContext.get(BufferedReader.class, CharBufferSpanPair.class); AtomicBoolean responseHandled = new AtomicBoolean(false); if (request.isAsyncStarted()) { @@ -174,7 +170,12 @@ public static void exit( .getAsyncContext() .addListener( new BodyCaptureAsyncListener( - responseHandled, currentSpan, outputStreamContext, writerContext)); + responseHandled, + currentSpan, + outputStreamContext, + writerContext, + inputStreamContext, + readerContext)); } catch (IllegalStateException e) { // org.eclipse.jetty.server.Request may throw an exception here if request became // finished after check above. We just ignore that exception and move on. @@ -195,6 +196,12 @@ public static void exit( && ContentTypeUtils.shouldCapture(httpResponse.getContentType())) { Utils.captureResponseBody(currentSpan, outputStreamContext, writerContext, httpResponse); } + + // remove request body buffers from context stores, otherwise they might get reused + if (agentConfig.getDataCapture().getHttpBody().getRequest().getValue() + && ContentTypeUtils.shouldCapture(httpRequest.getContentType())) { + Utils.resetRequestBodyBuffers(inputStreamContext, readerContext, httpRequest); + } } } } diff --git a/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30NoWrappingInstrumentationTest.java b/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30NoWrappingInstrumentationTest.java index aecab2970..fcec81541 100644 --- a/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30NoWrappingInstrumentationTest.java +++ b/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30NoWrappingInstrumentationTest.java @@ -16,6 +16,8 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping; +import io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping.TestServlets.EchoAsyncResponse_stream; +import io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping.TestServlets.EchoAsyncResponse_writer; import io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping.TestServlets.EchoStream_arr; import io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping.TestServlets.EchoStream_arr_offset; import io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping.TestServlets.EchoStream_readLine_print; @@ -63,12 +65,14 @@ public static void startServer() throws Exception { handler.addServlet(TestServlets.EchoWriter_arr.class, "/echo_writer_arr"); handler.addServlet(TestServlets.EchoWriter_arr_offset.class, "/echo_writer_arr_offset"); handler.addServlet(TestServlets.EchoWriter_readLine_write.class, "/echo_writer_readLine_write"); + handler.addServlet(TestServlets.EchoWriter_readLines.class, "/echo_writer_readLines"); handler.addServlet( TestServlets.EchoWriter_readLine_print_str.class, "/echo_writer_readLine_print_str"); handler.addServlet( TestServlets.EchoWriter_readLine_print_arr.class, "/echo_writer_readLine_print_arr"); handler.addServlet(TestServlets.Forward_to_post.class, "/forward_to_echo"); - handler.addServlet(TestServlets.EchoAsyncResponse.class, "/echo_async_response"); + handler.addServlet(EchoAsyncResponse_stream.class, "/echo_async_response_stream"); + handler.addServlet(EchoAsyncResponse_writer.class, "/echo_async_response_writer"); server.setHandler(handler); server.start(); serverPort = server.getConnectors()[0].getLocalPort(); @@ -85,8 +89,13 @@ public void forward_to_post() throws Exception { } @Test - public void echo_async_response() throws Exception { - postJson(String.format("http://localhost:%d/echo_async_response", serverPort)); + public void echo_async_response_stream() throws Exception { + postJson(String.format("http://localhost:%d/echo_async_response_stream", serverPort)); + } + + @Test + public void echo_async_response_writer() throws Exception { + postJson(String.format("http://localhost:%d/echo_async_response_writer", serverPort)); } @Test @@ -134,6 +143,11 @@ public void postJson_writer_readLine_print_str() throws Exception { postJson(String.format("http://localhost:%d/echo_writer_readLine_print_str", serverPort)); } + @Test + public void postJson_writer_readLines() throws Exception { + postJson(String.format("http://localhost:%d/echo_writer_readLines", serverPort)); + } + @Test public void postJson_writer_readLine_print_arr() throws Exception { postJson(String.format("http://localhost:%d/echo_writer_readLine_print_arr", serverPort)); diff --git a/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java b/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java index 279c6533d..fc01547fb 100644 --- a/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java +++ b/instrumentation/servlet/servlet-3.0-no-wrapping/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java @@ -17,6 +17,7 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping; import java.io.IOException; +import java.util.stream.Stream; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -147,6 +148,20 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws } } + public static class EchoWriter_readLines extends HttpServlet { + @Override + protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException { + Stream lines = req.getReader().lines(); + lines.forEach(s -> {}); + + resp.setStatus(200); + resp.setContentType("application/json"); + resp.setHeader(RESPONSE_HEADER, RESPONSE_HEADER_VALUE); + + resp.getWriter().write(RESPONSE_BODY); + } + } + public static class EchoWriter_readLine_print_str extends HttpServlet { @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException { @@ -173,14 +188,56 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws } } - public static class EchoAsyncResponse extends HttpServlet { + public static class EchoAsyncResponse_stream extends HttpServlet { @Override - protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException { - while (req.getReader().readLine() != null) {} + protected void service(HttpServletRequest req, HttpServletResponse resp) { AsyncContext asyncContext = req.startAsync(); asyncContext.start( () -> { + while (true) { + try { + if (!(req.getInputStream().read() != -1)) break; + } catch (IOException e) { + e.printStackTrace(); + } + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + HttpServletResponse httpServletResponse = + (HttpServletResponse) asyncContext.getResponse(); + httpServletResponse.setStatus(200); + httpServletResponse.setContentType("application/json"); + httpServletResponse.setHeader(RESPONSE_HEADER, RESPONSE_HEADER_VALUE); + try { + httpServletResponse.getOutputStream().print(RESPONSE_BODY); + } catch (IOException e) { + e.printStackTrace(); + } + asyncContext.complete(); + }); + } + } + + public static class EchoAsyncResponse_writer extends HttpServlet { + @Override + protected void service(HttpServletRequest req, HttpServletResponse resp) { + + AsyncContext asyncContext = req.startAsync(); + asyncContext.start( + () -> { + while (true) { + try { + if (!(req.getReader().read() != -1)) break; + } catch (IOException e) { + e.printStackTrace(); + } + } + try { Thread.sleep(100); } catch (InterruptedException e) { @@ -192,7 +249,7 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws httpServletResponse.setContentType("application/json"); httpServletResponse.setHeader(RESPONSE_HEADER, RESPONSE_HEADER_VALUE); try { - httpServletResponse.getWriter().print(RESPONSE_BODY.toCharArray()); + httpServletResponse.getWriter().print(RESPONSE_BODY); } catch (IOException e) { e.printStackTrace(); }