Skip to content

Commit 456b115

Browse files
committed
Merge pull request #41 from scrapinghub/retry-when-safe
Retry when safe
2 parents abd39be + 33768c8 commit 456b115

File tree

9 files changed

+424
-36
lines changed

9 files changed

+424
-36
lines changed

README.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,10 @@ Example getting job data later::
5252
[{'title': 'my first item'}]
5353

5454
...
55+
56+
57+
Testing
58+
-------
59+
60+
Running the tests require the hubstorage backend to be running,
61+
and the python `responses` library (see `tox.ini`).

hubstorage/client.py

Lines changed: 110 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
"""
22
High level Hubstorage client
33
"""
4+
from httplib import BadStatusLine
5+
import logging
46
import pkgutil
5-
from requests import session, adapters
7+
from requests import session, adapters, HTTPError, ConnectionError, Timeout
8+
from retrying import Retrying
69
from .utils import xauth, urlpathjoin
710
from .project import Project
811
from .job import Job
@@ -14,29 +17,125 @@
1417
__version__ = pkgutil.get_data('hubstorage', 'VERSION').strip()
1518

1619

20+
logger = logging.getLogger('HubstorageClient')
21+
22+
_HTTP_ERROR_CODES_TO_RETRY = (408, 429, 503, 504)
23+
24+
25+
def _hc_retry_on_exception(err):
26+
"""Callback used by the client to restrict the retry to acceptable errors"""
27+
if (isinstance(err, HTTPError) and err.response.status_code in _HTTP_ERROR_CODES_TO_RETRY):
28+
logger.warning("Server failed with %d status code, retrying (maybe)" % (err.response.status_code,))
29+
return True
30+
31+
# TODO: python3 compatibility: BadStatusLine error are wrapped differently
32+
if (isinstance(err, ConnectionError) and err.args[0] == 'Connection aborted.' and
33+
isinstance(err.args[1], BadStatusLine) and err.args[1][0] == repr('')):
34+
logger.warning("Protocol failed with BadStatusLine, retrying (maybe)")
35+
return True
36+
37+
if isinstance(err, Timeout):
38+
logger.warning("Server connection timeout, retrying (maybe)")
39+
return True
40+
41+
return False
42+
1743
class HubstorageClient(object):
1844

1945
DEFAULT_ENDPOINT = 'http://storage.scrapinghub.com/'
2046
USERAGENT = 'python-hubstorage/{0}'.format(__version__)
21-
DEFAULT_TIMEOUT = 60.0
2247

23-
def __init__(self, auth=None, endpoint=None, connection_timeout=None,
24-
max_retries=0):
48+
DEFAULT_CONNECTION_TIMEOUT_S = 60.0
49+
RETRY_DEFAUT_MAX_RETRY_TIME_S = 60.0
50+
51+
RETRY_DEFAULT_MAX_RETRIES = 3
52+
RETRY_DEFAULT_JITTER_MS = 500
53+
RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS = 500
54+
55+
def __init__(self, auth=None, endpoint=None, connection_timeout=None, max_retries=None, max_retry_time=None):
56+
"""
57+
Note:
58+
max_retries and max_retry_time change how the client attempt to retry failing requests that are
59+
idempotent (safe to execute multiple time).
60+
61+
HubstorageClient(max_retries=3) will retry requests 3 times, no matter the time it takes.
62+
Use max_retry_time if you want to bound the time spent in retrying.
63+
64+
By default, requests are retried at most 3 times, during 60 seconds.
65+
66+
Args:
67+
auth (str): The client authentication token
68+
endpoint (str): The API root address
69+
connection_timeout (int): The connection timeout for a _single request_
70+
max_retries (int): The number of time idempotent requests may be retried
71+
max_retry_time (int): The time, in seconds, during which the client can retry a request
72+
"""
2573
self.auth = xauth(auth)
2674
self.endpoint = endpoint or self.DEFAULT_ENDPOINT
27-
self.connection_timeout = connection_timeout or self.DEFAULT_TIMEOUT
28-
self.session = self._create_session(max_retries)
75+
self.connection_timeout = connection_timeout or self.DEFAULT_CONNECTION_TIMEOUT_S
76+
self.session = self._create_session()
77+
self.retrier = self._create_retrier(max_retries, max_retry_time)
2978
self.jobq = JobQ(self, None)
3079
self.projects = Projects(self, None)
3180
self.root = ResourceType(self, None)
3281
self._batchuploader = None
3382

34-
def _create_session(self, max_retries):
83+
def request(self, is_idempotent=False, **kwargs):
84+
"""
85+
Execute an HTTP request with the current client session.
86+
87+
Use the retry policy configured in the client when is_idempotent is True
88+
"""
89+
kwargs.setdefault('timeout', self.connection_timeout)
90+
91+
def invoke_request():
92+
r = self.session.request(**kwargs)
93+
94+
if not r.ok:
95+
logger.debug('%s: %s', r, r.content)
96+
r.raise_for_status()
97+
return r
98+
99+
if is_idempotent:
100+
return self.retrier.call(invoke_request)
101+
else:
102+
return invoke_request()
103+
104+
def _create_retrier(self, max_retries, max_retry_time):
105+
"""
106+
Create the Retrier object used to process idempotent client requests.
107+
108+
If only max_retries is set, the default max_retry_time is ignored.
109+
110+
Args:
111+
max_retries (int): the number of retries to be attempted
112+
max_retry_time (int): the number of time, in seconds, to retry for.
113+
Returns:
114+
A Retrying instance, that implements a call(func) method.
115+
"""
116+
117+
# Client sets max_retries only
118+
if max_retries is not None and max_retry_time is None:
119+
stop_max_delay = None
120+
stop_max_attempt_number = max_retries + 1
121+
wait_exponential_multiplier = self.RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS
122+
else:
123+
stop_max_delay = (max_retry_time or self.RETRY_DEFAUT_MAX_RETRY_TIME_S) * 1000.0
124+
stop_max_attempt_number = (max_retries or self.RETRY_DEFAULT_MAX_RETRIES) + 1
125+
126+
# Compute the backoff to allow for max_retries queries during the allowed delay
127+
# Solves the following formula (assumes requests are immediate):
128+
# max_retry_time = sum(exp_multiplier * 2 ** i) for i from 1 to max_retries + 1
129+
wait_exponential_multiplier = stop_max_delay / ((2 ** (stop_max_attempt_number + 1)) - 2)
130+
131+
return Retrying(stop_max_attempt_number=stop_max_attempt_number,
132+
stop_max_delay=stop_max_delay,
133+
retry_on_exception=_hc_retry_on_exception,
134+
wait_exponential_multiplier=wait_exponential_multiplier,
135+
wait_jitter_max=self.RETRY_DEFAULT_JITTER_MS)
136+
137+
def _create_session(self):
35138
s = session()
36-
if max_retries > 0:
37-
a = adapters.HTTPAdapter(max_retries=max_retries)
38-
s.mount('http://', a)
39-
s.mount('https://', a)
40139
s.headers.update({'User-Agent': self.USERAGENT})
41140
return s
42141

hubstorage/collectionsrt.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ def get(self, _type, _name, _key=None, **params):
2121

2222
def set(self, _type, _name, _values):
2323
try:
24-
return self.apipost((_type, _name), jl=_values)
24+
return self.apipost((_type, _name), is_idempotent=True, jl=_values)
2525
except HTTPError as exc:
2626
if exc.response.status_code in (400, 413):
2727
raise ValueError(exc.response.text)
2828
else:
2929
raise
3030

3131
def delete(self, _type, _name, _keys):
32-
return self.apipost((_type, _name, 'deleted'), jl=_keys)
32+
return self.apipost((_type, _name, 'deleted'), is_idempotent=True, jl=_keys)
3333

3434
def iter_json(self, _type, _name, requests_params=None, **apiparams):
3535
return DownloadableResource.iter_json(self, (_type, _name),

hubstorage/resourcetype.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ def __init__(self, client, key, auth=None):
2121
def _iter_lines(self, _path, **kwargs):
2222
kwargs['url'] = urlpathjoin(self.url, _path)
2323
kwargs.setdefault('auth', self.auth)
24-
kwargs.setdefault('timeout', self.client.connection_timeout)
2524
if 'jl' in kwargs:
2625
kwargs['data'] = jlencode(kwargs.pop('jl'))
27-
r = self.client.session.request(**kwargs)
28-
if not r.ok:
29-
logger.debug('%s: %s', r, r.content)
30-
r.raise_for_status()
26+
27+
r = self.client.request(**kwargs)
28+
3129
return r.iter_lines()
3230

3331
def apirequest(self, _path=None, **kwargs):
@@ -37,9 +35,11 @@ def apipost(self, _path=None, **kwargs):
3735
return self.apirequest(_path, method='POST', **kwargs)
3836

3937
def apiget(self, _path=None, **kwargs):
38+
kwargs.setdefault('is_idempotent', True)
4039
return self.apirequest(_path, method='GET', **kwargs)
4140

4241
def apidelete(self, _path=None, **kwargs):
42+
kwargs.setdefault('is_idempotent', True)
4343
return self.apirequest(_path, method='DELETE', **kwargs)
4444

4545

@@ -186,10 +186,11 @@ def save(self):
186186
self._deleted.clear()
187187
if self._cached:
188188
if not self.ignore_fields:
189-
self.apipost(jl=self._data)
189+
self.apipost(jl=self._data, is_idempotent=True)
190190
else:
191191
self.apipost(jl=dict((k, v) for k, v in self._data.iteritems()
192-
if k not in self.ignore_fields))
192+
if k not in self.ignore_fields),
193+
is_idempotent=True)
193194

194195
def __getitem__(self, key):
195196
return self._data[key]

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
requests>=1.0
2+
retrying>=1.3.3

tests/__init__.py

Whitespace-only changes.

tests/test_client.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,10 @@
22
Test Client
33
"""
44
from hstestcase import HSTestCase
5-
from hubstorage import HubstorageClient
65
from hubstorage.utils import millitime, apipoll
76

8-
97
class ClientTest(HSTestCase):
108

11-
def test_connect_retry(self):
12-
c = HubstorageClient(auth=self.auth, endpoint=self.endpoint,
13-
max_retries=2)
14-
c.push_job(self.projectid, self.spidername)
15-
job = c.start_job(projectid=self.projectid)
16-
m = job.metadata
17-
self.assertEqual(m.get('state'), u'running', dict(m))
18-
m.expire()
19-
retries = c.session.adapters['http://'].max_retries
20-
if not isinstance(retries, int):
21-
retries = retries.total
22-
self.assertEqual(retries, 2)
23-
249
def test_push_job(self):
2510
c = self.hsclient
2611
c.push_job(self.projectid, self.spidername,

0 commit comments

Comments
 (0)