From 26aa662a4156d7d8d5a382d7892bf9f11614f9ed Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 20 Mar 2014 14:54:41 +0100 Subject: [PATCH 1/2] DATAREDIS-73 - Add support for spring managed transactions. Prepare issue branch. --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index f09fb8eb53..373a3b8efb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ jredisVersion=06052013 jedisVersion=2.4.1 springVersion=3.2.8.RELEASE log4jVersion=1.2.17 -version=1.3.0.BUILD-SNAPSHOT +version=1.3.0.DATAREDIS-73-SNAPSHOT srpVersion=0.7 jacksonVersion=1.8.8 fasterXmlJacksonVersion=2.2.0 From 4ef7fdcbf50e8555da74a88d055f796c6baa26e3 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 20 Mar 2014 14:55:24 +0100 Subject: [PATCH 2/2] DATAREDIS-73 - Add support for spring managed transactions. RedisTemplate now supports enabling of transaction support, which is disabled by default. In case of enabled transaction support the RedisConnections will be bound to the current thread during ongoing outer transactions. We call MULTI at the beginning and depending on transaction state EXEC or DISCARD at its end. To support read operations during the transaction we wrap a proxy around the bound connection piping read operations to a new (non thread bound) connection obtained by the underlying RedisConnectionFactory. Transaction support is available for jedis, lettuce and srp, while had to be skipped for jredis due to the lack of support for MULTI. Original pull request: #64. --- build.gradle | 1 + .../data/redis/core/RedisCommand.java | 365 ++++++++++++++++++ .../data/redis/core/RedisConnectionUtils.java | 210 +++++++++- .../data/redis/core/RedisTemplate.java | 17 +- .../AbstractTransactionalTestBase.java | 158 ++++++++ .../TransactionalJedisItegrationTests.java | 45 +++ .../TransactionalLettuceItegrationTests.java | 45 +++ .../srp/TransactionalSrpItegrationTests.java | 45 +++ ...nnectionSplittingInterceptorUnitTests.java | 98 +++++ .../redis/core/RedisCommandUnitTests.java | 110 ++++++ template.mf | 2 + 11 files changed, 1080 insertions(+), 16 deletions(-) create mode 100644 src/main/java/org/springframework/data/redis/core/RedisCommand.java create mode 100644 src/test/java/org/springframework/data/redis/connection/AbstractTransactionalTestBase.java create mode 100644 src/test/java/org/springframework/data/redis/connection/jedis/TransactionalJedisItegrationTests.java create mode 100644 src/test/java/org/springframework/data/redis/connection/lettuce/TransactionalLettuceItegrationTests.java create mode 100644 src/test/java/org/springframework/data/redis/connection/srp/TransactionalSrpItegrationTests.java create mode 100644 src/test/java/org/springframework/data/redis/core/ConnectionSplittingInterceptorUnitTests.java create mode 100644 src/test/java/org/springframework/data/redis/core/RedisCommandUnitTests.java diff --git a/build.gradle b/build.gradle index 83aefbee6a..2a285805e4 100644 --- a/build.gradle +++ b/build.gradle @@ -51,6 +51,7 @@ dependencies { compile "org.springframework:spring-context:$springVersion" compile "org.springframework:spring-tx:$springVersion" compile("org.springframework:spring-oxm:$springVersion", optional) + compile "org.springframework:spring-aop:$springVersion" // Redis Drivers compile("redis.clients:jedis:$jedisVersion", optional) diff --git a/src/main/java/org/springframework/data/redis/core/RedisCommand.java b/src/main/java/org/springframework/data/redis/core/RedisCommand.java new file mode 100644 index 0000000000..be629b33ea --- /dev/null +++ b/src/main/java/org/springframework/data/redis/core/RedisCommand.java @@ -0,0 +1,365 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.core; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.jredis.Redis; +import org.springframework.util.StringUtils; + +/** + * @author Christoph Strobl + * @author Thomas Darimont + * @since 1.3 + * @see Redis command list: + * https://github.com/antirez/redis/blob/93e7a130fc9594e41ccfc996b5eca7626ae5356a/src/redis.c#L119 + */ +enum RedisCommand { + // -- A + APPEND("rw", 2, 2), // + AUTH("rw", 1, 1), // + // -- B + BGREWRITEAOF("r", 0, 0, "bgwriteaof"), // + BGSAVE("r", 0, 0), // + BITCOUNT("r", 1, 3), // + BITOP("rw", 3), // + BITPOS("r", 2, 4), // + BLPOP("rw", 2), // + BRPOP("rw", 2), // + BRPOPLPUSH("rw", 3), // + // -- C + CLIENT_KILL("rw", 1, 1), // + CLIENT_LIST("r", 0, 0), // + CLIENT_GETNAME("r", 0, 0), // + CLIENT_PAUSE("rw", 1, 1), // + CLIENT_SETNAME("w", 1, 1), // + CONFIG_GET("r", 1, 1, "getconfig"), // + CONFIG_REWRITE("rw", 0, 0), // + CONFIG_SET("w", 2, 2, "setconfig"), // + CONFIG_RESETSTAT("w", 0, 0, "resetconfigstats"), // + // -- D + DBSIZE("r", 0, 0), // + DECR("w", 1, 1), // + DECRBY("w", 2, 2), // + DEL("rw", 1), // + DISCARD("rw", 0, 0), // + DUMP("r", 1, 1), // + // -- E + ECHO("r", 1, 1), // + EVAL("rw", 2), // + EVALSHA("rw", 2), // + EXEC("rw", 0, 0), // + EXISTS("r", 1, 1), // + EXPIRE("rw", 2, 2), // + EXPIREAT("rw", 2, 2), // + // -- F + FLUSHALL("w", 0, 0), // + FLUSHDB("w", 0, 0), // + // -- G + GET("r", 1, 1), // + GETBIT("r", 2, 2), // + GETRANGE("r", 3, 3), // + GETSET("rw", 2, 2), // + // -- H + HDEL("rw", 2), // + HEXISTS("r", 2, 2), // + HGET("r", 2, 2), // + HGETALL("r", 1, 1), // + HINCRBY("rw", 3, 3), // + HINCBYFLOAT("rw", 3, 3), // + HKEYS("r", 1), // + HLEN("r", 1), // + HMGET("r", 2), // + HMSET("w", 3), // + HSET("w", 3, 3), // + HSETNX("w", 3, 3), // + HVALS("r", 1, 1), // + // -- I + INCR("rw", 1), // + INCRBYFLOAT("rw", 2, 2), // + INFO("r", 0), // + // -- K + KEYS("r", 1), // + // -- L + LASTSAVE("r", 0), // + LINDEX("r", 2, 2), // + LINSERT("rw", 4, 4), // + LLEN("r", 1, 1), // + LPOP("rw", 1, 1), // + LPUSH("rw", 2), // + LPUSHX("rw", 2), // + LRANGE("r", 3, 3), // + LREM("rw", 3, 3), // + LSET("w", 3, 3), // + LTRIM("w", 3, 3), // + // -- M + MGET("r", 1), // + MIGRATE("rw", 0), // + MONITOR("rw", 0, 0), // + MOVE("rw", 2, 2), // + MSET("w", 2), // + MSETNX("w", 2), // + MULTI("rw", 0, 0), // + // -- P + PERSIST("rw", 1, 1), // + PEXPIRE("rw", 2, 2), // + PEXPIREAT("rw", 2, 2), // + PING("r", 0, 0), // + PSETEX("w", 3), // + PSUBSCRIBE("r", 1), // + PTTL("r", 1, 1), // + // -- Q + QUIT("rw", 0, 0), // + // -- R + RANDOMKEY("r", 0, 0), // + RANAME("w", 2, 2), // + RENAMENX("w", 2, 2), // + RESTORE("w", 3, 3), // + RPOP("rw", 1, 1), // + RPOPLPUSH("rw", 2, 2), // + RPUSH("rw", 2), // + RPUSHX("rw", 2, 2), // + // -- S + SADD("rw", 2), // + SAVE("rw", 0, 0), // + SCARD("r", 1, 1), // + SCRIPT_EXISTS("r", 1), // + SCRIPT_FLUSH("rw", 0, 0), // + SCRIPT_KILL("rw", 0, 0), // + SCRIPT_LOAD("rw", 1, 1), // + SDIFF("r", 1), // + SDIFFSTORE("rw", 2), // + SELECT("rw", 0, 0), // + SET("w", 2), // + SETBIT("rw", 3, 3), // + SETEX("w", 3, 3), // + SETNX("w", 2, 2), // + SETRANGE("rw", 3, 3), // + SHUTDOWN("rw", 0), // + SINTER("r", 1), // + SINTERSTORE("rw", 2), // + SISMEMBER("r", 2), // + SLAVEOF("w", 2), // + SLOWLOG("rw", 1), // + SMEMBERS("r", 1, 1), // + SMOVE("rw", 3, 3), // + SORT("rw", 1), // + SPOP("rw", 1, 1), // + SRANDMEMBER("r", 1, 1), // + SREM("rw", 2), // + STRLEN("r", 1, 1), // + SUBSCRIBE("rw", 1), // + SUNION("r", 1), // + SUNIONSTORE("rw ", 2), // + SYNC("rw", 0, 0), // + // -- T + TIME("r", 0, 0), // + TTL("r", 1, 1), // + TYPE("r", 1, 1), // + // -- U + UNSUBSCRIBE("rw", 0), // + UNWATCH("rw", 0, 0), // + // -- W + WATCH("rw", 1), // + // -- Z + ZADD("rw", 3), // + ZCARD("r", 1), // + ZCOUNT("r", 3, 3), // + ZINCRBY("rw", 3), // + ZINTERSTORE("rw", 3), // + ZRANGE("r", 3), // + ZRANGEBYSCORE("r", 3), // + ZRANK("r", 2, 2), // + ZREM("rw", 2), // + ZREMRANGEBYRANK("rw", 3, 3), // + ZREMRANGEBYSCORE("rm", 3, 3), // + ZREVRANGE("r", 3), // + ZREVRANGEBYSCORE("r", 3), // + ZREVRANK("r", 2, 2), // + ZSCORE("r", 2, 2), // + ZUNIONSTORE("rw", 3), // + SCAN("r", 1), // + SSCAN("r", 2), // + HSCAN("r", 2), // + ZSCAN("r", 2), // + // -- UNKNOWN / DEFAULT + UNKNOWN("rw", -1); + + private boolean read = true; + private boolean write = true; + private Set alias = new HashSet(1); + + private int minArgs = -1; + private int maxArgs = -1; + + private final static Map commandLookup; + + static { + commandLookup = buildCommandLookupTable(); + } + + private static Map buildCommandLookupTable() { + + RedisCommand[] cmds = RedisCommand.values(); + Map map = new HashMap(cmds.length, 1.0F); + + for (RedisCommand cmd : cmds) { + + map.put(cmd.name().toLowerCase(), cmd); + + for (String alias : cmd.alias) { + map.put(alias, cmd); + } + } + + return Collections.unmodifiableMap(map); + } + + private RedisCommand(String mode, int minArgs) { + this(mode, minArgs, -1); + } + + private RedisCommand(String mode, int minArgs, int maxArgs) { + + if (StringUtils.hasText(mode)) { + this.read = mode.toLowerCase().indexOf('r') > -1; + this.write = mode.toLowerCase().indexOf('w') > -1; + } + + this.minArgs = minArgs; + this.maxArgs = maxArgs; + } + + /** + * Creates a new {@link RedisCommand}. + * + * @param mode + * @param minArgs + * @param maxArgs + * @param alias + */ + private RedisCommand(String mode, int minArgs, int maxArgs, String... alias) { + + this(mode, minArgs, maxArgs); + + if (alias != null && alias.length > 0) { + this.alias.addAll(Arrays.asList(alias)); + } + } + + /** + * @return {@literal true} if the command requires arguments + */ + public boolean requiresArguments() { + return minArgs >= 0; + } + + /** + * @return {@literal true} if an exact number of arguments is expected + */ + public boolean requiresExactNumberOfArguments() { + return maxArgs >= 0; + } + + /** + * @return {@literal true} if the command triggers a read operation + */ + public boolean isRead() { + return read; + } + + /** + * @return {@literal true} if the command triggers a write operation + */ + public boolean isWrite() { + return write; + } + + /** + * @return {@literal true} if values are read but not written + */ + public boolean isReadonly() { + return read && !write; + } + + /** + * {@link String#equalsIgnoreCase(String)} compare the given string representation of {@literal command} against the + * {@link #toString()} representation of the command as well as its given {@link #alias}. + * + * @param command + * @return true if positive match. + */ + public boolean isRepresentedBy(String command) { + + if (!StringUtils.hasText(command)) { + return false; + } + + if (toString().equalsIgnoreCase(command)) { + return true; + } + + return alias.contains(command.toLowerCase()); + } + + /** + * Validates given argument count against expected ones. + * + * @param nrArguments + * @throws {@link IllegalArgumentException} in case argument count does not match expected. + */ + public void validateArgumentCount(int nrArguments) { + + if (requiresArguments()) { + if (requiresExactNumberOfArguments()) { + if (nrArguments != maxArgs) { + throw new IllegalArgumentException(String.format("%s command requires %s arguments.", this.name(), + this.maxArgs)); + } + } + if (nrArguments < minArgs) { + throw new IllegalArgumentException(String.format("%s command requires at least %s arguments.", this.name(), + this.minArgs)); + } + } + } + + /** + * Returns the command represented by the given {@code key}. Returns {@link #UNKNOWN} if no matching command could be + * found. + * + * @param key + * @return + */ + public static RedisCommand failsafeCommandLookup(String key) { + + if (!StringUtils.hasText(key)) { + return RedisCommand.UNKNOWN; + } + + RedisCommand cmd = commandLookup.get(key.toLowerCase()); + if (cmd != null) { + return cmd; + } + + return RedisCommand.UNKNOWN; + } +} diff --git a/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java b/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java index 08ae0bc02b..c4f08aab57 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java +++ b/src/main/java/org/springframework/data/redis/core/RedisConnectionUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2013 the original author or authors. + * Copyright 2011-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,11 +15,20 @@ */ package org.springframework.data.redis.core; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.cglib.proxy.MethodProxy; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.transaction.support.ResourceHolder; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; @@ -28,6 +37,8 @@ * 'transactions'/scopes. * * @author Costin Leau + * @author Christoph Strobl + * @author Thomas Darimont */ public abstract class RedisConnectionUtils { @@ -37,10 +48,22 @@ public abstract class RedisConnectionUtils { * Binds a new Redis connection (from the given factory) to the current thread, if none is already bound. * * @param factory connection factory - * @return a new Redis connection + * @return a new Redis connection without transaction support. */ public static RedisConnection bindConnection(RedisConnectionFactory factory) { - return doGetConnection(factory, true, true); + return bindConnection(factory, false); + } + + /** + * Binds a new Redis connection (from the given factory) to the current thread, if none is already bound and enables + * transaction support if {@code enableTranactionSupport} is set to {@literal true}. + * + * @param factory connection factory + * @param enableTranactionSupport + * @return a new Redis connection with transaction support if requested. + */ + public static RedisConnection bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport) { + return doGetConnection(factory, true, true, enableTranactionSupport); } /** @@ -49,10 +72,23 @@ public static RedisConnection bindConnection(RedisConnectionFactory factory) { * otherwise. * * @param factory connection factory for creating the connection - * @return an active Redis connection + * @return an active Redis connection without transaction management. */ public static RedisConnection getConnection(RedisConnectionFactory factory) { - return doGetConnection(factory, true, false); + return getConnection(factory, false); + } + + /** + * Gets a Redis connection from the given factory. Is aware of and will return any existing corresponding connections + * bound to the current thread, for example when using a transaction manager. Will always create a new connection + * otherwise. + * + * @param factory connection factory for creating the connection + * @param enableTranactionSupport + * @return an active Redis connection with transaction management if requested. + */ + public static RedisConnection getConnection(RedisConnectionFactory factory, boolean enableTranactionSupport) { + return doGetConnection(factory, true, false, enableTranactionSupport); } /** @@ -64,34 +100,100 @@ public static RedisConnection getConnection(RedisConnectionFactory factory) { * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the * current thread * @param bind binds the connection to the thread, in case one was created + * @param enableTransactionSupport * @return an active Redis connection */ - public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind) { + public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, + boolean enableTransactionSupport) { + Assert.notNull(factory, "No RedisConnectionFactory specified"); RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); - // TODO: investigate tx synchronization - if (connHolder != null) + if (connHolder != null) { + if (enableTransactionSupport) { + potentiallyRegisterTransactionSynchronisation(connHolder, factory); + } return connHolder.getConnection(); + } if (!allowCreate) { throw new IllegalArgumentException("No connection found and allowCreate = false"); } - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Opening RedisConnection"); + } RedisConnection conn = factory.getConnection(); if (bind) { - connHolder = new RedisConnectionHolder(conn); + + RedisConnection connectionToBind = conn; + if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) { + connectionToBind = createConnectionProxy(conn, factory); + } + + connHolder = new RedisConnectionHolder(connectionToBind); + TransactionSynchronizationManager.bindResource(factory, connHolder); + if (enableTransactionSupport) { + potentiallyRegisterTransactionSynchronisation(connHolder, factory); + } + return connHolder.getConnection(); } + return conn; } + private static void potentiallyRegisterTransactionSynchronisation(final RedisConnectionHolder connHolder, + RedisConnectionFactory factory) { + + if (isActualNonReadonlyTransactionActive()) { + + final RedisConnection connection = connHolder.getConnection(); + + if (!connHolder.isTransactionSyncronisationActive()) { + connHolder.setTransactionSyncronisationActive(true); + connection.multi(); + + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { + @Override + public void afterCompletion(int status) { + + switch (status) { + + case TransactionSynchronization.STATUS_COMMITTED: + connection.exec(); + break; + + case TransactionSynchronization.STATUS_ROLLED_BACK: + case TransactionSynchronization.STATUS_UNKNOWN: + default: + connection.discard(); + } + + connHolder.setTransactionSyncronisationActive(false); + } + }); + } + } + } + + private static boolean isActualNonReadonlyTransactionActive() { + return TransactionSynchronizationManager.isActualTransactionActive() + && !TransactionSynchronizationManager.isCurrentTransactionReadOnly(); + } + + private static RedisConnection createConnectionProxy(RedisConnection connection, RedisConnectionFactory factory) { + + ProxyFactory proxyFactory = new ProxyFactory(connection); + proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory)); + + return RedisConnection.class.cast(proxyFactory.getProxy()); + } + /** * Closes the given connection, created via the given factory if not managed externally (i.e. not bound to the * thread). @@ -143,17 +245,83 @@ public static boolean isConnectionTransactional(RedisConnection conn, RedisConne return (connHolder != null && conn == connHolder.getConnection()); } + /** + * @author Christoph Strobl + * @since 1.3 + */ + static class ConnectionSplittingInterceptor implements MethodInterceptor, + org.springframework.cglib.proxy.MethodInterceptor { + + private final RedisConnectionFactory factory; + + public ConnectionSplittingInterceptor(RedisConnectionFactory factory) { + this.factory = factory; + } + + @Override + public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable { + + RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName()); + + if (isPotentiallyThreadBoundCommand(commandToExecute)) { + + if (log.isDebugEnabled()) { + log.debug(String.format("Invoke '%s' on bound conneciton", method.getName())); + } + + return invoke(method, obj, args); + } + + if (log.isDebugEnabled()) { + log.debug(String.format("Invoke '%s' on unbound conneciton", method.getName())); + } + + RedisConnection connection = factory.getConnection(); + + try { + return invoke(method, connection, args); + } finally { + // properly close the unbound connection after executing command + if (!connection.isClosed()) { + connection.close(); + } + } + } + + private Object invoke(Method method, Object target, Object[] args) throws Throwable { + + try { + return method.invoke(target, args); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } + + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + return intercept(invocation.getThis(), invocation.getMethod(), invocation.getArguments(), null); + } + + private boolean isPotentiallyThreadBoundCommand(RedisCommand command) { + return RedisCommand.UNKNOWN.equals(command) || !command.isReadonly(); + } + } + + /** + * @author Christoph Strobl + */ private static class RedisConnectionHolder implements ResourceHolder { - private boolean isVoid = false; + private boolean unbound; private final RedisConnection conn; + private boolean transactionSyncronisationActive; public RedisConnectionHolder(RedisConnection conn) { this.conn = conn; } public boolean isVoid() { - return isVoid; + return unbound; } public RedisConnection getConnection() { @@ -165,7 +333,23 @@ public void reset() { } public void unbound() { - this.isVoid = true; + this.unbound = true; + } + + /** + * @return + * @since 1.3 + */ + public boolean isTransactionSyncronisationActive() { + return transactionSyncronisationActive; + } + + /** + * @param transactionSyncronisationActive + * @since 1.3 + */ + public void setTransactionSyncronisationActive(boolean transactionSyncronisationActive) { + this.transactionSyncronisationActive = transactionSyncronisationActive; } } } diff --git a/src/main/java/org/springframework/data/redis/core/RedisTemplate.java b/src/main/java/org/springframework/data/redis/core/RedisTemplate.java index b96c14a95a..6eb3c2ac38 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/RedisTemplate.java @@ -77,6 +77,7 @@ */ public class RedisTemplate extends RedisAccessor implements RedisOperations { + private boolean enableTransactionSupport = false; private boolean exposeConnection = false; private boolean initialized = false; private boolean enableDefaultSerializer = true; @@ -168,7 +169,7 @@ public T execute(RedisCallback action, boolean exposeConnection, boolean RedisConnectionFactory factory = getConnectionFactory(); RedisConnection conn = null; try { - conn = RedisConnectionUtils.getConnection(factory); + conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); @@ -200,7 +201,7 @@ public T execute(SessionCallback session) { RedisConnectionFactory factory = getConnectionFactory(); // bind connection - RedisConnectionUtils.bindConnection(factory); + RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); try { return session.execute(this); } finally { @@ -218,7 +219,7 @@ public List executePipelined(final SessionCallback session, final Red RedisConnectionFactory factory = getConnectionFactory(); // bind connection - RedisConnectionUtils.bindConnection(factory); + RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); try { return execute(new RedisCallback>() { public List doInRedis(RedisConnection connection) throws DataAccessException { @@ -1053,4 +1054,14 @@ public Void doInRedis(RedisConnection connection) throws DataAccessException { } }); } + + /** + * If set to {@code true} {@link RedisTemplate} will use {@literal MULTI...EXEC|DISCARD} to keep track of operations. + * + * @param enableTransactionSupport + * @since 1.3 + */ + public void setEnableTransactionSupport(boolean enableTransactionSupport) { + this.enableTransactionSupport = enableTransactionSupport; + } } diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractTransactionalTestBase.java b/src/test/java/org/springframework/data/redis/connection/AbstractTransactionalTestBase.java new file mode 100644 index 0000000000..22f27c4078 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/AbstractTransactionalTestBase.java @@ -0,0 +1,158 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import javax.sql.DataSource; + +import org.hamcrest.core.Is; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.test.annotation.Rollback; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.transaction.AfterTransaction; +import org.springframework.test.context.transaction.TransactionConfiguration; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.Transactional; + +@RunWith(SpringJUnit4ClassRunner.class) +@Transactional +@TransactionConfiguration(transactionManager = "transactionManager") +public abstract class AbstractTransactionalTestBase { + + @Configuration + public abstract static class RedisContextConfiguration { + + @Bean + public StringRedisTemplate redisTemplate() { + + StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory()); + + // explicitly enable transaction support + template.setEnableTransactionSupport(true); + return template; + } + + @Bean + public abstract RedisConnectionFactory redisConnectionFactory(); + + @Bean + public PlatformTransactionManager transactionManager() throws SQLException { + return new DataSourceTransactionManager(dataSource()); + } + + @Bean + public DataSource dataSource() throws SQLException { + + DataSource ds = Mockito.mock(DataSource.class); + Mockito.when(ds.getConnection()).thenReturn(Mockito.mock(Connection.class)); + return ds; + } + } + + private @Autowired StringRedisTemplate template; + + private @Autowired RedisConnectionFactory factory; + + private List KEYS = Arrays.asList("spring", "data", "redis"); + private boolean valuesShouldHaveBeenPersisted = false; + + @Before + public void setUp() { + valuesShouldHaveBeenPersisted = false; + cleanDataStore(); + } + + private void cleanDataStore() { + + RedisConnection connection = factory.getConnection(); + connection.flushDb(); + connection.close(); + } + + @AfterTransaction + public void verifyTransactionResult() { + + RedisConnection connection = factory.getConnection(); + for (String key : KEYS) { + Assert.assertThat("Values for " + key + " should " + (valuesShouldHaveBeenPersisted ? "" : "NOT ") + + "have been found.", connection.exists(key.getBytes()), Is.is(valuesShouldHaveBeenPersisted)); + } + connection.close(); + } + + /** + * @see DATAREDIS-73 + */ + @Rollback(true) + @Test + public void valueOperationSetShouldBeRolledBackCorrectly() { + + for (String key : KEYS) { + template.opsForValue().set(key, key + "-value"); + } + } + + /** + * @see DATAREDIS-73 + */ + @Rollback(false) + @Test + public void valueOperationSetShouldBeCommittedCorrectly() { + + this.valuesShouldHaveBeenPersisted = true; + for (String key : KEYS) { + template.opsForValue().set(key, key + "-value"); + } + } + + /** + * @see DATAREDIS-73 + */ + @Rollback(true) + @Test + public void listOperationLPushShoudBeRolledBackCorrectly() { + + for (String key : KEYS) { + template.opsForList().leftPushAll(key, (String[]) KEYS.toArray()); + } + } + + /** + * @see DATAREDIS-73 + */ + @Rollback(false) + @Test + public void listOperationLPushShoudBeCommittedCorrectly() { + + this.valuesShouldHaveBeenPersisted = true; + for (String key : KEYS) { + template.opsForList().leftPushAll(key, (String[]) KEYS.toArray()); + } + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/TransactionalJedisItegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/TransactionalJedisItegrationTests.java new file mode 100644 index 0000000000..d876022904 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/jedis/TransactionalJedisItegrationTests.java @@ -0,0 +1,45 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.jedis; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.AbstractTransactionalTestBase; +import org.springframework.data.redis.connection.jedis.TransactionalJedisItegrationTests.JedisContextConfiguration; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Christoph Strobl + */ +@ContextConfiguration(classes = { JedisContextConfiguration.class }) +public class TransactionalJedisItegrationTests extends AbstractTransactionalTestBase { + + @Configuration + public static class JedisContextConfiguration extends RedisContextConfiguration { + + @Override + @Bean + public JedisConnectionFactory redisConnectionFactory() { + + JedisConnectionFactory factory = new JedisConnectionFactory(); + factory.setHostName("localhost"); + factory.setPort(6379); + return factory; + } + + } + +} diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/TransactionalLettuceItegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/TransactionalLettuceItegrationTests.java new file mode 100644 index 0000000000..1880e28728 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/TransactionalLettuceItegrationTests.java @@ -0,0 +1,45 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.AbstractTransactionalTestBase; +import org.springframework.data.redis.connection.lettuce.TransactionalLettuceItegrationTests.LettuceContextConfiguration; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Christoph Strobl + */ +@ContextConfiguration(classes = { LettuceContextConfiguration.class }) +public class TransactionalLettuceItegrationTests extends AbstractTransactionalTestBase { + + @Configuration + public static class LettuceContextConfiguration extends RedisContextConfiguration { + + @Override + @Bean + public LettuceConnectionFactory redisConnectionFactory() { + + LettuceConnectionFactory factory = new LettuceConnectionFactory(); + factory.setHostName("localhost"); + factory.setPort(6379); + return factory; + } + + } + +} diff --git a/src/test/java/org/springframework/data/redis/connection/srp/TransactionalSrpItegrationTests.java b/src/test/java/org/springframework/data/redis/connection/srp/TransactionalSrpItegrationTests.java new file mode 100644 index 0000000000..0ba9434ede --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/srp/TransactionalSrpItegrationTests.java @@ -0,0 +1,45 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.srp; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.AbstractTransactionalTestBase; +import org.springframework.data.redis.connection.srp.TransactionalSrpItegrationTests.SrpContextConfiguration; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Christoph Strobl + */ +@ContextConfiguration(classes = { SrpContextConfiguration.class }) +public class TransactionalSrpItegrationTests extends AbstractTransactionalTestBase { + + @Configuration + public static class SrpContextConfiguration extends RedisContextConfiguration { + + @Override + @Bean + public SrpConnectionFactory redisConnectionFactory() { + + SrpConnectionFactory factory = new SrpConnectionFactory(); + factory.setHostName("localhost"); + factory.setPort(6379); + return factory; + } + + } + +} diff --git a/src/test/java/org/springframework/data/redis/core/ConnectionSplittingInterceptorUnitTests.java b/src/test/java/org/springframework/data/redis/core/ConnectionSplittingInterceptorUnitTests.java new file mode 100644 index 0000000000..6d64b58253 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/core/ConnectionSplittingInterceptorUnitTests.java @@ -0,0 +1,98 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.core; + +import java.lang.reflect.Method; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.springframework.dao.InvalidDataAccessApiUsageException; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisConnectionUtils.ConnectionSplittingInterceptor; +import org.springframework.util.ClassUtils; + +/** + * @author Christoph Strobl + */ +@RunWith(MockitoJUnitRunner.class) +public class ConnectionSplittingInterceptorUnitTests { + + private static final Method WRITE_METHOD, READONLY_METHOD; + + private ConnectionSplittingInterceptor interceptor; + + private @Mock RedisConnectionFactory connectionFactoryMock; + + private @Mock RedisConnection freshConnectionMock; + + private @Mock RedisConnection boundConnectionMock; + + static { + try { + WRITE_METHOD = ClassUtils.getMethod(RedisConnection.class, "expire", byte[].class, long.class); + READONLY_METHOD = ClassUtils.getMethod(RedisConnection.class, "keys", byte[].class); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Before + public void setUp() { + interceptor = new ConnectionSplittingInterceptor(connectionFactoryMock); + Mockito.when(connectionFactoryMock.getConnection()).thenReturn(freshConnectionMock); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void interceptorShouldRequestFreshConnectionForReadonlyCommand() throws Throwable { + + interceptor.intercept(boundConnectionMock, READONLY_METHOD, new Object[] { new byte[] {} }, null); + Mockito.verify(connectionFactoryMock, Mockito.times(1)).getConnection(); + Mockito.verifyZeroInteractions(boundConnectionMock); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void interceptorShouldUseBoundConnectionForWriteOperations() throws Throwable { + + interceptor.intercept(boundConnectionMock, WRITE_METHOD, new Object[] { new byte[] {}, 0L }, null); + Mockito.verify(boundConnectionMock, Mockito.times(1)).expire(Matchers.any(byte[].class), Matchers.anyLong()); + Mockito.verifyZeroInteractions(connectionFactoryMock); + } + + /** + * @see DATAREDIS-73 + */ + @SuppressWarnings("unchecked") + @Test(expected = InvalidDataAccessApiUsageException.class) + public void interceptorShouldNotWrapException() throws Throwable { + + Mockito.when(freshConnectionMock.keys(Mockito.any(byte[].class))).thenThrow( + InvalidDataAccessApiUsageException.class); + interceptor.intercept(boundConnectionMock, READONLY_METHOD, new Object[] { new byte[] {} }, null); + } + +} diff --git a/src/test/java/org/springframework/data/redis/core/RedisCommandUnitTests.java b/src/test/java/org/springframework/data/redis/core/RedisCommandUnitTests.java new file mode 100644 index 0000000000..a67d5ac7a4 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/core/RedisCommandUnitTests.java @@ -0,0 +1,110 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.core; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * @author Christoph Strobl + * @author Thomas Darimont + */ +public class RedisCommandUnitTests { + + public @Rule ExpectedException expectedException = ExpectedException.none(); + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldIdentifyAliasCorrectly() { + assertThat(RedisCommand.CONFIG_SET.isRepresentedBy("setconfig"), equalTo(true)); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldIdentifyAliasCorrectlyWhenNamePassedInMixedCase() { + assertThat(RedisCommand.CONFIG_SET.isRepresentedBy("SetConfig"), equalTo(true)); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldNotThrowExceptionWhenUsingNullKeyForRepresentationCheck() { + assertThat(RedisCommand.CONFIG_SET.isRepresentedBy(null), equalTo(false)); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldIdentifyAliasCorrectlyViaLookup() { + assertThat(RedisCommand.failsafeCommandLookup("setconfig"), is(RedisCommand.CONFIG_SET)); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldIdentifyAliasCorrectlyWhenNamePassedInMixedCaseViaLookup() { + assertThat(RedisCommand.failsafeCommandLookup("SetConfig"), is(RedisCommand.CONFIG_SET)); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldReturnUnknownCommandForUnknownCommandString() { + assertThat(RedisCommand.failsafeCommandLookup("strangecommand"), is(RedisCommand.UNKNOWN)); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldNotThrowExceptionOnValidArgumentCount() { + RedisCommand.AUTH.validateArgumentCount(1); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldThrowExceptionOnInvalidArgumentCountWhenExpectedExcatMatch() { + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("AUTH command requires 1 arguments"); + RedisCommand.AUTH.validateArgumentCount(2); + } + + /** + * @see DATAREDIS-73 + */ + @Test + public void shouldThrowExceptionOnInvalidArgumentCountWhenExpectedMinimalMatch() { + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("DEL command requires at least 1 arguments"); + RedisCommand.DEL.validateArgumentCount(0); + } +} diff --git a/template.mf b/template.mf index 623a15f183..e66bd4cd4b 100644 --- a/template.mf +++ b/template.mf @@ -13,6 +13,8 @@ Import-Template: org.springframework.scheduling.*;resolution:="optional";version="[3.2.8, 4.0.2)", org.springframework.scripting.*;resolution:="optional";version="[3.2.8, 4.0.2)", org.springframework.util.*;version="[3.2.8, 4.0.2)", + org.springframework.aop.*;version="[3.2.8, 4.0.2)", + org.springframework.cglib.*;version="[3.2.8, 4.0.2)", org.springframework.oxm.*;resolution:="optional";version="[3.2.8, 4.0.2)", org.springframework.transaction.support.*;version="[3.2.8, 4.0.2)", org.aopalliance.*;version="[1.0.0, 2.0.0)";resolution:=optional,