From c0e4263a87fa06d29c27653f2f5deba61a08db0a Mon Sep 17 00:00:00 2001 From: eisbilir Date: Sat, 30 Sep 2023 01:58:12 +0300 Subject: [PATCH] feat: update inactivity handling --- src/main/java/com/topcoder/dal/DBAccessor.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/topcoder/dal/DBAccessor.java b/src/main/java/com/topcoder/dal/DBAccessor.java index ceddbf9..240bf52 100644 --- a/src/main/java/com/topcoder/dal/DBAccessor.java +++ b/src/main/java/com/topcoder/dal/DBAccessor.java @@ -326,9 +326,15 @@ public StreamObserver streamQuery(StreamObserver re AtomicLong lastTimerReset = new AtomicLong(System.nanoTime() - DEBOUNCE_INTERVAL.toNanos() - 1); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); AtomicReference> streamTimeoutFuture = new AtomicReference<>(scheduleStreamTimeout()); + private Boolean isStreamAlive = true; @Override public void onNext(QueryRequest request) { + if (!isStreamAlive) { + responseObserver.onError(Status.DEADLINE_EXCEEDED.withDescription("Stream closed due to inactivity") + .asRuntimeException()); + return; + } cancelStreamTimeout(); try { QueryResponse response = executeQuery(request.getQuery(), con); @@ -350,9 +356,11 @@ public void onError(Throwable throwable) { @Override public void onCompleted() { - cancelStreamTimeout(); - commit(); - responseObserver.onCompleted(); + if (isStreamAlive) { + cancelStreamTimeout(); + commit(); + responseObserver.onCompleted(); + } } private void commit() { @@ -362,6 +370,7 @@ private void commit() { private void rollback() { logger.info("Rolling back transaction"); + isStreamAlive = false; jdbcTemplate.rollback(con); } @@ -390,7 +399,7 @@ private ScheduledFuture scheduleStreamTimeout() { logger.error(message); rollback(); cancelStreamTimeout(); - responseObserver.onError(Status.DEADLINE_EXCEEDED.withDescription(message).asRuntimeException()); + responseObserver.onCompleted(); }, streamTimeout.plus(DEBOUNCE_INTERVAL).toNanos(), TimeUnit.NANOSECONDS); } };