diff --git a/README.rst b/README.rst index 39d6d30..035cd04 100644 --- a/README.rst +++ b/README.rst @@ -52,3 +52,10 @@ Example getting job data later:: [{'title': 'my first item'}] ... + + +Testing +------- + +Running the tests require the hubstorage backend to be running, +and the python `responses` library (see `tox.ini`). \ No newline at end of file diff --git a/hubstorage/client.py b/hubstorage/client.py index 41f7c97..157afc3 100644 --- a/hubstorage/client.py +++ b/hubstorage/client.py @@ -1,8 +1,11 @@ """ High level Hubstorage client """ +from httplib import BadStatusLine +import logging import pkgutil -from requests import session, adapters +from requests import session, adapters, HTTPError, ConnectionError, Timeout +from retrying import Retrying from .utils import xauth, urlpathjoin from .project import Project from .job import Job @@ -14,29 +17,125 @@ __version__ = pkgutil.get_data('hubstorage', 'VERSION').strip() +logger = logging.getLogger('HubstorageClient') + +_HTTP_ERROR_CODES_TO_RETRY = (408, 429, 503, 504) + + +def _hc_retry_on_exception(err): + """Callback used by the client to restrict the retry to acceptable errors""" + if (isinstance(err, HTTPError) and err.response.status_code in _HTTP_ERROR_CODES_TO_RETRY): + logger.warning("Server failed with %d status code, retrying (maybe)" % (err.response.status_code,)) + return True + + # TODO: python3 compatibility: BadStatusLine error are wrapped differently + if (isinstance(err, ConnectionError) and err.args[0] == 'Connection aborted.' and + isinstance(err.args[1], BadStatusLine) and err.args[1][0] == repr('')): + logger.warning("Protocol failed with BadStatusLine, retrying (maybe)") + return True + + if isinstance(err, Timeout): + logger.warning("Server connection timeout, retrying (maybe)") + return True + + return False + class HubstorageClient(object): DEFAULT_ENDPOINT = 'http://storage.scrapinghub.com/' USERAGENT = 'python-hubstorage/{0}'.format(__version__) - DEFAULT_TIMEOUT = 60.0 - def __init__(self, auth=None, endpoint=None, connection_timeout=None, - max_retries=0): + DEFAULT_CONNECTION_TIMEOUT_S = 60.0 + RETRY_DEFAUT_MAX_RETRY_TIME_S = 60.0 + + RETRY_DEFAULT_MAX_RETRIES = 3 + RETRY_DEFAULT_JITTER_MS = 500 + RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS = 500 + + def __init__(self, auth=None, endpoint=None, connection_timeout=None, max_retries=None, max_retry_time=None): + """ + Note: + max_retries and max_retry_time change how the client attempt to retry failing requests that are + idempotent (safe to execute multiple time). + + HubstorageClient(max_retries=3) will retry requests 3 times, no matter the time it takes. + Use max_retry_time if you want to bound the time spent in retrying. + + By default, requests are retried at most 3 times, during 60 seconds. + + Args: + auth (str): The client authentication token + endpoint (str): The API root address + connection_timeout (int): The connection timeout for a _single request_ + max_retries (int): The number of time idempotent requests may be retried + max_retry_time (int): The time, in seconds, during which the client can retry a request + """ self.auth = xauth(auth) self.endpoint = endpoint or self.DEFAULT_ENDPOINT - self.connection_timeout = connection_timeout or self.DEFAULT_TIMEOUT - self.session = self._create_session(max_retries) + self.connection_timeout = connection_timeout or self.DEFAULT_CONNECTION_TIMEOUT_S + self.session = self._create_session() + self.retrier = self._create_retrier(max_retries, max_retry_time) self.jobq = JobQ(self, None) self.projects = Projects(self, None) self.root = ResourceType(self, None) self._batchuploader = None - def _create_session(self, max_retries): + def request(self, is_idempotent=False, **kwargs): + """ + Execute an HTTP request with the current client session. + + Use the retry policy configured in the client when is_idempotent is True + """ + kwargs.setdefault('timeout', self.connection_timeout) + + def invoke_request(): + r = self.session.request(**kwargs) + + if not r.ok: + logger.debug('%s: %s', r, r.content) + r.raise_for_status() + return r + + if is_idempotent: + return self.retrier.call(invoke_request) + else: + return invoke_request() + + def _create_retrier(self, max_retries, max_retry_time): + """ + Create the Retrier object used to process idempotent client requests. + + If only max_retries is set, the default max_retry_time is ignored. + + Args: + max_retries (int): the number of retries to be attempted + max_retry_time (int): the number of time, in seconds, to retry for. + Returns: + A Retrying instance, that implements a call(func) method. + """ + + # Client sets max_retries only + if max_retries is not None and max_retry_time is None: + stop_max_delay = None + stop_max_attempt_number = max_retries + 1 + wait_exponential_multiplier = self.RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS + else: + stop_max_delay = (max_retry_time or self.RETRY_DEFAUT_MAX_RETRY_TIME_S) * 1000.0 + stop_max_attempt_number = (max_retries or self.RETRY_DEFAULT_MAX_RETRIES) + 1 + + # Compute the backoff to allow for max_retries queries during the allowed delay + # Solves the following formula (assumes requests are immediate): + # max_retry_time = sum(exp_multiplier * 2 ** i) for i from 1 to max_retries + 1 + wait_exponential_multiplier = stop_max_delay / ((2 ** (stop_max_attempt_number + 1)) - 2) + + return Retrying(stop_max_attempt_number=stop_max_attempt_number, + stop_max_delay=stop_max_delay, + retry_on_exception=_hc_retry_on_exception, + wait_exponential_multiplier=wait_exponential_multiplier, + wait_jitter_max=self.RETRY_DEFAULT_JITTER_MS) + + def _create_session(self): s = session() - if max_retries > 0: - a = adapters.HTTPAdapter(max_retries=max_retries) - s.mount('http://', a) - s.mount('https://', a) s.headers.update({'User-Agent': self.USERAGENT}) return s diff --git a/hubstorage/collectionsrt.py b/hubstorage/collectionsrt.py index 9071543..f842947 100644 --- a/hubstorage/collectionsrt.py +++ b/hubstorage/collectionsrt.py @@ -21,7 +21,7 @@ def get(self, _type, _name, _key=None, **params): def set(self, _type, _name, _values): try: - return self.apipost((_type, _name), jl=_values) + return self.apipost((_type, _name), is_idempotent=True, jl=_values) except HTTPError as exc: if exc.response.status_code in (400, 413): raise ValueError(exc.response.text) @@ -29,7 +29,7 @@ def set(self, _type, _name, _values): raise def delete(self, _type, _name, _keys): - return self.apipost((_type, _name, 'deleted'), jl=_keys) + return self.apipost((_type, _name, 'deleted'), is_idempotent=True, jl=_keys) def iter_json(self, _type, _name, requests_params=None, **apiparams): return DownloadableResource.iter_json(self, (_type, _name), diff --git a/hubstorage/resourcetype.py b/hubstorage/resourcetype.py index f681b60..3156946 100644 --- a/hubstorage/resourcetype.py +++ b/hubstorage/resourcetype.py @@ -21,13 +21,11 @@ def __init__(self, client, key, auth=None): def _iter_lines(self, _path, **kwargs): kwargs['url'] = urlpathjoin(self.url, _path) kwargs.setdefault('auth', self.auth) - kwargs.setdefault('timeout', self.client.connection_timeout) if 'jl' in kwargs: kwargs['data'] = jlencode(kwargs.pop('jl')) - r = self.client.session.request(**kwargs) - if not r.ok: - logger.debug('%s: %s', r, r.content) - r.raise_for_status() + + r = self.client.request(**kwargs) + return r.iter_lines() def apirequest(self, _path=None, **kwargs): @@ -37,9 +35,11 @@ def apipost(self, _path=None, **kwargs): return self.apirequest(_path, method='POST', **kwargs) def apiget(self, _path=None, **kwargs): + kwargs.setdefault('is_idempotent', True) return self.apirequest(_path, method='GET', **kwargs) def apidelete(self, _path=None, **kwargs): + kwargs.setdefault('is_idempotent', True) return self.apirequest(_path, method='DELETE', **kwargs) @@ -186,10 +186,11 @@ def save(self): self._deleted.clear() if self._cached: if not self.ignore_fields: - self.apipost(jl=self._data) + self.apipost(jl=self._data, is_idempotent=True) else: self.apipost(jl=dict((k, v) for k, v in self._data.iteritems() - if k not in self.ignore_fields)) + if k not in self.ignore_fields), + is_idempotent=True) def __getitem__(self, key): return self._data[key] diff --git a/requirements.txt b/requirements.txt index 1ff50f7..0e8deb5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ requests>=1.0 +retrying>=1.3.3 \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_client.py b/tests/test_client.py index 9e04ea9..3f6b745 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -2,25 +2,10 @@ Test Client """ from hstestcase import HSTestCase -from hubstorage import HubstorageClient from hubstorage.utils import millitime, apipoll - class ClientTest(HSTestCase): - def test_connect_retry(self): - c = HubstorageClient(auth=self.auth, endpoint=self.endpoint, - max_retries=2) - c.push_job(self.projectid, self.spidername) - job = c.start_job(projectid=self.projectid) - m = job.metadata - self.assertEqual(m.get('state'), u'running', dict(m)) - m.expire() - retries = c.session.adapters['http://'].max_retries - if not isinstance(retries, int): - retries = retries.total - self.assertEqual(retries, 2) - def test_push_job(self): c = self.hsclient c.push_job(self.projectid, self.spidername, diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..424dcf4 --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,293 @@ +""" +Test Retry Policy +""" +from httplib import BadStatusLine +from requests import HTTPError, ConnectionError +from hstestcase import HSTestCase +from hubstorage import HubstorageClient +import responses +import json +import re + +GET = responses.GET +POST = responses.POST +DELETE = responses.DELETE + + +class RetryTest(HSTestCase): + def test_delete_on_hubstorage_api_does_not_404(self): + # NOTE: The current Hubstorage API does not raise 404 errors on deleting resources that do not exist, + # Thus the retry policy does not catch 404 errors when retrying deletes (simplify implementation A LOT). + # This test checks that this assumption holds. + + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=0) + project = client.get_project(projectid=self.projectid) + + # Check frontier delete + project.frontier.delete_slot('frontier_non_existing', 'slot_non_existing') + + # Check metadata delete + job = client.push_job(self.projectid, self.spidername) + job.metadata['foo'] = 'bar' # Add then delete key, this will trigger an api delete for item foo + del job.metadata['foo'] + job.metadata.save() + + # Check collections delete + store = project.collections.new_store('foo') + store.set({'_key': 'foo'}) + store.delete('bar') + + self.assertTrue(True, "No error have been triggered by calling a delete on resources that do not exist") + + @responses.activate + def test_retrier_does_not_catch_unwanted_exception(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=2) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(3, job_metadata, http_error_status=403) + + self.mock_api(callback=callback) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + self.assertIsNone(metadata) + self.assertIsNotNone(err) + self.assertEqual(err.response.status_code, 403) + self.assertEqual(attempts_count[0], 1) + + @responses.activate + def test_retrier_catches_badstatusline_and_429(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + + attempts_count = [0] # use a list for nonlocal mutability used in request_callback + + def request_callback(request): + attempts_count[0] += 1 + + if attempts_count[0] <= 2: + raise ConnectionError("Connection aborted.", BadStatusLine("''")) + if attempts_count[0] == 3: + return (429, {}, {}) + else: + resp_body = dict(job_metadata) + return (200, {}, json.dumps(resp_body)) + + self.mock_api(callback=request_callback) + + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + + # Assert + self.assertEqual(dict(job_metadata), dict(job.metadata)) + self.assertEqual(attempts_count[0], 4) + + @responses.activate + def test_api_delete_can_be_set_to_non_idempotent(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback_delete, attempts_count_delete = self.make_request_callback(2, job_metadata) + + self.mock_api(method=DELETE, callback=callback_delete) + + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + + err = None + try: + job.metadata.apidelete('/my/non/idempotent/delete/', is_idempotent=False) + except HTTPError as e: + err = e + + # Assert + self.assertEqual(attempts_count_delete[0], 1) + self.assertIsNotNone(err) + + @responses.activate + def test_collection_store_and_delete_are_retried(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + + callback_post, attempts_count_post = self.make_request_callback(2, []) + callback_delete, attempts_count_delete = self.make_request_callback(2, []) + + self.mock_api(method=POST, callback=callback_delete, url_match='/.*/deleted') + self.mock_api(method=POST, callback=callback_post) # /!\ default regexp matches all paths, has to be added last + + # Act + project = client.get_project(self.projectid) + store = project.collections.new_store('foo') + store.set({'_key': 'bar', 'content': 'value'}) + store.delete('baz') + + # Assert + self.assertEqual(attempts_count_post[0], 3) + self.assertEqual(attempts_count_delete[0], 3) + + @responses.activate + def test_delete_requests_are_retried(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback_getpost, attempts_count_getpost = self.make_request_callback(0, job_metadata) + callback_delete, attempts_count_delete = self.make_request_callback(2, job_metadata) + + self.mock_api(method=GET, callback=callback_getpost) + self.mock_api(method=POST, callback=callback_getpost) + self.mock_api(method=DELETE, callback=callback_delete) + + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + job.metadata['foo'] = 'bar' + del job.metadata['foo'] + job.metadata.save() + + # Assert + self.assertEqual(attempts_count_delete[0], 3) + + @responses.activate + def test_metadata_save_does_retry(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback_get, attempts_count_get = self.make_request_callback(0, job_metadata) + callback_post, attempts_count_post = self.make_request_callback(2, job_metadata) + + self.mock_api(method=GET, callback=callback_get) + self.mock_api(method=POST, callback=callback_post) + + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + job.metadata['foo'] = 'bar' + job.metadata.save() + + # Assert + self.assertEqual(attempts_count_post[0], 3) + + @responses.activate + def test_push_job_does_not_retry(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + callback, attempts_count = self.make_request_callback(2, {'key': '1/2/3'}) + + self.mock_api(POST, callback=callback) + + # Act + job, err = None, None + try: + job = client.push_job(self.projectid, self.spidername) + except HTTPError as e: + err = e + + # Assert + self.assertIsNone(job) + self.assertIsNotNone(err) + self.assertEqual(err.response.status_code, 504) + self.assertEqual(attempts_count[0], 1) + + @responses.activate + def test_get_job_does_retry(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(2, job_metadata) + + self.mock_api(callback=callback) + + # Act + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + + # Assert + self.assertEqual(dict(job_metadata), dict(job.metadata)) + self.assertEqual(attempts_count[0], 3) + + @responses.activate + def test_get_job_does_fails_if_no_retries(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=0) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(2, job_metadata) + + self.mock_api(callback=callback) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + self.assertIsNone(metadata) + self.assertIsNotNone(err) + self.assertEqual(err.response.status_code, 504) + self.assertEqual(attempts_count[0], 1) + + @responses.activate + def test_get_job_does_fails_on_too_many_retries(self): + # Prepare + client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=2) + job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} + callback, attempts_count = self.make_request_callback(3, job_metadata) + + self.mock_api(callback=callback) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + self.assertIsNone(metadata) + self.assertIsNotNone(err) + self.assertEqual(err.response.status_code, 504) + self.assertEqual(attempts_count[0], 3) + + def mock_api(self, method=GET, callback=None, url_match='/.*'): + """ + Mock an API URL using the responses library. + + Args: + method (Optional[str]): The HTTP method to mock. Defaults to responses.GET + callback (function(request) -> response): + url_match (Optional[str]): The API URL regexp. Defaults to '/.*'. + """ + responses.add_callback( + method, re.compile(self.endpoint + url_match), + callback=callback, + content_type='application/json', + ) + + def make_request_callback(self, timeout_count, body_on_success, http_error_status=504): + """ + Make a request callback that timeout a couple of time before returning body_on_success + + Returns: + A tuple (request_callback, attempts), attempts is an array of size one that contains the number of time + request_callback has been called. + """ + attempts = [0] # use a list for nonlocal mutability used in request_callback + + def request_callback(request): + attempts[0] += 1 + + if attempts[0] <= timeout_count: + return (http_error_status, {}, "Timeout") + else: + resp_body = dict(body_on_success) + return (200, {}, json.dumps(resp_body)) + + return (request_callback, attempts) diff --git a/tox.ini b/tox.ini index fc2bf93..0428973 100644 --- a/tox.ini +++ b/tox.ini @@ -4,10 +4,12 @@ # and then run "tox" from this directory. [tox] -envlist = py27, pypy, py33 +envlist = py27, pypy, py3 [testenv] deps = -rrequirements.txt + responses==0.5.0 nose + commands = nosetests []