diff --git a/requirements-test.txt b/requirements-test.txt index e89378fc..a1171b1d 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,7 @@ -r requirements-pypy.txt mock +vcrpy==1.10.3 pytest pytest-cov -responses==0.5.0 \ No newline at end of file +responses==0.5.0 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..b1685dd4 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- + + +def pytest_addoption(parser): + parser.addoption( + "--update-cassettes", action="store_true", default=False, + help="test with real services rewriting existing vcr cassettes") + parser.addoption( + "--ignore-cassettes", action="store_true", default=False, + help="test with real services skipping existing vcr cassettes") diff --git a/tests/hubstorage/conftest.py b/tests/hubstorage/conftest.py new file mode 100644 index 00000000..63d09604 --- /dev/null +++ b/tests/hubstorage/conftest.py @@ -0,0 +1,185 @@ +import os +import zlib +import base64 +import pickle + +import vcr +import pytest +import shutil +import requests +from requests import HTTPError + +from scrapinghub import HubstorageClient +from scrapinghub.hubstorage.utils import urlpathjoin + + +TEST_PROJECT_ID = "2222222" +TEST_SPIDER_NAME = 'hs-test-spider' +TEST_FRONTIER_NAME = 'test' +TEST_FRONTIER_SLOT = 'site.com' +TEST_BOTGROUP = 'python-hubstorage-test' +TEST_COLLECTION_NAME = "test_collection_123" +TEST_AUTH = os.getenv('HS_AUTH', 'f' * 32) +TEST_ENDPOINT = os.getenv('HS_ENDPOINT', 'http://storage.vm.scrapinghub.com') + +# vcrpy creates the cassetes automatically under VCR_CASSETES_DIR +VCR_CASSETES_DIR = 'tests/hubstorage/cassetes' + + +class VCRGzipSerializer(object): + """Custom ZIP serializer for VCR.py.""" + + def serialize(self, cassette_dict): + # receives a dict, must return a string + # there can be binary data inside some of the requests, + # so it's impossible to use json for serialization to string + compressed = zlib.compress(pickle.dumps(cassette_dict, protocol=2)) + return base64.b64encode(compressed).decode('utf8') + + def deserialize(self, cassette_string): + # receives a string, must return a dict + decoded = base64.b64decode(cassette_string.encode('utf8')) + return pickle.loads(zlib.decompress(decoded)) + + +my_vcr = vcr.VCR(cassette_library_dir=VCR_CASSETES_DIR, record_mode='once') +my_vcr.register_serializer('gz', VCRGzipSerializer()) +my_vcr.serializer = 'gz' + + +def pytest_configure(config): + if config.option.update_cassettes: + # there's vcr `all` mode to update cassettes but it doesn't delete + # or clear existing records, so its size will always only grow + if os.path.exists(VCR_CASSETES_DIR): + shutil.rmtree(VCR_CASSETES_DIR) + elif config.option.ignore_cassettes: + # simple hack to just ignore vcr cassettes: + # - all record_mode means recording new interactions + no replay + # - before_record returning None means skipping all the requests + global my_vcr + my_vcr.record_mode = 'all' + my_vcr.before_record_request = lambda request: None + + +def is_using_real_services(request): + return (request.config.option.update_cassettes or + request.config.option.ignore_cassettes) + + +@pytest.fixture(scope='session') +def hsclient(): + return HubstorageClient(auth=TEST_AUTH, endpoint=TEST_ENDPOINT) + + +@pytest.fixture(scope='session') +def hsproject(hsclient): + return hsclient.get_project(TEST_PROJECT_ID) + + +@my_vcr.use_cassette() +@pytest.fixture(scope='session') +def hsspiderid(hsproject): + return str(hsproject.ids.spider(TEST_SPIDER_NAME, create=1)) + + +@pytest.fixture(scope='session') +def hscollection(hsproject, request): + collection = get_test_collection(hsproject) + if is_using_real_services(request): + clean_collection(collection) + yield collection + + +@pytest.fixture(autouse=True, scope='session') +def setup_session(hsclient, hsproject, hscollection, request): + if is_using_real_services(request): + set_testbotgroup(hsproject) + remove_all_jobs(hsproject) + yield + hsclient.close() + + +@pytest.fixture(autouse=True) +def setup_vcrpy(request, hsproject): + # generates names like "test_module/test_function.yaml" + # otherwise it uses current function name (setup_vcrpy) for all tests + # other option is to add vcr decorator to each test separately + cassette_name = '{}/{}.gz'.format( + request.function.__module__.split('.')[-1], + request.function.__name__ + ) + if is_using_real_services(request): + remove_all_jobs(hsproject) + with my_vcr.use_cassette(cassette_name): + yield + + +# ---------------------------------------------------------------------------- + + +def start_job(hsproject, **startparams): + jobdata = hsproject.jobq.start(**startparams) + if jobdata: + jobkey = jobdata.pop('key') + jobauth = (jobkey, jobdata['auth']) + return hsproject.get_job(jobkey, jobauth=jobauth, metadata=jobdata) + + +# Clean environment section + + +def remove_all_jobs(hsproject): + for k in list(hsproject.settings.keys()): + if k != 'botgroups': + del hsproject.settings[k] + hsproject.settings.save() + + # Cleanup JobQ: run 2 times to ensure we covered all jobs + for queuename in ('pending', 'running', 'finished')*2: + info = hsproject.jobq.summary(queuename) + for summary in info['summary']: + _remove_job(hsproject, summary['key']) + + +def _remove_job(hsproject, jobkey): + hsproject.jobq.finish(jobkey) + hsproject.jobq.delete(jobkey) + # delete job + assert jobkey.startswith(TEST_PROJECT_ID), jobkey + hsproject.jobs.apidelete(jobkey.partition('/')[2]) + +# Collection helpers section + + +def get_test_collection(project): + return project.collections.new_store(TEST_COLLECTION_NAME) + + +def clean_collection(collection): + try: + for item in collection.iter_values(): + collection.delete(item['_key']) + except HTTPError as e: + # if collection doesn't exist yet service responds 404 + if e.response.status_code != 404: + raise + + +# Botgroups helpers section + + +def set_testbotgroup(hsproject): + hsproject.settings.apipost(jl={'botgroups': [TEST_BOTGROUP]}) + # Additional step to populate JobQ's botgroups table + url = urlpathjoin(TEST_ENDPOINT, 'botgroups', TEST_BOTGROUP, 'max_running') + requests.post(url, auth=hsproject.auth, data='null') + hsproject.settings.expire() + + +def unset_testbotgroup(hsproject): + hsproject.settings.apidelete('botgroups') + hsproject.settings.expire() + # Additional step to delete botgroups in JobQ + url = urlpathjoin(TEST_ENDPOINT, 'botgroups', TEST_BOTGROUP) + requests.delete(url, auth=hsproject.auth) diff --git a/tests/hubstorage/hstestcase.py b/tests/hubstorage/hstestcase.py deleted file mode 100644 index 8b09c449..00000000 --- a/tests/hubstorage/hstestcase.py +++ /dev/null @@ -1,102 +0,0 @@ -import os -import unittest -import random -import requests - -from scrapinghub import HubstorageClient -from scrapinghub.hubstorage.utils import urlpathjoin - - -class HSTestCase(unittest.TestCase): - - projectid = str(random.randint(2222000, 2223000)) - spidername = 'hs-test-spider' - endpoint = os.getenv('HS_ENDPOINT', 'http://storage.vm.scrapinghub.com') - auth = os.getenv('HS_AUTH', 'f' * 32) - frontier = 'test' - slot = 'site.com' - testbotgroups = ['python-hubstorage-test', 'g1'] - - @classmethod - def setUpClass(cls): - cls.hsclient = HubstorageClient(auth=cls.auth, endpoint=cls.endpoint) - cls.project = cls.hsclient.get_project(cls.projectid) - cls.spiderid = str(cls.project.ids.spider(cls.spidername, create=1)) - cls._set_testbotgroup() - - def setUp(self): - self._set_testbotgroup() - self._remove_all_jobs() - - def tearDown(self): - self._remove_all_jobs() - - @classmethod - def tearDownClass(cls): - cls.hsclient.close() - cls._remove_all_jobs() - cls._unset_testbotgroup() - - @classmethod - def _remove_all_jobs(cls): - project = cls.project - for k in list(project.settings.keys()): - if k != 'botgroups': - del project.settings[k] - project.settings.save() - - # Cleanup JobQ - jobq = project.jobq - for queuename in ('pending', 'running', 'finished'): - info = {'summary': [None]} # poor-guy do...while - while info['summary']: - info = jobq.summary(queuename) - for summary in info['summary']: - cls._remove_job(summary['key']) - - @classmethod - def _remove_job(cls, jobkey): - cls.project.jobq.finish(jobkey) - cls.project.jobq.delete(jobkey) - cls._delete_job(jobkey) - - @classmethod - def _delete_job(cls, jobkey): - assert jobkey.startswith(cls.projectid), jobkey - cls.project.jobs.apidelete(jobkey.partition('/')[2]) - - @classmethod - def _set_testbotgroup(cls): - cls.project.settings.apipost(jl={'botgroups': [cls.testbotgroups[0]]}) - # Additional step to populate JobQ's botgroups table - for botgroup in cls.testbotgroups: - url = urlpathjoin(cls.endpoint, 'botgroups', - botgroup, 'max_running') - requests.post(url, auth=cls.project.auth, data='null') - cls.project.settings.expire() - - @classmethod - def _unset_testbotgroup(cls): - cls.project.settings.apidelete('botgroups') - cls.project.settings.expire() - # Additional step to delete botgroups in JobQ - for botgroup in cls.testbotgroups: - url = urlpathjoin(cls.endpoint, 'botgroups', botgroup) - requests.delete(url, auth=cls.project.auth) - - def start_job(self, **startparams): - jobdata = self.project.jobq.start(**startparams) - if jobdata: - jobkey = jobdata.pop('key') - jobauth = (jobkey, jobdata['auth']) - return self.project.get_job(jobkey, jobauth=jobauth, metadata=jobdata) - - -class NopTest(HSTestCase): - - def test_nop(self): - pass # hooray! - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/hubstorage/test_activity.py b/tests/hubstorage/test_activity.py index 302cf0af..84284845 100644 --- a/tests/hubstorage/test_activity.py +++ b/tests/hubstorage/test_activity.py @@ -1,34 +1,33 @@ """ Test Activty """ -from .hstestcase import HSTestCase from six.moves import range -class ActivityTest(HSTestCase): +def test_post_and_reverse_get(hsproject): + # make some sample data + orig_data = [{u'foo': 42, u'counter': i} for i in range(20)] + data1 = orig_data[:10] + data2 = orig_data[10:] - def test_post_and_reverse_get(self): - # make some sample data - orig_data = [{u'foo': 42, u'counter': i} for i in range(20)] - data1 = orig_data[:10] - data2 = orig_data[10:] + # put ordered data in 2 separate posts + hsproject.activity.post(data1) + hsproject.activity.post(data2) - # put ordered data in 2 separate posts - self.project.activity.post(data1) - self.project.activity.post(data2) + # read them back in reverse chronological order + result = list(hsproject.activity.list(count=20)) + assert len(result) == 20 + assert orig_data[::-1] == result - # read them back in reverse chronological order - result = list(self.project.activity.list(count=20)) - self.assertEqual(len(result), 20) - self.assertEqual(orig_data[::-1], result) - def test_filters(self): - self.project.activity.post({'c': i} for i in range(10)) - r = list(self.project.activity.list(filter='["c", ">", [5]]', count=2)) - self.assertEqual(r, [{'c': 9}, {'c': 8}]) +def test_filters(hsproject): + hsproject.activity.post({'c': i} for i in range(10)) + r = list(hsproject.activity.list(filter='["c", ">", [5]]', count=2)) + assert r == [{'c': 9}, {'c': 8}] - def test_timestamp(self): - self.project.activity.add({'foo': 'bar'}, baz='qux') - entry = next(self.project.activity.list(count=1, meta='_ts')) - self.assertTrue(entry.pop('_ts', None)) - self.assertEqual(entry, {'foo': 'bar', 'baz': 'qux'}) + +def test_timestamp(hsproject): + hsproject.activity.add({'foo': 'bar'}, baz='qux') + entry = next(hsproject.activity.list(count=1, meta='_ts')) + assert entry.pop('_ts', None) + assert entry == {'foo': 'bar', 'baz': 'qux'} diff --git a/tests/hubstorage/test_batchuploader.py b/tests/hubstorage/test_batchuploader.py index b42e6732..3210936d 100644 --- a/tests/hubstorage/test_batchuploader.py +++ b/tests/hubstorage/test_batchuploader.py @@ -2,73 +2,82 @@ Test Project """ import time +import pytest from six.moves import range from collections import defaultdict -from .hstestcase import HSTestCase from scrapinghub.hubstorage import ValueTooLarge +from .conftest import TEST_SPIDER_NAME, TEST_AUTH +from .conftest import start_job -class BatchUploaderTest(HSTestCase): +def _job_and_writer(hsclient, hsproject, **writerargs): + hsproject.push_job(TEST_SPIDER_NAME) + job = start_job(hsproject) + batch_uploader = hsclient.batchuploader + writer = batch_uploader.create_writer( + job.items.url, auth=TEST_AUTH, **writerargs) + return job, writer - def _job_and_writer(self, **writerargs): - self.project.push_job(self.spidername) - job = self.start_job() - bu = self.hsclient.batchuploader - w = bu.create_writer(job.items.url, auth=self.auth, **writerargs) - return job, w - def test_writer_batchsize(self): - job, w = self._job_and_writer(size=10) - for x in range(111): - w.write({'x': x}) - w.close() - # this works only for small batches (previous size=10 and small data) - # as internally HS may commit a single large request as many smaller - # commits, each with different timestamps - groups = defaultdict(int) - for doc in job.items.list(meta=['_ts']): - groups[doc['_ts']] += 1 - - self.assertEqual(len(groups), 12) - - def test_writer_maxitemsize(self): - job, w = self._job_and_writer() - m = w.maxitemsize - self.assertRaisesRegexp( - ValueTooLarge, - 'Value exceeds max encoded size of 1048576 bytes:' - ' \'{"b": "x+\\.\\.\\.\'', - w.write, {'b': 'x' * m}) - self.assertRaisesRegexp( - ValueTooLarge, - 'Value exceeds max encoded size of 1048576 bytes:' - ' \'{"b+\\.\\.\\.\'', - w.write, {'b'*m: 'x'}) - self.assertRaisesRegexp( - ValueTooLarge, - 'Value exceeds max encoded size of 1048576 bytes:' - ' \'{"b+\\.\\.\\.\'', - w.write, {'b'*(m//2): 'x'*(m//2)}) - - def test_writer_contentencoding(self): - for ce in ('identity', 'gzip'): - job, w = self._job_and_writer(content_encoding=ce) - for x in range(111): - w.write({'x': x}) - w.close() - self.assertEqual(job.items.stats()['totals']['input_values'], 111) - - def test_writer_interval(self): - job, w = self._job_and_writer(size=1000, interval=1) +def test_writer_batchsize(hsclient, hsproject): + job, writer = _job_and_writer(hsclient, hsproject, size=10) + for x in range(111): + writer.write({'x': x}) + writer.close() + # this works only for small batches (previous size=10 and small data) + # as internally HS may commit a single large request as many smaller + # commits, each with different timestamps + groups = defaultdict(int) + for doc in job.items.list(meta=['_ts']): + groups[doc['_ts']] += 1 + + assert len(groups) == 12 + + +def test_writer_maxitemsize(hsclient, hsproject): + _, writer = _job_and_writer(hsclient, hsproject) + max_size = writer.maxitemsize + with pytest.raises(ValueTooLarge) as excinfo1: + writer.write({'b': 'x' * max_size}) + excinfo1.match( + r'Value exceeds max encoded size of 1048576 bytes:' + ' \'{"b": "x+\\.\\.\\.\'') + + with pytest.raises(ValueTooLarge) as excinfo2: + writer.write({'b'*max_size: 'x'}) + excinfo2.match( + r'Value exceeds max encoded size of 1048576 bytes:' + ' \'{"b+\\.\\.\\.\'') + + with pytest.raises(ValueTooLarge) as excinfo3: + writer.write({'b'*(max_size//2): 'x'*(max_size//2)}) + excinfo3.match( + r'Value exceeds max encoded size of 1048576 bytes:' + ' \'{"b+\\.\\.\\.\'') + + +def test_writer_contentencoding(hsclient, hsproject): + for ce in ('identity', 'gzip'): + job, writer = _job_and_writer(hsclient, hsproject, + content_encoding=ce) for x in range(111): - w.write({'x': x}) - if x == 50: - time.sleep(2) + writer.write({'x': x}) + writer.close() + assert job.items.stats()['totals']['input_values'] == 111 + + +def test_writer_interval(hsclient, hsproject): + job, writer = _job_and_writer(hsclient, hsproject, + size=1000, interval=1) + for x in range(111): + writer.write({'x': x}) + if x == 50: + time.sleep(2) - w.close() - groups = defaultdict(int) - for doc in job.items.list(meta=['_ts']): - groups[doc['_ts']] += 1 + writer.close() + groups = defaultdict(int) + for doc in job.items.list(meta=['_ts']): + groups[doc['_ts']] += 1 - self.assertEqual(len(groups), 2) + assert len(groups) == 2 diff --git a/tests/hubstorage/test_client.py b/tests/hubstorage/test_client.py index f2b93071..532d9472 100644 --- a/tests/hubstorage/test_client.py +++ b/tests/hubstorage/test_client.py @@ -1,53 +1,59 @@ """ Test Client """ -from .hstestcase import HSTestCase from scrapinghub import HubstorageClient from scrapinghub.hubstorage.utils import apipoll -class ClientTest(HSTestCase): - - def test_default_ua(self): - self.assertEqual(self.hsclient.user_agent, - HubstorageClient.DEFAULT_USER_AGENT) - - def test_custom_ua(self): - client = HubstorageClient(auth=HSTestCase.auth, - endpoint=HSTestCase.endpoint, - user_agent='testUA') - self.assertEqual(client.user_agent, 'testUA') - - def test_push_job(self): - c = self.hsclient - c.push_job(self.projectid, self.spidername, - priority=self.project.jobq.PRIO_LOW, - foo='baz') - job = self.start_job() - m = job.metadata - self.assertEqual(m.get('state'), u'running', c.auth) - self.assertEqual(m.get('foo'), u'baz') - self.project.jobq.finish(job) - self.project.jobq.delete(job) - - # job auth token is valid only while job is running - m = c.get_job(job.key).metadata - self.assertEqual(m.get('state'), u'deleted') - self.assertEqual(m.get('foo'), u'baz') - - def test_jobsummaries(self): - hsc = self.hsclient - # add at least one running or pending job to ensure summary is returned - hsc.push_job(self.projectid, self.spidername, state='running') - - def _get_summary(): - jss = hsc.projects.jobsummaries() - mjss = dict((str(js['project']), js) for js in jss) - return mjss.get(self.projectid) - summary = apipoll(_get_summary) - self.assertIsNotNone(summary) - - def test_timestamp(self): - ts1 = self.hsclient.server_timestamp() - ts2 = self.hsclient.server_timestamp() - self.assertGreater(ts1, 0) - self.assertLessEqual(ts1, ts2) +from .conftest import TEST_AUTH, TEST_ENDPOINT +from .conftest import TEST_PROJECT_ID, TEST_SPIDER_NAME +from .conftest import start_job + + +def test_default_ua(hsclient): + assert hsclient.user_agent == HubstorageClient.DEFAULT_USER_AGENT + + +def test_custom_ua(): + client = HubstorageClient(auth=TEST_AUTH, + endpoint=TEST_ENDPOINT, + user_agent='testUA') + assert client.user_agent == 'testUA' + + +def test_push_job(hsclient, hsproject): + hsclient.push_job( + TEST_PROJECT_ID, TEST_SPIDER_NAME, + priority=hsproject.jobq.PRIO_LOW, + foo='baz', + ) + job = start_job(hsproject) + meta = job.metadata + assert meta.get('state') == u'running', hsclient.auth + assert meta.get('foo') == u'baz' + hsproject.jobq.finish(job) + hsproject.jobq.delete(job) + + # job auth token is valid only while job is running + meta = hsclient.get_job(job.key).metadata + assert meta.get('state') == u'deleted' + assert meta.get('foo') == u'baz' + + +def test_jobsummaries(hsclient): + # add at least one running or pending job to ensure summary is returned + hsclient.push_job(TEST_PROJECT_ID, TEST_SPIDER_NAME, state='running') + + def _get_summary(): + jss = hsclient.projects.jobsummaries() + mjss = dict((str(js['project']), js) for js in jss) + return mjss.get(TEST_PROJECT_ID) + + summary = apipoll(_get_summary) + assert summary is not None + + +def test_timestamp(hsclient): + ts1 = hsclient.server_timestamp() + ts2 = hsclient.server_timestamp() + assert ts1 > 0 + assert ts1 <= ts2 diff --git a/tests/hubstorage/test_collections.py b/tests/hubstorage/test_collections.py index 50cb9454..291289f2 100644 --- a/tests/hubstorage/test_collections.py +++ b/tests/hubstorage/test_collections.py @@ -8,123 +8,130 @@ from scrapinghub import HubstorageClient from six.moves import range -from .hstestcase import HSTestCase +from .conftest import TEST_COLLECTION_NAME from .testutil import failing_downloader def _mkitem(): - return dict(field1='value1', field2=['value2a', 'value2b'], field3=3, field4={'v4k': 'v4v'}) - -def random_collection_name(): - return "test_collection_%s" % random.randint(1, 1000000) - -class CollectionsTest(HSTestCase): - - # For fixed tests (test_errors, test_data_download) - test_collection_name = random_collection_name() - - def test_simple_count(self): - coll_name = random_collection_name() - test_item = dict(_mkitem()) - test_item['_key'] = 'a' - - collection = self.project.collections.new_store(coll_name) - collection.set(test_item) - assert collection.count() == 1 - - def post_get_delete_test(self): - test_item = _mkitem() - item_to_send = dict(test_item) - item_to_send['_key'] = test_key = 'insert_test_key' - - test_collections = [ - self.project.collections.new_store(self.test_collection_name), - self.project.collections.new_cached_store(self.test_collection_name), - self.project.collections.new_versioned_store(self.test_collection_name), - self.project.collections.new_versioned_cached_store(self.test_collection_name), - ] - - test_collections.extend( - self.project.collections.new_collection(t, self.test_collection_name + 'b') - for t in ('s', 'vs', 'cs', 'vcs')) - - for col in test_collections: - col.set(item_to_send) - returned_item = col.get(test_key) - self.assertEqual(test_item, returned_item) - col.delete(test_key) - self.assertRaises(KeyError, col.get, test_key) - - def post_scan_test(self): - col = self.project.collections.new_store(self.test_collection_name) - - # populate with 20 items - test_item = _mkitem() - last_key = None - with closing(col.create_writer()) as writer: - for i in range(20): - test_item['_key'] = last_key = "post_scan_test%d" % i - test_item['counter'] = i - writer.write(test_item) - - # check last value is as expected - returned_item = col.get(last_key) - del test_item['_key'] - self.assertEqual(test_item, returned_item) - - # get all values starting with 1 - result = list(col.get(prefix='post_scan_test1')) - # 1 & 10-19 = 11 items - self.assertEqual(len(result), 11) - - # combining with normal filters - result = list(col.get(filter='["counter", ">", [5]]', prefix='post_scan_test1')) - # 10-19 - self.assertEqual(len(result), 10) - - # bulk delete - col.delete('post_scan_test%d' % i for i in range(20)) - - # test items removed (check first and last) - self.assertRaises(KeyError, col.get, 'post_scan_test0') - self.assertRaises(KeyError, col.get, last_key) - - def test_errors(self): - col = self.project.collections.new_store(self.test_collection_name) - self.assertRaises(KeyError, col.get, 'does_not_exist') - self.assertRaises(ValueError, col.set, {'foo': 42}) - self.assertRaises(ValueError, col.set, {'_key': []}) - self.assertRaises(ValueError, col.set, - {'_key': 'large_test', 'value': 'x' * 1024 ** 2}) - - def test_data_download(self): - col = self.project.collections.new_store(self.test_collection_name) - items = [] - with closing(col.create_writer()) as writer: - for i in range(20): - test_item = _mkitem() - test_item['_key'] = "test_data_download%d" % i - test_item['counter'] = i - writer.write(test_item) - items.append(test_item) - - # check parameters are passed correctly - downloaded = list(col.iter_values(prefix='test_data_download1')) - self.assertEqual(len(downloaded), 11) - - # simulate network timeouts and download data - with failing_downloader(self.project.collections): - downloaded = list(col.iter_values(start='test_data_download1')) - self.assertEqual(len(downloaded), 19) - - def test_invalid_collection_name(self): - cols = self.project.collections - self.assertRaises(ValueError, cols.new_collection, 'invalidtype', 'n') - self.assertRaises(ValueError, cols.new_store, 'foo-bar') - self.assertRaises(ValueError, cols.new_store, 'foo/bar') - self.assertRaises(ValueError, cols.new_store, '/foo') - self.assertRaises(ValueError, cols.create_writer, 'invalidtype', 'n') - self.assertRaises(ValueError, cols.create_writer, 's', 'foo-bar') + return dict(field1='value1', field2=['value2a', 'value2b'], + field3=3, field4={'v4k': 'v4v'}) + + +def test_simple_count(hsproject, hscollection): + test_item = dict(_mkitem()) + test_item['_key'] = 'a' + + hscollection.set(test_item) + assert hscollection.count() == 1 + + +def post_get_delete_test(hsproject): + test_item = _mkitem() + item_to_send = dict(test_item) + item_to_send['_key'] = test_key = 'insert_test_key' + + test_collections = [ + hsproject.collections.new_store(TEST_COLLECTION_NAME), + hsproject.collections.new_cached_store(TEST_COLLECTION_NAME), + hsproject.collections.new_versioned_store(TEST_COLLECTION_NAME), + hsproject.collections.new_versioned_cached_store(TEST_COLLECTION_NAME), + ] + + test_collections.extend( + hsproject.collections.new_collection(t, TEST_COLLECTION_NAME + 'b') + for t in ('s', 'vs', 'cs', 'vcs')) + + for col in test_collections: + col.set(item_to_send) + returned_item = col.get(test_key) + assert test_item == returned_item + col.delete(test_key) + with pytest.raises(KeyError): + col.get(test_key) + + +def post_scan_test(hsproject, hscollection): + # populate with 20 items + test_item = _mkitem() + last_key = None + with closing(hscollection.create_writer()) as writer: + for i in range(20): + test_item['_key'] = last_key = "post_scan_test%d" % i + test_item['counter'] = i + writer.write(test_item) + + # check last value is as expected + returned_item = hscollection.get(last_key) + del test_item['_key'] + assert test_item == returned_item + + # get all values starting with 1 + result = list(hscollection.get(prefix='post_scan_test1')) + # 1 & 10-19 = 11 items + assert len(result) == 11 + + # combining with normal filters + result = list(hscollection.get(filter='["counter", ">", [5]]', + prefix='post_scan_test1')) + # 10-19 + assert len(result) == 10 + + # bulk delete + hscollection.delete('post_scan_test%d' % i for i in range(20)) + + # test items removed (check first and last) + with pytest.raises(KeyError): + hscollection.get('post_scan_test0') + with pytest.raises(KeyError): + hscollection.get(last_key) + + +def test_errors_bad_key(hscollection): + with pytest.raises(KeyError): + hscollection.get('does_not_exist') + + +@pytest.mark.parametrize('testarg', [ + {'foo': 42}, + {'_key': []}, + {'_key': 'large_test', 'value': 'x' * 1024 ** 2}, +]) +def test_errors(hscollection, testarg): + with pytest.raises(ValueError): + hscollection.set(testarg) + + +def test_data_download(hsproject, hscollection): + items = [] + with closing(hscollection.create_writer()) as writer: + for i in range(20): + test_item = _mkitem() + test_item['_key'] = "test_data_download%d" % i + test_item['counter'] = i + writer.write(test_item) + items.append(test_item) + + # check parameters are passed correctly + downloaded = list(hscollection.iter_values(prefix='test_data_download1')) + assert len(downloaded) == 11 + + # simulate network timeouts and download data + with failing_downloader(hsproject.collections): + downloaded = list(hscollection.iter_values(start='test_data_download1')) + assert len(downloaded) == 19 + + +def test_invalid_collection_name(hsproject): + cols = hsproject.collections + for method, args in [ + (cols.new_collection, ('invalidtype', 'n')), + (cols.new_store, ('foo-bar',)), + (cols.new_store, ('foo/bar',)), + (cols.new_store, ('/foo',)), + (cols.create_writer, ('invalidtype', 'n')), + (cols.create_writer, ('s', 'foo-bar'))]: + with pytest.raises(ValueError): + method(*args) @pytest.mark.parametrize('msgpack_available', [True, False]) diff --git a/tests/hubstorage/test_frontier.py b/tests/hubstorage/test_frontier.py index e7813774..18504175 100644 --- a/tests/hubstorage/test_frontier.py +++ b/tests/hubstorage/test_frontier.py @@ -1,128 +1,128 @@ """ Test Frontier """ -from .hstestcase import HSTestCase +import pytest +from .conftest import TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT -class FrontierTest(HSTestCase): - def setUp(self): - self._delete_slot() +@pytest.fixture(autouse=True) +def delete_frontier_slot(hsproject): + frontier = hsproject.frontier + frontier.delete_slot(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT) - def tearDown(self): - self._delete_slot() - def _delete_slot(self): - frontier = self.project.frontier - frontier.delete_slot(self.frontier, self.slot) +def _get_urls(batch): + return [r[0] for r in batch['requests']] - def _remove_all_ids(self): - frontier = self.project.frontier - ids = [batch['id'] for batch in frontier.read(self.frontier, self.slot)] - frontier.delete(self.frontier, self.slot, ids) - def _get_urls(self, batch): - return [r[0] for r in batch['requests']] +def test_add_read(hsproject): + frontier = hsproject.frontier - def test_add_read(self): - frontier = self.project.frontier + fps = [{'fp': '/'}] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps) + fps = [{'fp': '/index.html'}, {'fp': '/index2.html'}] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps) + frontier.flush() - fps = [{'fp': '/'}] - frontier.add(self.frontier, self.slot, fps) - fps = [{'fp': '/index.html'}, {'fp': '/index2.html'}] - frontier.add(self.frontier, self.slot, fps) - frontier.flush() + urls = [_get_urls(batch) for batch + in frontier.read(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT)] + expected_urls = [[u'/', u'/index.html', u'/index2.html']] + assert urls == expected_urls - urls = [self._get_urls(batch) for batch - in frontier.read(self.frontier, self.slot)] - expected_urls = [[u'/', u'/index.html', u'/index2.html']] - self.assertEqual(urls, expected_urls) - def test_add_multiple_chunks(self): - frontier = self.project.frontier - old_count = frontier.newcount +def test_add_multiple_chunks(hsproject): + frontier = hsproject.frontier + old_count = frontier.newcount - batch_size = 50 - fps1 = [{'fp': '/index_%s.html' % fp} for fp in range(0, batch_size)] - frontier.add(self.frontier, self.slot, fps1) + batch_size = 50 + fps1 = [{'fp': '/index_%s.html' % fp} for fp in range(0, batch_size)] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps1) - fps2 = [{'fp': '/index_%s.html' % fp} for fp in range(batch_size, batch_size * 2)] - frontier.add(self.frontier, self.slot, fps2) + fps2 = [{'fp': '/index_%s.html' % fp} + for fp in range(batch_size, batch_size * 2)] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps2) - fps3 = [{'fp': '/index_%s.html' % fp} for fp in range(batch_size * 2, batch_size * 3)] - frontier.add(self.frontier, self.slot, fps3) - frontier.flush() + fps3 = [{'fp': '/index_%s.html' % fp} + for fp in range(batch_size * 2, batch_size * 3)] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps3) + frontier.flush() - self.assertEqual(frontier.newcount, 150 + old_count) + assert frontier.newcount == 150 + old_count - # insert repeated fingerprints - fps4 = [{'fp': '/index_%s.html' % fp} for fp in range(0, batch_size)] - frontier.add(self.frontier, self.slot, fps3) - frontier.flush() + # insert repeated fingerprints + fps4 = [{'fp': '/index_%s.html' % fp} for fp in range(0, batch_size)] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps3) + frontier.flush() - # new count is the same - self.assertEqual(frontier.newcount, 150 + old_count) + # new count is the same + assert frontier.newcount == 150 + old_count - # get first 100 - batches = list(frontier.read(self.frontier, self.slot, mincount=100)) - urls = [self._get_urls(batch) for batch in batches] - expected_urls = [[fp['fp'] for fp in fps1 + fps2]] - self.assertEqual(urls, expected_urls) + # get first 100 + batches = list(frontier.read(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, + mincount=100)) + urls = [_get_urls(batch) for batch in batches] + expected_urls = [[fp['fp'] for fp in fps1 + fps2]] + assert urls == expected_urls - # delete first 100 - ids = [batch['id'] for batch in batches] - frontier.delete(self.frontier, self.slot, ids) + # delete first 100 + ids = [batch['id'] for batch in batches] + frontier.delete(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, ids) - # get remaining 50 - batches = list(frontier.read(self.frontier, self.slot)) - urls = [self._get_urls(batch) for batch in batches] - expected_urls = [[fp['fp'] for fp in fps3]] - self.assertEqual(urls, expected_urls) + # get remaining 50 + batches = list(frontier.read(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT)) + urls = [_get_urls(batch) for batch in batches] + expected_urls = [[fp['fp'] for fp in fps3]] + assert urls == expected_urls - def test_add_big_chunk(self): - frontier = self.project.frontier - batch_size = 300 - fps1 = [{'fp': '/index_%s.html' % fp} for fp in range(0, batch_size)] - frontier.add(self.frontier, self.slot, fps1) - frontier.flush() +def test_add_big_chunk(hsproject): + frontier = hsproject.frontier - # get first 100 - batches = list(frontier.read(self.frontier, self.slot, mincount=100)) - urls = [self._get_urls(batch) for batch in batches] - expected_urls = [[fp['fp'] for fp in fps1[:100]]] - self.assertEqual(urls, expected_urls) + batch_size = 300 + fps1 = [{'fp': '/index_%s.html' % fp} for fp in range(0, batch_size)] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps1) + frontier.flush() - # delete first 100 - ids = [batch['id'] for batch in batches] - frontier.delete(self.frontier, self.slot, ids) + # get first 100 + batches = list(frontier.read(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, + mincount=100)) + urls = [_get_urls(batch) for batch in batches] + expected_urls = [[fp['fp'] for fp in fps1[:100]]] + assert urls == expected_urls - # get next 100 - batches = list(frontier.read(self.frontier, self.slot, mincount=100)) - urls = [self._get_urls(batch) for batch in batches] - expected_urls = [[fp['fp'] for fp in fps1[100:200]]] - self.assertEqual(urls, expected_urls) + # delete first 100 + ids = [batch['id'] for batch in batches] + frontier.delete(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, ids) - # delete next 100 - ids = [batch['id'] for batch in batches] - frontier.delete(self.frontier, self.slot, ids) - - # get next 100 - batches = list(frontier.read(self.frontier, self.slot, mincount=100)) - urls = [self._get_urls(batch) for batch in batches] - expected_urls = [[fp['fp'] for fp in fps1[200:300]]] - self.assertEqual(urls, expected_urls) - - def test_add_extra_params(self): - frontier = self.project.frontier - - qdata = {"a": 1, "b": 2, "c": 3} - fps = [{'fp': '/', "qdata": qdata}] - frontier.add(self.frontier, self.slot, fps) - frontier.flush() - - expected_request = [[u'/', {u'a': 1, u'c': 3, u'b': 2}]] - batches = list(frontier.read(self.frontier, self.slot)) - request = batches[0]['requests'] - self.assertEqual(request, expected_request) + # get next 100 + batches = list(frontier.read(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, + mincount=100)) + urls = [_get_urls(batch) for batch in batches] + expected_urls = [[fp['fp'] for fp in fps1[100:200]]] + assert urls == expected_urls + # delete next 100 + ids = [batch['id'] for batch in batches] + frontier.delete(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, ids) + + # get next 100 + batches = list(frontier.read(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, + mincount=100)) + urls = [_get_urls(batch) for batch in batches] + expected_urls = [[fp['fp'] for fp in fps1[200:300]]] + assert urls == expected_urls + + +def test_add_extra_params(hsproject): + frontier = hsproject.frontier + + qdata = {"a": 1, "b": 2, "c": 3} + fps = [{'fp': '/', "qdata": qdata}] + frontier.add(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT, fps) + frontier.flush() + + expected_request = [[u'/', {u'a': 1, u'c': 3, u'b': 2}]] + batches = list(frontier.read(TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT)) + request = batches[0]['requests'] + assert request == expected_request diff --git a/tests/hubstorage/test_jobq.py b/tests/hubstorage/test_jobq.py index 1d48a635..7781887d 100644 --- a/tests/hubstorage/test_jobq.py +++ b/tests/hubstorage/test_jobq.py @@ -1,357 +1,338 @@ """ Test JobQ """ -import os, unittest - +import os import six +import pytest from six.moves import range from scrapinghub.hubstorage.jobq import DuplicateJobError from scrapinghub.hubstorage.utils import apipoll -from .hstestcase import HSTestCase - - -EXCLUSIVE = os.environ.get('EXCLUSIVE_STORAGE') - - -class JobqTest(HSTestCase): - - def test_push(self): - jobq = self.project.jobq - qjob = jobq.push(self.spidername) - self.assertTrue('key' in qjob, qjob) - self.assertTrue('auth' in qjob, qjob) - - job = self.hsclient.get_job(qjob['key']) - self.assertEqual(job.metadata.get('state'), u'pending') - self.assertEqual(job.metadata.get('spider'), self.spidername) - self.assertEqual(job.metadata.get('auth'), qjob['auth']) - - jobq.start(job) - job.metadata.expire() - self.assertEqual(job.metadata.get('state'), u'running') - - jobq.finish(job) - job.metadata.expire() - self.assertEqual(job.metadata.get('state'), u'finished') - - jobq.delete(job) - job.metadata.expire() - self.assertEqual(job.metadata.get('state'), u'deleted') - - def test_push_with_extras(self): - qjob = self.project.jobq.push(self.spidername, foo='bar', baz='fuu') - job = self.hsclient.get_job(qjob['key']) - self.assertEqual(job.metadata.get('foo'), u'bar') - self.assertEqual(job.metadata.get('baz'), u'fuu') - - def test_push_with_priority(self): - jobq = self.project.jobq - qjob = jobq.push(self.spidername, priority=jobq.PRIO_HIGHEST) - self.assertTrue('key' in qjob, qjob) - self.assertTrue('auth' in qjob, qjob) - - def test_push_with_state(self): - qjob = self.project.jobq.push(self.spidername, state='running') - self.assertTrue('key' in qjob, qjob) - self.assertTrue('auth' in qjob, qjob) - job = self.hsclient.get_job(qjob['key']) - self.assertEqual(job.metadata.get('state'), u'running') - - def test_push_with_unique(self): - jobq = self.project.jobq - # no unique key - jobq.push(self.spidername) - jobq.push(self.spidername) - jobq.push(self.spidername, unique=None) - jobq.push(self.spidername, unique=None) - - # unique key - q1 = jobq.push(self.spidername, unique='h1') - jobq.push(self.spidername, unique='h2') - self.assertRaises(DuplicateJobError, jobq.push, self.spidername, unique='h1') - jobq.finish(q1) - self.assertRaises(DuplicateJobError, jobq.push, self.spidername, unique='h2') - jobq.push(self.spidername, unique='h1') - - def test_startjob(self): - jobq = self.project.jobq - qj = jobq.push(self.spidername) - nj = jobq.start() - self.assertTrue(nj.pop('pending_time', None), nj) - self.assertTrue(nj.pop('running_time', None), nj) - self.assertTrue(nj.pop('auth', None), nj) - self.assertEqual(nj[u'key'], qj['key']) - self.assertEqual(nj[u'spider'], self.spidername) - self.assertEqual(nj[u'state'], u'running') - self.assertEqual(nj[u'priority'], jobq.PRIO_NORMAL) - - def test_startjob_with_extras(self): - jobq = self.project.jobq - pushextras = { - 'string': 'foo', - 'integer': 1, - 'float': 3.2, - 'mixedarray': ['b', 1, None, True, False, {'k': 'c'}], - 'emptyarray': [], - 'mapping': {'alpha': 5, 'b': 'B', 'cama': []}, - 'emptymapping': {}, - 'true': True, - 'false': False, - 'nil': None, - } - qj = jobq.push(self.spidername, **pushextras) - startextras = dict(('s_' + k, v) for k, v in six.iteritems(pushextras)) - nj = jobq.start(**startextras) - self.assertEqual(qj['key'], nj['key']) - for k, v in six.iteritems(dict(pushextras, **startextras)): - if type(v) is float: - self.assertAlmostEqual(nj.get(k), v) - else: - self.assertEqual(nj.get(k), v) - - def test_startjob_order(self): - jobq = self.project.jobq - q1 = jobq.push(self.spidername) - q2 = jobq.push(self.spidername) - q3 = jobq.push(self.spidername) - self.assertEqual(jobq.start()['key'], q1['key']) - self.assertEqual(jobq.start()['key'], q2['key']) - self.assertEqual(jobq.start()['key'], q3['key']) - - def test_summary(self): - jobq = self.project.jobq - # push at least one job per state - jobq.push(self.spidername) - jobq.push(self.spidername, state='running') - jobq.push(self.spidername, state='finished') - summaries = dict((s['name'], s) for s in jobq.summary()) - self.assertEqual(set(summaries), set(['pending', 'running', 'finished'])) - self.assertTrue(jobq.summary('pending')) - self.assertTrue(jobq.summary('running')) - self.assertTrue(jobq.summary('finished')) - - def test_summary_jobmeta(self): - jobq = self.project.jobq - jobq.push(self.spidername, foo='bar', caz='fuu') - pendings = jobq.summary('pending', jobmeta='foo')['summary'] - p1 = pendings[0] - self.assertEqual(p1.get('foo'), 'bar') - self.assertFalse('caz' in p1) - - pendings = jobq.summary('pending', jobmeta=['foo', 'caz'])['summary'] - p1 = pendings[0] - self.assertEqual(p1.get('foo'), 'bar') - self.assertEqual(p1.get('caz'), 'fuu') - - def test_summary_countstart(self): - # push more than 5 jobs into same queue - N = 6 - jobq = self.project.jobq - for state in ('pending', 'running', 'finished'): - for idx in range(N): - jobq.push(self.spidername, state=state, idx=idx) - - s1 = jobq.summary(state) - self.assertEqual(s1['count'], N) - self.assertEqual(len(s1['summary']), 5) - - s2 = jobq.summary(state, count=N) - self.assertEqual(len(s2['summary']), N) - - s3 = jobq.summary(state, start=N - 6, count=3) - self.assertEqual([o['key'] for o in s3['summary']], - [o['key'] for o in s2['summary'][-6:-3]]) - - def test_summaries_and_state_changes(self): - jobq = self.project.jobq - j1 = jobq.push(self.spidername) - j2 = jobq.push(self.spidername) - j3 = jobq.push(self.spidername) - j4 = jobq.push(self.spidername, state='running') - # check queue summaries - self._assert_queue('pending', [j3, j2, j1]) - self._assert_queue('running', [j4]) - self._assert_queue('finished', []) - # change job states - jobq.start(j1) - jobq.finish(j2) - jobq.finish(j4) - # check summaries again - self._assert_queue('pending', [j3]) - self._assert_queue('running', [j1]) - self._assert_queue('finished', [j4, j2]) - # delete all jobs and check for empty summaries - jobq.finish(j1) - jobq.finish(j3) - jobq.delete(j1) - jobq.delete(j2) - jobq.delete(j3) - jobq.delete(j4) - self._assert_queue('pending', []) - self._assert_queue('running', []) - self._assert_queue('finished', []) - - def test_list_with_state(self): - jobq = self.project.jobq - j1 = jobq.push(self.spidername, state='finished') - j2 = jobq.push(self.spidername, state='running') - j3 = jobq.push(self.spidername, state='pending') - j4 = jobq.push(self.spidername, state='finished') - # Only finished jobs by default - assert _keys(jobq.list()) == _keys([j4, j1]) - assert _keys(jobq.list(state='finished')) == _keys([j4, j1]) - assert _keys(jobq.list(state='running')) == _keys([j2]) - assert _keys(jobq.list(state=['running', 'pending'])) == _keys([j3, j2]) - - def test_list_with_count(self): - jobq = self.project.jobq - j1 = jobq.push(self.spidername, state='finished') # NOQA - j2 = jobq.push(self.spidername, state='finished') # NOQA - j3 = jobq.push(self.spidername, state='finished') - j4 = jobq.push(self.spidername, state='finished') - # fetch only the 2 most recent jobs - assert _keys(jobq.list(count=2)) == _keys([j4, j3]) - - def test_list_with_stop(self): - jobq = self.project.jobq - j1 = jobq.push(self.spidername, state='finished') - j2 = jobq.push(self.spidername, state='finished') - j3 = jobq.push(self.spidername, state='finished') - j4 = jobq.push(self.spidername, state='finished') - # test "stop" parameter - # we should stop before the 4th finished job - assert _keys(jobq.list(stop=j1['key'])) == _keys([j4, j3, j2]) - - def test_list_with_tags(self): - jobq = self.project.jobq - j1 = jobq.push(self.spidername, state='finished', tags=['t1']) - j2 = jobq.push(self.spidername, state='finished', tags=['t2']) - j3 = jobq.push(self.spidername, state='finished', tags=['t1', 't2']) - j4 = jobq.push(self.spidername, state='finished') - assert _keys(jobq.list(has_tag='t1')) == _keys([j3, j1]) - assert _keys(jobq.list(has_tag=['t2', 't1'])) == _keys([j3, j2, j1]) - assert _keys(jobq.list(has_tag='t2', lacks_tag='t1')) == _keys([j2]) - assert _keys(jobq.list(lacks_tag=['t1', 't2'])) == _keys([j4]) - - # endts is not implemented - @unittest.expectedFailure - def test_list_with_startts_endts(self): - jobq = self.project.jobq - j1 = jobq.push(self.spidername, state='finished') # NOQA - j2 = jobq.push(self.spidername, state='finished') - j3 = jobq.push(self.spidername, state='finished') - j4 = jobq.push(self.spidername, state='finished') # NOQA - # test "startts/endts" parameters - # endts is not inclusive - # so we should get the 2 in the middle out of 4 - timestamps = [j['ts'] for j in jobq.list()] - jobs = jobq.list(startts=timestamps[2], endts=timestamps[0]) - assert _keys(jobs) == _keys([j3, j2]) - - def _assert_queue(self, qname, jobs): - summary = self.project.jobq.summary(qname, spiderid=self.spiderid) - self.assertEqual(summary['name'], qname) - self.assertEqual(summary['count'], len(jobs)) - self.assertEqual(len(summary['summary']), len(jobs)) - # Most recent jobs first - self.assertEqual([s['key'] for s in summary['summary']], - [j['key'] for j in jobs]) - - def test_simple_botgroups(self): - self.project.settings['botgroups'] = ['g1'] - self.project.settings.save() - pq = self.project.jobq - hq = self.hsclient.jobq - q1 = pq.push(self.spidername) - self.assertEqual(hq.start(botgroup='g3'), None) - self.assertEqual(apipoll(hq.start, botgroup='g1')['key'], q1['key']) - - @unittest.skipUnless(EXCLUSIVE, "test requires exclusive" - " (without any active bots) access to HS. Set EXCLUSIVE_STORAGE" - " env. var to activate") - def test_botgroups(self): - self.project.settings['botgroups'] = ['g1', 'g2'] - self.project.settings.save() - pq = self.project.jobq - hq = self.hsclient.jobq - q1 = pq.push(self.spidername) - q2 = pq.push(self.spidername) - q3 = pq.push(self.spidername) - self.assertEqual(hq.start(), None) - self.assertEqual(hq.start(botgroup='g3'), None) - self.assertEqual(apipoll(hq.start, botgroup='g1')['key'], q1['key']) - self.assertEqual(apipoll(hq.start, botgroup='g2')['key'], q2['key']) - - # cleanup project botgroups, q3 must be polled only by generic bots - del self.project.settings['botgroups'] - self.project.settings.save() - q4 = pq.push(self.spidername) - self.assertEqual(hq.start(botgroup='g1'), None) - self.assertEqual(hq.start(botgroup='g2'), None) - self.assertEqual(hq.start(botgroup='g3'), None) - self.assertEqual(hq.start()['key'], q3['key']) - self.assertEqual(hq.start()['key'], q4['key']) - - self.project.settings['botgroups'] = ['python-hubstorage-test'] - self.project.settings.save() - - def test_spider_updates(self): - jobq = self.project.jobq - spiderkey = '%s/%s' % (self.projectid, self.spiderid) - - def finish_and_delete_jobs(): - for job in jobq.finish(spiderkey): - yield job - jobq.delete(spiderkey) - - q1 = jobq.push(self.spidername) - q2 = jobq.push(self.spidername, state='running') - q3 = jobq.push(self.spidername, state='finished') - q4 = jobq.push(self.spidername, state='deleted') - - r = dict((x['key'], x['prevstate']) for x in finish_and_delete_jobs()) - self.assertEqual(r.get(q1['key']), 'pending', r) - self.assertEqual(r.get(q2['key']), 'running', r) - self.assertEqual(r.get(q3['key']), 'finished', r) - self.assertTrue(q4['key'] not in r) - - # Empty result set - self.assertFalse(list(jobq.delete(spiderkey))) - - def test_multiple_job_update(self): - jobq = self.project.jobq - q1 = jobq.push(self.spidername) - q2 = jobq.push(self.spidername) - q3 = jobq.push(self.spidername) - ids = [q1, q2['key'], self.project.get_job(q3['key'])] - self.assertTrue([x['prevstate'] for x in jobq.start(ids)], - ['pending', 'pending', 'pending']) - self.assertTrue([x['prevstate'] for x in jobq.finish(ids)], - ['running', 'running', 'running']) - self.assertTrue([x['prevstate'] for x in jobq.delete(ids)], - ['finished', 'finished', 'finished']) - - def test_update(self): - job = self.project.push_job(self.spidername) - self.assertEqual(job.metadata['state'], 'pending') - self.project.jobq.update(job, state='running', foo='bar') - job = self.project.get_job(job.key) - self.assertEqual(job.metadata['state'], 'running') - self.assertEqual(job.metadata['foo'], 'bar') - - def test_jobsummary(self): - jobs = [self.project.push_job(self.spidername, foo=i) - for i in range(5)] - jobmetas = list(self.project.jobq.jobsummary( - jobkeys=[j.key for j in jobs], jobmeta=['key', 'foo'])) - jobmeta_dict = {jm['key']: jm['foo'] for jm in jobmetas} - assert jobmeta_dict == { - jobs[i].key: i - for i in range(5) - } +from .conftest import TEST_PROJECT_ID, TEST_SPIDER_NAME +from .conftest import hsspiderid def _keys(lst): return [x['key'] for x in lst] + + +def test_push(hsclient, hsproject): + jobq = hsproject.jobq + qjob = jobq.push(TEST_SPIDER_NAME) + assert 'key' in qjob, qjob + assert 'auth' in qjob, qjob + + job = hsclient.get_job(qjob['key']) + assert job.metadata.get('state') == u'pending' + assert job.metadata.get('spider') == TEST_SPIDER_NAME + assert job.metadata.get('auth') == qjob['auth'] + + jobq.start(job) + job.metadata.expire() + assert job.metadata.get('state') == u'running' + + jobq.finish(job) + job.metadata.expire() + assert job.metadata.get('state') == u'finished' + + jobq.delete(job) + job.metadata.expire() + assert job.metadata.get('state') == u'deleted' + + +def test_push_with_extras(hsclient, hsproject): + qjob = hsproject.jobq.push(TEST_SPIDER_NAME, foo='bar', baz='fuu') + job = hsclient.get_job(qjob['key']) + assert job.metadata.get('foo') == u'bar' + assert job.metadata.get('baz') == u'fuu' + + +def test_push_with_priority(hsclient, hsproject): + jobq = hsproject.jobq + qjob = jobq.push(TEST_SPIDER_NAME, priority=jobq.PRIO_HIGHEST) + assert 'key' in qjob, qjob + assert 'auth' in qjob, qjob + + +def test_push_with_state(hsclient, hsproject): + qjob = hsproject.jobq.push(TEST_SPIDER_NAME, state='running') + assert 'key' in qjob, qjob + assert 'auth' in qjob, qjob + job = hsclient.get_job(qjob['key']) + assert job.metadata.get('state') == u'running' + + +def test_push_with_unique(hsproject): + jobq = hsproject.jobq + # no unique key + jobq.push(TEST_SPIDER_NAME) + jobq.push(TEST_SPIDER_NAME) + jobq.push(TEST_SPIDER_NAME, unique=None) + jobq.push(TEST_SPIDER_NAME, unique=None) + + # unique key + q1 = jobq.push(TEST_SPIDER_NAME, unique='h1') + jobq.push(TEST_SPIDER_NAME, unique='h2') + with pytest.raises(DuplicateJobError): + jobq.push(TEST_SPIDER_NAME, unique='h1') + jobq.finish(q1) + with pytest.raises(DuplicateJobError): + jobq.push(TEST_SPIDER_NAME, unique='h2') + jobq.push(TEST_SPIDER_NAME, unique='h1') + + +def test_startjob(hsproject): + jobq = hsproject.jobq + qj = jobq.push(TEST_SPIDER_NAME) + nj = jobq.start() + assert nj.pop('pending_time', None), nj + assert nj.pop('running_time', None), nj + assert nj.pop('auth', None), nj + assert nj[u'key'] == qj['key'] + assert nj[u'spider'] == TEST_SPIDER_NAME + assert nj[u'state'] == u'running' + assert nj[u'priority'] == jobq.PRIO_NORMAL + + +def test_startjob_with_extras(hsproject): + jobq = hsproject.jobq + pushextras = { + 'string': 'foo', + 'integer': 1, + 'float': 3.2, + 'mixedarray': ['b', 1, None, True, False, {'k': 'c'}], + 'emptyarray': [], + 'mapping': {'alpha': 5, 'b': 'B', 'cama': []}, + 'emptymapping': {}, + 'true': True, + 'false': False, + 'nil': None, + } + qj = jobq.push(TEST_SPIDER_NAME, **pushextras) + startextras = dict(('s_' + k, v) for k, v in six.iteritems(pushextras)) + nj = jobq.start(**startextras) + assert qj['key'] == nj['key'] + for k, v in six.iteritems(dict(pushextras, **startextras)): + if type(v) is float: + assert abs(nj.get(k) - v) < 0.0001 + else: + assert nj.get(k) == v + + +def test_startjob_order(hsproject): + jobq = hsproject.jobq + q1 = jobq.push(TEST_SPIDER_NAME) + q2 = jobq.push(TEST_SPIDER_NAME) + q3 = jobq.push(TEST_SPIDER_NAME) + assert jobq.start()['key'] == q1['key'] + assert jobq.start()['key'] == q2['key'] + assert jobq.start()['key'] == q3['key'] + + +def test_summary(hsproject): + jobq = hsproject.jobq + # push at least one job per state + jobq.push(TEST_SPIDER_NAME) + jobq.push(TEST_SPIDER_NAME, state='running') + jobq.push(TEST_SPIDER_NAME, state='finished') + summaries = dict((s['name'], s) for s in jobq.summary()) + assert set(summaries), set(['pending', 'running', 'finished']) + assert jobq.summary('pending') + assert jobq.summary('running') + assert jobq.summary('finished') + + +def test_summary_jobmeta(hsproject): + jobq = hsproject.jobq + jobq.push(TEST_SPIDER_NAME, foo='bar', caz='fuu') + pendings = jobq.summary('pending', jobmeta='foo')['summary'] + p1 = pendings[0] + assert p1.get('foo') == 'bar' + assert 'caz' not in p1 + + pendings = jobq.summary('pending', jobmeta=['foo', 'caz'])['summary'] + p1 = pendings[0] + assert p1.get('foo') == 'bar' + assert p1.get('caz') == 'fuu' + + +def test_summary_countstart(hsproject): + # push more than 5 jobs into same queue + N = 6 + jobq = hsproject.jobq + for state in ('pending', 'running', 'finished'): + for idx in range(N): + jobq.push(TEST_SPIDER_NAME, state=state, idx=idx) + + s1 = jobq.summary(state) + assert s1['count'] == N + assert len(s1['summary']) == 5 + + s2 = jobq.summary(state, count=N) + assert len(s2['summary']) == N + + s3 = jobq.summary(state, start=N - 6, count=3) + assert ([o['key'] for o in s3['summary']] == + [o['key'] for o in s2['summary'][-6:-3]]) + + +def test_summaries_and_state_changes(hsproject, hsspiderid): + jobq = hsproject.jobq + j1 = jobq.push(TEST_SPIDER_NAME) + j2 = jobq.push(TEST_SPIDER_NAME) + j3 = jobq.push(TEST_SPIDER_NAME) + j4 = jobq.push(TEST_SPIDER_NAME, state='running') + # check queue summaries + _assert_queue(hsproject, hsspiderid, 'pending', [j3, j2, j1]) + _assert_queue(hsproject, hsspiderid, 'running', [j4]) + _assert_queue(hsproject, hsspiderid, 'finished', []) + # change job states + jobq.start(j1) + jobq.finish(j2) + jobq.finish(j4) + # check summaries again + _assert_queue(hsproject, hsspiderid, 'pending', [j3]) + _assert_queue(hsproject, hsspiderid, 'running', [j1]) + _assert_queue(hsproject, hsspiderid, 'finished', [j4, j2]) + # delete all jobs and check for empty summaries + jobq.finish(j1) + jobq.finish(j3) + jobq.delete(j1) + jobq.delete(j2) + jobq.delete(j3) + jobq.delete(j4) + _assert_queue(hsproject, hsspiderid, 'pending', []) + _assert_queue(hsproject, hsspiderid, 'running', []) + _assert_queue(hsproject, hsspiderid, 'finished', []) + + +def test_list_with_state(hsproject): + jobq = hsproject.jobq + j1 = jobq.push(TEST_SPIDER_NAME, state='finished') + j2 = jobq.push(TEST_SPIDER_NAME, state='running') + j3 = jobq.push(TEST_SPIDER_NAME, state='pending') + j4 = jobq.push(TEST_SPIDER_NAME, state='finished') + # Only finished jobs by default + assert _keys(jobq.list()) == _keys([j4, j1]) + assert _keys(jobq.list(state='finished')) == _keys([j4, j1]) + assert _keys(jobq.list(state='running')) == _keys([j2]) + assert _keys(jobq.list(state=['running', 'pending'])) == _keys([j3, j2]) + + +def test_list_with_count(hsproject): + jobq = hsproject.jobq + j1 = jobq.push(TEST_SPIDER_NAME, state='finished') # NOQA + j2 = jobq.push(TEST_SPIDER_NAME, state='finished') # NOQA + j3 = jobq.push(TEST_SPIDER_NAME, state='finished') + j4 = jobq.push(TEST_SPIDER_NAME, state='finished') + # fetch only the 2 most recent jobs + assert _keys(jobq.list(count=2)) == _keys([j4, j3]) + + +def test_list_with_stop(hsproject): + jobq = hsproject.jobq + j1 = jobq.push(TEST_SPIDER_NAME, state='finished') + j2 = jobq.push(TEST_SPIDER_NAME, state='finished') + j3 = jobq.push(TEST_SPIDER_NAME, state='finished') + j4 = jobq.push(TEST_SPIDER_NAME, state='finished') + # test "stop" parameter + # we should stop before the 4th finished job + assert _keys(jobq.list(stop=j1['key'])) == _keys([j4, j3, j2]) + + +def test_list_with_tags(hsproject): + jobq = hsproject.jobq + j1 = jobq.push(TEST_SPIDER_NAME, state='finished', tags=['t1']) + j2 = jobq.push(TEST_SPIDER_NAME, state='finished', tags=['t2']) + j3 = jobq.push(TEST_SPIDER_NAME, state='finished', tags=['t1', 't2']) + j4 = jobq.push(TEST_SPIDER_NAME, state='finished') + assert _keys(jobq.list(has_tag='t1')) == _keys([j3, j1]) + assert _keys(jobq.list(has_tag=['t2', 't1'])) == _keys([j3, j2, j1]) + assert _keys(jobq.list(has_tag='t2', lacks_tag='t1')) == _keys([j2]) + assert _keys(jobq.list(lacks_tag=['t1', 't2'])) == _keys([j4]) + + +# endts is not implemented +@pytest.mark.xfail +def test_list_with_startts_endts(hsproject): + jobq = hsproject.jobq + j1 = jobq.push(TEST_SPIDER_NAME, state='finished') # NOQA + j2 = jobq.push(TEST_SPIDER_NAME, state='finished') + j3 = jobq.push(TEST_SPIDER_NAME, state='finished') + j4 = jobq.push(TEST_SPIDER_NAME, state='finished') # NOQA + # test "startts/endts" parameters + # endts is not inclusive + # so we should get the 2 in the middle out of 4 + timestamps = [j['ts'] for j in jobq.list()] + jobs = jobq.list(startts=timestamps[2], endts=timestamps[3]) + assert _keys(jobs) == _keys([j3, j2]) + + +def test_spider_updates(hsproject, hsspiderid): + jobq = hsproject.jobq + spiderkey = '%s/%s' % (TEST_PROJECT_ID, hsspiderid) + + def finish_and_delete_jobs(): + for job in jobq.finish(spiderkey): + yield job + jobq.delete(spiderkey) + + q1 = jobq.push(TEST_SPIDER_NAME) + q2 = jobq.push(TEST_SPIDER_NAME, state='running') + q3 = jobq.push(TEST_SPIDER_NAME, state='finished') + q4 = jobq.push(TEST_SPIDER_NAME, state='deleted') + + r = dict((x['key'], x['prevstate']) for x in finish_and_delete_jobs()) + assert r.get(q1['key']) == 'pending', r + assert r.get(q2['key']) == 'running', r + assert r.get(q3['key']) == 'finished', r + assert q4['key'] not in r + + # Empty result set + assert not list(jobq.delete(spiderkey)) + + +def test_multiple_job_update(hsproject): + jobq = hsproject.jobq + q1 = jobq.push(TEST_SPIDER_NAME) + q2 = jobq.push(TEST_SPIDER_NAME) + q3 = jobq.push(TEST_SPIDER_NAME) + ids = [q1, q2['key'], hsproject.get_job(q3['key'])] + assert ([x['prevstate'] for x in jobq.start(ids)] == + ['pending', 'pending', 'pending']) + assert ([x['prevstate'] for x in jobq.finish(ids)] == + ['running', 'running', 'running']) + assert ([x['prevstate'] for x in jobq.delete(ids)] == + ['finished', 'finished', 'finished']) + + +def test_update(hsproject): + job = hsproject.push_job(TEST_SPIDER_NAME) + assert job.metadata['state'] == 'pending' + hsproject.jobq.update(job, state='running', foo='bar') + job = hsproject.get_job(job.key) + assert job.metadata['state'] == 'running' + assert job.metadata['foo'] == 'bar' + + +def test_jobsummary(hsproject): + jobs = [hsproject.push_job(TEST_SPIDER_NAME, foo=i) + for i in range(5)] + jobmetas = list(hsproject.jobq.jobsummary( + jobkeys=[j.key for j in jobs], jobmeta=['key', 'foo'])) + jobmeta_dict = {jm['key']: jm['foo'] for jm in jobmetas} + assert jobmeta_dict == { + jobs[i].key: i + for i in range(5) + } + + +def _assert_queue(hsproject, hsspiderid, qname, jobs): + summary = hsproject.jobq.summary(qname, spiderid=hsspiderid) + assert summary['name'] == qname + assert summary['count'] == len(jobs) + assert len(summary['summary']) == len(jobs) + # Most recent jobs first + assert ([s['key'] for s in summary['summary']] == + [j['key'] for j in jobs]) diff --git a/tests/hubstorage/test_jobsmeta.py b/tests/hubstorage/test_jobsmeta.py index c2bfbe6e..d17c0825 100644 --- a/tests/hubstorage/test_jobsmeta.py +++ b/tests/hubstorage/test_jobsmeta.py @@ -3,103 +3,107 @@ System tests for operations on stored job metadata """ -from .hstestcase import HSTestCase - - -class JobsMetadataTest(HSTestCase): - - def _assertMetadata(self, meta1, meta2): - def _clean(m): - return dict((k, v) for k, v in m.items() if k != 'updated_time') - - meta1 = _clean(meta1) - meta2 = _clean(meta2) - self.assertEqual(meta1, meta2) - - def test_basic(self): - job = self.project.push_job(self.spidername) - self.assertTrue('auth' not in job.metadata) - self.assertTrue('state' in job.metadata) - self.assertEqual(job.metadata['spider'], self.spidername) - - # set some metadata and forget it - job.metadata['foo'] = 'bar' - self.assertEqual(job.metadata['foo'], 'bar') - job.metadata.expire() - self.assertTrue('foo' not in job.metadata) - - # set it again and persist it - job.metadata['foo'] = 'bar' - self.assertEqual(job.metadata['foo'], 'bar') - job.metadata.save() - self.assertEqual(job.metadata['foo'], 'bar') - job.metadata.expire() - self.assertEqual(job.metadata['foo'], 'bar') - - # refetch the job and compare its metadata - job2 = self.hsclient.get_job(job.key) - self._assertMetadata(job2.metadata, job.metadata) - - # delete foo but do not persist it - del job.metadata['foo'] - self.assertTrue('foo' not in job.metadata) - job.metadata.expire() - self.assertEqual(job.metadata.get('foo'), 'bar') - # persist it to be sure it is not removed - job.metadata.save() - self.assertEqual(job.metadata.get('foo'), 'bar') - # and finally delete again and persist it - del job.metadata['foo'] - self.assertTrue('foo' not in job.metadata) - job.metadata.save() - self.assertTrue('foo' not in job.metadata) - job.metadata.expire() - self.assertTrue('foo' not in job.metadata) - - job2 = self.hsclient.get_job(job.key) - self._assertMetadata(job.metadata, job2.metadata) - - def test_updating(self): - job = self.project.push_job(self.spidername) - self.assertIsNone(job.metadata.get('foo')) - job.update_metadata({'foo': 'bar'}) - # metadata attr should change - self.assertEqual(job.metadata.get('foo'), 'bar') - # as well as actual metadata - job = self.project.get_job(job.key) - self.assertEqual(job.metadata.get('foo'), 'bar') - job.update_metadata({'foo': None}) - self.assertFalse(job.metadata.get('foo', False)) - - # there are ignored fields like: auth, _key, state - state = job.metadata['state'] - job.update_metadata({'state': 'running'}) - self.assertEqual(job.metadata['state'], state) - - def test_representation(self): - job = self.project.push_job(self.spidername) - meta = job.metadata - self.assertNotEqual(str(meta), repr(meta)) - self.assertEqual(meta, eval(str(meta))) - self.assertTrue(meta.__class__.__name__ in repr(meta)) - self.assertFalse(meta.__class__.__name__ in str(meta)) - - def test_jobauth(self): - job = self.project.push_job(self.spidername) - self.assertIsNone(job.jobauth) - self.assertEqual(job.auth, self.project.auth) - self.assertEqual(job.items.auth, self.project.auth) - - samejob = self.hsclient.get_job(job.key) - self.assertIsNone(samejob.auth) - self.assertIsNone(samejob.jobauth) - self.assertEqual(samejob.items.auth, self.project.auth) - - def test_authtoken(self): - pendingjob = self.project.push_job(self.spidername) - runningjob = self.start_job() - self.assertEqual(pendingjob.key, runningjob.key) - self.assertTrue(runningjob.jobauth) - self.assertEqual(runningjob.jobauth, runningjob.auth) - self.assertEqual(runningjob.auth[0], runningjob.key) - self.assertTrue(runningjob.auth[1]) +from .conftest import TEST_SPIDER_NAME +from .conftest import start_job + + +def _assertMetadata(meta1, meta2): + def _clean(m): + return dict((k, v) for k, v in m.items() if k != 'updated_time') + + meta1 = _clean(meta1) + meta2 = _clean(meta2) + assert meta1 == meta2 + + +def test_basic(hsclient, hsproject): + job = hsproject.push_job(TEST_SPIDER_NAME) + assert 'auth' not in job.metadata + assert 'state' in job.metadata + assert job.metadata['spider'] == TEST_SPIDER_NAME + + # set some metadata and forget it + job.metadata['foo'] = 'bar' + assert job.metadata['foo'] == 'bar' + job.metadata.expire() + assert 'foo' not in job.metadata + + # set it again and persist it + job.metadata['foo'] = 'bar' + assert job.metadata['foo'] == 'bar' + job.metadata.save() + assert job.metadata['foo'] == 'bar' + job.metadata.expire() + assert job.metadata['foo'] == 'bar' + + # refetch the job and compare its metadata + job2 = hsclient.get_job(job.key) + _assertMetadata(job2.metadata, job.metadata) + + # delete foo but do not persist it + del job.metadata['foo'] + assert 'foo' not in job.metadata + job.metadata.expire() + assert job.metadata.get('foo') == 'bar' + # persist it to be sure it is not removed + job.metadata.save() + assert job.metadata.get('foo') == 'bar' + # and finally delete again and persist it + del job.metadata['foo'] + assert 'foo' not in job.metadata + job.metadata.save() + assert 'foo' not in job.metadata + job.metadata.expire() + assert 'foo' not in job.metadata + + job2 = hsclient.get_job(job.key) + _assertMetadata(job.metadata, job2.metadata) + + +def test_updating(hsproject): + job = hsproject.push_job(TEST_SPIDER_NAME) + assert job.metadata.get('foo') is None + job.update_metadata({'foo': 'bar'}) + # metadata attr should change + assert job.metadata.get('foo') == 'bar' + # as well as actual metadata + job = hsproject.get_job(job.key) + assert job.metadata.get('foo') == 'bar' + job.update_metadata({'foo': None}) + assert not job.metadata.get('foo', False) + + # there are ignored fields like: auth, _key, state + state = job.metadata['state'] + job.update_metadata({'state': 'running'}) + assert job.metadata['state'] == state + + +def test_representation(hsproject): + job = hsproject.push_job(TEST_SPIDER_NAME) + meta = job.metadata + assert str(meta) != repr(meta) + assert meta == eval(str(meta)) + assert meta.__class__.__name__ in repr(meta) + assert meta.__class__.__name__ not in str(meta) + + +def test_jobauth(hsclient, hsproject): + job = hsproject.push_job(TEST_SPIDER_NAME) + assert job.jobauth is None + assert job.auth == hsproject.auth + assert job.items.auth == hsproject.auth + + samejob = hsclient.get_job(job.key) + assert samejob.auth is None + assert samejob.jobauth is None + assert samejob.items.auth == hsproject.auth + + +def test_authtoken(hsproject): + pendingjob = hsproject.push_job(TEST_SPIDER_NAME) + runningjob = start_job(hsproject) + assert pendingjob.key == runningjob.key + assert runningjob.jobauth + assert runningjob.jobauth == runningjob.auth + assert runningjob.auth[0] == runningjob.key + assert runningjob.auth[1] diff --git a/tests/hubstorage/test_project.py b/tests/hubstorage/test_project.py index 5a0e79a4..4a1a17bc 100644 --- a/tests/hubstorage/test_project.py +++ b/tests/hubstorage/test_project.py @@ -1,261 +1,282 @@ """ Test Project """ +import six import json -from random import randint, random - import pytest -import six from six.moves import range from requests.exceptions import HTTPError from scrapinghub import HubstorageClient -from scrapinghub.hubstorage.utils import millitime -from .hstestcase import HSTestCase +from .conftest import TEST_PROJECT_ID, TEST_SPIDER_NAME +from .conftest import hsspiderid +from .conftest import start_job +from .conftest import set_testbotgroup, unset_testbotgroup from .testutil import failing_downloader -class ProjectTest(HSTestCase): - - def test_projectid(self): - p1 = self.hsclient.get_project(int(self.projectid)) - p2 = self.hsclient.get_project(str(self.projectid)) - self.assertEqual(p1.projectid, p2.projectid) - self.assertEqual(type(p1.projectid), six.text_type) - self.assertEqual(type(p2.projectid), six.text_type) - self.assertRaises(AssertionError, self.hsclient.get_project, '111/3') - - def test_get_job_from_key(self): - job = self.project.push_job(self.spidername) - parts = tuple(job.key.split('/')) - self.assertEqual(len(parts), 3) - self.assertEqual(parts[:2], (self.projectid, self.spiderid)) - samejob1 = self.hsclient.get_job(job.key) - samejob2 = self.project.get_job(job.key) - samejob3 = self.project.get_job(parts[1:]) - self.assertEqual(samejob1.key, job.key) - self.assertEqual(samejob2.key, job.key) - self.assertEqual(samejob3.key, job.key) - - def test_get_jobs(self): - p = self.project - j1 = p.push_job(self.spidername, testid=0) - j2 = p.push_job(self.spidername, testid=1) - j3 = p.push_job(self.spidername, testid=2) - # global list must list at least one job - self.assertTrue(list(p.get_jobs(count=1, state='pending'))) - # List all jobs for test spider - r = list(p.get_jobs(spider=self.spidername, state='pending')) - self.assertEqual([j.key for j in r], [j3.key, j2.key, j1.key]) - - def test_get_jobs_with_legacy_filter(self): - p = self.project - j1 = p.push_job(self.spidername, state='finished', - close_reason='finished', tags=['t2']) - j2 = p.push_job(self.spidername, state='finished', - close_reason='finished', tags=['t1']) - j3 = p.push_job(self.spidername, state='pending') - j4 = p.push_job(self.spidername, state='finished', - close_reason='failed', tags=['t1']) - j5 = p.push_job(self.spidername + 'skip', state='finished', - close_reason='failed', tags=['t1']) - - filters = [['spider', '=', [self.spidername]], - ['state', '=', ['finished']], - ['close_reason', '=', ['finished']], - ['tags', 'haselement', ['t1']], - ['tags', 'hasnotelement', ['t2']]] - jobs = p.get_jobs(filter=[json.dumps(x) for x in filters]) - assert [j.key for j in jobs] == [j2.key], jobs - - def test_push_job(self): - job = self.project.push_job(self.spidername, state='running', - priority=self.project.jobq.PRIO_HIGH, - foo=u'bar') - self.assertEqual(job.metadata.get('state'), u'running') - self.assertEqual(job.metadata.get('foo'), u'bar') - self.project.jobq.finish(job) - self.project.jobq.delete(job) - job.metadata.expire() - self.assertEqual(job.metadata.get('state'), u'deleted') - self.assertEqual(job.metadata.get('foo'), u'bar') - - def test_auth(self): - # client without global auth set - hsc = HubstorageClient(endpoint=self.hsclient.endpoint) - self.assertEqual(hsc.auth, None) - - # check no-auth access - try: - hsc.push_job(self.projectid, self.spidername) - except HTTPError as exc: - self.assertTrue(exc.response.status_code, 401) - else: - self.assertTrue(False, '401 not raised') - - try: - hsc.get_project(self.projectid).push_job(self.spidername) - except HTTPError as exc: - self.assertTrue(exc.response.status_code, 401) - else: - self.assertTrue(False, '401 not raised') - - try: - hsc.get_job((self.projectid, 1, 1)).items.list() - except HTTPError as exc: - self.assertTrue(exc.response.status_code, 401) - else: - self.assertTrue(False, '401 not raised') - - try: - hsc.get_project(self.projectid).get_job((self.projectid, 1, 1)).items.list() - except HTTPError as exc: - self.assertTrue(exc.response.status_code, 401) - else: - self.assertTrue(False, '401 not raised') - - # create project with auth - auth = self.hsclient.auth - project = hsc.get_project(self.projectid, auth) - self.assertEqual(project.auth, auth) - job = project.push_job(self.spidername) - samejob = project.get_job(job.key) - self.assertEqual(samejob.key, job.key) - - def test_broad(self): - project = self.hsclient.get_project(self.projectid) - # populate project with at least one job - job = project.push_job(self.spidername) - self.assertEqual(job.metadata.get('state'), 'pending') - job = self.start_job() - self.assertEqual(job.metadata.get('state'), 'running') - job.items.write({'title': 'bar'}) - job.logs.info('nice to meet you') - job.samples.write([1, 2, 3]) - job.close_writers() - job.jobq.finish(job) - - # keep a jobid for get_job and unreference job - jobid = job.key - jobauth = job.auth - del job - - self.assertTrue(list(project.items.list(self.spiderid, count=1))) - self.assertTrue(list(project.logs.list(self.spiderid, count=1))) - self.assertTrue(list(project.samples.list(self.spiderid, count=1))) - - job = project.client.get_job(jobid, jobauth=jobauth) - job.purged() - - def test_settings(self): - project = self.hsclient.get_project(self.projectid) - settings = dict(project.settings) - settings.pop('botgroups', None) # ignore testsuite botgroups - self.assertEqual(settings, {}) - project.settings['created'] = created = millitime() - project.settings['botgroups'] = ['g1'] - project.settings.save() - self.assertEqual(project.settings.liveget('created'), created) - self.assertEqual(project.settings.liveget('botgroups'), ['g1']) - project.settings.expire() - self.assertEqual(dict(project.settings), { - 'created': created, - 'botgroups': ['g1'], - }) - - def test_requests(self): - ts = millitime() - job = self.project.push_job(self.spidername, state='running') - # top parent - r1 = job.requests.add(url='http://test.com/', status=200, method='GET', - rs=1337, duration=5, parent=None, ts=ts) - # first child - r2 = job.requests.add(url='http://test.com/2', status=400, method='POST', - rs=0, duration=1, parent=r1, ts=ts + 1) - # another child with fingerprint set - r3 = job.requests.add(url='http://test.com/3', status=400, method='PUT', - rs=0, duration=1, parent=r1, ts=ts + 2, fp='1234') - - job.requests.close() - rr = job.requests.list() - self.assertEqual(next(rr), - {u'status': 200, u'rs': 1337, - u'url': u'http://test.com/', u'time': ts, - u'duration': 5, u'method': u'GET'}) - self.assertEqual(next(rr), - {u'status': 400, u'parent': 0, u'rs': 0, - u'url': u'http://test.com/2', u'time': ts + 1, - u'duration': 1, u'method': u'POST'}) - self.assertEqual(next(rr), - {u'status': 400, u'fp': u'1234', u'parent': 0, - u'rs': 0, u'url': u'http://test.com/3', - u'time': ts + 2, u'duration': 1, - u'method': u'PUT'}) - - self.assertRaises(StopIteration, next, rr) - - def test_samples(self): - # no samples stored - j1 = self.project.push_job(self.spidername, state='running') - self.assertEqual(list(j1.samples.list()), []) - # simple fill - ts = millitime() - j1.samples.write([ts, 1, 2, 3]) - j1.samples.write([ts + 1, 5, 9, 4]) - j1.samples.flush() - o = list(j1.samples.list()) - self.assertEqual(len(o), 2) - self.assertEqual(o[0], [ts, 1, 2, 3]) - self.assertEqual(o[1], [ts + 1, 5, 9, 4]) - - # random fill - j2 = self.project.push_job(self.spidername, state='running') - samples = [] - ts = millitime() - count = int(j2.samples.batch_size * (random() + randint(1, 5))) - for _ in range(count): - ts += randint(1, 2**16) - row = [ts] + list(randint(0, 2**16) for _ in range(randint(0, 100))) - samples.append(row) - j2.samples.write(row) - j2.samples.flush() - o = list(j2.samples.list()) - self.assertEqual(len(o), count) - for r1, r2 in zip(samples, o): - self.assertEqual(r1, r2) - - def test_jobsummary(self): - js = self.project.jobsummary() - self.assertEqual(js.get('project'), int(self.project.projectid), js) - self.assertEqual(js.get('has_capacity'), True, js) - self.assertTrue('pending' in js, js) - self.assertTrue('running' in js, js) - - def test_bulkdata(self): - j = self.project.push_job(self.spidername, state='running') - for i in range(20): - j.logs.info("log line %d" % i) - j.items.write(dict(field1="item%d" % i)) - j.requests.add("http://test.com/%d" % i, - 200, 'GET', 10, None, 10, 120) - for resourcename in ('logs', 'items', 'requests'): - resource = getattr(j, resourcename) - resource.flush() - - # downloading resource, with simulated failures - with failing_downloader(resource): - downloaded = list(resource.iter_values()) - self.assertEqual(len(downloaded), 20) - - def test_output_string(self): - project = self.hsclient.get_project(self.projectid) - project.push_job(self.spidername) - job = self.start_job() - job.items.write({'foo': 'bar'}) - job.close_writers() - items = self.hsclient.get_job(job.key).items.iter_json() - self.assertEqual(type(next(items)), str) +def test_projectid(hsclient): + p1 = hsclient.get_project(int(TEST_PROJECT_ID)) + p2 = hsclient.get_project(str(TEST_PROJECT_ID)) + assert p1.projectid == p2.projectid + assert isinstance(p1.projectid, six.text_type) + assert isinstance(p2.projectid, six.text_type) + with pytest.raises(AssertionError): + hsclient.get_project('111/3') + + +def test_get_job_from_key(hsclient, hsproject, hsspiderid): + job = hsproject.push_job(TEST_SPIDER_NAME) + parts = tuple(job.key.split('/')) + assert len(parts) == 3 + assert parts[:2] == (TEST_PROJECT_ID, hsspiderid) + samejob1 = hsclient.get_job(job.key) + samejob2 = hsproject.get_job(job.key) + samejob3 = hsproject.get_job(parts[1:]) + assert samejob1.key == job.key + assert samejob2.key == job.key + assert samejob3.key == job.key + + +def test_get_jobs(hsproject): + p = hsproject + j1 = p.push_job(TEST_SPIDER_NAME, testid=0) + j2 = p.push_job(TEST_SPIDER_NAME, testid=1) + j3 = p.push_job(TEST_SPIDER_NAME, testid=2) + # global list must list at least one job + assert list(p.get_jobs(count=1, state='pending')) + # List all jobs for test spider + r = list(p.get_jobs(spider=TEST_SPIDER_NAME, state='pending')) + assert [j.key for j in r] == [j3.key, j2.key, j1.key] + + +def test_get_jobs_with_legacy_filter(hsproject): + p = hsproject + j1 = p.push_job(TEST_SPIDER_NAME, state='finished', + close_reason='finished', tags=['t2']) + j2 = p.push_job(TEST_SPIDER_NAME, state='finished', + close_reason='finished', tags=['t1']) + j3 = p.push_job(TEST_SPIDER_NAME, state='pending') + j4 = p.push_job(TEST_SPIDER_NAME, state='finished', + close_reason='failed', tags=['t1']) + j5 = p.push_job(TEST_SPIDER_NAME + 'skip', state='finished', + close_reason='failed', tags=['t1']) + filters = [ + ['spider', '=', [TEST_SPIDER_NAME]], + ['state', '=', ['finished']], + ['close_reason', '=', ['finished']], + ['tags', 'haselement', ['t1']], + ['tags', 'hasnotelement', ['t2']], + ] + jobs = p.get_jobs(filter=[json.dumps(x) for x in filters]) + assert [j.key for j in jobs] == [j2.key], jobs + + +def test_push_job(hsproject): + job = hsproject.push_job(TEST_SPIDER_NAME, state='running', + priority=hsproject.jobq.PRIO_HIGH, + foo=u'bar') + assert job.metadata.get('state') == u'running' + assert job.metadata.get('foo') == u'bar' + hsproject.jobq.finish(job) + hsproject.jobq.delete(job) + job.metadata.expire() + assert job.metadata.get('state') == u'deleted' + assert job.metadata.get('foo') == u'bar' + + +def test_auth(hsclient): + # client without global auth set + hsc = HubstorageClient(endpoint=hsclient.endpoint) + assert hsc.auth is None + + # check no-auth access + try: + hsc.push_job(TEST_PROJECT_ID, TEST_SPIDER_NAME) + except HTTPError as exc: + assert exc.response.status_code == 401 + else: + raise AssertionError('401 not raised') + + try: + hsc.get_project(TEST_PROJECT_ID).push_job(TEST_SPIDER_NAME) + except HTTPError as exc: + assert exc.response.status_code == 401 + else: + raise AssertionError('401 not raised') + + try: + hsc.get_job((TEST_PROJECT_ID, 1, 1)).items.list() + except HTTPError as exc: + assert exc.response.status_code == 401 + else: + raise AssertionError('401 not raised') + + try: + hsc.get_project(TEST_PROJECT_ID).get_job( + (TEST_PROJECT_ID, 1, 1)).items.list() + except HTTPError as exc: + assert exc.response.status_code == 401 + else: + raise AssertionError('401 not raised') + + # create project with auth + auth = hsclient.auth + project = hsc.get_project(TEST_PROJECT_ID, auth) + assert project.auth == auth + job = project.push_job(TEST_SPIDER_NAME) + samejob = project.get_job(job.key) + assert samejob.key == job.key + + +def test_broad(hsproject, hsspiderid): + # populate project with at least one job + job = hsproject.push_job(TEST_SPIDER_NAME) + assert job.metadata.get('state') == 'pending' + job = start_job(hsproject) + job.metadata.expire() + assert job.metadata.get('state') == 'running' + job.items.write({'title': 'bar'}) + job.logs.info('nice to meet you') + job.samples.write([1, 2, 3]) + job.close_writers() + job.jobq.finish(job) + + # keep a jobid for get_job and unreference job + jobid = job.key + jobauth = job.auth + del job + + assert list(hsproject.items.list(hsspiderid, count=1)) + assert list(hsproject.logs.list(hsspiderid, count=1)) + assert list(hsproject.samples.list(hsspiderid, count=1)) + + job = hsproject.client.get_job(jobid, jobauth=jobauth) + job.purged() + + +@pytest.fixture +def unset_botgroup(hsproject): + unset_testbotgroup(hsproject) + yield + set_testbotgroup(hsproject) + + +def test_settings(hsproject, unset_botgroup): + settings = dict(hsproject.settings) + assert settings == {} + # use some fixed timestamp to represent current time + hsproject.settings['created'] = created = 1476803148638 + hsproject.settings['botgroups'] = ['g1'] + hsproject.settings.save() + assert hsproject.settings.liveget('created') == created + assert hsproject.settings.liveget('botgroups') == ['g1'] + hsproject.settings.expire() + assert dict(hsproject.settings) == { + 'created': created, + 'botgroups': ['g1'], + } + + +def test_requests(hsproject): + # use some fixed timestamp to represent current time + ts = 1476803148638 + job = hsproject.push_job(TEST_SPIDER_NAME, state='running') + # top parent + r1 = job.requests.add(url='http://test.com/', status=200, method='GET', + rs=1337, duration=5, parent=None, ts=ts) + # first child + r2 = job.requests.add(url='http://test.com/2', status=400, method='POST', + rs=0, duration=1, parent=r1, ts=ts + 1) + # another child with fingerprint set + r3 = job.requests.add(url='http://test.com/3', status=400, method='PUT', + rs=0, duration=1, parent=r1, ts=ts + 2, fp='1234') + + job.requests.close() + rr = job.requests.list() + assert next(rr) == { + u'status': 200, u'rs': 1337, + u'url': u'http://test.com/', u'time': ts, + u'duration': 5, u'method': u'GET', + } + assert next(rr) == { + u'status': 400, u'parent': 0, u'rs': 0, + u'url': u'http://test.com/2', u'time': ts + 1, + u'duration': 1, u'method': u'POST', + } + assert next(rr) == { + u'status': 400, u'fp': u'1234', u'parent': 0, + u'rs': 0, u'url': u'http://test.com/3', + u'time': ts + 2, u'duration': 1, + u'method': u'PUT', + } + with pytest.raises(StopIteration): + next(rr) + + +def test_samples(hsproject): + # use some fixed timestamp to represent current time + ts = 1476803148638 + # no samples stored + j1 = hsproject.push_job(TEST_SPIDER_NAME, state='running') + assert list(j1.samples.list()) == [] + # simple fill + j1.samples.write([ts, 1, 2, 3]) + j1.samples.write([ts + 1, 5, 9, 4]) + j1.samples.flush() + o = list(j1.samples.list()) + assert len(o) == 2 + assert o[0] == [ts, 1, 2, 3] + assert o[1] == [ts + 1, 5, 9, 4] + + # random fill + j2 = hsproject.push_job(TEST_SPIDER_NAME, state='running') + samples = [] + count = int(j2.samples.batch_size * 3) + for i in range(count): + ts += i + row = [ts] + list(val*i for val in range(10)) + samples.append(row) + j2.samples.write(row) + j2.samples.flush() + o = list(j2.samples.list()) + assert len(o) == count + for r1, r2 in zip(samples, o): + assert r1 == r2 + + +def test_jobsummary(hsproject): + js = hsproject.jobsummary() + assert js.get('project') == int(hsproject.projectid), js + assert js.get('has_capacity') is True, js + assert 'pending' in js, js + assert 'running' in js, js + + +def test_bulkdata(hsproject): + j = hsproject.push_job(TEST_SPIDER_NAME, state='running') + for i in range(20): + j.logs.info("log line %d" % i) + j.items.write(dict(field1="item%d" % i)) + j.requests.add("http://test.com/%d" % i, 200, 'GET', 10, None, 10, 120) + for resourcename in ('logs', 'items', 'requests'): + resource = getattr(j, resourcename) + resource.flush() + + # downloading resource, with simulated failures + with failing_downloader(resource): + downloaded = list(resource.iter_values()) + assert len(downloaded) == 20 + + +def test_output_string(hsclient, hsproject): + hsproject.push_job(TEST_SPIDER_NAME) + job = start_job(hsproject) + job.items.write({'foo': 'bar'}) + job.close_writers() + items = hsclient.get_job(job.key).items.iter_json() + assert isinstance(next(items), str) @pytest.mark.parametrize('msgpack_available', [True, False]) diff --git a/tests/hubstorage/test_retry.py b/tests/hubstorage/test_retry.py index 873796a9..fb3d2fa1 100644 --- a/tests/hubstorage/test_retry.py +++ b/tests/hubstorage/test_retry.py @@ -4,292 +4,366 @@ import json import re +from mock import patch import responses from requests import HTTPError, ConnectionError from scrapinghub import HubstorageClient from six.moves.http_client import BadStatusLine -from .hstestcase import HSTestCase +from .conftest import TEST_AUTH, TEST_ENDPOINT +from .conftest import TEST_PROJECT_ID, TEST_SPIDER_NAME + GET = responses.GET POST = responses.POST DELETE = responses.DELETE -class RetryTest(HSTestCase): - def test_delete_on_hubstorage_api_does_not_404(self): - # NOTE: The current Hubstorage API does not raise 404 errors on deleting resources that do not exist, - # Thus the retry policy does not catch 404 errors when retrying deletes (simplify implementation A LOT). - # This test checks that this assumption holds. - - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=0) - project = client.get_project(projectid=self.projectid) - - # Check frontier delete - project.frontier.delete_slot('frontier_non_existing', 'slot_non_existing') - - # Check metadata delete - job = client.push_job(self.projectid, self.spidername) - job.metadata['foo'] = 'bar' # Add then delete key, this will trigger an api delete for item foo - del job.metadata['foo'] - job.metadata.save() - - # Check collections delete - store = project.collections.new_store('foo') - store.set({'_key': 'foo'}) - store.delete('bar') - - self.assertTrue(True, "No error have been triggered by calling a delete on resources that do not exist") - - @responses.activate - def test_retrier_does_not_catch_unwanted_exception(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=2, max_retry_time=1) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - callback, attempts_count = self.make_request_callback(3, job_metadata, http_error_status=403) - - self.mock_api(callback=callback) - - # Act - job, metadata, err = None, None, None - try: - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - metadata = dict(job.metadata) - except HTTPError as e: - err = e - - # Assert - self.assertIsNone(metadata) - self.assertIsNotNone(err) - self.assertEqual(err.response.status_code, 403) - self.assertEqual(attempts_count[0], 1) - - @responses.activate - def test_retrier_catches_badstatusline_and_429(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3, max_retry_time=1) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - - attempts_count = [0] # use a list for nonlocal mutability used in request_callback - - def request_callback(request): - attempts_count[0] += 1 - - if attempts_count[0] <= 2: - raise ConnectionError("Connection aborted.", BadStatusLine("''")) - if attempts_count[0] == 3: - return (429, {}, u'') - else: - resp_body = dict(job_metadata) - return (200, {}, json.dumps(resp_body)) - - self.mock_api(callback=request_callback) - - # Act - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - - # Assert - self.assertEqual(dict(job_metadata), dict(job.metadata)) - self.assertEqual(attempts_count[0], 4) - - @responses.activate - def test_api_delete_can_be_set_to_non_idempotent(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3, max_retry_time=1) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - callback_delete, attempts_count_delete = self.make_request_callback(2, job_metadata) - - self.mock_api(method=DELETE, callback=callback_delete) - - # Act - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - - err = None - try: - job.metadata.apidelete('/my/non/idempotent/delete/', is_idempotent=False) - except HTTPError as e: - err = e - - # Assert - self.assertEqual(attempts_count_delete[0], 1) - self.assertIsNotNone(err) - - @responses.activate - def test_collection_store_and_delete_are_retried(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3, max_retry_time=1) - - callback_post, attempts_count_post = self.make_request_callback(2, []) - callback_delete, attempts_count_delete = self.make_request_callback(2, []) - - self.mock_api(method=POST, callback=callback_delete, url_match='/.*/deleted') - self.mock_api(method=POST, callback=callback_post) # /!\ default regexp matches all paths, has to be added last - - # Act - project = client.get_project(self.projectid) - store = project.collections.new_store('foo') - store.set({'_key': 'bar', 'content': 'value'}) - store.delete('baz') - - # Assert - self.assertEqual(attempts_count_post[0], 3) - self.assertEqual(attempts_count_delete[0], 3) - - @responses.activate - def test_delete_requests_are_retried(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3, max_retry_time=1) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - callback_getpost, attempts_count_getpost = self.make_request_callback(0, job_metadata) - callback_delete, attempts_count_delete = self.make_request_callback(2, job_metadata) - - self.mock_api(method=GET, callback=callback_getpost) - self.mock_api(method=POST, callback=callback_getpost) - self.mock_api(method=DELETE, callback=callback_delete) - - # Act - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - job.metadata['foo'] = 'bar' - del job.metadata['foo'] - job.metadata.save() - - # Assert - self.assertEqual(attempts_count_delete[0], 3) - - @responses.activate - def test_metadata_save_does_retry(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3, max_retry_time=1) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - callback_get, attempts_count_get = self.make_request_callback(0, job_metadata) - callback_post, attempts_count_post = self.make_request_callback(2, job_metadata) - - self.mock_api(method=GET, callback=callback_get) - self.mock_api(method=POST, callback=callback_post) - - # Act - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - job.metadata['foo'] = 'bar' - job.metadata.save() - - # Assert - self.assertEqual(attempts_count_post[0], 3) - - @responses.activate - def test_push_job_does_not_retry(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3) - callback, attempts_count = self.make_request_callback(2, {'key': '1/2/3'}) - - self.mock_api(POST, callback=callback) - - # Act - job, err = None, None - try: - job = client.push_job(self.projectid, self.spidername) - except HTTPError as e: - err = e - - # Assert - self.assertIsNone(job) - self.assertIsNotNone(err) - self.assertEqual(err.response.status_code, 504) - self.assertEqual(attempts_count[0], 1) - - @responses.activate - def test_get_job_does_retry(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=3, max_retry_time=1) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - callback, attempts_count = self.make_request_callback(2, job_metadata) - - self.mock_api(callback=callback) - - # Act - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - - # Assert - self.assertEqual(dict(job_metadata), dict(job.metadata)) - self.assertEqual(attempts_count[0], 3) - - @responses.activate - def test_get_job_does_fails_if_no_retries(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=0) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - callback, attempts_count = self.make_request_callback(2, job_metadata) - - self.mock_api(callback=callback) - - # Act - job, metadata, err = None, None, None - try: - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - metadata = dict(job.metadata) - except HTTPError as e: - err = e - - # Assert - self.assertIsNone(metadata) - self.assertIsNotNone(err) - self.assertEqual(err.response.status_code, 504) - self.assertEqual(attempts_count[0], 1) - - @responses.activate - def test_get_job_does_fails_on_too_many_retries(self): - # Prepare - client = HubstorageClient(auth=self.auth, endpoint=self.endpoint, max_retries=2, max_retry_time=1) - job_metadata = {'project': self.projectid, 'spider': self.spidername, 'state': 'pending'} - callback, attempts_count = self.make_request_callback(3, job_metadata) - - self.mock_api(callback=callback) - - # Act - job, metadata, err = None, None, None - try: - job = client.get_job('%s/%s/%s' % (self.projectid, self.spiderid, 42)) - metadata = dict(job.metadata) - except HTTPError as e: - err = e - - # Assert - self.assertIsNone(metadata) - self.assertIsNotNone(err) - self.assertEqual(err.response.status_code, 504) - self.assertEqual(attempts_count[0], 3) - - def mock_api(self, method=GET, callback=None, url_match='/.*'): - """ - Mock an API URL using the responses library. - - Args: - method (Optional[str]): The HTTP method to mock. Defaults to responses.GET - callback (function(request) -> response): - url_match (Optional[str]): The API URL regexp. Defaults to '/.*'. - """ - responses.add_callback( - method, re.compile(self.endpoint + url_match), - callback=callback, - content_type='application/json', - ) - - def make_request_callback(self, timeout_count, body_on_success, http_error_status=504): - """ - Make a request callback that timeout a couple of time before returning body_on_success - - Returns: - A tuple (request_callback, attempts), attempts is an array of size one that contains the number of time - request_callback has been called. - """ - attempts = [0] # use a list for nonlocal mutability used in request_callback - - def request_callback(request): - attempts[0] += 1 - - if attempts[0] <= timeout_count: - return (http_error_status, {}, "Timeout") - else: - resp_body = dict(body_on_success) - return (200, {}, json.dumps(resp_body)) - - return (request_callback, attempts) +@patch.multiple('scrapinghub.HubstorageClient', + RETRY_DEFAULT_JITTER_MS=1, + RETRY_DEFAULT_EXPONENTIAL_BACKOFF_MS=1) +def hsclient_with_retries(max_retries=3, max_retry_time=1): + return HubstorageClient( + auth=TEST_AUTH, endpoint=TEST_ENDPOINT, + max_retries=max_retries, max_retry_time=max_retry_time, + ) + + +@patch('scrapinghub.hubstorage.client.Retrying') +def test_hsclient_with_retries_no_wait(retrying_class): + hsclient_with_retries() + assert retrying_class.call_count == 1 + call = retrying_class.call_args_list[0] + assert call[1]['wait_jitter_max'] == 1 + assert int(call[1]['wait_exponential_multiplier']) == 33 + + +def mock_api(method=GET, callback=None, url_match='/.*'): + """ + Mock an API URL using the responses library. + + Args: + method (Optional[str]): The HTTP method to mock. Defaults to + responses.GET callback (function(request) -> response): + url_match (Optional[str]): The API URL regexp. Defaults to '/.*'. + """ + responses.add_callback( + method, re.compile(TEST_ENDPOINT + url_match), + callback=callback, + content_type='application/json', + ) + + +def make_request_callback(timeout_count, body_on_success, + http_error_status=504): + """Make a request callback that timeout a couple of time before returning + body_on_success. + + Returns: + A tuple (request_callback, attempts), attempts is an array of size one + that contains the number of time request_callback has been called. + """ + # use a list for nonlocal mutability used in request_callback + attempts = [0] + + def request_callback(request): + attempts[0] += 1 + + if attempts[0] <= timeout_count: + return (http_error_status, {}, "Timeout") + else: + resp_body = dict(body_on_success) + return (200, {}, json.dumps(resp_body)) + + return (request_callback, attempts) + + +def test_delete_on_hubstorage_api_does_not_404(): + """Delete a non-existing resource without exceptions. + + The current Hubstorage API does not raise 404 errors on deleting resources + that do not exist, thus the retry policy does not catch 404 errors when + retrying deletes (simplify implementation A LOT). This test checks that + this assumption holds. + """ + client = hsclient_with_retries(max_retries=0, max_retry_time=None) + project = client.get_project(projectid=TEST_PROJECT_ID) + + # Check frontier delete + project.frontier.delete_slot('frontier_non_existing', 'slot_non_existing') + + # Check metadata delete + job = client.push_job(TEST_PROJECT_ID, TEST_SPIDER_NAME) + # Add then delete key, this will trigger an api delete for item foo + job.metadata['foo'] = 'bar' + del job.metadata['foo'] + job.metadata.save() + + # Check collections delete + store = project.collections.new_store('foo') + store.set({'_key': 'foo'}) + store.delete('bar') + + +@responses.activate +def test_retrier_does_not_catch_unwanted_exception(hsspiderid): + # Prepare + client = hsclient_with_retries(max_retries=2) + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + callback, attempts_count = make_request_callback( + 3, job_metadata, http_error_status=403) + mock_api(callback=callback) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + assert metadata is None + assert err is not None + assert err.response.status_code == 403 + assert attempts_count[0] == 1 + + +@responses.activate +def test_retrier_catches_badstatusline_and_429(hsspiderid): + # Prepare + client = hsclient_with_retries() + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + + # use a list for nonlocal mutability used in request_callback + attempts_count = [0] + + def request_callback(request): + attempts_count[0] += 1 + + if attempts_count[0] <= 2: + raise ConnectionError("Connection aborted.", BadStatusLine("''")) + if attempts_count[0] == 3: + return (429, {}, u'') + else: + resp_body = dict(job_metadata) + return (200, {}, json.dumps(resp_body)) + + mock_api(callback=request_callback) + + # Act + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + + # Assert + assert dict(job_metadata) == dict(job.metadata) + assert attempts_count[0] == 4 + + +@responses.activate +def test_api_delete_can_be_set_to_non_idempotent(hsspiderid): + # Prepare + client = hsclient_with_retries() + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + callback_delete, attempts_count_delete = make_request_callback( + 2, job_metadata) + + mock_api(method=DELETE, callback=callback_delete) + + # Act + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + + err = None + try: + job.metadata.apidelete('/my/non/idempotent/delete/', + is_idempotent=False) + except HTTPError as e: + err = e + + # Assert + assert attempts_count_delete[0] == 1 + assert err is not None + + +@responses.activate +def test_collection_store_and_delete_are_retried(): + # Prepare + client = hsclient_with_retries() + + callback_post, attempts_count_post = make_request_callback(2, []) + callback_delete, attempts_count_delete = make_request_callback(2, []) + + mock_api(method=POST, callback=callback_delete, url_match='/.*/deleted') + # /!\ default regexp matches all paths, has to be added last + mock_api(method=POST, callback=callback_post) + # Act + project = client.get_project(TEST_PROJECT_ID) + store = project.collections.new_store('foo') + store.set({'_key': 'bar', 'content': 'value'}) + store.delete('baz') + + # Assert + assert attempts_count_post[0] == 3 + assert attempts_count_delete[0] == 3 + + +@responses.activate +def test_delete_requests_are_retried(hsspiderid): + # Prepare + client = hsclient_with_retries() + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + callback_getpost, attempts_count_getpost = make_request_callback( + 0, job_metadata) + callback_delete, attempts_count_delete = make_request_callback( + 2, job_metadata) + + mock_api(method=GET, callback=callback_getpost) + mock_api(method=POST, callback=callback_getpost) + mock_api(method=DELETE, callback=callback_delete) + + # Act + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + job.metadata['foo'] = 'bar' + del job.metadata['foo'] + job.metadata.save() + + # Assert + assert attempts_count_delete[0] == 3 + + +@responses.activate +def test_metadata_save_does_retry(hsspiderid): + # Prepare + client = hsclient_with_retries() + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + callback_get, attempts_count_get = make_request_callback(0, job_metadata) + callback_post, attempts_count_post = make_request_callback(2, job_metadata) + + mock_api(method=GET, callback=callback_get) + mock_api(method=POST, callback=callback_post) + + # Act + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + job.metadata['foo'] = 'bar' + job.metadata.save() + + # Assert + assert attempts_count_post[0] == 3 + + +@responses.activate +def test_push_job_does_not_retry(): + # Prepare + client = hsclient_with_retries() + callback, attempts_count = make_request_callback(2, {'key': '1/2/3'}) + + mock_api(POST, callback=callback) + + # Act + job, err = None, None + try: + job = client.push_job(TEST_PROJECT_ID, TEST_SPIDER_NAME) + except HTTPError as e: + err = e + + # Assert + assert job is None + assert err is not None + assert err.response.status_code == 504 + assert attempts_count[0] == 1 + + +@responses.activate +def test_get_job_does_retry(hsspiderid): + # Prepare + client = hsclient_with_retries() + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + callback, attempts_count = make_request_callback(2, job_metadata) + + mock_api(callback=callback) + + # Act + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + + # Assert + assert dict(job_metadata) == dict(job.metadata) + assert attempts_count[0] == 3 + + +@responses.activate +def test_get_job_does_fails_if_no_retries(hsspiderid): + # Prepare + client = hsclient_with_retries(max_retries=0, max_retry_time=None) + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + callback, attempts_count = make_request_callback(2, job_metadata) + + mock_api(callback=callback) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + assert metadata is None + assert err is not None + assert err.response.status_code == 504 + assert attempts_count[0] == 1 + + +@responses.activate +def test_get_job_does_fails_on_too_many_retries(hsspiderid): + # Prepare + client = hsclient_with_retries(max_retries=2, max_retry_time=1) + job_metadata = { + 'project': TEST_PROJECT_ID, + 'spider': TEST_SPIDER_NAME, + 'state': 'pending', + } + callback, attempts_count = make_request_callback(3, job_metadata) + + mock_api(callback=callback) + + # Act + job, metadata, err = None, None, None + try: + job = client.get_job('%s/%s/%s' % (TEST_PROJECT_ID, hsspiderid, 42)) + metadata = dict(job.metadata) + except HTTPError as e: + err = e + + # Assert + assert metadata is None + assert err is not None + assert err.response.status_code == 504 + assert attempts_count[0] == 3 diff --git a/tests/hubstorage/test_system.py b/tests/hubstorage/test_system.py index fe475391..c67d373c 100644 --- a/tests/hubstorage/test_system.py +++ b/tests/hubstorage/test_system.py @@ -1,112 +1,134 @@ import random from contextlib import closing +import pytest from six.moves import range from scrapinghub import HubstorageClient from scrapinghub.hubstorage.utils import millitime -from .hstestcase import HSTestCase - - -class SystemTest(HSTestCase): - - MAGICN = 1211 - - def setUp(self): - super(HSTestCase, self).setUp() - self.endpoint = self.hsclient.endpoint - # Panel - no client auth, only project auth using user auth token - self.panelclient = HubstorageClient(endpoint=self.endpoint) - self.panelproject = self.panelclient.get_project(self.projectid, auth=self.auth) - - def tearDown(self): - super(HSTestCase, self).tearDown() - self.panelclient.close() - - def test_succeed_with_close_reason(self): - self._do_test_success('all-good', 'all-good') - - def test_succeed_without_close_reason(self): - self._do_test_success(None, 'no_reason') - - def test_scraper_failure(self): - job = self._do_test_job(IOError('no more resources, ha!'), 'failed') - # MAGICN per log level messages plus one of last failure - stats = job.logs.stats() - self.assertTrue(stats) - self.assertEqual(stats['totals']['input_values'], self.MAGICN * 4 + 1) - - def _do_test_success(self, job_close_reason, expected_close_reason): - job = self._do_test_job(job_close_reason, expected_close_reason) - self.assertEqual(job.items.stats()['totals']['input_values'], self.MAGICN) - self.assertEqual(job.logs.stats()['totals']['input_values'], self.MAGICN * 4) - self.assertEqual(job.requests.stats()['totals']['input_values'], self.MAGICN) - - def _do_test_job(self, job_close_reason, expected_close_reason): - p = self.panelproject - pushed = p.jobq.push(self.spidername) - # check pending state - job = p.get_job(pushed['key']) - self.assertEqual(job.metadata.get('state'), 'pending') - # consume msg from runner - self._run_runner(pushed, close_reason=job_close_reason) - # query again from panel - job = p.get_job(pushed['key']) - self.assertEqual(job.metadata.get('state'), 'finished') - self.assertEqual(job.metadata.get('close_reason'), expected_close_reason) - return job - - def _run_runner(self, pushed, close_reason): - client = HubstorageClient(endpoint=self.endpoint, auth=self.auth) - with closing(client) as runnerclient: - job = self.start_job() - self.assertFalse(job.metadata.get('stop_requested')) - job.metadata.update(host='localhost', slot=1) - self.assertEqual(job.metadata.get('state'), 'running') - # run scraper - try: - self._run_scraper(job.key, job.jobauth, close_reason=close_reason) - except Exception as exc: - job.logs.error(message=str(exc), appendmode=True) - job.close_writers() - job.jobq.finish(job, close_reason='failed') - # logging from runner must append and never remove messages logged - # by scraper - self.assertTrue(job.logs.batch_append) - else: - job.jobq.finish(job, close_reason=close_reason or 'no_reason') - - def _run_scraper(self, jobkey, jobauth, close_reason=None): - httpmethods = 'GET PUT POST DELETE HEAD OPTIONS TRACE CONNECT'.split() - # Scraper - uses job level auth, no global or project auth available - client = HubstorageClient(endpoint=self.endpoint) - with closing(client) as scraperclient: - job = scraperclient.get_job(jobkey, auth=jobauth) - for idx in range(self.MAGICN): - iid = job.items.write({'uuid': idx}) - job.logs.debug('log debug %s' % idx, idx=idx) - job.logs.info('log info %s' % idx, idx=idx) - job.logs.warn('log warn %s' % idx, idx=idx) - job.logs.error('log error %s' % idx, idx=idx) - sid = job.samples.write([idx, idx, idx]) - rid = job.requests.add( - url='http://test.com/%d' % idx, - status=random.randint(100, 1000), - method=random.choice(httpmethods), - rs=random.randint(0, 100000), - duration=random.randint(0, 1000), - parent=random.randrange(0, idx + 1) if idx > 10 else None, - ts=millitime() + random.randint(100, 100000), - ) - self.assertEqual(iid, idx) - self.assertEqual(sid, idx) - self.assertEqual(rid, idx) - - if isinstance(close_reason, Exception): - raise close_reason - - if close_reason: - job.metadata['close_reason'] = close_reason - - job.metadata.save() +from .conftest import TEST_ENDPOINT, TEST_SPIDER_NAME +from .conftest import TEST_PROJECT_ID, TEST_AUTH +from .conftest import start_job + + +MAGICN = 1211 + + +@pytest.fixture +def panelclient(): + # Panel - no client auth, only project auth using user auth token + return HubstorageClient(endpoint=TEST_ENDPOINT) + + +@pytest.fixture +def panelproject(panelclient): + return panelclient.get_project(TEST_PROJECT_ID, auth=TEST_AUTH) + + +@pytest.fixture(autouse=True) +def close_panelclient(panelclient): + yield + panelclient.close() + + +def test_succeed_with_close_reason(hsproject, panelproject): + _do_test_success(hsproject, panelproject, 'all-good', 'all-good') + + +def test_succeed_without_close_reason(hsproject, panelproject): + _do_test_success(hsproject, panelproject, None, 'no_reason') + + +def _do_test_success(*args): + """Simple wrapper around _do_test_job with additonal checks""" + job = _do_test_job(*args) + assert job.items.stats()['totals']['input_values'] == MAGICN + assert job.logs.stats()['totals']['input_values'] == MAGICN * 4 + assert job.requests.stats()['totals']['input_values'] == MAGICN + + +def test_scraper_failure(hsproject, panelproject): + job = _do_test_job( + hsproject, + panelproject, + IOError('no more resources, ha!'), + 'failed', + ) + # MAGICN per log level messages plus one of last failure + stats = job.logs.stats() + assert stats + assert stats['totals']['input_values'] == MAGICN * 4 + 1 + + +def _do_test_job(hsproject, panelproject, + job_close_reason, expected_close_reason): + pushed = panelproject.jobq.push(TEST_SPIDER_NAME) + # check pending state + job = panelproject.get_job(pushed['key']) + assert job.metadata.get('state') == 'pending' + # consume msg from runner + _run_runner(hsproject, pushed, close_reason=job_close_reason) + # query again from panel + job = panelproject.get_job(pushed['key']) + assert job.metadata.get('state') == 'finished' + assert job.metadata.get('close_reason') == expected_close_reason + return job + + +def _run_runner(hsproject, pushed, close_reason): + client = HubstorageClient(endpoint=TEST_ENDPOINT, auth=TEST_AUTH) + with closing(client) as runnerclient: + job = start_job(hsproject) + assert not job.metadata.get('stop_requested') + job.metadata.update(host='localhost', slot=1) + assert job.metadata.get('state') == 'running' + # run scraper + try: + _run_scraper(job.key, job.jobauth, close_reason=close_reason) + except Exception as exc: + job.logs.error(message=str(exc), appendmode=True) + job.close_writers() + job.jobq.finish(job, close_reason='failed') + # logging from runner must append and never remove messages logged + # by scraper + assert job.logs.batch_append + else: + job.jobq.finish(job, close_reason=close_reason or 'no_reason') + + +def _run_scraper(jobkey, jobauth, close_reason=None): + httpmethods = 'GET PUT POST DELETE HEAD OPTIONS TRACE CONNECT'.split() + # Scraper - uses job level auth, no global or project auth available + client = HubstorageClient(endpoint=TEST_ENDPOINT) + # use some fixed timestamp to represent current time + now_ts = 1476803148638 + with closing(client) as scraperclient: + job = scraperclient.get_job(jobkey, auth=jobauth) + for idx in range(MAGICN): + iid = job.items.write({'uuid': idx}) + job.logs.debug('log debug %s' % idx, idx=idx) + job.logs.info('log info %s' % idx, idx=idx) + job.logs.warn('log warn %s' % idx, idx=idx) + job.logs.error('log error %s' % idx, idx=idx) + sid = job.samples.write([idx, idx, idx]) + rid = job.requests.add( + url='http://test.com/%d' % idx, + status=random.randint(100, 1000), + method=random.choice(httpmethods), + rs=random.randint(0, 100000), + duration=random.randint(0, 1000), + parent=random.randrange(0, idx + 1) if idx > 10 else None, + ts=now_ts + 100 + idx, + ) + assert iid == idx + assert sid == idx + assert rid == idx + + if isinstance(close_reason, Exception): + raise close_reason + + if close_reason: + job.metadata['close_reason'] = close_reason + + job.metadata.save() diff --git a/tests/legacy/test_connection.py b/tests/legacy/test_connection.py index 3b75362d..2f88e14e 100644 --- a/tests/legacy/test_connection.py +++ b/tests/legacy/test_connection.py @@ -15,7 +15,8 @@ def test_connection_class_attrs(): assert isinstance(Connection.API_METHODS, dict) -def test_connection_init_fail_wo_apikey(): +def test_connection_init_fail_wo_apikey(monkeypatch): + monkeypatch.delenv('SH_APIKEY', raising=False) with pytest.raises(RuntimeError): Connection()