Skip to content

Retry when safe #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Nov 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
45d0a19
Make tests/ folder a python package
laurentsenta Nov 13, 2015
47917c5
Remove the current retry policy based on urllib3 retry
laurentsenta Nov 13, 2015
54a5087
Add retry based on HTTP status code
laurentsenta Nov 13, 2015
7f19194
Move tests on retry policy to their own package
laurentsenta Nov 16, 2015
b2e6b6e
Test retry fails on too many attempts
laurentsenta Nov 16, 2015
74d3315
Allow retry only for GET requests
laurentsenta Nov 16, 2015
8d2aad5
Allow retry for mapping ressources save.
laurentsenta Nov 16, 2015
ae62add
Refactor retry tests: add mock_api function to simplify testing code
laurentsenta Nov 16, 2015
c575f84
Use only **kwargs in client requests (catch malformed call sooner)
laurentsenta Nov 16, 2015
accc697
Simple retry for DELETE requests
laurentsenta Nov 16, 2015
c24086f
Update the requirements for the library and testing it.
laurentsenta Nov 16, 2015
b00f3a7
Add BadStatusLine error to retried exceptions
laurentsenta Nov 16, 2015
b67dbf2
Add no-404 check for store.delete in test retry
laurentsenta Nov 16, 2015
fb09bf6
Add retry for collections store and delete
laurentsenta Nov 16, 2015
7c5c938
Set is_idempotent as a default value in apidelete instead of forcing it
laurentsenta Nov 17, 2015
3d591b1
Fix incorrect/useless checks in test_retry
laurentsenta Nov 17, 2015
25f9767
Turn retrier creation into a method with exponential backoff and jitter
laurentsenta Nov 17, 2015
fa8dce2
Turn retrying requirement into a >= range
laurentsenta Nov 17, 2015
cd6b318
Test that retry does not catch unhandled exceptions
laurentsenta Nov 17, 2015
c484269
Add note on python3 compatibility for retrier
laurentsenta Nov 17, 2015
dd80468
Add retry on timeout exceptions
laurentsenta Nov 18, 2015
cb3b0a4
Move idempotence dispatch to client and GET default to apiget
laurentsenta Nov 18, 2015
d370178
Add 408 to retried http errors
laurentsenta Nov 18, 2015
ea9d165
Add max_retry_time to the client options
laurentsenta Nov 18, 2015
7180fd8
Move set request timeout from resource to client
laurentsenta Nov 18, 2015
33768c8
Add automatic computation for exponential backoff in retrier
laurentsenta Nov 19, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,10 @@ Example getting job data later::
[{'title': 'my first item'}]

...


Testing
-------

Running the tests require the hubstorage backend to be running,
and the python `responses` library (see `tox.ini`).
121 changes: 110 additions & 11 deletions hubstorage/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""
High level Hubstorage client
"""
from httplib import BadStatusLine
import logging
import pkgutil
from requests import session, adapters
from requests import session, adapters, HTTPError, ConnectionError, Timeout
from retrying import Retrying
from .utils import xauth, urlpathjoin
from .project import Project
from .job import Job
Expand All @@ -14,29 +17,125 @@
__version__ = pkgutil.get_data('hubstorage', 'VERSION').strip()


logger = logging.getLogger('HubstorageClient')

_HTTP_ERROR_CODES_TO_RETRY = (408, 429, 503, 504)


def _hc_retry_on_exception(err):
"""Callback used by the client to restrict the retry to acceptable errors"""
if (isinstance(err, HTTPError) and err.response.status_code in _HTTP_ERROR_CODES_TO_RETRY):
logger.warning("Server failed with %d status code, retrying (maybe)" % (err.response.status_code,))
return True

# TODO: python3 compatibility: BadStatusLine error are wrapped differently
if (isinstance(err, ConnectionError) and err.args[0] == 'Connection aborted.' and
isinstance(err.args[1], BadStatusLine) and err.args[1][0] == repr('')):
logger.warning("Protocol failed with BadStatusLine, retrying (maybe)")
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this conditional run into an updated urllib3 (or httplib) that got fixed with Python 3.5 ? Do we need to handle RemoteDisconnected error, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoteDisconnected inherits from BadStatusLine their should be no difference,
but actually, after testing the code in python3 it appears that it triggers a different structure for the same error. Catching it would look like:

                isinstance(err, ConnectionError)
                and isinstance(err.args[0], ProtocolError)
                and err.args[0].args[0] == 'Connection aborted.'
                and isinstance(err.args[0].args[1], BadStatusLine))

This is terrible, I'm looking at an idiomatic way to check the list of chained exception, something like:

if BadStatusLineError in all_chained_errors(err)

Thanks for the note!


if isinstance(err, Timeout):
logger.warning("Server connection timeout, retrying (maybe)")
return True

return False

class HubstorageClient(object):

DEFAULT_ENDPOINT = 'http://storage.scrapinghub.com/'
USERAGENT = 'python-hubstorage/{0}'.format(__version__)
DEFAULT_TIMEOUT = 60.0

def __init__(self, auth=None, endpoint=None, connection_timeout=None,
max_retries=0):
DEFAULT_CONNECTION_TIMEOUT_S = 60.0
RETRY_DEFAUT_MAX_RETRY_TIME_S = 60.0

RETRY_DEFAULT_MAX_RETRIES = 3
RETRY_DEFAULT_JITTER_MS = 500
RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS = 500

def __init__(self, auth=None, endpoint=None, connection_timeout=None, max_retries=None, max_retry_time=None):
"""
Note:
max_retries and max_retry_time change how the client attempt to retry failing requests that are
idempotent (safe to execute multiple time).

HubstorageClient(max_retries=3) will retry requests 3 times, no matter the time it takes.
Use max_retry_time if you want to bound the time spent in retrying.

By default, requests are retried at most 3 times, during 60 seconds.

Args:
auth (str): The client authentication token
endpoint (str): The API root address
connection_timeout (int): The connection timeout for a _single request_
max_retries (int): The number of time idempotent requests may be retried
max_retry_time (int): The time, in seconds, during which the client can retry a request
"""
self.auth = xauth(auth)
self.endpoint = endpoint or self.DEFAULT_ENDPOINT
self.connection_timeout = connection_timeout or self.DEFAULT_TIMEOUT
self.session = self._create_session(max_retries)
self.connection_timeout = connection_timeout or self.DEFAULT_CONNECTION_TIMEOUT_S
self.session = self._create_session()
self.retrier = self._create_retrier(max_retries, max_retry_time)
self.jobq = JobQ(self, None)
self.projects = Projects(self, None)
self.root = ResourceType(self, None)
self._batchuploader = None

def _create_session(self, max_retries):
def request(self, is_idempotent=False, **kwargs):
"""
Execute an HTTP request with the current client session.

Use the retry policy configured in the client when is_idempotent is True
"""
kwargs.setdefault('timeout', self.connection_timeout)

def invoke_request():
r = self.session.request(**kwargs)

if not r.ok:
logger.debug('%s: %s', r, r.content)
r.raise_for_status()
return r

if is_idempotent:
return self.retrier.call(invoke_request)
else:
return invoke_request()

def _create_retrier(self, max_retries, max_retry_time):
"""
Create the Retrier object used to process idempotent client requests.

If only max_retries is set, the default max_retry_time is ignored.

Args:
max_retries (int): the number of retries to be attempted
max_retry_time (int): the number of time, in seconds, to retry for.
Returns:
A Retrying instance, that implements a call(func) method.
"""

# Client sets max_retries only
if max_retries is not None and max_retry_time is None:
stop_max_delay = None
stop_max_attempt_number = max_retries + 1
wait_exponential_multiplier = self.RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS
else:
stop_max_delay = (max_retry_time or self.RETRY_DEFAUT_MAX_RETRY_TIME_S) * 1000.0
stop_max_attempt_number = (max_retries or self.RETRY_DEFAULT_MAX_RETRIES) + 1

# Compute the backoff to allow for max_retries queries during the allowed delay
# Solves the following formula (assumes requests are immediate):
# max_retry_time = sum(exp_multiplier * 2 ** i) for i from 1 to max_retries + 1
wait_exponential_multiplier = stop_max_delay / ((2 ** (stop_max_attempt_number + 1)) - 2)

return Retrying(stop_max_attempt_number=stop_max_attempt_number,
stop_max_delay=stop_max_delay,
retry_on_exception=_hc_retry_on_exception,
wait_exponential_multiplier=wait_exponential_multiplier,
wait_jitter_max=self.RETRY_DEFAULT_JITTER_MS)

def _create_session(self):
s = session()
if max_retries > 0:
a = adapters.HTTPAdapter(max_retries=max_retries)
s.mount('http://', a)
s.mount('https://', a)
s.headers.update({'User-Agent': self.USERAGENT})
return s

Expand Down
4 changes: 2 additions & 2 deletions hubstorage/collectionsrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ def get(self, _type, _name, _key=None, **params):

def set(self, _type, _name, _values):
try:
return self.apipost((_type, _name), jl=_values)
return self.apipost((_type, _name), is_idempotent=True, jl=_values)
except HTTPError as exc:
if exc.response.status_code in (400, 413):
raise ValueError(exc.response.text)
else:
raise

def delete(self, _type, _name, _keys):
return self.apipost((_type, _name, 'deleted'), jl=_keys)
return self.apipost((_type, _name, 'deleted'), is_idempotent=True, jl=_keys)

def iter_json(self, _type, _name, requests_params=None, **apiparams):
return DownloadableResource.iter_json(self, (_type, _name),
Expand Down
15 changes: 8 additions & 7 deletions hubstorage/resourcetype.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ def __init__(self, client, key, auth=None):
def _iter_lines(self, _path, **kwargs):
kwargs['url'] = urlpathjoin(self.url, _path)
kwargs.setdefault('auth', self.auth)
kwargs.setdefault('timeout', self.client.connection_timeout)
if 'jl' in kwargs:
kwargs['data'] = jlencode(kwargs.pop('jl'))
r = self.client.session.request(**kwargs)
if not r.ok:
logger.debug('%s: %s', r, r.content)
r.raise_for_status()

r = self.client.request(**kwargs)

return r.iter_lines()

def apirequest(self, _path=None, **kwargs):
Expand All @@ -37,9 +35,11 @@ def apipost(self, _path=None, **kwargs):
return self.apirequest(_path, method='POST', **kwargs)

def apiget(self, _path=None, **kwargs):
kwargs.setdefault('is_idempotent', True)
return self.apirequest(_path, method='GET', **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd set is_idempotent for GET requests here using kwargs.setdefault('is_idempotent', True). Same for apidelete() instead of preventing callers from overriding is_idempotent keyword argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, we should set the GET default as late as possible. Direct calls to _iter_lines also benefits from the retrying without the user duplicating the setdefault code. (in this case iter_json).

I fixed the apidelete() with kwargs.setdefault, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, we should set the GET default as late as possible. Direct calls to _iter_lines also benefits from the retrying without the user duplicating the setdefault code. (in this case iter_json).

I fixed the apidelete() with kwargs.setdefault, thanks!

Interesting thinking, you accept to set idempotent on apidelete() for DELETE method, but doesn't on apiget() for GET.

For me the "late as possible" is apirequest(), iter_lines() should transparently forward the idempotent kwarg to self.client.request() (a single method, not two, using same api dialect than others: a keyword argument) and callers of _iter_lines() should choose to set idempotent if needed.

iter_json is that case and it has its own retry logic that may not play nice with global retrier, so for a first iteration I think it is better to leave it out of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DELETE is a special case, it should use a specific & more complex retrier that turn 404 into "everything went fine" (whether to accept 404 on the first attempt is worth a discussion since it would require changing/not using the retrying library).

But the current use of DELETE, from apidelete never return 404 (except for a very specific case that looks like a bug, see SC-338). That's why I handled it as a special case of idempotence. (there's a test_delete_on_hubstorage_api_does_not_404 to verify that).

I do see the thinking behind "let _iter_lines just forward the request parameters to the client.request". My only last, tiny, concern, if we think like that: we should set the request timeout in the client.request, not in _iter_lines, it's not the role of the resource to setup this value.

I'll do as you say, we'll come back later to iter_json with more tests.


def apidelete(self, _path=None, **kwargs):
kwargs.setdefault('is_idempotent', True)
return self.apirequest(_path, method='DELETE', **kwargs)


Expand Down Expand Up @@ -186,10 +186,11 @@ def save(self):
self._deleted.clear()
if self._cached:
if not self.ignore_fields:
self.apipost(jl=self._data)
self.apipost(jl=self._data, is_idempotent=True)
else:
self.apipost(jl=dict((k, v) for k, v in self._data.iteritems()
if k not in self.ignore_fields))
if k not in self.ignore_fields),
is_idempotent=True)

def __getitem__(self, key):
return self._data[key]
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
requests>=1.0
retrying>=1.3.3
Empty file added tests/__init__.py
Empty file.
15 changes: 0 additions & 15 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,10 @@
Test Client
"""
from hstestcase import HSTestCase
from hubstorage import HubstorageClient
from hubstorage.utils import millitime, apipoll


class ClientTest(HSTestCase):

def test_connect_retry(self):
c = HubstorageClient(auth=self.auth, endpoint=self.endpoint,
max_retries=2)
c.push_job(self.projectid, self.spidername)
job = c.start_job(projectid=self.projectid)
m = job.metadata
self.assertEqual(m.get('state'), u'running', dict(m))
m.expire()
retries = c.session.adapters['http://'].max_retries
if not isinstance(retries, int):
retries = retries.total
self.assertEqual(retries, 2)

def test_push_job(self):
c = self.hsclient
c.push_job(self.projectid, self.spidername,
Expand Down
Loading