|
2 | 2 | Test Project
|
3 | 3 | """
|
4 | 4 | import time
|
| 5 | +import pytest |
5 | 6 | from six.moves import range
|
6 | 7 | from collections import defaultdict
|
7 |
| -from .hstestcase import HSTestCase |
8 | 8 |
|
9 | 9 | from scrapinghub.hubstorage import ValueTooLarge
|
| 10 | +from .conftest import TEST_SPIDER_NAME, TEST_AUTH |
10 | 11 |
|
11 | 12 |
|
12 |
| -class BatchUploaderTest(HSTestCase): |
| 13 | +def _job_and_writer(hsclient, hsproject, **writerargs): |
| 14 | + job = hsproject.push_job(TEST_SPIDER_NAME) |
| 15 | + hsproject.jobq.start(job) |
| 16 | + batch_uploader = hsclient.batchuploader |
| 17 | + writer = batch_uploader.create_writer( |
| 18 | + job.items.url, auth=TEST_AUTH, **writerargs) |
| 19 | + return job, writer |
13 | 20 |
|
14 |
| - def _job_and_writer(self, **writerargs): |
15 |
| - self.project.push_job(self.spidername) |
16 |
| - job = self.start_job() |
17 |
| - bu = self.hsclient.batchuploader |
18 |
| - w = bu.create_writer(job.items.url, auth=self.auth, **writerargs) |
19 |
| - return job, w |
20 | 21 |
|
21 |
| - def test_writer_batchsize(self): |
22 |
| - job, w = self._job_and_writer(size=10) |
23 |
| - for x in range(111): |
24 |
| - w.write({'x': x}) |
25 |
| - w.close() |
26 |
| - # this works only for small batches (previous size=10 and small data) |
27 |
| - # as internally HS may commit a single large request as many smaller |
28 |
| - # commits, each with different timestamps |
29 |
| - groups = defaultdict(int) |
30 |
| - for doc in job.items.list(meta=['_ts']): |
31 |
| - groups[doc['_ts']] += 1 |
32 |
| - |
33 |
| - self.assertEqual(len(groups), 12) |
34 |
| - |
35 |
| - def test_writer_maxitemsize(self): |
36 |
| - job, w = self._job_and_writer() |
37 |
| - m = w.maxitemsize |
38 |
| - self.assertRaisesRegexp( |
39 |
| - ValueTooLarge, |
40 |
| - 'Value exceeds max encoded size of 1048576 bytes:' |
41 |
| - ' \'{"b": "x+\\.\\.\\.\'', |
42 |
| - w.write, {'b': 'x' * m}) |
43 |
| - self.assertRaisesRegexp( |
44 |
| - ValueTooLarge, |
45 |
| - 'Value exceeds max encoded size of 1048576 bytes:' |
46 |
| - ' \'{"b+\\.\\.\\.\'', |
47 |
| - w.write, {'b'*m: 'x'}) |
48 |
| - self.assertRaisesRegexp( |
49 |
| - ValueTooLarge, |
50 |
| - 'Value exceeds max encoded size of 1048576 bytes:' |
51 |
| - ' \'{"b+\\.\\.\\.\'', |
52 |
| - w.write, {'b'*(m//2): 'x'*(m//2)}) |
53 |
| - |
54 |
| - def test_writer_contentencoding(self): |
55 |
| - for ce in ('identity', 'gzip'): |
56 |
| - job, w = self._job_and_writer(content_encoding=ce) |
57 |
| - for x in range(111): |
58 |
| - w.write({'x': x}) |
59 |
| - w.close() |
60 |
| - self.assertEqual(job.items.stats()['totals']['input_values'], 111) |
61 |
| - |
62 |
| - def test_writer_interval(self): |
63 |
| - job, w = self._job_and_writer(size=1000, interval=1) |
| 22 | +def test_writer_batchsize(hsclient, hsproject): |
| 23 | + job, writer = _job_and_writer(hsclient, hsproject, size=10) |
| 24 | + for x in range(111): |
| 25 | + writer.write({'x': x}) |
| 26 | + writer.close() |
| 27 | + # this works only for small batches (previous size=10 and small data) |
| 28 | + # as internally HS may commit a single large request as many smaller |
| 29 | + # commits, each with different timestamps |
| 30 | + groups = defaultdict(int) |
| 31 | + for doc in job.items.list(meta=['_ts']): |
| 32 | + groups[doc['_ts']] += 1 |
| 33 | + |
| 34 | + assert len(groups) == 12 |
| 35 | + |
| 36 | + |
| 37 | +def test_writer_maxitemsize(hsclient, hsproject): |
| 38 | + _, writer = _job_and_writer(hsclient, hsproject) |
| 39 | + max_size = writer.maxitemsize |
| 40 | + with pytest.raises(ValueTooLarge) as excinfo1: |
| 41 | + writer.write({'b': 'x' * max_size}) |
| 42 | + excinfo1.match( |
| 43 | + r'Value exceeds max encoded size of 1048576 bytes:' |
| 44 | + ' \'{"b": "x+\\.\\.\\.\'') |
| 45 | + |
| 46 | + with pytest.raises(ValueTooLarge) as excinfo2: |
| 47 | + writer.write({'b'*max_size: 'x'}) |
| 48 | + excinfo2.match( |
| 49 | + r'Value exceeds max encoded size of 1048576 bytes:' |
| 50 | + ' \'{"b+\\.\\.\\.\'') |
| 51 | + |
| 52 | + with pytest.raises(ValueTooLarge) as excinfo3: |
| 53 | + writer.write({'b'*(max_size//2): 'x'*(max_size//2)}) |
| 54 | + excinfo3.match( |
| 55 | + r'Value exceeds max encoded size of 1048576 bytes:' |
| 56 | + ' \'{"b+\\.\\.\\.\'') |
| 57 | + |
| 58 | + |
| 59 | +def test_writer_contentencoding(hsclient, hsproject): |
| 60 | + for ce in ('identity', 'gzip'): |
| 61 | + job, writer = _job_and_writer(hsclient, hsproject, |
| 62 | + content_encoding=ce) |
64 | 63 | for x in range(111):
|
65 |
| - w.write({'x': x}) |
66 |
| - if x == 50: |
67 |
| - time.sleep(2) |
| 64 | + writer.write({'x': x}) |
| 65 | + writer.close() |
| 66 | + assert job.items.stats()['totals']['input_values'] == 111 |
| 67 | + |
| 68 | + |
| 69 | +def test_writer_interval(hsclient, hsproject): |
| 70 | + job, writer = _job_and_writer(hsclient, hsproject, |
| 71 | + size=1000, interval=1) |
| 72 | + for x in range(111): |
| 73 | + writer.write({'x': x}) |
| 74 | + if x == 50: |
| 75 | + time.sleep(2) |
68 | 76 |
|
69 |
| - w.close() |
70 |
| - groups = defaultdict(int) |
71 |
| - for doc in job.items.list(meta=['_ts']): |
72 |
| - groups[doc['_ts']] += 1 |
| 77 | + writer.close() |
| 78 | + groups = defaultdict(int) |
| 79 | + for doc in job.items.list(meta=['_ts']): |
| 80 | + groups[doc['_ts']] += 1 |
73 | 81 |
|
74 |
| - self.assertEqual(len(groups), 2) |
| 82 | + assert len(groups) == 2 |
0 commit comments