Skip to content

DATAREDIS-285 - LettuceConnection.execute should fully read response. #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jredisVersion=06052013
jedisVersion=2.4.1
springVersion=3.2.8.RELEASE
log4jVersion=1.2.17
version=1.3.0.BUILD-SNAPSHOT
version=1.3.0.DATAREDIS-285-SNAPSHOT
srpVersion=0.7
jacksonVersion=1.8.8
fasterXmlJacksonVersion=2.2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

import static com.lambdaworks.redis.protocol.CommandType.*;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.BeanUtils;
import org.springframework.core.convert.converter.Converter;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
Expand All @@ -46,6 +50,7 @@
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;

import com.lambdaworks.redis.RedisAsyncConnection;
Expand All @@ -54,9 +59,22 @@
import com.lambdaworks.redis.SortArgs;
import com.lambdaworks.redis.ZStoreArgs;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.output.BooleanOutput;
import com.lambdaworks.redis.output.ByteArrayOutput;
import com.lambdaworks.redis.output.DateOutput;
import com.lambdaworks.redis.output.DoubleOutput;
import com.lambdaworks.redis.output.IntegerOutput;
import com.lambdaworks.redis.output.KeyListOutput;
import com.lambdaworks.redis.output.KeyValueOutput;
import com.lambdaworks.redis.output.MapOutput;
import com.lambdaworks.redis.output.MultiOutput;
import com.lambdaworks.redis.output.StatusOutput;
import com.lambdaworks.redis.output.ValueListOutput;
import com.lambdaworks.redis.output.ValueOutput;
import com.lambdaworks.redis.output.ValueSetOutput;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandOutput;
import com.lambdaworks.redis.protocol.CommandType;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;

Expand All @@ -71,6 +89,7 @@
public class LettuceConnection implements RedisConnection {

static final RedisCodec<byte[], byte[]> CODEC = new BytesRedisCodec();
private static final TypeHints typeHints = new TypeHints();

private final com.lambdaworks.redis.RedisAsyncConnection<byte[], byte[]> asyncSharedConn;
private final com.lambdaworks.redis.RedisConnection<byte[], byte[]> sharedConn;
Expand Down Expand Up @@ -252,7 +271,23 @@ private Object await(Command cmd) {
return getAsyncConnection().await(cmd, timeout, TimeUnit.MILLISECONDS);
}

@Override
public Object execute(String command, byte[]... args) {
return execute(command, null, args);
}

/**
* 'Native' or 'raw' execution of the given command along-side the given arguments.
*
* @see RedisCommands#execute(String, byte[]...)
* @param command Command to execute
* @param commandOutputTypeHint Type of Output to use, may be (may be {@literal null}).
* @param args Possible command arguments (may be {@literal null})
* @return execution result.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public Object execute(String command, CommandOutput commandOutputTypeHint, byte[]... args) {

Assert.hasText(command, "a valid command needs to be specified");
try {
String name = command.trim().toUpperCase();
Expand All @@ -263,16 +298,15 @@ public Object execute(String command, byte[]... args) {
cmdArg.addKeys(args);
}

CommandOutput expectedOutput = commandOutputTypeHint != null ? commandOutputTypeHint : typeHints.getTypeHint(cmd);
if (isPipelined()) {
pipeline(new LettuceResult(getAsyncConnection().dispatch(cmd, new ByteArrayOutput<byte[], byte[]>(CODEC),
cmdArg)));
pipeline(new LettuceResult(getAsyncConnection().dispatch(cmd, expectedOutput, cmdArg)));
return null;
} else if (isQueueing()) {
transaction(new LettuceResult(getAsyncConnection().dispatch(cmd, new ByteArrayOutput<byte[], byte[]>(CODEC),
cmdArg)));
transaction(new LettuceResult(getAsyncConnection().dispatch(cmd, expectedOutput, cmdArg)));
return null;
} else {
return await(getAsyncConnection().dispatch(cmd, new ByteArrayOutput<byte[], byte[]>(CODEC), cmdArg));
return await(getAsyncConnection().dispatch(cmd, expectedOutput, cmdArg));
}
} catch (RedisException ex) {
throw convertLettuceAccessException(ex);
Expand Down Expand Up @@ -2938,4 +2972,202 @@ private ZStoreArgs zStoreArgs(Aggregate aggregate, int[] weights) {
return args;
}

/**
* {@link TypeHints} provide {@link CommandOutput} information for a given {@link CommandType}.
*
* @since 1.2.1
*/
static class TypeHints {

@SuppressWarnings("rawtypes")//
private static final Map<CommandType, Class<? extends CommandOutput>> COMMAND_OUTPUT_TYPE_MAPPING = new HashMap<CommandType, Class<? extends CommandOutput>>();

@SuppressWarnings("rawtypes")//
private static final Map<Class<?>, Constructor<CommandOutput>> CONSTRUCTORS = new ConcurrentHashMap<Class<?>, Constructor<CommandOutput>>();

{
// INTEGER
COMMAND_OUTPUT_TYPE_MAPPING.put(BITCOUNT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(BITOP, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DBSIZE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DECR, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DECRBY, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DEL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GETBIT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HDEL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HINCRBY, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INCR, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INCRBY, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LINSERT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LPUSH, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LPUSHX, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LREM, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PTTL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PUBLISH, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPUSH, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPUSHX, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SADD, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SCARD, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SDIFFSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETBIT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETRANGE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SINTERSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SREM, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SUNIONSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(STRLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(TTL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZADD, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZCOUNT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZINTERSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZRANK, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREM, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREMRANGEBYRANK, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREMRANGEBYSCORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREVRANK, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZUNIONSTORE, IntegerOutput.class);

// DOUBLE
COMMAND_OUTPUT_TYPE_MAPPING.put(HINCRBYFLOAT, DoubleOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INCRBYFLOAT, DoubleOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MGET, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZINCRBY, DoubleOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZSCORE, DoubleOutput.class);

// MAP
COMMAND_OUTPUT_TYPE_MAPPING.put(HGETALL, MapOutput.class);

// KEY LIST
COMMAND_OUTPUT_TYPE_MAPPING.put(HKEYS, KeyListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(KEYS, KeyListOutput.class);

// KEY VALUE
COMMAND_OUTPUT_TYPE_MAPPING.put(BRPOP, KeyValueOutput.class);

// SINGLE VALUE
COMMAND_OUTPUT_TYPE_MAPPING.put(BRPOPLPUSH, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ECHO, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GET, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GETRANGE, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GETSET, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HGET, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LINDEX, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LPOP, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RANDOMKEY, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RENAME, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPOP, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPOPLPUSH, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SPOP, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SRANDMEMBER, ValueOutput.class);

// STATUS VALUE
COMMAND_OUTPUT_TYPE_MAPPING.put(BGREWRITEAOF, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(BGSAVE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(CLIENT, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DEBUG, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DISCARD, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(FLUSHALL, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(FLUSHDB, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HMSET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INFO, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LSET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LTRIM, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MIGRATE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MSET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(QUIT, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RESTORE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SAVE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SELECT, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETEX, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SHUTDOWN, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SLAVEOF, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SYNC, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(TYPE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(WATCH, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(UNWATCH, StatusOutput.class);

// VALUE LIST
COMMAND_OUTPUT_TYPE_MAPPING.put(HMGET, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MGET, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HVALS, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LRANGE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SORT, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZRANGE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZRANGEBYSCORE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREVRANGE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREVRANGEBYSCORE, ValueListOutput.class);

// BOOLEAN
COMMAND_OUTPUT_TYPE_MAPPING.put(EXISTS, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(EXPIRE, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(EXPIREAT, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HEXISTS, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HSET, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HSETNX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MOVE, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MSETNX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PERSIST, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PEXPIRE, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PEXPIREAT, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RENAMENX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETNX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SISMEMBER, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SMOVE, BooleanOutput.class);

// MULTI
COMMAND_OUTPUT_TYPE_MAPPING.put(EXEC, MultiOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MULTI, MultiOutput.class);

// DATE
COMMAND_OUTPUT_TYPE_MAPPING.put(LASTSAVE, DateOutput.class);

// VALUE SET
COMMAND_OUTPUT_TYPE_MAPPING.put(SDIFF, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SINTER, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SMEMBERS, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SUNION, ValueSetOutput.class);
}

/**
* Returns the {@link CommandOutput} mapped for given {@link CommandType} or {@link ByteArrayOutput} as default.
*
* @param type
* @return {@link ByteArrayOutput} as default when no matching {@link CommandOutput} available.
*/
@SuppressWarnings("rawtypes")
public CommandOutput getTypeHint(CommandType type) {
return getTypeHint(type, new ByteArrayOutput<byte[], byte[]>(CODEC));
}

/**
* Returns the {@link CommandOutput} mapped for given {@link CommandType} given {@link CommandOutput} as default.
*
* @param type
* @return
*/
@SuppressWarnings("rawtypes")
public CommandOutput getTypeHint(CommandType type, CommandOutput defaultType) {

if (type == null || !COMMAND_OUTPUT_TYPE_MAPPING.containsKey(type)) {
return defaultType;
}
CommandOutput<?, ?, ?> outputType = instanciateCommandOutput(COMMAND_OUTPUT_TYPE_MAPPING.get(type));
return outputType != null ? outputType : defaultType;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private CommandOutput<?, ?, ?> instanciateCommandOutput(Class<? extends CommandOutput> type) {

Assert.notNull(type, "Cannot create instance for 'null' type.");
Constructor<CommandOutput> constructor = CONSTRUCTORS.get(type);
if (constructor == null) {
constructor = (Constructor<CommandOutput>) ClassUtils.getConstructorIfAvailable(type, RedisCodec.class);
CONSTRUCTORS.put(type, constructor);
}
return BeanUtils.instantiateClass(constructor, CODEC);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
import static org.junit.Assert.*;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.hamcrest.core.AllOf;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Test;
import org.junit.internal.matchers.IsCollectionContaining;
import org.junit.runner.RunWith;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.SettingsUtils;
Expand Down Expand Up @@ -313,4 +317,20 @@ public void testPoolNPE() {
factory2.getConnection().dbSize();
factory2.destroy();
}

/**
* @see DATAREDIS-285
*/
@SuppressWarnings("unchecked")
@Test
public void testExecuteShouldConvertArrayReplyCorrectly() {
connection.set("spring", "awesome");
connection.set("data", "cool");
connection.set("redis", "supercalifragilisticexpialidocious");

assertThat(
connection.execute("MGET", "spring".getBytes(), "data".getBytes(), "redis".getBytes()),
AllOf.allOf(IsInstanceOf.instanceOf(List.class), IsCollectionContaining.hasItems("awesome".getBytes(),
"cool".getBytes(), "supercalifragilisticexpialidocious".getBytes())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

import org.apache.commons.pool.impl.GenericObjectPool.Config;
import org.hamcrest.core.IsInstanceOf;
import org.jredis.JRedis;
import org.jredis.protocol.BulkResponse;
import org.jredis.ri.alphazero.protocol.SyncProtocol.SyncMultiBulkResponse;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.internal.matchers.IsCollectionContaining;
import org.junit.runner.RunWith;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.RedisConnectionFailureException;
Expand Down Expand Up @@ -778,4 +782,23 @@ public void testGetTimeShouldRequestServerTime() {
super.testGetTimeShouldRequestServerTime();
}

/**
* @see DATAREDIS-285
*/
@Test
public void testExecuteShouldConvertArrayReplyCorrectly() {
connection.set("spring", "awesome");
connection.set("data", "cool");
connection.set("redis", "supercalifragilisticexpialidocious");

Object result = connection.execute("MGET", "spring".getBytes(), "data".getBytes(), "redis".getBytes());

assertThat(result, IsInstanceOf.instanceOf(SyncMultiBulkResponse.class));

List<byte[]> data = ((SyncMultiBulkResponse) result).getMultiBulkData();
assertThat(
data,
IsCollectionContaining.hasItems("awesome".getBytes(), "cool".getBytes(),
"supercalifragilisticexpialidocious".getBytes()));
}
}
Loading