2
2
3
3
import base64
4
4
import json
5
+ import logging
5
6
import re
6
7
import time
7
8
import warnings
34
35
wait_fixed ,
35
36
)
36
37
38
+ from graphdatascience .retry_utils .retry_utils import before_log
39
+
37
40
from ..semantic_version .semantic_version import SemanticVersion
38
41
from ..version import __version__
39
42
from .arrow_endpoint_version import ArrowEndpointVersion
40
43
from .arrow_info import ArrowInfo
41
44
45
+ _arrow_client_logger = logging .getLogger ("gds_arrow_client" )
46
+
42
47
43
48
class GdsArrowClient :
44
49
@staticmethod
@@ -148,6 +153,7 @@ def connection_info(self) -> tuple[str, int]:
148
153
return self ._host , self ._port
149
154
150
155
@retry (
156
+ before = before_log ("Request token" , _arrow_client_logger , logging .DEBUG ),
151
157
retry = retry_any (retry_if_exception_type (FlightTimedOutError ), retry_if_exception_type (FlightUnavailableError )),
152
158
stop = stop_after_attempt (3 ),
153
159
wait = wait_fixed (1 ),
@@ -593,6 +599,7 @@ def _client(self) -> flight.FlightClient:
593
599
return self ._flight_client
594
600
595
601
@retry (
602
+ before = before_log ("Send action" , _arrow_client_logger , logging .DEBUG ),
596
603
retry = retry_any (retry_if_exception_type (FlightTimedOutError ), retry_if_exception_type (FlightUnavailableError )),
597
604
stop = (stop_after_delay (10 ) | stop_after_attempt (5 )),
598
605
wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ),
@@ -614,6 +621,7 @@ def _send_action(self, action_type: str, meta_data: dict[str, Any]) -> dict[str,
614
621
raise e # unreachable
615
622
616
623
@retry (
624
+ before = before_log ("Do put" , _arrow_client_logger , logging .DEBUG ),
617
625
retry = retry_any (retry_if_exception_type (FlightTimedOutError ), retry_if_exception_type (FlightUnavailableError )),
618
626
stop = (stop_after_delay (10 ) | stop_after_attempt (5 )),
619
627
wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ),
@@ -644,6 +652,7 @@ def _upload_data(
644
652
put_stream , ack_stream = self ._safe_do_put (upload_descriptor , batches [0 ].schema )
645
653
646
654
@retry (
655
+ before = before_log ("Upload batch" , _arrow_client_logger , logging .DEBUG ),
647
656
stop = (stop_after_delay (10 ) | stop_after_attempt (5 )),
648
657
wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ),
649
658
retry = (
@@ -665,6 +674,7 @@ def upload_batch(p: RecordBatch) -> None:
665
674
GdsArrowClient .handle_flight_error (e )
666
675
667
676
@retry (
677
+ before = before_log ("Do get" , _arrow_client_logger , logging .DEBUG ),
668
678
retry = retry_any (retry_if_exception_type (FlightTimedOutError ), retry_if_exception_type (FlightUnavailableError )),
669
679
stop = (stop_after_delay (10 ) | stop_after_attempt (5 )),
670
680
wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ),
0 commit comments