Skip to content

Commit 62c704d

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-491 - Add configuration option to tune KeyspaceEventMessageListener usage.
We now allow more fine grained setup for usage of Redis keyspace events. This can be done programmatically via the RedisKeyValueAdapter or using `@EnableRedisRepositories.enableKeyspaceEvents`. Original pull request: #193.
1 parent 46f07d7 commit 62c704d

File tree

6 files changed

+262
-38
lines changed

6 files changed

+262
-38
lines changed

src/main/java/org/springframework/data/redis/core/RedisKeyValueAdapter.java

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121
import java.util.Map;
2222
import java.util.Map.Entry;
2323
import java.util.Set;
24+
import java.util.concurrent.atomic.AtomicReference;
2425

2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728
import org.springframework.beans.BeansException;
29+
import org.springframework.beans.factory.DisposableBean;
30+
import org.springframework.beans.factory.InitializingBean;
2831
import org.springframework.context.ApplicationContext;
2932
import org.springframework.context.ApplicationContextAware;
33+
import org.springframework.context.ApplicationEventPublisher;
3034
import org.springframework.context.ApplicationListener;
3135
import org.springframework.core.convert.ConversionService;
3236
import org.springframework.core.convert.ConverterNotFoundException;
@@ -50,22 +54,23 @@
5054
import org.springframework.data.redis.util.ByteUtils;
5155
import org.springframework.data.util.CloseableIterator;
5256
import org.springframework.util.Assert;
57+
import org.springframework.util.ObjectUtils;
5358

5459
/**
5560
* Redis specific {@link KeyValueAdapter} implementation. Uses binary codec to read/write data from/to Redis. Objects
5661
* are stored in a Redis Hash using the value of {@link RedisHash}, the {@link KeyspaceConfiguration} or just
5762
* {@link Class#getName()} as a prefix. <br />
5863
* <strong>Example</strong>
59-
*
64+
*
6065
* <pre>
6166
* <code>
6267
* &#64;RedisHash("persons")
6368
* class Person {
6469
* &#64;Id String id;
6570
* String name;
6671
* }
67-
*
68-
*
72+
*
73+
*
6974
* prefix ID
7075
* | |
7176
* V V
@@ -76,29 +81,33 @@
7681
* 4) Rand al'Thor
7782
* </code>
7883
* </pre>
79-
*
84+
*
8085
* <br />
8186
* The {@link KeyValueAdapter} is <strong>not</strong> intended to store simple types such as {@link String} values.
8287
* Please use {@link RedisTemplate} for this purpose.
83-
*
88+
*
8489
* @author Christoph Strobl
8590
* @author Mark Paluch
8691
* @since 1.7
8792
*/
8893
public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
89-
implements ApplicationContextAware, ApplicationListener<RedisKeyspaceEvent> {
94+
implements InitializingBean, ApplicationContextAware, ApplicationListener<RedisKeyspaceEvent> {
9095

9196
private static final Logger LOGGER = LoggerFactory.getLogger(RedisKeyValueAdapter.class);
9297

9398
private RedisOperations<?, ?> redisOps;
9499
private RedisConverter converter;
95100
private RedisMessageListenerContainer messageListenerContainer;
96-
private KeyExpirationEventMessageListener expirationListener;
101+
private AtomicReference<KeyExpirationEventMessageListener> expirationListener = new AtomicReference<KeyExpirationEventMessageListener>(
102+
null);
103+
private ApplicationEventPublisher eventPublisher;
104+
105+
private EnableKeyspaceEvents enableKeyspaceEvents = EnableKeyspaceEvents.ON_STARTUP;
97106

98107
/**
99108
* Creates new {@link RedisKeyValueAdapter} with default {@link RedisMappingContext} and default
100109
* {@link CustomConversions}.
101-
*
110+
*
102111
* @param redisOps must not be {@literal null}.
103112
*/
104113
public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps) {
@@ -107,7 +116,7 @@ public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps) {
107116

108117
/**
109118
* Creates new {@link RedisKeyValueAdapter} with default {@link CustomConversions}.
110-
*
119+
*
111120
* @param redisOps must not be {@literal null}.
112121
* @param mappingContext must not be {@literal null}.
113122
*/
@@ -117,7 +126,7 @@ public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps, RedisMappingContext
117126

118127
/**
119128
* Creates new {@link RedisKeyValueAdapter}.
120-
*
129+
*
121130
* @param redisOps must not be {@literal null}.
122131
* @param mappingContext must not be {@literal null}.
123132
* @param customConversions can be {@literal null}.
@@ -138,12 +147,12 @@ public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps, RedisMappingContext
138147
converter = mappingConverter;
139148
this.redisOps = redisOps;
140149

141-
initKeyExpirationListener();
150+
intiMessageListenerContainer();
142151
}
143152

144153
/**
145154
* Creates new {@link RedisKeyValueAdapter} with specific {@link RedisConverter}.
146-
*
155+
*
147156
* @param redisOps must not be {@literal null}.
148157
* @param mappingContext must not be {@literal null}.
149158
*/
@@ -156,7 +165,7 @@ public RedisKeyValueAdapter(RedisOperations<?, ?> redisOps, RedisConverter redis
156165
converter = redisConverter;
157166
this.redisOps = redisOps;
158167

159-
initKeyExpirationListener();
168+
intiMessageListenerContainer();
160169
}
161170

162171
/**
@@ -175,6 +184,14 @@ public Object put(final Serializable id, final Object item, final Serializable k
175184
converter.write(item, rdo);
176185
}
177186

187+
if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_DEMAND, enableKeyspaceEvents)
188+
&& this.expirationListener.get() == null) {
189+
190+
if (rdo.getTimeToLive() != null && rdo.getTimeToLive().longValue() > 0) {
191+
initKeyExpirationListener();
192+
}
193+
}
194+
178195
if (rdo.getId() == null) {
179196

180197
rdo.setId(converter.getConversionService().convert(id, String.class));
@@ -397,7 +414,7 @@ public Long doInRedis(RedisConnection connection) throws DataAccessException {
397414

398415
/**
399416
* Execute {@link RedisCallback} via underlying {@link RedisOperations}.
400-
*
417+
*
401418
* @param callback must not be {@literal null}.
402419
* @see RedisOperations#execute(RedisCallback)
403420
* @return
@@ -408,7 +425,7 @@ public <T> T execute(RedisCallback<T> callback) {
408425

409426
/**
410427
* Get the {@link RedisConverter} in use.
411-
*
428+
*
412429
* @return never {@literal null}.
413430
*/
414431
public RedisConverter getConverter() {
@@ -430,7 +447,7 @@ public byte[] createKey(String keyspace, String id) {
430447

431448
/**
432449
* Convert given source to binary representation using the underlying {@link ConversionService}.
433-
*
450+
*
434451
* @param source
435452
* @return
436453
* @throws ConverterNotFoundException
@@ -444,13 +461,38 @@ public byte[] toBytes(Object source) {
444461
return converter.getConversionService().convert(source, byte[].class);
445462
}
446463

464+
/**
465+
* Configure usage of {@link KeyExpirationEventMessageListener}.
466+
*
467+
* @param enableKeyspaceEvents
468+
* @since 1.8
469+
*/
470+
public void setEnableKeyspaceEvents(EnableKeyspaceEvents enableKeyspaceEvents) {
471+
this.enableKeyspaceEvents = enableKeyspaceEvents;
472+
}
473+
474+
/**
475+
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
476+
* @since 1.8
477+
*/
478+
@Override
479+
public void afterPropertiesSet() {
480+
481+
if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP, this.enableKeyspaceEvents)) {
482+
initKeyExpirationListener();
483+
}
484+
}
485+
447486
/*
448487
* (non-Javadoc)
449488
* @see org.springframework.beans.factory.DisposableBean#destroy()
450489
*/
451490
public void destroy() throws Exception {
452491

453-
this.expirationListener.destroy();
492+
if (this.expirationListener.get() != null) {
493+
this.expirationListener.get().destroy();
494+
}
495+
454496
this.messageListenerContainer.destroy();
455497
}
456498

@@ -490,26 +532,39 @@ public Void doInRedis(RedisConnection connection) throws DataAccessException {
490532
*/
491533
@Override
492534
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
493-
this.expirationListener.setApplicationEventPublisher(applicationContext);
535+
this.eventPublisher = applicationContext;
494536
}
495537

496-
private void initKeyExpirationListener() {
538+
private void intiMessageListenerContainer() {
497539

498540
this.messageListenerContainer = new RedisMessageListenerContainer();
499541
messageListenerContainer.setConnectionFactory(((RedisTemplate<?, ?>) redisOps).getConnectionFactory());
500542
messageListenerContainer.afterPropertiesSet();
501543
messageListenerContainer.start();
544+
}
545+
546+
private void initKeyExpirationListener() {
547+
548+
if (this.expirationListener.get() == null) {
549+
550+
MappingExpirationListener listener = new MappingExpirationListener(this.messageListenerContainer, this.redisOps,
551+
this.converter);
552+
553+
if (this.eventPublisher != null) {
554+
listener.setApplicationEventPublisher(this.eventPublisher);
555+
}
502556

503-
this.expirationListener = new MappingExpirationListener(this.messageListenerContainer, this.redisOps,
504-
this.converter);
505-
this.expirationListener.init();
557+
if (this.expirationListener.compareAndSet(null, listener)) {
558+
listener.init();
559+
}
560+
}
506561
}
507562

508563
/**
509564
* {@link MessageListener} implementation used to capture Redis keypspace notifications. Tries to read a previously
510565
* created phantom key {@code keyspace:id:phantom} to provide the expired object as part of the published
511566
* {@link RedisKeyExpiredEvent}.
512-
*
567+
*
513568
* @author Christoph Strobl
514569
* @since 1.7
515570
*/
@@ -520,7 +575,7 @@ static class MappingExpirationListener extends KeyExpirationEventMessageListener
520575

521576
/**
522577
* Creates new {@link MappingExpirationListener}.
523-
*
578+
*
524579
* @param listenerContainer
525580
* @param ops
526581
* @param converter
@@ -582,4 +637,26 @@ private boolean isKeyExpirationMessage(Message message) {
582637
}
583638
}
584639

640+
/**
641+
* @author Christoph Strobl
642+
* @since 1.8
643+
*/
644+
public static enum EnableKeyspaceEvents {
645+
646+
/**
647+
* Initializes the {@link KeyExpirationEventMessageListener} on startup.
648+
*/
649+
ON_STARTUP,
650+
651+
/**
652+
* Initializes the {@link KeyExpirationEventMessageListener} on first insert having expiration time set.
653+
*/
654+
ON_DEMAND,
655+
656+
/**
657+
* Turn {@link KeyExpirationEventMessageListener} usage off. No expiration events will be received.
658+
*/
659+
OFF
660+
}
661+
585662
}

src/main/java/org/springframework/data/redis/repository/configuration/EnableRedisRepositories.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015 the original author or authors.
2+
* Copyright 2015-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,9 +27,11 @@
2727
import org.springframework.context.annotation.Import;
2828
import org.springframework.data.keyvalue.core.KeyValueOperations;
2929
import org.springframework.data.keyvalue.repository.config.QueryCreatorType;
30+
import org.springframework.data.redis.core.RedisKeyValueAdapter.EnableKeyspaceEvents;
3031
import org.springframework.data.redis.core.RedisOperations;
3132
import org.springframework.data.redis.core.convert.KeyspaceConfiguration;
3233
import org.springframework.data.redis.core.index.IndexConfiguration;
34+
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
3335
import org.springframework.data.redis.repository.query.RedisQueryCreator;
3436
import org.springframework.data.redis.repository.support.RedisRepositoryFactoryBean;
3537
import org.springframework.data.repository.config.DefaultRepositoryBaseClass;
@@ -53,7 +55,8 @@
5355

5456
/**
5557
* Alias for the {@link #basePackages()} attribute. Allows for more concise annotation declarations e.g.:
56-
* {@code @EnableRedisRepositories("org.my.pkg")} instead of {@code @EnableRedisRepositories(basePackages="org.my.pkg")}.
58+
* {@code @EnableRedisRepositories("org.my.pkg")} instead of
59+
* {@code @EnableRedisRepositories(basePackages="org.my.pkg")}.
5760
*/
5861
String[] value() default {};
5962

@@ -154,4 +157,12 @@
154157
*/
155158
Class<? extends KeyspaceConfiguration> keyspaceConfiguration() default KeyspaceConfiguration.class;
156159

160+
/**
161+
* Configure usage of {@link KeyExpirationEventMessageListener}.
162+
*
163+
* @return
164+
* @since 1.8
165+
*/
166+
EnableKeyspaceEvents enableKeyspaceEvents() default EnableKeyspaceEvents.ON_DEMAND;
167+
157168
}

src/main/java/org/springframework/data/redis/repository/configuration/RedisRepositoryConfigurationExtension.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015 the original author or authors.
2+
* Copyright 2015-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -122,6 +122,11 @@ public void registerBeansForRoot(BeanDefinitionRegistry registry, RepositoryConf
122122
new RuntimeBeanReference(REDIS_CONVERTER_BEAN_NAME));
123123

124124
redisKeyValueAdapterDefinition.setConstructorArgumentValues(constructorArgumentValuesForRedisKeyValueAdapter);
125+
126+
DirectFieldAccessor dfa = new DirectFieldAccessor(configurationSource);
127+
AnnotationAttributes aa = (AnnotationAttributes) dfa.getPropertyValue("attributes");
128+
redisKeyValueAdapterDefinition.setAttribute("enableKeyspaceEvents", aa.getEnum("enableKeyspaceEvents"));
129+
125130
registerIfNotAlreadyRegistered(redisKeyValueAdapterDefinition, registry, REDIS_ADAPTER_BEAN_NAME,
126131
configurationSource);
127132

src/test/java/org/springframework/data/redis/core/RedisKeyValueAdapterTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void setUp() {
6969
mappingContext.afterPropertiesSet();
7070

7171
adapter = new RedisKeyValueAdapter(template, mappingContext);
72+
adapter.afterPropertiesSet();
7273

7374
template.execute(new RedisCallback<Void>() {
7475

0 commit comments

Comments
 (0)