From b34187f3af63e0e5b628b62ff9f8169b978e0178 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Thu, 12 Mar 2020 11:27:59 +0800 Subject: [PATCH] fix issue #DATAREDIS-1081 --- .../connection/ReactiveHashCommands.java | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveHashCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveHashCommands.java index 8d11405156..d71e1da4ea 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveHashCommands.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.reactivestreams.Publisher; import org.springframework.dao.InvalidDataAccessApiUsageException; @@ -162,7 +163,9 @@ default Mono hSet(ByteBuffer key, ByteBuffer field, ByteBuffer value) { Assert.notNull(field, "Field must not be null!"); Assert.notNull(value, "Value must not be null!"); - return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key))).next().map(BooleanResponse::getOutput); + return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key))) + .next() + .flatMap(response->Mono.justOrEmpty(response.getOutput())); } /** @@ -180,8 +183,9 @@ default Mono hSetNX(ByteBuffer key, ByteBuffer field, ByteBuffer value) Assert.notNull(field, "Field must not be null!"); Assert.notNull(value, "Value must not be null!"); - return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key).ifValueNotExists())).next() - .map(BooleanResponse::getOutput); + return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key).ifValueNotExists())) + .next() + .flatMap(response->Mono.justOrEmpty(response.getOutput())); } /** @@ -282,7 +286,7 @@ public List getFields() { * @see Redis Documentation: HGET */ default Mono hGet(ByteBuffer key, ByteBuffer field) { - return hMGet(key, Collections.singletonList(field)).map(val -> val.isEmpty() ? null : val.iterator().next()); + return hMGet(key, Collections.singletonList(field)).flatMapIterable(Function.identity()).next(); } /** @@ -298,7 +302,9 @@ default Mono> hMGet(ByteBuffer key, Collection fiel Assert.notNull(key, "Key must not be null!"); Assert.notNull(fields, "Fields must not be null!"); - return hMGet(Mono.just(HGetCommand.fields(fields).from(key))).next().map(MultiValueResponse::getOutput); + return hMGet(Mono.just(HGetCommand.fields(fields).from(key))) + .next() + .flatMap(res->Mono.justOrEmpty(res.getOutput())); } /** @@ -374,7 +380,9 @@ default Mono hExists(ByteBuffer key, ByteBuffer field) { Assert.notNull(key, "Key must not be null!"); Assert.notNull(field, "Field must not be null!"); - return hExists(Mono.just(HExistsCommand.field(field).in(key))).next().map(BooleanResponse::getOutput); + return hExists(Mono.just(HExistsCommand.field(field).in(key))) + .next() + .flatMap(response->Mono.justOrEmpty(response.getOutput())); } /** @@ -476,7 +484,9 @@ default Mono hDel(ByteBuffer key, Collection fields) { Assert.notNull(key, "Key must not be null!"); Assert.notNull(fields, "Fields must not be null!"); - return hDel(Mono.just(HDelCommand.fields(fields).from(key))).next().map(NumericResponse::getOutput); + return hDel(Mono.just(HDelCommand.fields(fields).from(key))) + .next() + .flatMap(response->Mono.justOrEmpty(response.getOutput())); } /** @@ -499,7 +509,9 @@ default Mono hLen(ByteBuffer key) { Assert.notNull(key, "Key must not be null!"); - return hLen(Mono.just(new KeyCommand(key))).next().map(NumericResponse::getOutput); + return hLen(Mono.just(new KeyCommand(key))) + .next() + .flatMap(response->Mono.justOrEmpty(response.getOutput())); } /** @@ -522,7 +534,8 @@ default Flux hKeys(ByteBuffer key) { Assert.notNull(key, "Key must not be null!"); - return hKeys(Mono.just(new KeyCommand(key))).flatMap(CommandResponse::getOutput); + return hKeys(Mono.just(new KeyCommand(key))) + .flatMap(CommandResponse::getOutput); } /**