Skip to content

Commit be7c590

Browse files
committed
Filter null values from top-level Flux and Mono responses.
We now use mapNotNull to map Redis response values to avoid NullPointerExceptions caused by RedisElementReader returning null. This effectively turns RedisElementReader into a filter function that can suppress elements from being emitted.
1 parent 0e0fa47 commit be7c590

17 files changed

+155
-68
lines changed

src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.stream.Collectors;
2727

2828
import org.reactivestreams.Publisher;
29-
3029
import org.springframework.data.geo.Circle;
3130
import org.springframework.data.geo.Distance;
3231
import org.springframework.data.geo.GeoResult;
@@ -198,7 +197,7 @@ public Flux<GeoResult<GeoLocation<V>>> radius(K key, Circle within) {
198197
Assert.notNull(key, "Key must not be null");
199198
Assert.notNull(within, "Circle must not be null");
200199

201-
return createFlux(connection -> connection.geoRadius(rawKey(key), within).map(this::readGeoResult));
200+
return createFlux(connection -> connection.geoRadius(rawKey(key), within).mapNotNull(this::readGeoResult));
202201
}
203202

204203
@Override
@@ -209,7 +208,7 @@ public Flux<GeoResult<GeoLocation<V>>> radius(K key, Circle within, GeoRadiusCom
209208
Assert.notNull(args, "GeoRadiusCommandArgs must not be null");
210209

211210
return createFlux(connection -> connection.geoRadius(rawKey(key), within, args) //
212-
.map(this::readGeoResult));
211+
.mapNotNull(this::readGeoResult));
213212
}
214213

215214
@Override
@@ -219,7 +218,7 @@ public Flux<GeoResult<GeoLocation<V>>> radius(K key, V member, double radius) {
219218
Assert.notNull(member, "Member must not be null");
220219

221220
return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) //
222-
.map(this::readGeoResult));
221+
.mapNotNull(this::readGeoResult));
223222
}
224223

225224
@Override
@@ -230,7 +229,7 @@ public Flux<GeoResult<GeoLocation<V>>> radius(K key, V member, Distance distance
230229
Assert.notNull(distance, "Distance must not be null");
231230

232231
return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance) //
233-
.map(this::readGeoResult));
232+
.mapNotNull(this::readGeoResult));
234233
}
235234

236235
@Override
@@ -242,7 +241,7 @@ public Flux<GeoResult<GeoLocation<V>>> radius(K key, V member, Distance distance
242241
Assert.notNull(args, "GeoRadiusCommandArgs must not be null");
243242

244243
return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance, args))
245-
.map(this::readGeoResult);
244+
.mapNotNull(this::readGeoResult);
246245
}
247246

248247
@Override
@@ -276,7 +275,7 @@ public Flux<GeoResult<GeoLocation<V>>> search(K key, GeoReference<V> reference,
276275
GeoReference<ByteBuffer> rawReference = getGeoReference(reference);
277276

278277
return template.doCreateFlux(connection -> connection.geoCommands()
279-
.geoSearch(rawKey(key), rawReference, geoPredicate, args).map(this::readGeoResult));
278+
.geoSearch(rawKey(key), rawReference, geoPredicate, args).mapNotNull(this::readGeoResult));
280279
}
281280

282281
@Override

src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.function.Function;
2727

2828
import org.reactivestreams.Publisher;
29-
3029
import org.springframework.data.redis.connection.ReactiveHashCommands;
3130
import org.springframework.data.redis.connection.convert.Converters;
3231
import org.springframework.data.redis.serializer.RedisSerializationContext;
@@ -127,7 +126,7 @@ public Mono<HK> randomKey(H key) {
127126
Assert.notNull(key, "Key must not be null");
128127

129128
return template.doCreateMono(connection -> connection //
130-
.hashCommands().hRandField(rawKey(key))).map(this::readHashKey);
129+
.hashCommands().hRandField(rawKey(key))).mapNotNull(this::readHashKey);
131130
}
132131

133132
@Override
@@ -145,7 +144,7 @@ public Flux<HK> randomKeys(H key, long count) {
145144
Assert.notNull(key, "Key must not be null");
146145

147146
return template.doCreateFlux(connection -> connection //
148-
.hashCommands().hRandField(rawKey(key), count)).map(this::readHashKey);
147+
.hashCommands().hRandField(rawKey(key), count)).mapNotNull(this::readHashKey);
149148
}
150149

151150
@Override
@@ -163,7 +162,7 @@ public Flux<HK> keys(H key) {
163162
Assert.notNull(key, "Key must not be null");
164163

165164
return createFlux(connection -> connection.hKeys(rawKey(key)) //
166-
.map(this::readHashKey));
165+
.mapNotNull(this::readHashKey));
167166
}
168167

169168
@Override

src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public Flux<V> range(K key, long start, long end) {
5858

5959
Assert.notNull(key, "Key must not be null");
6060

61-
return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readValue));
61+
return createFlux(connection -> connection.lRange(rawKey(key), start, end).mapNotNull(this::readValue));
6262
}
6363

6464
@Override
@@ -170,7 +170,8 @@ public Mono<V> move(K sourceKey, Direction from, K destinationKey, Direction to)
170170
Assert.notNull(to, "To direction must not be null");
171171

172172
return createMono(
173-
connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue));
173+
connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to)
174+
.mapNotNull(this::readValue));
174175
}
175176

176177
@Override
@@ -183,7 +184,7 @@ public Mono<V> move(K sourceKey, Direction from, K destinationKey, Direction to,
183184
Assert.notNull(timeout, "Timeout must not be null");
184185

185186
return createMono(connection -> connection.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout)
186-
.map(this::readValue));
187+
.mapNotNull(this::readValue));
187188
}
188189

189190
@Override
@@ -208,7 +209,7 @@ public Mono<V> index(K key, long index) {
208209

209210
Assert.notNull(key, "Key must not be null");
210211

211-
return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readValue));
212+
return createMono(connection -> connection.lIndex(rawKey(key), index).mapNotNull(this::readValue));
212213
}
213214

214215
@Override
@@ -232,7 +233,7 @@ public Mono<V> leftPop(K key) {
232233

233234
Assert.notNull(key, "Key must not be null");
234235

235-
return createMono(connection -> connection.lPop(rawKey(key)).map(this::readValue));
236+
return createMono(connection -> connection.lPop(rawKey(key)).mapNotNull(this::readValue));
236237

237238
}
238239

@@ -244,15 +245,15 @@ public Mono<V> leftPop(K key, Duration timeout) {
244245
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
245246

246247
return createMono(connection -> connection.blPop(Collections.singletonList(rawKey(key)), timeout)
247-
.map(popResult -> readValue(popResult.getValue())));
248+
.mapNotNull(popResult -> readValue(popResult.getValue())));
248249
}
249250

250251
@Override
251252
public Mono<V> rightPop(K key) {
252253

253254
Assert.notNull(key, "Key must not be null");
254255

255-
return createMono(connection -> connection.rPop(rawKey(key)).map(this::readValue));
256+
return createMono(connection -> connection.rPop(rawKey(key)).mapNotNull(this::readValue));
256257
}
257258

258259
@Override
@@ -263,7 +264,7 @@ public Mono<V> rightPop(K key, Duration timeout) {
263264
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
264265

265266
return createMono(connection -> connection.brPop(Collections.singletonList(rawKey(key)), timeout)
266-
.map(popResult -> readValue(popResult.getValue())));
267+
.mapNotNull(popResult -> readValue(popResult.getValue())));
267268
}
268269

269270
@Override
@@ -273,7 +274,7 @@ public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey) {
273274
Assert.notNull(destinationKey, "Destination key must not be null");
274275

275276
return createMono(
276-
connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue));
277+
connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).mapNotNull(this::readValue));
277278
}
278279

279280
@Override
@@ -285,7 +286,8 @@ public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo
285286
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
286287

287288
return createMono(
288-
connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
289+
connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout)
290+
.mapNotNull(this::readValue));
289291
}
290292

291293
@Override

src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ public Mono<V> pop(K key) {
8888

8989
Assert.notNull(key, "Key must not be null");
9090

91-
return createMono(connection -> connection.sPop(rawKey(key)).map(this::readValue));
91+
return createMono(connection -> connection.sPop(rawKey(key)).mapNotNull(this::readValue));
9292
}
9393

9494
@Override
9595
public Flux<V> pop(K key, long count) {
9696

9797
Assert.notNull(key, "Key must not be null");
9898

99-
return createFlux(connection -> connection.sPop(rawKey(key), count).map(this::readValue));
99+
return createFlux(connection -> connection.sPop(rawKey(key), count).mapNotNull(this::readValue));
100100
}
101101

102102
@Override
@@ -176,7 +176,7 @@ public Flux<V> intersect(Collection<K> keys) {
176176
.map(this::rawKey) //
177177
.collectList() //
178178
.flatMapMany(connection::sInter) //
179-
.map(this::readValue));
179+
.mapNotNull(this::readValue));
180180
}
181181

182182
@Override
@@ -238,7 +238,7 @@ public Flux<V> union(Collection<K> keys) {
238238
.map(this::rawKey) //
239239
.collectList() //
240240
.flatMapMany(connection::sUnion) //
241-
.map(this::readValue));
241+
.mapNotNull(this::readValue));
242242
}
243243

244244
@Override
@@ -300,7 +300,7 @@ public Flux<V> difference(Collection<K> keys) {
300300
.map(this::rawKey) //
301301
.collectList() //
302302
.flatMapMany(connection::sDiff) //
303-
.map(this::readValue));
303+
.mapNotNull(this::readValue));
304304
}
305305

306306
@Override
@@ -340,7 +340,7 @@ public Flux<V> members(K key) {
340340

341341
Assert.notNull(key, "Key must not be null");
342342

343-
return createFlux(connection -> connection.sMembers(rawKey(key)).map(this::readValue));
343+
return createFlux(connection -> connection.sMembers(rawKey(key)).mapNotNull(this::readValue));
344344
}
345345

346346
@Override
@@ -349,31 +349,31 @@ public Flux<V> scan(K key, ScanOptions options) {
349349
Assert.notNull(key, "Key must not be null");
350350
Assert.notNull(options, "ScanOptions must not be null");
351351

352-
return createFlux(connection -> connection.sScan(rawKey(key), options).map(this::readValue));
352+
return createFlux(connection -> connection.sScan(rawKey(key), options).mapNotNull(this::readValue));
353353
}
354354

355355
@Override
356356
public Mono<V> randomMember(K key) {
357357

358358
Assert.notNull(key, "Key must not be null");
359359

360-
return createMono(connection -> connection.sRandMember(rawKey(key)).map(this::readValue));
360+
return createMono(connection -> connection.sRandMember(rawKey(key)).mapNotNull(this::readValue));
361361
}
362362

363363
@Override
364364
public Flux<V> distinctRandomMembers(K key, long count) {
365365

366366
Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements");
367367

368-
return createFlux(connection -> connection.sRandMember(rawKey(key), count).map(this::readValue));
368+
return createFlux(connection -> connection.sRandMember(rawKey(key), count).mapNotNull(this::readValue));
369369
}
370370

371371
@Override
372372
public Flux<V> randomMembers(K key, long count) {
373373

374374
Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements");
375375

376-
return createFlux(connection -> connection.sRandMember(rawKey(key), -count).map(this::readValue));
376+
return createFlux(connection -> connection.sRandMember(rawKey(key), -count).mapNotNull(this::readValue));
377377
}
378378

379379
@Override

src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public Mono<V> get(Object key) {
146146
Assert.notNull(key, "Key must not be null");
147147

148148
return createMono(connection -> connection.get(rawKey((K) key)) //
149-
.map(this::readValue));
149+
.mapNotNull(this::readValue));
150150
}
151151

152152
@Override
@@ -155,7 +155,7 @@ public Mono<V> getAndDelete(K key) {
155155
Assert.notNull(key, "Key must not be null");
156156

157157
return createMono(connection -> connection.getDel(rawKey(key)) //
158-
.map(this::readValue));
158+
.mapNotNull(this::readValue));
159159
}
160160

161161
@Override
@@ -165,7 +165,7 @@ public Mono<V> getAndExpire(K key, Duration timeout) {
165165
Assert.notNull(timeout, "Timeout must not be null");
166166

167167
return createMono(connection -> connection.getEx(rawKey(key), Expiration.from(timeout)) //
168-
.map(this::readValue));
168+
.mapNotNull(this::readValue));
169169
}
170170

171171
@Override
@@ -174,15 +174,15 @@ public Mono<V> getAndPersist(K key) {
174174
Assert.notNull(key, "Key must not be null");
175175

176176
return createMono(connection -> connection.getEx(rawKey(key), Expiration.persistent()) //
177-
.map(this::readValue));
177+
.mapNotNull(this::readValue));
178178
}
179179

180180
@Override
181181
public Mono<V> getAndSet(K key, V value) {
182182

183183
Assert.notNull(key, "Key must not be null");
184184

185-
return createMono(connection -> connection.getSet(rawKey(key), rawValue(value)).map(value()::read));
185+
return createMono(connection -> connection.getSet(rawKey(key), rawValue(value)).mapNotNull(value()::read));
186186
}
187187

188188
@Override
@@ -250,7 +250,7 @@ public Mono<String> get(K key, long start, long end) {
250250
Assert.notNull(key, "Key must not be null");
251251

252252
return createMono(connection -> connection.getRange(rawKey(key), start, end) //
253-
.map(stringSerializationPair()::read));
253+
.mapNotNull(stringSerializationPair()::read));
254254
}
255255

256256
@Override

0 commit comments

Comments
 (0)