7
7
import com .topcoder .dal .util .StreamJdbcTemplate ;
8
8
import com .topcoder .dal .util .ParameterizedExpression ;
9
9
10
+ import io .grpc .Status ;
10
11
import io .grpc .stub .StreamObserver ;
11
12
import jdk .jshell .spi .ExecutionControl ;
12
13
import net .devh .boot .grpc .server .service .GrpcService ;
@@ -317,6 +318,7 @@ public void query(QueryRequest request, StreamObserver<QueryResponse> responseOb
317
318
318
319
@ Override
319
320
public StreamObserver <QueryRequest > streamQuery (StreamObserver <QueryResponse > responseObserver ) {
321
+ logger .info ("Stream started" );
320
322
return new StreamObserver <>() {
321
323
Connection con = jdbcTemplate .getConnection ();
322
324
private final Duration streamTimeout = Duration .ofSeconds (20 );
@@ -327,10 +329,11 @@ public StreamObserver<QueryRequest> streamQuery(StreamObserver<QueryResponse> re
327
329
328
330
@ Override
329
331
public void onNext (QueryRequest request ) {
330
- resetStreamTimeout ();
332
+ cancelStreamTimeout ();
331
333
try {
332
334
QueryResponse response = executeQuery (request .getQuery (), con );
333
335
responseObserver .onNext (response );
336
+ resetStreamTimeout ();
334
337
} catch (Exception e ) {
335
338
rollback ();
336
339
cancelStreamTimeout ();
@@ -377,7 +380,7 @@ private boolean debounce() {
377
380
378
381
private boolean cancelStreamTimeout () {
379
382
ScheduledFuture <?> currentFuture = streamTimeoutFuture .get ();
380
- return currentFuture == null || currentFuture .cancel (false );
383
+ return currentFuture == null || currentFuture .isCancelled () || currentFuture . cancel (false );
381
384
}
382
385
383
386
private ScheduledFuture <?> scheduleStreamTimeout () {
@@ -387,7 +390,7 @@ private ScheduledFuture<?> scheduleStreamTimeout() {
387
390
logger .error (message );
388
391
rollback ();
389
392
cancelStreamTimeout ();
390
- responseObserver .onCompleted ( );
393
+ responseObserver .onError ( Status . DEADLINE_EXCEEDED . withDescription ( message ). asRuntimeException () );
391
394
}, streamTimeout .plus (DEBOUNCE_INTERVAL ).toNanos (), TimeUnit .NANOSECONDS );
392
395
}
393
396
};
0 commit comments