|
18 | 18 | import java.io.Serializable;
|
19 | 19 | import java.util.ArrayList;
|
20 | 20 | import java.util.Collection;
|
| 21 | +import java.util.LinkedHashSet; |
21 | 22 | import java.util.List;
|
22 | 23 | import java.util.Map;
|
23 | 24 | import java.util.Map.Entry;
|
@@ -411,58 +412,28 @@ public void update(final PartialUpdate<?> update) {
|
411 | 412 | @Override
|
412 | 413 | public Void doInRedis(RedisConnection connection) throws DataAccessException {
|
413 | 414 |
|
414 |
| - List<byte[]> pathsToRemove = new ArrayList<byte[]>(update.getPropertyUpdates().size()); |
| 415 | + RedisUpdateObject redisUpdateObject = new RedisUpdateObject(redisKey, keyspace, id); |
415 | 416 |
|
416 | 417 | for (PropertyUpdate pUpdate : update.getPropertyUpdates()) {
|
417 | 418 |
|
418 | 419 | String propertyPath = pUpdate.getPropertyPath();
|
419 | 420 |
|
420 |
| - if (UpdateCommand.DEL.equals(pUpdate.getCmd())) { |
421 |
| - |
422 |
| - byte[] existingValue = connection.hGet(redisKey, toBytes(propertyPath)); |
423 |
| - pathsToRemove.add(toBytes(propertyPath)); |
424 |
| - |
425 |
| - byte[] existingValueIndexKey = existingValue != null |
426 |
| - ? ByteUtils.concatAll(toBytes(keyspace), (":" + propertyPath).getBytes(), ":".getBytes(), existingValue) |
427 |
| - : null; |
428 |
| - |
429 |
| - if (existingValue != null) { |
430 |
| - |
431 |
| - if (connection.exists(existingValueIndexKey)) { |
432 |
| - connection.sRem(existingValueIndexKey, toBytes(id)); |
433 |
| - } |
434 |
| - } |
435 |
| - } |
436 |
| - |
437 |
| - if (pUpdate.getValue() instanceof Collection || pUpdate.getValue() instanceof Map |
| 421 | + if (UpdateCommand.DEL.equals(pUpdate.getCmd()) || pUpdate.getValue() instanceof Collection |
| 422 | + || pUpdate.getValue() instanceof Map |
438 | 423 | || (pUpdate.getValue() != null && pUpdate.getValue().getClass().isArray()) || (pUpdate.getValue() != null
|
439 | 424 | && !converter.getConversionService().canConvert(pUpdate.getValue().getClass(), byte[].class))) {
|
440 | 425 |
|
441 |
| - Set<byte[]> existingFields = connection.hKeys(redisKey); |
442 |
| - |
443 |
| - for (byte[] hkey : existingFields) { |
444 |
| - |
445 |
| - if (asString(hkey).startsWith(pUpdate.getPropertyPath() + ".")) { |
446 |
| - pathsToRemove.add(hkey); |
447 |
| - |
448 |
| - byte[] existingValue = connection.hGet(redisKey, toBytes(hkey)); |
449 |
| - byte[] existingValueIndexKey = existingValue != null ? ByteUtils.concatAll(toBytes(keyspace), |
450 |
| - (":" + propertyPath).getBytes(), ":".getBytes(), existingValue) : null; |
451 |
| - |
452 |
| - if (existingValue != null) { |
453 |
| - |
454 |
| - if (connection.exists(existingValueIndexKey)) { |
455 |
| - connection.sRem(existingValueIndexKey, toBytes(id)); |
456 |
| - } |
457 |
| - } |
458 |
| - } |
459 |
| - } |
460 |
| - |
| 426 | + redisUpdateObject = fetchDeletePathsFromHashAndUpdateIndex(redisUpdateObject, propertyPath, connection); |
461 | 427 | }
|
462 | 428 | }
|
463 | 429 |
|
464 |
| - if (!pathsToRemove.isEmpty()) { |
465 |
| - connection.hDel(redisKey, pathsToRemove.toArray(new byte[pathsToRemove.size()][])); |
| 430 | + if (!redisUpdateObject.fieldsToRemove.isEmpty()) { |
| 431 | + connection.hDel(redisKey, |
| 432 | + redisUpdateObject.fieldsToRemove.toArray(new byte[redisUpdateObject.fieldsToRemove.size()][])); |
| 433 | + } |
| 434 | + |
| 435 | + for (byte[] index : redisUpdateObject.indexesToUpdate) { |
| 436 | + connection.sRem(index, toBytes(redisUpdateObject.targetId)); |
466 | 437 | }
|
467 | 438 |
|
468 | 439 | if (!rdo.getBucket().isEmpty()) {
|
@@ -497,6 +468,48 @@ public Void doInRedis(RedisConnection connection) throws DataAccessException {
|
497 | 468 | });
|
498 | 469 | }
|
499 | 470 |
|
| 471 | + private RedisUpdateObject fetchDeletePathsFromHashAndUpdateIndex(RedisUpdateObject redisUpdateObject, String path, |
| 472 | + RedisConnection connection) { |
| 473 | + |
| 474 | + redisUpdateObject.addFieldToRemove(toBytes(path)); |
| 475 | + byte[] value = connection.hGet(redisUpdateObject.targetKey, toBytes(path)); |
| 476 | + |
| 477 | + if (value != null && value.length > 0) { |
| 478 | + |
| 479 | + byte[] existingValueIndexKey = value != null |
| 480 | + ? ByteUtils.concatAll(toBytes(redisUpdateObject.keyspace), toBytes((":" + path)), toBytes(":"), value) : null; |
| 481 | + |
| 482 | + if (connection.exists(existingValueIndexKey)) { |
| 483 | + redisUpdateObject.addIndexToUpdate(existingValueIndexKey); |
| 484 | + } |
| 485 | + return redisUpdateObject; |
| 486 | + } |
| 487 | + |
| 488 | + Set<byte[]> existingFields = connection.hKeys(redisUpdateObject.targetKey); |
| 489 | + |
| 490 | + for (byte[] field : existingFields) { |
| 491 | + |
| 492 | + if (asString(field).startsWith(path + ".")) { |
| 493 | + |
| 494 | + redisUpdateObject.addFieldToRemove(field); |
| 495 | + value = connection.hGet(redisUpdateObject.targetKey, toBytes(field)); |
| 496 | + |
| 497 | + if (value != null) { |
| 498 | + |
| 499 | + byte[] existingValueIndexKey = value != null |
| 500 | + ? ByteUtils.concatAll(toBytes(redisUpdateObject.keyspace), toBytes(":"), field, toBytes(":"), value) |
| 501 | + : null; |
| 502 | + |
| 503 | + if (connection.exists(existingValueIndexKey)) { |
| 504 | + redisUpdateObject.addIndexToUpdate(existingValueIndexKey); |
| 505 | + } |
| 506 | + } |
| 507 | + } |
| 508 | + } |
| 509 | + |
| 510 | + return redisUpdateObject; |
| 511 | + } |
| 512 | + |
500 | 513 | /**
|
501 | 514 | * Execute {@link RedisCallback} via underlying {@link RedisOperations}.
|
502 | 515 | *
|
@@ -684,4 +697,33 @@ private boolean isKeyExpirationMessage(Message message) {
|
684 | 697 | }
|
685 | 698 | }
|
686 | 699 |
|
| 700 | + /** |
| 701 | + * Container holding update information like fields to remove from the Redis Hash. |
| 702 | + * |
| 703 | + * @author Christoph Strobl |
| 704 | + */ |
| 705 | + private static class RedisUpdateObject { |
| 706 | + |
| 707 | + private final String keyspace; |
| 708 | + private final Object targetId; |
| 709 | + private final byte[] targetKey; |
| 710 | + |
| 711 | + private Set<byte[]> fieldsToRemove = new LinkedHashSet<byte[]>(); |
| 712 | + private Set<byte[]> indexesToUpdate = new LinkedHashSet<byte[]>(); |
| 713 | + |
| 714 | + RedisUpdateObject(byte[] targetKey, String keyspace, Object targetId) { |
| 715 | + |
| 716 | + this.targetKey = targetKey; |
| 717 | + this.keyspace = keyspace; |
| 718 | + this.targetId = targetId; |
| 719 | + } |
| 720 | + |
| 721 | + void addFieldToRemove(byte[] field) { |
| 722 | + fieldsToRemove.add(field); |
| 723 | + } |
| 724 | + |
| 725 | + void addIndexToUpdate(byte[] indexName) { |
| 726 | + indexesToUpdate.add(indexName); |
| 727 | + } |
| 728 | + } |
687 | 729 | }
|
0 commit comments