@@ -326,9 +326,15 @@ public StreamObserver<QueryRequest> streamQuery(StreamObserver<QueryResponse> re
326
326
AtomicLong lastTimerReset = new AtomicLong (System .nanoTime () - DEBOUNCE_INTERVAL .toNanos () - 1 );
327
327
private final ScheduledExecutorService scheduler = Executors .newScheduledThreadPool (1 );
328
328
AtomicReference <ScheduledFuture <?>> streamTimeoutFuture = new AtomicReference <>(scheduleStreamTimeout ());
329
+ private Boolean isStreamAlive = true ;
329
330
330
331
@ Override
331
332
public void onNext (QueryRequest request ) {
333
+ if (!isStreamAlive ) {
334
+ responseObserver .onError (Status .DEADLINE_EXCEEDED .withDescription ("Stream closed due to inactivity" )
335
+ .asRuntimeException ());
336
+ return ;
337
+ }
332
338
cancelStreamTimeout ();
333
339
try {
334
340
QueryResponse response = executeQuery (request .getQuery (), con );
@@ -350,9 +356,11 @@ public void onError(Throwable throwable) {
350
356
351
357
@ Override
352
358
public void onCompleted () {
353
- cancelStreamTimeout ();
354
- commit ();
355
- responseObserver .onCompleted ();
359
+ if (isStreamAlive ) {
360
+ cancelStreamTimeout ();
361
+ commit ();
362
+ responseObserver .onCompleted ();
363
+ }
356
364
}
357
365
358
366
private void commit () {
@@ -362,6 +370,7 @@ private void commit() {
362
370
363
371
private void rollback () {
364
372
logger .info ("Rolling back transaction" );
373
+ isStreamAlive = false ;
365
374
jdbcTemplate .rollback (con );
366
375
}
367
376
@@ -390,7 +399,7 @@ private ScheduledFuture<?> scheduleStreamTimeout() {
390
399
logger .error (message );
391
400
rollback ();
392
401
cancelStreamTimeout ();
393
- responseObserver .onError ( Status . DEADLINE_EXCEEDED . withDescription ( message ). asRuntimeException () );
402
+ responseObserver .onCompleted ( );
394
403
}, streamTimeout .plus (DEBOUNCE_INTERVAL ).toNanos (), TimeUnit .NANOSECONDS );
395
404
}
396
405
};
0 commit comments