Skip to content

Commit 9c8be70

Browse files
nickgayaandymccurdy
authored andcommitted
Clear pipeline watch state after exec
1 parent 07fec7e commit 9c8be70

File tree

6 files changed

+66
-22
lines changed

6 files changed

+66
-22
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ vagrant/.vagrant
99
.python-version
1010
.cache
1111
.eggs
12-
.idea
12+
.idea
13+
.coverage

CHANGES

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
deprecated now. Thanks to @laixintao #1271
1919
* Don't manually DISCARD when encountering an ExecAbortError.
2020
Thanks @nickgaya, #1300/#1301
21+
* Reset the watched state of pipelines after calling exec. This saves
22+
a roundtrip to the server by not having to call UNWATCH within
23+
Pipeline.reset(). Thanks @nickgaya, #1299/#1302
2124
* 3.4.1
2225
* Move the username argument in the Redis and Connection classes to the
2326
end of the argument list. This helps those poor souls that specify all

redis/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3902,6 +3902,9 @@ def _execute_transaction(self, connection, commands, raise_on_error):
39023902
raise errors[0][1]
39033903
raise sys.exc_info()[1]
39043904

3905+
# EXEC clears any watched keys
3906+
self.watching = False
3907+
39053908
if response is None:
39063909
raise WatchError("Watched variable changed.")
39073910

tests/conftest.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import random
2+
13
import pytest
24
import redis
35
from mock import Mock
@@ -146,3 +148,22 @@ def mock_cluster_resp_slaves(request, **kwargs):
146148
"slave 19efe5a631f3296fdf21a5441680f893e8cc96ec 0 "
147149
"1447836789290 3 connected']")
148150
return _gen_cluster_mock_resp(r, response)
151+
152+
153+
def wait_for_command(client, monitor, command):
154+
# issue a command with a key name that's local to this process.
155+
# if we find a command with our key before the command we're waiting
156+
# for, something went wrong
157+
redis_version = REDIS_INFO["version"]
158+
if StrictVersion(redis_version) >= StrictVersion('5.0.0'):
159+
id_str = str(client.client_id())
160+
else:
161+
id_str = '%08x' % random.randrange(2**32)
162+
key = '__REDIS-PY-%s__' % id_str
163+
client.get(key)
164+
while True:
165+
monitor_response = monitor.next_command()
166+
if command in monitor_response['command']:
167+
return monitor_response
168+
if key in monitor_response['command']:
169+
return None

tests/test_monitor.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,15 @@
11
from __future__ import unicode_literals
22
from redis._compat import unicode
3-
from .conftest import skip_if_server_version_lt
3+
from .conftest import wait_for_command
44

55

6-
def wait_for_command(client, monitor, command):
7-
# issue a command with a key name that's local to this process.
8-
# if we find a command with our key before the command we're waiting
9-
# for, something went wrong
10-
key = '__REDIS-PY-%s__' % str(client.client_id())
11-
client.get(key)
12-
while True:
13-
monitor_response = monitor.next_command()
14-
if command in monitor_response['command']:
15-
return monitor_response
16-
if key in monitor_response['command']:
17-
return None
18-
19-
20-
class TestPipeline(object):
21-
@skip_if_server_version_lt('5.0.0')
6+
class TestMonitor(object):
227
def test_wait_command_not_found(self, r):
238
"Make sure the wait_for_command func works when command is not found"
249
with r.monitor() as m:
2510
response = wait_for_command(r, m, 'nothing')
2611
assert response is None
2712

28-
@skip_if_server_version_lt('5.0.0')
2913
def test_response_values(self, r):
3014
with r.monitor() as m:
3115
r.ping()
@@ -37,22 +21,19 @@ def test_response_values(self, r):
3721
assert isinstance(response['client_port'], unicode)
3822
assert response['command'] == 'PING'
3923

40-
@skip_if_server_version_lt('5.0.0')
4124
def test_command_with_quoted_key(self, r):
4225
with r.monitor() as m:
4326
r.get('foo"bar')
4427
response = wait_for_command(r, m, 'GET foo"bar')
4528
assert response['command'] == 'GET foo"bar'
4629

47-
@skip_if_server_version_lt('5.0.0')
4830
def test_command_with_binary_data(self, r):
4931
with r.monitor() as m:
5032
byte_string = b'foo\x92'
5133
r.get(byte_string)
5234
response = wait_for_command(r, m, 'GET foo\\x92')
5335
assert response['command'] == 'GET foo\\x92'
5436

55-
@skip_if_server_version_lt('5.0.0')
5637
def test_lua_script(self, r):
5738
with r.monitor() as m:
5839
script = 'return redis.call("GET", "foo")'

tests/test_pipeline.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import redis
55
from redis._compat import unichr, unicode
6+
from .conftest import wait_for_command
67

78

89
class TestPipeline(object):
@@ -243,6 +244,40 @@ def test_unwatch(self, r):
243244
pipe.get('a')
244245
assert pipe.execute() == [b'1']
245246

247+
def test_watch_exec_no_unwatch(self, r):
248+
r['a'] = 1
249+
r['b'] = 2
250+
251+
with r.monitor() as m:
252+
with r.pipeline() as pipe:
253+
pipe.watch('a', 'b')
254+
assert pipe.watching
255+
a_value = pipe.get('a')
256+
b_value = pipe.get('b')
257+
assert a_value == b'1'
258+
assert b_value == b'2'
259+
pipe.multi()
260+
pipe.set('c', 3)
261+
assert pipe.execute() == [True]
262+
assert not pipe.watching
263+
264+
unwatch_command = wait_for_command(r, m, 'UNWATCH')
265+
assert unwatch_command is None, "should not send UNWATCH"
266+
267+
def test_watch_reset_unwatch(self, r):
268+
r['a'] = 1
269+
270+
with r.monitor() as m:
271+
with r.pipeline() as pipe:
272+
pipe.watch('a')
273+
assert pipe.watching
274+
pipe.reset()
275+
assert not pipe.watching
276+
277+
unwatch_command = wait_for_command(r, m, 'UNWATCH')
278+
assert unwatch_command is not None
279+
assert unwatch_command['command'] == 'UNWATCH'
280+
246281
def test_transaction_callable(self, r):
247282
r['a'] = 1
248283
r['b'] = 2

0 commit comments

Comments
 (0)