Skip to content

Fix spelling in SynchronizingMessageListener.SubscriptionSynchronizion #2657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.0-GH-2656-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.io.Serializable;

import org.springframework.lang.Nullable;

/**
* Class encapsulating a Redis message body and its properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ else if (topic instanceof PatternTopic) {
if (wasListening) {
CompletableFuture<Void> future = new CompletableFuture<>();

getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns,
getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns,
channels, () -> future.complete(null)));
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));
Expand Down Expand Up @@ -1212,7 +1212,7 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {

addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
() -> subscriptionDone.complete(null)));

doSubscribe(connection, patterns, channels);
Expand Down Expand Up @@ -1240,7 +1240,7 @@ void doSubscribe(RedisConnection connection, Collection<byte[]> patterns, Collec
}
}

void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronizion synchronizer) {
void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronization synchronizer) {
this.synchronizingMessageListener.addSynchronization(synchronizer);
}

Expand Down Expand Up @@ -1413,7 +1413,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
initiallySubscribeToChannels = Collections.emptySet();
// perform channel subscription later as the first call to (p)subscribe blocks the client
addSynchronization(
new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, Collections.emptySet(), () -> {
new SynchronizingMessageListener.SubscriptionSynchronization(patterns, Collections.emptySet(), () -> {
try {
subscribeChannel(channels.toArray(new byte[0][]));
} catch (Exception e) {
Expand All @@ -1424,7 +1424,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
initiallySubscribeToChannels = channels;
}

addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
() -> subscriptionDone.complete(null)));

executor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
Expand All @@ -34,7 +33,7 @@

/**
* Synchronizing {@link MessageListener} and {@link SubscriptionListener} that allows notifying a {@link Runnable}
* (through {@link SubscriptionSynchronizion}) upon completing subscriptions to channels or patterns.
* (through {@link SubscriptionSynchronization}) upon completing subscriptions to channels or patterns.
*
* @author Mark Paluch
* @since 3.0
Expand All @@ -43,19 +42,19 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe

private final MessageListener messageListener;
private final SubscriptionListener subscriptionListener;
private final List<SubscriptionSynchronizion> synchronizations = new CopyOnWriteArrayList<>();
private final List<SubscriptionSynchronization> synchronizations = new CopyOnWriteArrayList<>();

public SynchronizingMessageListener(MessageListener messageListener, SubscriptionListener subscriptionListener) {
this.messageListener = messageListener;
this.subscriptionListener = subscriptionListener;
}

/**
* Register a {@link SubscriptionSynchronizion}.
* Register a {@link SubscriptionSynchronization}.
*
* @param synchronization must not be {@literal null}.
*/
public void addSynchronization(SubscriptionSynchronizion synchronization) {
public void addSynchronization(SubscriptionSynchronization synchronization) {
this.synchronizations.add(synchronization);
}

Expand All @@ -68,7 +67,7 @@ public void onMessage(Message message, @Nullable byte[] pattern) {
public void onChannelSubscribed(byte[] channel, long count) {

subscriptionListener.onChannelSubscribed(channel, count);
handleSubscription(channel, SubscriptionSynchronizion::onChannelSubscribed);
handleSubscription(channel, SubscriptionSynchronization::onChannelSubscribed);
}

@Override
Expand All @@ -80,7 +79,7 @@ public void onChannelUnsubscribed(byte[] channel, long count) {
public void onPatternSubscribed(byte[] pattern, long count) {

subscriptionListener.onPatternSubscribed(pattern, count);
handleSubscription(pattern, SubscriptionSynchronizion::onPatternSubscribed);
handleSubscription(pattern, SubscriptionSynchronization::onPatternSubscribed);
}

@Override
Expand All @@ -89,16 +88,16 @@ public void onPatternUnsubscribed(byte[] pattern, long count) {
}

void handleSubscription(byte[] topic,
BiFunction<SubscriptionSynchronizion, ByteArrayWrapper, Boolean> synchronizerCallback) {
BiFunction<SubscriptionSynchronization, ByteArrayWrapper, Boolean> synchronizerCallback) {

if (synchronizations.isEmpty()) {
return;
}

ByteArrayWrapper binaryChannel = new ByteArrayWrapper(topic);
List<SubscriptionSynchronizion> finalized = new ArrayList<>(synchronizations.size());
List<SubscriptionSynchronization> finalized = new ArrayList<>(synchronizations.size());

for (SubscriptionSynchronizion synchronizer : synchronizations) {
for (SubscriptionSynchronization synchronizer : synchronizations) {

if (synchronizerCallback.apply(synchronizer, binaryChannel)) {
finalized.add(synchronizer);
Expand All @@ -111,37 +110,38 @@ void handleSubscription(byte[] topic,
/**
* Synchronization to await subscriptions for channels and patterns.
*/
static class SubscriptionSynchronizion {
static class SubscriptionSynchronization {

private static final AtomicIntegerFieldUpdater<SubscriptionSynchronizion> DONE = AtomicIntegerFieldUpdater
.newUpdater(SubscriptionSynchronizion.class, "done");
private static final AtomicIntegerFieldUpdater<SubscriptionSynchronization> DONE = AtomicIntegerFieldUpdater
.newUpdater(SubscriptionSynchronization.class, "done");

private static final int NOT_DONE = 0;
private static final int DONE_DONE = 0;

private volatile int done = NOT_DONE;
private final Set<ByteArrayWrapper> remainingPatterns;
private final Set<ByteArrayWrapper> remainingChannels;

private final Runnable doneCallback;

public SubscriptionSynchronizion(Collection<byte[]> remainingPatterns, Collection<byte[]> remainingChannels,
private final Set<ByteArrayWrapper> remainingPatterns;
private final Set<ByteArrayWrapper> remainingChannels;

public SubscriptionSynchronization(Collection<byte[]> remainingPatterns, Collection<byte[]> remainingChannels,
Runnable doneCallback) {

if (remainingPatterns.isEmpty()) {
this.remainingPatterns = Collections.emptySet();
} else {
this.remainingPatterns = ConcurrentHashMap.newKeySet(remainingPatterns.size());
this.remainingPatterns
.addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
.addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).toList());
}

if (remainingChannels.isEmpty()) {
this.remainingChannels = Collections.emptySet();
} else {
this.remainingChannels = ConcurrentHashMap.newKeySet(remainingChannels.size());
this.remainingChannels
.addAll(remainingChannels.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
.addAll(remainingChannels.stream().map(ByteArrayWrapper::new).toList());
}

this.doneCallback = doneCallback;
Expand Down