From 45d0a19c037fe251df545060346749da54034efd Mon Sep 17 00:00:00 2001 From: lsenta Date: Fri, 13 Nov 2015 15:26:25 +0100 Subject: [PATCH 01/26] Make tests/ folder a python package Add __init__.py in tests/ folder so that nosetests can run individual tests. Signed-off-by: lsenta --- tests/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/__init__.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 From 47917c5ec21d3040cbaa9233ce75161a23a8b817 Mon Sep 17 00:00:00 2001 From: lsenta Date: Fri, 13 Nov 2015 17:24:42 +0100 Subject: [PATCH 02/26] Remove the current retry policy based on urllib3 retry Signed-off-by: lsenta --- hubstorage/client.py | 8 ++------ tests/test_client.py | 13 ------------- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 41f7c97..8f826c5 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -25,18 +25,14 @@ def __init__(self, auth=None, endpoint=None, connection_timeout=None, self.auth = xauth(auth) self.endpoint = endpoint or self.DEFAULT_ENDPOINT self.connection_timeout = connection_timeout or self.DEFAULT_TIMEOUT - self.session = self._create_session(max_retries) + self.session = self._create_session() self.jobq = JobQ(self, None) self.projects = Projects(self, None) self.root = ResourceType(self, None) self._batchuploader = None - def _create_session(self, max_retries): + def _create_session(self): s = session() - if max_retries > 0: - a = adapters.HTTPAdapter(max_retries=max_retries) - s.mount('http://', a) - s.mount('https://', a) s.headers.update({'User-Agent': self.USERAGENT}) return s diff --git a/tests/test_client.py b/tests/test_client.py index 9e04ea9..2eb8a66 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -8,19 +8,6 @@ class ClientTest(HSTestCase): - def test_connect_retry(self): - c = HubstorageClient(auth=self.auth, endpoint=self.endpoint, - max_retries=2) - c.push_job(self.projectid, self.spidername) - job = c.start_job(projectid=self.projectid) - m = job.metadata - self.assertEqual(m.get('state'), u'running', dict(m)) - m.expire() - retries = c.session.adapters['http://'].max_retries - if not isinstance(retries, int): - retries = retries.total - self.assertEqual(retries, 2) - def test_push_job(self): c = self.hsclient c.push_job(self.projectid, self.spidername, From 54a508720b332f97e594e87b03a1fadf52137920 Mon Sep 17 00:00:00 2001 From: lsenta Date: Fri, 13 Nov 2015 19:57:59 +0100 Subject: [PATCH 03/26] Add retry based on HTTP status code This patch add the retrier system for all calls to the api (even unsafe ones). Signed-off-by: lsenta --- hubstorage/client.py | 31 +++++++++++++++++++++++++++++-- hubstorage/resourcetype.py | 5 +---- tests/test_client.py | 37 ++++++++++++++++++++++++++++++++++++- 3 files changed, 66 insertions(+), 7 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 8f826c5..40ff060 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -1,8 +1,10 @@ """ High level Hubstorage client """ +import logging import pkgutil -from requests import session, adapters +from requests import session, adapters, HTTPError +from retrying import Retrying from .utils import xauth, urlpathjoin from .project import Project from .job import Job @@ -14,6 +16,19 @@ __version__ = pkgutil.get_data('hubstorage', 'VERSION').strip() +logger = logging.getLogger('HubstorageClient') + +_ERROR_CODES_TO_RETRY = (429, 503, 504) + + +def _hc_retry_on_exception(err): + """Callback used by the client to restrict the retry to acceptable errors""" + if (isinstance(err, HTTPError) + and err.response.status_code in _ERROR_CODES_TO_RETRY): + logger.warning("Server failed with %d status code, retrying (maybe)" % (err.response.status_code,)) + return True + return False + class HubstorageClient(object): DEFAULT_ENDPOINT = 'http://storage.scrapinghub.com/' @@ -21,16 +36,28 @@ class HubstorageClient(object): DEFAULT_TIMEOUT = 60.0 def __init__(self, auth=None, endpoint=None, connection_timeout=None, - max_retries=0): + max_retries=3): self.auth = xauth(auth) self.endpoint = endpoint or self.DEFAULT_ENDPOINT self.connection_timeout = connection_timeout or self.DEFAULT_TIMEOUT self.session = self._create_session() + self.retrier = Retrying(stop_max_attempt_number=max_retries + 1, retry_on_exception=_hc_retry_on_exception) self.jobq = JobQ(self, None) self.projects = Projects(self, None) self.root = ResourceType(self, None) self._batchuploader = None + def request(self, *args, **kwargs): + def invoke_req(): + r = self.session.request(*args, **kwargs) + + if not r.ok: + logger.debug('%s: %s', r, r.content) + r.raise_for_status() + return r + + return self.retrier.call(invoke_req) + def _create_session(self): s = session() s.headers.update({'User-Agent': self.USERAGENT}) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index f681b60..a05a130 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -24,10 +24,7 @@ def _iter_lines(self, _path, **kwargs): kwargs.setdefault('timeout', self.client.connection_timeout) if 'jl' in kwargs: kwargs['data'] = jlencode(kwargs.pop('jl')) - r = self.client.session.request(**kwargs) - if not r.ok: - logger.debug('%s: %s', r, r.content) - r.raise_for_status() + r = self.client.request(**kwargs) return r.iter_lines() def apirequest(self, _path=None, **kwargs): diff --git a/tests/test_client.py b/tests/test_client.py index 2eb8a66..d00eeea 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,13 +1,48 @@ """ Test Client """ +import requests from hstestcase import HSTestCase from hubstorage import HubstorageClient from hubstorage.utils import millitime, apipoll - +import responses +import json +import re class ClientTest(HSTestCase): + @responses.activate + def test_retry_get_job(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + + # setup connector that fails on 2 calls + attempts = [0] # use a list for nonlocal mutability used in request_callback + def request_callback(request): + attempts[0] += 1 + + print request, attempts + + if attempts[0] < 3: + return (504, {}, "Timeout") + else: + resp_body = dict(job_metadata) + return (200, {}, json.dumps(resp_body)) + + responses.add_callback( + responses.GET, re.compile(self.endpoint + '/.*'), + callback=request_callback, + content_type='application/json', + ) + + # Act + job2 = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + + # Assert + self.assertEqual(dict(job_metadata), dict(job2.metadata)) + self.assertEqual(attempts[0], 3) + def test_push_job(self): c = self.hsclient c.push_job(self.projectid, self.spidername, From 7f19194db5a9f9609fcfcf3703e00c41fe946dbf Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 09:29:42 +0100 Subject: [PATCH 04/26] Move tests on retry policy to their own package Signed-off-by: lsenta --- tests/test_client.py | 37 ------------------------------------- tests/test_retry.py | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 37 deletions(-) create mode 100644 tests/test_retry.py diff --git a/tests/test_client.py b/tests/test_client.py index d00eeea..3f6b745 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,48 +1,11 @@ """ Test Client """ -import requests from hstestcase import HSTestCase -from hubstorage import HubstorageClient from hubstorage.utils import millitime, apipoll -import responses -import json -import re class ClientTest(HSTestCase): - @responses.activate - def test_retry_get_job(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - - # setup connector that fails on 2 calls - attempts = [0] # use a list for nonlocal mutability used in request_callback - def request_callback(request): - attempts[0] += 1 - - print request, attempts - - if attempts[0] < 3: - return (504, {}, "Timeout") - else: - resp_body = dict(job_metadata) - return (200, {}, json.dumps(resp_body)) - - responses.add_callback( - responses.GET, re.compile(self.endpoint + '/.*'), - callback=request_callback, - content_type='application/json', - ) - - # Act - job2 = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - - # Assert - self.assertEqual(dict(job_metadata), dict(job2.metadata)) - self.assertEqual(attempts[0], 3) - def test_push_job(self): c = self.hsclient c.push_job(self.projectid, self.spidername, diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..132adf9 --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,41 @@ +""" +Test Retry Policy +""" +from hstestcase import HSTestCase +from hubstorage import HubstorageClient +import responses +import json +import re + + +class RetryTest(HSTestCase): + @responses.activate + def test_retry_get_job(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + + # setup connector that fails on 2 calls + attempts = [0] # use a list for nonlocal mutability used in request_callback + + def request_callback(request): + attempts[0] += 1 + + if attempts[0] < 3: + return (504, {}, "Timeout") + else: + resp_body = dict(job_metadata) + return (200, {}, json.dumps(resp_body)) + + responses.add_callback( + responses.GET, re.compile(self.endpoint + '/.*'), + callback=request_callback, + content_type='application/json', + ) + + # Act + job2 = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + + # Assert + self.assertEqual(dict(job_metadata), dict(job2.metadata)) + self.assertEqual(attempts[0], 3) From b2e6b6e635f8627bd03b9c5ade10f6e6b5af743d Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 09:36:01 +0100 Subject: [PATCH 05/26] Test retry fails on too many attempts Signed-off-by: lsenta --- tests/test_retry.py | 91 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 77 insertions(+), 14 deletions(-) diff --git a/tests/test_retry.py b/tests/test_retry.py index 132adf9..9e5f083 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -1,6 +1,7 @@ """ Test Retry Policy """ +from requests import HTTPError from hstestcase import HSTestCase from hubstorage import HubstorageClient import responses @@ -10,32 +11,94 @@ class RetryTest(HSTestCase): @responses.activate - def test_retry_get_job(self): + def test_get_job_does_retry(self): # Prepare client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(2, job_metadata) - # setup connector that fails on 2 calls - attempts = [0] # use a list for nonlocal mutability used in request_callback + responses.add_callback( + responses.GET, re.compile(self.endpoint + '/.*'), + callback=callback, + content_type='application/json', + ) - def request_callback(request): - attempts[0] += 1 + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - if attempts[0] < 3: - return (504, {}, "Timeout") - else: - resp_body = dict(job_metadata) - return (200, {}, json.dumps(resp_body)) + # Assert + self.assertEqual(dict(job_metadata), dict(job.metadata)) + self.assertEqual(attempts_count[0], 3) + + @responses.activate + def test_get_job_does_fails_if_no_retries(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=0) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(2, job_metadata) + + responses.add_callback( + responses.GET, re.compile(self.endpoint + '/.*'), + callback=callback, + content_type='application/json', + ) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + self.assertIsNone(metadata, None) + self.assertEqual(err.response.status_code, 504) + self.assertEqual(attempts_count[0], 1) + + @responses.activate + def test_get_job_does_fails_on_too_many_retries(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=2) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(3, job_metadata) responses.add_callback( responses.GET, re.compile(self.endpoint + '/.*'), - callback=request_callback, + callback=callback, content_type='application/json', ) # Act - job2 = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e # Assert - self.assertEqual(dict(job_metadata), dict(job2.metadata)) - self.assertEqual(attempts[0], 3) + self.assertIsNone(metadata, None) + self.assertEqual(err.response.status_code, 504) + self.assertEqual(attempts_count[0], 3) + + def make_request_callback(self, timeout_count, body_on_success): + """ + Make a request callback that timeout a couple of time before returning body_on_success + + Returns: + A tuple (request_callback, attempts), attempts is an array of size one that contains the number of time + request_callback has been called. + """ + attempts = [0] # use a list for nonlocal mutability used in request_callback + + def request_callback(request): + attempts[0] += 1 + + if attempts[0] <= timeout_count: + return (504, {}, "Timeout") + else: + resp_body = dict(body_on_success) + return (200, {}, json.dumps(resp_body)) + + return (request_callback, attempts) From 74d3315d52b0eb97f8451cdb1fd5d3ab8aebbc74 Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 10:15:54 +0100 Subject: [PATCH 06/26] Allow retry only for GET requests Signed-off-by: lsenta --- hubstorage/client.py | 20 +++++++++++++++++++- hubstorage/resourcetype.py | 9 ++++++++- tests/test_retry.py | 24 ++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 40ff060..0b6ffe0 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -47,7 +47,12 @@ def __init__(self, auth=None, endpoint=None, connection_timeout=None, self.root = ResourceType(self, None) self._batchuploader = None - def request(self, *args, **kwargs): + def request_idempotent(self, *args, **kwargs): + """ + Execute an HTTP request with the current client session. + + Use the retry policy configured in the client. + """ def invoke_req(): r = self.session.request(*args, **kwargs) @@ -58,6 +63,19 @@ def invoke_req(): return self.retrier.call(invoke_req) + def request_nonidempotent(self, *args, **kwargs): + """ + Execute an HTTP request with the current client session + + Do not use the retry policy to avoid side-effects. + """ + r = self.session.request(*args, **kwargs) + + if not r.ok: + logger.debug('%s: %s', r, r.content) + r.raise_for_status() + return r + def _create_session(self): s = session() s.headers.update({'User-Agent': self.USERAGENT}) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index a05a130..04d0b28 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -19,12 +19,19 @@ def __init__(self, client, key, auth=None): self.url = urlpathjoin(client.endpoint, self.key) def _iter_lines(self, _path, **kwargs): + is_idempotent = kwargs.get('is_idempotent', False) or kwargs['method'] == 'GET' + kwargs['url'] = urlpathjoin(self.url, _path) kwargs.setdefault('auth', self.auth) kwargs.setdefault('timeout', self.client.connection_timeout) if 'jl' in kwargs: kwargs['data'] = jlencode(kwargs.pop('jl')) - r = self.client.request(**kwargs) + + if is_idempotent: + r = self.client.request_idempotent(**kwargs) + else: + r = self.client.request_nonidempotent(**kwargs) + return r.iter_lines() def apirequest(self, _path=None, **kwargs): diff --git a/tests/test_retry.py b/tests/test_retry.py index 9e5f083..a3c4c65 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -10,6 +10,30 @@ class RetryTest(HSTestCase): + @responses.activate + def test_push_job_does_not_retry(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + callback, attempts_count = self.make_request_callback(2, {'key': '1/2/3'}) + + responses.add_callback( + responses.POST, re.compile(self.endpoint + '/.*'), + callback=callback, + content_type='application/json', + ) + + # Act + job, err = None, None + try: + job = client.push_job(self.projectid, self.spidername) + except HTTPError as e: + err = e + + # Assert + self.assertIsNone(job, None) + self.assertEqual(err.response.status_code, 504) + self.assertEqual(attempts_count[0], 1) + @responses.activate def test_get_job_does_retry(self): # Prepare From 8d2aad552c1536bbe16c3a94e1a2bb52ae6dbbbf Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 10:27:31 +0100 Subject: [PATCH 07/26] Allow retry for mapping ressources save. Signed-off-by: lsenta --- hubstorage/resourcetype.py | 7 ++++--- tests/test_retry.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index 04d0b28..8b2152c 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -19,7 +19,7 @@ def __init__(self, client, key, auth=None): self.url = urlpathjoin(client.endpoint, self.key) def _iter_lines(self, _path, **kwargs): - is_idempotent = kwargs.get('is_idempotent', False) or kwargs['method'] == 'GET' + is_idempotent = kwargs.pop('is_idempotent', False) or kwargs['method'] == 'GET' kwargs['url'] = urlpathjoin(self.url, _path) kwargs.setdefault('auth', self.auth) @@ -190,10 +190,11 @@ def save(self): self._deleted.clear() if self._cached: if not self.ignore_fields: - self.apipost(jl=self._data) + self.apipost(jl=self._data, is_idempotent=True) else: self.apipost(jl=dict((k, v) for k, v in self._data.iteritems() - if k not in self.ignore_fields)) + if k not in self.ignore_fields), + is_idempotent=True) def __getitem__(self, key): return self._data[key] diff --git a/tests/test_retry.py b/tests/test_retry.py index a3c4c65..c2cc1d9 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -10,6 +10,37 @@ class RetryTest(HSTestCase): + @responses.activate + def test_metadata_save_does_retry(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback_get, attempts_count_get = self.make_request_callback(0, job_metadata) + callback_post, attempts_count_post = self.make_request_callback(2, job_metadata) + + responses.add_callback( + responses.GET, re.compile(self.endpoint + '/.*'), + callback=callback_get, + content_type='application/json', + ) + + responses.add_callback( + responses.POST, re.compile(self.endpoint + '/.*'), + callback=callback_post, + content_type='application/json', + ) + + # Act + err = None + + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + job.metadata['foo'] = 'bar' + job.metadata.save() + + # Assert + self.assertIsNone(err, None) + self.assertEqual(attempts_count_post[0], 3) + @responses.activate def test_push_job_does_not_retry(self): # Prepare From ae62add7085bfa16c2785d1b5fdac447c0215f72 Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 10:45:29 +0100 Subject: [PATCH 08/26] Refactor retry tests: add mock_api function to simplify testing code Signed-off-by: lsenta --- tests/test_retry.py | 55 ++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/tests/test_retry.py b/tests/test_retry.py index c2cc1d9..5f16b0e 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -8,6 +8,9 @@ import json import re +GET = responses.GET +POST = responses.POST + class RetryTest(HSTestCase): @responses.activate @@ -18,17 +21,8 @@ def test_metadata_save_does_retry(self): callback_get, attempts_count_get = self.make_request_callback(0, job_metadata) callback_post, attempts_count_post = self.make_request_callback(2, job_metadata) - responses.add_callback( - responses.GET, re.compile(self.endpoint + '/.*'), - callback=callback_get, - content_type='application/json', - ) - - responses.add_callback( - responses.POST, re.compile(self.endpoint + '/.*'), - callback=callback_post, - content_type='application/json', - ) + self.mock_api(method=GET, callback=callback_get) + self.mock_api(method=POST, callback=callback_post) # Act err = None @@ -47,11 +41,7 @@ def test_push_job_does_not_retry(self): client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) callback, attempts_count = self.make_request_callback(2, {'key': '1/2/3'}) - responses.add_callback( - responses.POST, re.compile(self.endpoint + '/.*'), - callback=callback, - content_type='application/json', - ) + self.mock_api(POST, callback=callback) # Act job, err = None, None @@ -72,11 +62,7 @@ def test_get_job_does_retry(self): job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} callback, attempts_count = self.make_request_callback(2, job_metadata) - responses.add_callback( - responses.GET, re.compile(self.endpoint + '/.*'), - callback=callback, - content_type='application/json', - ) + self.mock_api(callback=callback) # Act job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) @@ -92,11 +78,7 @@ def test_get_job_does_fails_if_no_retries(self): job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} callback, attempts_count = self.make_request_callback(2, job_metadata) - responses.add_callback( - responses.GET, re.compile(self.endpoint + '/.*'), - callback=callback, - content_type='application/json', - ) + self.mock_api(callback=callback) # Act job, metadata, err = None, None, None @@ -118,11 +100,7 @@ def test_get_job_does_fails_on_too_many_retries(self): job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} callback, attempts_count = self.make_request_callback(3, job_metadata) - responses.add_callback( - responses.GET, re.compile(self.endpoint + '/.*'), - callback=callback, - content_type='application/json', - ) + self.mock_api(callback=callback) # Act job, metadata, err = None, None, None @@ -137,6 +115,21 @@ def test_get_job_does_fails_on_too_many_retries(self): self.assertEqual(err.response.status_code, 504) self.assertEqual(attempts_count[0], 3) + def mock_api(self, method=GET, callback=None, url_match='/.*'): + """ + Mock an API URL using the responses library. + + Args: + method (Optional[str]): The HTTP method to mock. Defaults to responses.GET + callback (function(request) -> response): + url_match (Optional[str]): The API URL regexp. Defaults to '/.*'. + """ + responses.add_callback( + method, re.compile(self.endpoint + url_match), + callback=callback, + content_type='application/json', + ) + def make_request_callback(self, timeout_count, body_on_success): """ Make a request callback that timeout a couple of time before returning body_on_success From c575f843a0a3c5cc3e65581b5ebbd2808a41898d Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 11:37:22 +0100 Subject: [PATCH 09/26] Use only **kwargs in client requests (catch malformed call sooner) Signed-off-by: lsenta --- hubstorage/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 0b6ffe0..e8febbd 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -47,14 +47,14 @@ def __init__(self, auth=None, endpoint=None, connection_timeout=None, self.root = ResourceType(self, None) self._batchuploader = None - def request_idempotent(self, *args, **kwargs): + def request_idempotent(self, **kwargs): """ Execute an HTTP request with the current client session. Use the retry policy configured in the client. """ def invoke_req(): - r = self.session.request(*args, **kwargs) + r = self.session.request(**kwargs) if not r.ok: logger.debug('%s: %s', r, r.content) @@ -63,13 +63,13 @@ def invoke_req(): return self.retrier.call(invoke_req) - def request_nonidempotent(self, *args, **kwargs): + def request_nonidempotent(self, **kwargs): """ Execute an HTTP request with the current client session Do not use the retry policy to avoid side-effects. """ - r = self.session.request(*args, **kwargs) + r = self.session.request(**kwargs) if not r.ok: logger.debug('%s: %s', r, r.content) From accc697e80b54734818a2072b66e1d9b40e87b76 Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 11:41:43 +0100 Subject: [PATCH 10/26] Simple retry for DELETE requests Right now the API does not raises 404 errors when we delete a resource that does not exists. Thus we can retry delete requests until we get a 200. A correct implementation would be to catch 404 errors as "delete success", which is not needed now and complexify the retry implementation (special retry case only for delete requests). A test `test_delete_on_hubstorage_api_does_not_404` will be triggered when the assumption does not hold anymore and the retry policy should be fixed. Signed-off-by: lsenta --- hubstorage/resourcetype.py | 2 +- tests/test_retry.py | 44 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index 8b2152c..a5c7e4b 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -44,7 +44,7 @@ def apiget(self, _path=None, **kwargs): return self.apirequest(_path, method='GET', **kwargs) def apidelete(self, _path=None, **kwargs): - return self.apirequest(_path, method='DELETE', **kwargs) + return self.apirequest(_path, method='DELETE', is_idempotent=True, **kwargs) class DownloadableResource(ResourceType): diff --git a/tests/test_retry.py b/tests/test_retry.py index 5f16b0e..b450a50 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -10,9 +10,53 @@ GET = responses.GET POST = responses.POST +DELETE = responses.DELETE class RetryTest(HSTestCase): + def test_delete_on_hubstorage_api_does_not_404(self): + # NOTE: The current Hubstorage API does not raise 404 errors on deleting resources that do not exist, + # Thus the retry policy does not catch 404 errors when retrying deletes (simplify implementation A LOT). + # This test checks that this assumption holds. + + # Check frontier delete + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=0) + + project = client.get_project(projectid=self.projectid) + result = project.frontier.delete_slot('frontier_non_existing', 'slot_non_existing') + + # Check metadata delete + job = client.push_job(self.projectid, self.spidername) + job.metadata['foo'] = 'bar' # Add then delete key, this will trigger an api delete for item foo + del job.metadata['foo'] + job.metadata.save() + + self.assertTrue(True, "No error have been triggered by calling a delete on resources that do not exist") + + @responses.activate + def test_delete_requests_are_retried(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback_getpost, attempts_count_getpost = self.make_request_callback(0, job_metadata) + callback_delete, attempts_count_delete = self.make_request_callback(2, job_metadata) + + self.mock_api(method=GET, callback=callback_getpost) + self.mock_api(method=POST, callback=callback_getpost) + self.mock_api(method=DELETE, callback=callback_delete) + + # Act + err = None + + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + job.metadata['foo'] = 'bar' + del job.metadata['foo'] + job.metadata.save() + + # Assert + self.assertIsNone(err, None) + self.assertEqual(attempts_count_delete[0], 3) + @responses.activate def test_metadata_save_does_retry(self): # Prepare From c24086fa0a96ce29e0776250ce63f4788b5af64c Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 12:02:03 +0100 Subject: [PATCH 11/26] Update the requirements for the library and testing it. Signed-off-by: lsenta --- README.rst | 7 +++++++ requirements.txt | 1 + tox.ini | 4 +++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 39d6d30..035cd04 100644 --- a/README.rst +++ b/README.rst @@ -52,3 +52,10 @@ Example getting job data later:: [{'title': 'my first item'}] ... + + +Testing +------- + +Running the tests require the hubstorage backend to be running, +and the python `responses` library (see `tox.ini`). \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 1ff50f7..2fd91e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ requests>=1.0 +retrying==1.3.3 \ No newline at end of file diff --git a/tox.ini b/tox.ini index fc2bf93..0428973 100644 --- a/tox.ini +++ b/tox.ini @@ -4,10 +4,12 @@ # and then run "tox" from this directory. [tox] -envlist = py27, pypy, py33 +envlist = py27, pypy, py3 [testenv] deps = -rrequirements.txt + responses==0.5.0 nose + commands = nosetests [] From b00f3a7666f912d2c14a18de7e8b73c9cf6f7b70 Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 12:44:56 +0100 Subject: [PATCH 12/26] Add BadStatusLine error to retried exceptions Signed-off-by: lsenta --- hubstorage/client.py | 12 +++++++++--- tests/test_retry.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index e8febbd..c4c35a5 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -1,9 +1,10 @@ """ High level Hubstorage client """ +from httplib import BadStatusLine import logging import pkgutil -from requests import session, adapters, HTTPError +from requests import session, adapters, HTTPError, ConnectionError from retrying import Retrying from .utils import xauth, urlpathjoin from .project import Project @@ -23,10 +24,15 @@ def _hc_retry_on_exception(err): """Callback used by the client to restrict the retry to acceptable errors""" - if (isinstance(err, HTTPError) - and err.response.status_code in _ERROR_CODES_TO_RETRY): + if (isinstance(err, HTTPError) and err.response.status_code in _ERROR_CODES_TO_RETRY): logger.warning("Server failed with %d status code, retrying (maybe)" % (err.response.status_code,)) return True + + if (isinstance(err, ConnectionError) and err.args[0] == 'Connection aborted.' and + isinstance(err.args[1], BadStatusLine) and err.args[1][0] == repr('')): + logger.warning("Protocol failed with BadStatusLine, retrying (maybe)") + return True + return False class HubstorageClient(object): diff --git a/tests/test_retry.py b/tests/test_retry.py index b450a50..605283f 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -1,7 +1,8 @@ """ Test Retry Policy """ -from requests import HTTPError +from httplib import BadStatusLine +from requests import HTTPError, ConnectionError from hstestcase import HSTestCase from hubstorage import HubstorageClient import responses @@ -33,6 +34,34 @@ def test_delete_on_hubstorage_api_does_not_404(self): self.assertTrue(True, "No error have been triggered by calling a delete on resources that do not exist") + @responses.activate + def test_retrier_catches_badstatusline_and_429(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + + attempts_count = [0] # use a list for nonlocal mutability used in request_callback + + def request_callback(request): + attempts_count[0] += 1 + + if attempts_count[0] <= 2: + raise ConnectionError("Connection aborted.", BadStatusLine("''")) + if attempts_count[0] == 3: + return (429, {}, {}) + else: + resp_body = dict(job_metadata) + return (200, {}, json.dumps(resp_body)) + + self.mock_api(callback=request_callback) + + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + + # Assert + self.assertEqual(dict(job_metadata), dict(job.metadata)) + self.assertEqual(attempts_count[0], 4) + @responses.activate def test_delete_requests_are_retried(self): # Prepare From b67dbf29a17d7911e35b5b3884a86fa14358e9e1 Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 14:33:29 +0100 Subject: [PATCH 13/26] Add no-404 check for store.delete in test retry Signed-off-by: lsenta --- tests/test_retry.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/test_retry.py b/tests/test_retry.py index 605283f..7b79527 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -20,11 +20,11 @@ def test_delete_on_hubstorage_api_does_not_404(self): # Thus the retry policy does not catch 404 errors when retrying deletes (simplify implementation A LOT). # This test checks that this assumption holds. - # Check frontier delete client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=0) - project = client.get_project(projectid=self.projectid) - result = project.frontier.delete_slot('frontier_non_existing', 'slot_non_existing') + + # Check frontier delete + project.frontier.delete_slot('frontier_non_existing', 'slot_non_existing') # Check metadata delete job = client.push_job(self.projectid, self.spidername) @@ -32,6 +32,11 @@ def test_delete_on_hubstorage_api_does_not_404(self): del job.metadata['foo'] job.metadata.save() + # Check collections delete + store = project.collections.new_store('foo') + store.set({'_key': 'foo'}) + store.delete('bar') + self.assertTrue(True, "No error have been triggered by calling a delete on resources that do not exist") @responses.activate From fb09bf66a6b8f6f61e15cbcc3602c000099a1af4 Mon Sep 17 00:00:00 2001 From: lsenta Date: Mon, 16 Nov 2015 14:54:56 +0100 Subject: [PATCH 14/26] Add retry for collections store and delete Signed-off-by: lsenta --- hubstorage/collectionsrt.py | 4 ++-- tests/test_retry.py | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/hubstorage/collectionsrt.py b/hubstorage/collectionsrt.py index 9071543..f842947 100644 --- a/hubstorage/collectionsrt.py +++ b/hubstorage/collectionsrt.py @@ -21,7 +21,7 @@ def get(self, _type, _name, _key=None, **params): def set(self, _type, _name, _values): try: - return self.apipost((_type, _name), jl=_values) + return self.apipost((_type, _name), is_idempotent=True, jl=_values) except HTTPError as exc: if exc.response.status_code in (400, 413): raise ValueError(exc.response.text) @@ -29,7 +29,7 @@ def set(self, _type, _name, _values): raise def delete(self, _type, _name, _keys): - return self.apipost((_type, _name, 'deleted'), jl=_keys) + return self.apipost((_type, _name, 'deleted'), is_idempotent=True, jl=_keys) def iter_json(self, _type, _name, requests_params=None, **apiparams): return DownloadableResource.iter_json(self, (_type, _name), diff --git a/tests/test_retry.py b/tests/test_retry.py index 7b79527..9e7fc48 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -67,6 +67,27 @@ def request_callback(request): self.assertEqual(dict(job_metadata), dict(job.metadata)) self.assertEqual(attempts_count[0], 4) + @responses.activate + def test_collection_store_and_delete_are_retried(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + + callback_post, attempts_count_post = self.make_request_callback(2, []) + callback_delete, attempts_count_delete = self.make_request_callback(2, []) + + self.mock_api(method=POST, callback=callback_delete, url_match='/.*/deleted') + self.mock_api(method=POST, callback=callback_post) # /!\ default regexp matches all paths, has to be added last + + # Act + project = client.get_project(self.projectid) + store = project.collections.new_store('foo') + store.set({'_key': 'bar', 'content': 'value'}) + store.delete('baz') + + # Assert + self.assertEqual(attempts_count_post[0], 3) + self.assertEqual(attempts_count_delete[0], 3) + @responses.activate def test_delete_requests_are_retried(self): # Prepare From 7c5c9384de11ebe74e09808133fa701fd9f02b2d Mon Sep 17 00:00:00 2001 From: lsenta Date: Tue, 17 Nov 2015 11:11:21 +0100 Subject: [PATCH 15/26] Set is_idempotent as a default value in apidelete instead of forcing it Allows calls to apidelete to override the default is_idempotent behavior. Signed-off-by: lsenta --- hubstorage/resourcetype.py | 3 ++- tests/test_retry.py | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index a5c7e4b..787cd8c 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -44,7 +44,8 @@ def apiget(self, _path=None, **kwargs): return self.apirequest(_path, method='GET', **kwargs) def apidelete(self, _path=None, **kwargs): - return self.apirequest(_path, method='DELETE', is_idempotent=True, **kwargs) + kwargs.setdefault('is_idempotent', True) + return self.apirequest(_path, method='DELETE', **kwargs) class DownloadableResource(ResourceType): diff --git a/tests/test_retry.py b/tests/test_retry.py index 9e7fc48..aa107ab 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -67,6 +67,28 @@ def request_callback(request): self.assertEqual(dict(job_metadata), dict(job.metadata)) self.assertEqual(attempts_count[0], 4) + @responses.activate + def test_api_delete_can_be_set_to_non_idempotent(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback_delete, attempts_count_delete = self.make_request_callback(2, job_metadata) + + self.mock_api(method=DELETE, callback=callback_delete) + + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + + err = None + try: + job.metadata.apidelete('/my/non/idempotent/delete/', is_idempotent=False) + except HTTPError as e: + err = e + + # Assert + self.assertEqual(attempts_count_delete[0], 1) + self.assertIsNotNone(err) + @responses.activate def test_collection_store_and_delete_are_retried(self): # Prepare From 3d591b1c31efef09584c5d2c5dc37244c8388f0b Mon Sep 17 00:00:00 2001 From: lsenta Date: Tue, 17 Nov 2015 11:15:18 +0100 Subject: [PATCH 16/26] Fix incorrect/useless checks in test_retry Signed-off-by: lsenta --- tests/test_retry.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/test_retry.py b/tests/test_retry.py index aa107ab..c55109e 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -123,15 +123,12 @@ def test_delete_requests_are_retried(self): self.mock_api(method=DELETE, callback=callback_delete) # Act - err = None - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) job.metadata['foo'] = 'bar' del job.metadata['foo'] job.metadata.save() # Assert - self.assertIsNone(err, None) self.assertEqual(attempts_count_delete[0], 3) @responses.activate @@ -146,14 +143,11 @@ def test_metadata_save_does_retry(self): self.mock_api(method=POST, callback=callback_post) # Act - err = None - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) job.metadata['foo'] = 'bar' job.metadata.save() # Assert - self.assertIsNone(err, None) self.assertEqual(attempts_count_post[0], 3) @responses.activate @@ -172,7 +166,8 @@ def test_push_job_does_not_retry(self): err = e # Assert - self.assertIsNone(job, None) + self.assertIsNone(job) + self.assertIsNotNone(err) self.assertEqual(err.response.status_code, 504) self.assertEqual(attempts_count[0], 1) @@ -210,7 +205,8 @@ def test_get_job_does_fails_if_no_retries(self): err = e # Assert - self.assertIsNone(metadata, None) + self.assertIsNone(metadata) + self.assertIsNotNone(err) self.assertEqual(err.response.status_code, 504) self.assertEqual(attempts_count[0], 1) @@ -232,7 +228,8 @@ def test_get_job_does_fails_on_too_many_retries(self): err = e # Assert - self.assertIsNone(metadata, None) + self.assertIsNone(metadata) + self.assertIsNotNone(err) self.assertEqual(err.response.status_code, 504) self.assertEqual(attempts_count[0], 3) From 25f9767d58238aacbaaa378a0d5337fb257df970 Mon Sep 17 00:00:00 2001 From: lsenta Date: Tue, 17 Nov 2015 14:09:49 +0100 Subject: [PATCH 17/26] Turn retrier creation into a method with exponential backoff and jitter Signed-off-by: lsenta --- hubstorage/client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index c4c35a5..ae2a9b2 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -39,7 +39,10 @@ class HubstorageClient(object): DEFAULT_ENDPOINT = 'http://storage.scrapinghub.com/' USERAGENT = 'python-hubstorage/{0}'.format(__version__) + DEFAULT_TIMEOUT = 60.0 + RETRY_EXPONENTIAL_BACKOFF_MS = 500 + RETRY_JITTER_MS = 500 def __init__(self, auth=None, endpoint=None, connection_timeout=None, max_retries=3): @@ -47,7 +50,7 @@ def __init__(self, auth=None, endpoint=None, connection_timeout=None, self.endpoint = endpoint or self.DEFAULT_ENDPOINT self.connection_timeout = connection_timeout or self.DEFAULT_TIMEOUT self.session = self._create_session() - self.retrier = Retrying(stop_max_attempt_number=max_retries + 1, retry_on_exception=_hc_retry_on_exception) + self.retrier = self._create_retrier(max_retries, self.RETRY_EXPONENTIAL_BACKOFF_MS, self.RETRY_JITTER_MS) self.jobq = JobQ(self, None) self.projects = Projects(self, None) self.root = ResourceType(self, None) @@ -82,6 +85,12 @@ def request_nonidempotent(self, **kwargs): r.raise_for_status() return r + def _create_retrier(self, max_retries, exponential_backoff, jitter): + return Retrying(stop_max_attempt_number=max_retries + 1, + retry_on_exception=_hc_retry_on_exception, + wait_exponential_multiplier=exponential_backoff, + wait_jitter_max=jitter) + def _create_session(self): s = session() s.headers.update({'User-Agent': self.USERAGENT}) From fa8dce29081b10c49945ec53a7590377a3be212f Mon Sep 17 00:00:00 2001 From: lsenta Date: Tue, 17 Nov 2015 18:51:22 +0100 Subject: [PATCH 18/26] Turn retrying requirement into a >= range Signed-off-by: lsenta --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 2fd91e1..0e8deb5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ requests>=1.0 -retrying==1.3.3 \ No newline at end of file +retrying>=1.3.3 \ No newline at end of file From cd6b318dbee681e9e304516749961cb8010034e4 Mon Sep 17 00:00:00 2001 From: lsenta Date: Tue, 17 Nov 2015 19:08:55 +0100 Subject: [PATCH 19/26] Test that retry does not catch unhandled exceptions Signed-off-by: lsenta --- tests/test_retry.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/tests/test_retry.py b/tests/test_retry.py index c55109e..424dcf4 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -39,6 +39,29 @@ def test_delete_on_hubstorage_api_does_not_404(self): self.assertTrue(True, "No error have been triggered by calling a delete on resources that do not exist") + @responses.activate + def test_retrier_does_not_catch_unwanted_exception(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=2) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(3, job_metadata, http_error_status=403) + + self.mock_api(callback=callback) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + self.assertIsNone(metadata) + self.assertIsNotNone(err) + self.assertEqual(err.response.status_code, 403) + self.assertEqual(attempts_count[0], 1) + @responses.activate def test_retrier_catches_badstatusline_and_429(self): # Prepare @@ -248,7 +271,7 @@ def mock_api(self, method=GET, callback=None, url_match='/.*'): content_type='application/json', ) - def make_request_callback(self, timeout_count, body_on_success): + def make_request_callback(self, timeout_count, body_on_success, http_error_status=504): """ Make a request callback that timeout a couple of time before returning body_on_success @@ -262,7 +285,7 @@ def request_callback(request): attempts[0] += 1 if attempts[0] <= timeout_count: - return (504, {}, "Timeout") + return (http_error_status, {}, "Timeout") else: resp_body = dict(body_on_success) return (200, {}, json.dumps(resp_body)) From c484269846aaa675f29274d5969ebde330356da6 Mon Sep 17 00:00:00 2001 From: lsenta Date: Tue, 17 Nov 2015 19:58:27 +0100 Subject: [PATCH 20/26] Add note on python3 compatibility for retrier Signed-off-by: lsenta --- hubstorage/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hubstorage/client.py b/hubstorage/client.py index ae2a9b2..0808b1e 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -28,6 +28,7 @@ def _hc_retry_on_exception(err): logger.warning("Server failed with %d status code, retrying (maybe)" % (err.response.status_code,)) return True + # TODO: python3 compatibility: BadStatusLine error are wrapped differently if (isinstance(err, ConnectionError) and err.args[0] == 'Connection aborted.' and isinstance(err.args[1], BadStatusLine) and err.args[1][0] == repr('')): logger.warning("Protocol failed with BadStatusLine, retrying (maybe)") From dd80468f8b91cdb4360486397b22f939c531ed34 Mon Sep 17 00:00:00 2001 From: lsenta Date: Wed, 18 Nov 2015 09:42:48 +0100 Subject: [PATCH 21/26] Add retry on timeout exceptions Fixes #14 examples (504 and timeouts) Signed-off-by: lsenta --- hubstorage/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 0808b1e..afb17b2 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -4,7 +4,7 @@ from httplib import BadStatusLine import logging import pkgutil -from requests import session, adapters, HTTPError, ConnectionError +from requests import session, adapters, HTTPError, ConnectionError, Timeout from retrying import Retrying from .utils import xauth, urlpathjoin from .project import Project @@ -34,6 +34,10 @@ def _hc_retry_on_exception(err): logger.warning("Protocol failed with BadStatusLine, retrying (maybe)") return True + if isinstance(err, Timeout): + logger.warning("Server connection timeout, retrying (maybe)") + return True + return False class HubstorageClient(object): From cb3b0a45c49a000bd951e7a2fdfd6257a60dd5d5 Mon Sep 17 00:00:00 2001 From: lsenta Date: Wed, 18 Nov 2015 10:53:03 +0100 Subject: [PATCH 22/26] Move idempotence dispatch to client and GET default to apiget Change where the dispatch occurs so that _iter_lines simply forward the request configuration to the client. The client decides whether to apply the retry policy depending on request args. Move the default GET behavior to apiget so that iter_json method is not altered by this change. (it already implements a retry that may be incompatible). Signed-off-by: lsenta --- hubstorage/client.py | 25 ++++++++----------------- hubstorage/resourcetype.py | 8 ++------ 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index afb17b2..2000c82 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -61,13 +61,14 @@ def __init__(self, auth=None, endpoint=None, connection_timeout=None, self.root = ResourceType(self, None) self._batchuploader = None - def request_idempotent(self, **kwargs): + def request(self, is_idempotent=False, **kwargs): """ Execute an HTTP request with the current client session. - Use the retry policy configured in the client. + Use the retry policy configured in the client when is_idempotent is True """ - def invoke_req(): + + def invoke_request(): r = self.session.request(**kwargs) if not r.ok: @@ -75,20 +76,10 @@ def invoke_req(): r.raise_for_status() return r - return self.retrier.call(invoke_req) - - def request_nonidempotent(self, **kwargs): - """ - Execute an HTTP request with the current client session - - Do not use the retry policy to avoid side-effects. - """ - r = self.session.request(**kwargs) - - if not r.ok: - logger.debug('%s: %s', r, r.content) - r.raise_for_status() - return r + if is_idempotent: + return self.retrier.call(invoke_request) + else: + return invoke_request() def _create_retrier(self, max_retries, exponential_backoff, jitter): return Retrying(stop_max_attempt_number=max_retries + 1, diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index 787cd8c..3df8bcf 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -19,18 +19,13 @@ def __init__(self, client, key, auth=None): self.url = urlpathjoin(client.endpoint, self.key) def _iter_lines(self, _path, **kwargs): - is_idempotent = kwargs.pop('is_idempotent', False) or kwargs['method'] == 'GET' - kwargs['url'] = urlpathjoin(self.url, _path) kwargs.setdefault('auth', self.auth) kwargs.setdefault('timeout', self.client.connection_timeout) if 'jl' in kwargs: kwargs['data'] = jlencode(kwargs.pop('jl')) - if is_idempotent: - r = self.client.request_idempotent(**kwargs) - else: - r = self.client.request_nonidempotent(**kwargs) + r = self.client.request(**kwargs) return r.iter_lines() @@ -41,6 +36,7 @@ def apipost(self, _path=None, **kwargs): return self.apirequest(_path, method='POST', **kwargs) def apiget(self, _path=None, **kwargs): + kwargs.setdefault('is_idempotent', True) return self.apirequest(_path, method='GET', **kwargs) def apidelete(self, _path=None, **kwargs): From d3701789f627e9c9e80223ebcca3c0078023af11 Mon Sep 17 00:00:00 2001 From: lsenta Date: Wed, 18 Nov 2015 10:56:27 +0100 Subject: [PATCH 23/26] Add 408 to retried http errors Signed-off-by: lsenta --- hubstorage/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 2000c82..58497d9 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -19,12 +19,12 @@ logger = logging.getLogger('HubstorageClient') -_ERROR_CODES_TO_RETRY = (429, 503, 504) +_HTTP_ERROR_CODES_TO_RETRY = (408, 429, 503, 504) def _hc_retry_on_exception(err): """Callback used by the client to restrict the retry to acceptable errors""" - if (isinstance(err, HTTPError) and err.response.status_code in _ERROR_CODES_TO_RETRY): + if (isinstance(err, HTTPError) and err.response.status_code in _HTTP_ERROR_CODES_TO_RETRY): logger.warning("Server failed with %d status code, retrying (maybe)" % (err.response.status_code,)) return True From ea9d1652d231c1a65b99928041729dec24ac2557 Mon Sep 17 00:00:00 2001 From: lsenta Date: Wed, 18 Nov 2015 16:33:11 +0100 Subject: [PATCH 24/26] Add max_retry_time to the client options A client may define a max_retries and max_retry_time. Setting only max_retry=N means that the client will retry N times, no matter the time it takes. Signed-off-by: lsenta --- hubstorage/client.py | 62 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 58497d9..19790d1 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -45,17 +45,36 @@ class HubstorageClient(object): DEFAULT_ENDPOINT = 'http://storage.scrapinghub.com/' USERAGENT = 'python-hubstorage/{0}'.format(__version__) - DEFAULT_TIMEOUT = 60.0 - RETRY_EXPONENTIAL_BACKOFF_MS = 500 - RETRY_JITTER_MS = 500 + DEFAULT_CONNECTION_TIMEOUT_S = 60.0 + RETRY_DEFAUT_MAX_RETRY_TIME_S = 60.0 - def __init__(self, auth=None, endpoint=None, connection_timeout=None, - max_retries=3): + RETRY_DEFAULT_MAX_RETRIES = 3 + RETRY_DEFAULT_JITTER_MS = 500 + RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS = 500 + + def __init__(self, auth=None, endpoint=None, connection_timeout=None, max_retries=None, max_retry_time=None): + """ + Note: + max_retries and max_retry_time change how the client attempt to retry failing requests that are + idempotent (safe to execute multiple time). + + HubstorageClient(max_retries=3) will retry requests 3 times, no matter the time it takes. + Use max_retry_time if you want to bound the time spent in retrying. + + By default, requests are retried at most 3 times, during 60 seconds. + + Args: + auth (str): The client authentication token + endpoint (str): The API root address + connection_timeout (int): The connection timeout for a _single request_ + max_retries (int): The number of time idempotent requests may be retried + max_retry_time (int): The time, in seconds, during which the client can retry a request + """ self.auth = xauth(auth) self.endpoint = endpoint or self.DEFAULT_ENDPOINT - self.connection_timeout = connection_timeout or self.DEFAULT_TIMEOUT + self.connection_timeout = connection_timeout or self.DEFAULT_CONNECTION_TIMEOUT_S self.session = self._create_session() - self.retrier = self._create_retrier(max_retries, self.RETRY_EXPONENTIAL_BACKOFF_MS, self.RETRY_JITTER_MS) + self.retrier = self._create_retrier(max_retries, max_retry_time) self.jobq = JobQ(self, None) self.projects = Projects(self, None) self.root = ResourceType(self, None) @@ -81,11 +100,32 @@ def invoke_request(): else: return invoke_request() - def _create_retrier(self, max_retries, exponential_backoff, jitter): - return Retrying(stop_max_attempt_number=max_retries + 1, + def _create_retrier(self, max_retries, max_retry_time): + """ + Create the Retrier object used to process idempotent client requests. + + If only max_retries is set, the default max_retry_time is ignored. + + Args: + max_retries (int): the number of retries to be attempted + max_retry_time (int): the number of time, in seconds, to retry for. + Returns: + A Retrying instance, that implements a call(func) method. + """ + + # Client sets max_retries only + if max_retries is not None and max_retry_time is None: + stop_max_delay = None + stop_max_attempt_number = max_retries + 1 + else: + stop_max_delay = (max_retry_time or self.RETRY_DEFAUT_MAX_RETRY_TIME_S) * 1000.0 + stop_max_attempt_number = (max_retries or self.RETRY_DEFAULT_MAX_RETRIES) + 1 + + return Retrying(stop_max_attempt_number=stop_max_attempt_number, + stop_max_delay=stop_max_delay, retry_on_exception=_hc_retry_on_exception, - wait_exponential_multiplier=exponential_backoff, - wait_jitter_max=jitter) + wait_exponential_multiplier=self.RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS, + wait_jitter_max=self.RETRY_DEFAULT_JITTER_MS) def _create_session(self): s = session() From 7180fd87c90d661e21da4b390517e47ce670adb0 Mon Sep 17 00:00:00 2001 From: lsenta Date: Wed, 18 Nov 2015 16:39:46 +0100 Subject: [PATCH 25/26] Move set request timeout from resource to client Signed-off-by: lsenta --- hubstorage/client.py | 1 + hubstorage/resourcetype.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index 19790d1..c2e60ef 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -86,6 +86,7 @@ def request(self, is_idempotent=False, **kwargs): Use the retry policy configured in the client when is_idempotent is True """ + kwargs.setdefault('timeout', self.connection_timeout) def invoke_request(): r = self.session.request(**kwargs) diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index 3df8bcf..3156946 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -21,7 +21,6 @@ def __init__(self, client, key, auth=None): def _iter_lines(self, _path, **kwargs): kwargs['url'] = urlpathjoin(self.url, _path) kwargs.setdefault('auth', self.auth) - kwargs.setdefault('timeout', self.client.connection_timeout) if 'jl' in kwargs: kwargs['data'] = jlencode(kwargs.pop('jl')) From 33768c88c2869a35db0a33a5d44e62ebbb6439c7 Mon Sep 17 00:00:00 2001 From: lsenta Date: Thu, 19 Nov 2015 15:51:19 +0100 Subject: [PATCH 26/26] Add automatic computation for exponential backoff in retrier Signed-off-by: lsenta --- hubstorage/client.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hubstorage/client.py b/hubstorage/client.py index c2e60ef..157afc3 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -118,14 +118,20 @@ def _create_retrier(self, max_retries, max_retry_time): if max_retries is not None and max_retry_time is None: stop_max_delay = None stop_max_attempt_number = max_retries + 1 + wait_exponential_multiplier = self.RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS else: stop_max_delay = (max_retry_time or self.RETRY_DEFAUT_MAX_RETRY_TIME_S) * 1000.0 stop_max_attempt_number = (max_retries or self.RETRY_DEFAULT_MAX_RETRIES) + 1 + # Compute the backoff to allow for max_retries queries during the allowed delay + # Solves the following formula (assumes requests are immediate): + # max_retry_time = sum(exp_multiplier * 2 ** i) for i from 1 to max_retries + 1 + wait_exponential_multiplier = stop_max_delay / ((2 ** (stop_max_attempt_number + 1)) - 2) + return Retrying(stop_max_attempt_number=stop_max_attempt_number, stop_max_delay=stop_max_delay, retry_on_exception=_hc_retry_on_exception, - wait_exponential_multiplier=self.RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS, + wait_exponential_multiplier=wait_exponential_multiplier, wait_jitter_max=self.RETRY_DEFAULT_JITTER_MS) def _create_session(self):