|
17 | 17 |
|
18 | 18 | import java.io.Serializable;
|
19 | 19 | import java.util.ArrayList;
|
| 20 | +import java.util.Collection; |
20 | 21 | import java.util.List;
|
21 | 22 | import java.util.Map;
|
22 | 23 | import java.util.Map.Entry;
|
|
25 | 26 | import org.slf4j.Logger;
|
26 | 27 | import org.slf4j.LoggerFactory;
|
27 | 28 | import org.springframework.beans.BeansException;
|
28 |
| -import org.springframework.beans.factory.DisposableBean; |
29 | 29 | import org.springframework.context.ApplicationContext;
|
30 | 30 | import org.springframework.context.ApplicationContextAware;
|
31 | 31 | import org.springframework.context.ApplicationListener;
|
|
38 | 38 | import org.springframework.data.redis.connection.Message;
|
39 | 39 | import org.springframework.data.redis.connection.MessageListener;
|
40 | 40 | import org.springframework.data.redis.connection.RedisConnection;
|
41 |
| -import org.springframework.data.redis.connection.RedisConnectionFactory; |
| 41 | +import org.springframework.data.redis.core.PartialUpdate.PropertyUpdate; |
| 42 | +import org.springframework.data.redis.core.PartialUpdate.UpdateCommand; |
42 | 43 | import org.springframework.data.redis.core.convert.CustomConversions;
|
43 | 44 | import org.springframework.data.redis.core.convert.KeyspaceConfiguration;
|
44 | 45 | import org.springframework.data.redis.core.convert.MappingRedisConverter;
|
|
47 | 48 | import org.springframework.data.redis.core.convert.RedisData;
|
48 | 49 | import org.springframework.data.redis.core.convert.ReferenceResolverImpl;
|
49 | 50 | import org.springframework.data.redis.core.mapping.RedisMappingContext;
|
| 51 | +import org.springframework.data.redis.core.mapping.RedisPersistentEntity; |
50 | 52 | import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
51 | 53 | import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
52 | 54 | import org.springframework.data.redis.util.ByteUtils;
|
@@ -163,8 +165,7 @@ public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps, RedisConverter redis
|
163 | 165 | /**
|
164 | 166 | * Default constructor.
|
165 | 167 | */
|
166 |
| - protected RedisKeyValueAdapter() { |
167 |
| - } |
| 168 | + protected RedisKeyValueAdapter() {} |
168 | 169 |
|
169 | 170 | /*
|
170 | 171 | * (non-Javadoc)
|
@@ -392,6 +393,109 @@ public Long doInRedis(RedisConnection connection) throws DataAccessException {
|
392 | 393 | return count != null ? count.longValue() : 0;
|
393 | 394 | }
|
394 | 395 |
|
| 396 | + public void update(final PartialUpdate<?> update) { |
| 397 | + |
| 398 | + final RedisPersistentEntity<?> entity = this.converter.getMappingContext().getPersistentEntity(update.getTarget()); |
| 399 | + |
| 400 | + final String keyspace = entity.getKeySpace(); |
| 401 | + final Object id = update.getId(); |
| 402 | + |
| 403 | + final byte[] redisKey = createKey(keyspace, converter.getConversionService().convert(id, String.class)); |
| 404 | + |
| 405 | + final RedisData rdo = new RedisData(); |
| 406 | + this.converter.write(update, rdo); |
| 407 | + |
| 408 | + redisOps.execute(new RedisCallback<Void>() { |
| 409 | + |
| 410 | + @Override |
| 411 | + public Void doInRedis(RedisConnection connection) throws DataAccessException { |
| 412 | + |
| 413 | + List<byte[]> pathsToRemove = new ArrayList<byte[]>(update.getPropertyUpdates().size()); |
| 414 | + |
| 415 | + for (PropertyUpdate pUpdate : update.getPropertyUpdates()) { |
| 416 | + |
| 417 | + String propertyPath = pUpdate.getPropertyPath(); |
| 418 | + |
| 419 | + if (UpdateCommand.DEL.equals(pUpdate.getCmd())) { |
| 420 | + |
| 421 | + byte[] existingValue = connection.hGet(redisKey, toBytes(propertyPath)); |
| 422 | + pathsToRemove.add(toBytes(propertyPath)); |
| 423 | + |
| 424 | + byte[] existingValueIndexKey = existingValue != null |
| 425 | + ? ByteUtils.concatAll(toBytes(keyspace), (":" + propertyPath).getBytes(), ":".getBytes(), existingValue) |
| 426 | + : null; |
| 427 | + |
| 428 | + if (existingValue != null) { |
| 429 | + |
| 430 | + if (connection.exists(existingValueIndexKey)) { |
| 431 | + connection.sRem(existingValueIndexKey, toBytes(id)); |
| 432 | + } |
| 433 | + } |
| 434 | + } |
| 435 | + |
| 436 | + if (pUpdate.getValue() instanceof Collection || pUpdate.getValue() instanceof Map |
| 437 | + || (pUpdate.getValue() != null && pUpdate.getValue().getClass().isArray()) || (pUpdate.getValue() != null |
| 438 | + && !converter.getConversionService().canConvert(pUpdate.getValue().getClass(), byte[].class))) { |
| 439 | + |
| 440 | + Set<byte[]> existingFields = connection.hKeys(redisKey); |
| 441 | + |
| 442 | + for (byte[] hkey : existingFields) { |
| 443 | + |
| 444 | + if (asString(hkey).startsWith(pUpdate.getPropertyPath() + ".")) { |
| 445 | + pathsToRemove.add(hkey); |
| 446 | + |
| 447 | + byte[] existingValue = connection.hGet(redisKey, toBytes(hkey)); |
| 448 | + byte[] existingValueIndexKey = existingValue != null ? ByteUtils.concatAll(toBytes(keyspace), |
| 449 | + (":" + propertyPath).getBytes(), ":".getBytes(), existingValue) : null; |
| 450 | + |
| 451 | + if (existingValue != null) { |
| 452 | + |
| 453 | + if (connection.exists(existingValueIndexKey)) { |
| 454 | + connection.sRem(existingValueIndexKey, toBytes(id)); |
| 455 | + } |
| 456 | + } |
| 457 | + } |
| 458 | + } |
| 459 | + |
| 460 | + } |
| 461 | + } |
| 462 | + |
| 463 | + if (!pathsToRemove.isEmpty()) { |
| 464 | + connection.hDel(redisKey, pathsToRemove.toArray(new byte[pathsToRemove.size()][])); |
| 465 | + } |
| 466 | + |
| 467 | + if (!rdo.getBucket().isEmpty()) { |
| 468 | + if (rdo.getBucket().size() > 1 |
| 469 | + || (rdo.getBucket().size() == 1 && !rdo.getBucket().asMap().containsKey("_class"))) { |
| 470 | + connection.hMSet(redisKey, rdo.getBucket().rawMap()); |
| 471 | + } |
| 472 | + } |
| 473 | + |
| 474 | + if (update.isRefreshTtl()) { |
| 475 | + |
| 476 | + if (rdo.getTimeToLive() != null && rdo.getTimeToLive().longValue() > 0) { |
| 477 | + |
| 478 | + connection.expire(redisKey, rdo.getTimeToLive().longValue()); |
| 479 | + |
| 480 | + // add phantom key so values can be restored |
| 481 | + byte[] phantomKey = ByteUtils.concat(redisKey, toBytes(":phantom")); |
| 482 | + connection.hMSet(phantomKey, rdo.getBucket().rawMap()); |
| 483 | + connection.expire(phantomKey, rdo.getTimeToLive().longValue() + 300); |
| 484 | + |
| 485 | + } else { |
| 486 | + |
| 487 | + connection.persist(redisKey); |
| 488 | + connection.persist(ByteUtils.concat(redisKey, toBytes(":phantom"))); |
| 489 | + } |
| 490 | + } |
| 491 | + |
| 492 | + new IndexWriter(connection, converter).updateIndexes(toBytes(id), rdo.getIndexedData()); |
| 493 | + return null; |
| 494 | + } |
| 495 | + |
| 496 | + }); |
| 497 | + } |
| 498 | + |
395 | 499 | /**
|
396 | 500 | * Execute {@link RedisCallback} via underlying {@link RedisOperations}.
|
397 | 501 | *
|
@@ -447,13 +551,6 @@ public byte[] toBytes(Object source) {
|
447 | 551 | */
|
448 | 552 | public void destroy() throws Exception {
|
449 | 553 |
|
450 |
| - if (redisOps instanceof RedisTemplate) { |
451 |
| - RedisConnectionFactory connectionFactory = ((RedisTemplate<?, ?>) redisOps).getConnectionFactory(); |
452 |
| - if (connectionFactory instanceof DisposableBean) { |
453 |
| - ((DisposableBean) connectionFactory).destroy(); |
454 |
| - } |
455 |
| - } |
456 |
| - |
457 | 554 | this.expirationListener.destroy();
|
458 | 555 | this.messageListenerContainer.destroy();
|
459 | 556 | }
|
|
0 commit comments