Skip to content

First pass - trivial synchronized blocks #8652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.ReturnedMessage;
Expand Down Expand Up @@ -58,6 +60,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Christian Tzolov
*
* @since 4.3
*
Expand Down Expand Up @@ -115,6 +118,8 @@ public abstract class AbstractAmqpOutboundEndpoint extends AbstractReplyProducin

private volatile ScheduledFuture<?> confirmChecker;

private final Lock lock = new ReentrantLock();

/**
* Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers.
* Defaults to {@link DefaultAmqpHeaderMapper#outboundMapper()}.
Expand Down Expand Up @@ -336,8 +341,14 @@ public void setConfirmTimeout(long confirmTimeout) {
this.confirmTimeout = Duration.ofMillis(confirmTimeout); // NOSONAR sync inconsistency
}

protected final synchronized void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
protected final void setConnectionFactory(ConnectionFactory connectionFactory) {
this.lock.lock();
try {
this.connectionFactory = connectionFactory;
}
finally {
this.lock.unlock();
}
}

protected String getExchangeName() {
Expand Down Expand Up @@ -487,26 +498,33 @@ protected void endpointInit() {
}

@Override
public synchronized void start() {
if (!this.running) {
if (!this.lazyConnect && this.connectionFactory != null) {
try {
Connection connection = this.connectionFactory.createConnection(); // NOSONAR (close)
if (connection != null) {
connection.close();
public void start() {
this.lock.lock();
try {
if (!this.running) {
if (!this.lazyConnect && this.connectionFactory != null) {
try {
Connection connection = this.connectionFactory.createConnection(); // NOSONAR (close)
if (connection != null) {
connection.close();
}
}
catch (RuntimeException ex) {
logger.error(ex, "Failed to eagerly establish the connection.");
}
}
catch (RuntimeException ex) {
logger.error(ex, "Failed to eagerly establish the connection.");
doStart();
if (this.confirmTimeout != null && getConfirmNackChannel() != null && getRabbitTemplate() != null) {
this.confirmChecker = getTaskScheduler()
.scheduleAtFixedRate(checkUnconfirmed(), this.confirmTimeout.dividedBy(2L));
}
this.running = true;
}
doStart();
if (this.confirmTimeout != null && getConfirmNackChannel() != null && getRabbitTemplate() != null) {
this.confirmChecker = getTaskScheduler()
.scheduleAtFixedRate(checkUnconfirmed(), this.confirmTimeout.dividedBy(2L));
}
this.running = true;
}
finally {
this.lock.unlock();
}

}

private Runnable checkUnconfirmed() {
Expand All @@ -526,14 +544,20 @@ private Runnable checkUnconfirmed() {
protected abstract RabbitTemplate getRabbitTemplate();

@Override
public synchronized void stop() {
if (this.running) {
doStop();
}
this.running = false;
if (this.confirmChecker != null) {
this.confirmChecker.cancel(false);
this.confirmChecker = null;
public void stop() {
this.lock.lock();
try {
if (this.running) {
doStop();
}
this.running = false;
if (this.confirmChecker != null) {
this.confirmChecker.cancel(false);
this.confirmChecker = null;
}
}
finally {
this.lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.micrometer.observation.ObservationRegistry;

Expand Down Expand Up @@ -70,6 +72,7 @@
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
* @author Christian Tzolov
*/
@IntegrationManagedResource
public abstract class AbstractMessageChannel extends IntegrationObjectSupport
Expand Down Expand Up @@ -475,6 +478,8 @@ public void destroy() {
*/
protected static class ChannelInterceptorList {

private final Lock lock = new ReentrantLock();

protected final List<ChannelInterceptor> interceptors = new CopyOnWriteArrayList<>(); // NOSONAR

private final LogAccessor logger;
Expand All @@ -486,11 +491,15 @@ public ChannelInterceptorList(LogAccessor logger) {
}

public boolean set(List<ChannelInterceptor> interceptors) {
synchronized (this.interceptors) {
this.lock.lock();
try {
this.interceptors.clear();
this.size = interceptors.size();
return this.interceptors.addAll(interceptors);
}
finally {
this.lock.unlock();
}
}

public int getSize() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.support.channel.HeaderChannelRegistry;
Expand All @@ -44,6 +46,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Trung Pham
* @author Christian Tzolov
*
* @since 3.0
*
Expand All @@ -69,6 +72,8 @@ public class DefaultHeaderChannelRegistry extends IntegrationObjectSupport

private volatile boolean explicitlyStopped;

private final Lock lock = new ReentrantLock();

/**
* Construct a registry with the default delay for channel expiry.
*/
Expand Down Expand Up @@ -120,25 +125,37 @@ protected void onInit() {
}

@Override
public synchronized void start() {
if (!this.running) {
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
this.reaperScheduledFuture =
getTaskScheduler()
.schedule(this, Instant.now().plusMillis(this.reaperDelay));

this.running = true;
public void start() {
this.lock.lock();
try {
if (!this.running) {
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
this.reaperScheduledFuture = getTaskScheduler()
.schedule(this, Instant.now().plusMillis(this.reaperDelay));

this.running = true;
}
}
finally {
this.lock.unlock();
}
}

@Override
public synchronized void stop() {
this.running = false;
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
this.reaperScheduledFuture = null;
public void stop() {
this.lock.lock();
try {
this.running = false;
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
this.reaperScheduledFuture = null;
}
this.explicitlyStopped = true;
}
finally {
this.lock.unlock();
}
this.explicitlyStopped = true;

}

public void stop(Runnable callback) {
Expand Down Expand Up @@ -200,35 +217,45 @@ public MessageChannel channelNameToChannel(@Nullable String name) {
* Cancel the scheduled reap task and run immediately; then reschedule.
*/
@Override
public synchronized void runReaper() {
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
this.reaperScheduledFuture = null;
}
public void runReaper() {
this.lock.lock();
try {
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
this.reaperScheduledFuture = null;
}

run();
run();
}
finally {
this.lock.unlock();
}
}

@Override
public synchronized void run() {
logger.trace(() -> "Reaper started; channels size=" + this.channels.size());
Iterator<Entry<String, MessageChannelWrapper>> iterator = this.channels.entrySet().iterator();
long now = System.currentTimeMillis();
while (iterator.hasNext()) {
Entry<String, MessageChannelWrapper> entry = iterator.next();
if (entry.getValue().expireAt() < now) {
logger.debug(() -> "Expiring " + entry.getKey() + " (" + entry.getValue().channel() + ")");
iterator.remove();
public void run() {
this.lock.lock();
try {
logger.trace(() -> "Reaper started; channels size=" + this.channels.size());
Iterator<Entry<String, MessageChannelWrapper>> iterator = this.channels.entrySet().iterator();
long now = System.currentTimeMillis();
while (iterator.hasNext()) {
Entry<String, MessageChannelWrapper> entry = iterator.next();
if (entry.getValue().expireAt() < now) {
logger.debug(() -> "Expiring " + entry.getKey() + " (" + entry.getValue().channel() + ")");
iterator.remove();
}
}
}
this.reaperScheduledFuture =
getTaskScheduler()
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
this.reaperScheduledFuture = getTaskScheduler()
.schedule(this, Instant.now().plusMillis(this.reaperDelay));

logger.trace(() -> "Reaper completed; channels size=" + this.channels.size());
logger.trace(() -> "Reaper completed; channels size=" + this.channels.size());
}
finally {
this.lock.unlock();
}
}


protected record MessageChannelWrapper(MessageChannel channel, long expireAt) {

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,8 @@
package org.springframework.integration.config;

import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -58,14 +60,15 @@
* @author Gary Russell
* @author Artem Bilan
* @author David Liu
* @author Christian Tzolov
*/
public abstract class AbstractSimpleMessageHandlerFactoryBean<H extends MessageHandler>
implements FactoryBean<MessageHandler>, ApplicationContextAware, BeanFactoryAware, BeanNameAware,
ApplicationEventPublisherAware {

protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR protected with final

private final Object initializationMonitor = new Object();
private final Lock initializationMonitor = new ReentrantLock();

private BeanFactory beanFactory;

Expand Down Expand Up @@ -192,7 +195,8 @@ public H getObject() {
}

protected final H createHandlerInternal() {
synchronized (this.initializationMonitor) {
this.initializationMonitor.lock();
try {
if (this.initialized) {
// There was a problem when this method was called already
return null;
Expand Down Expand Up @@ -228,6 +232,9 @@ protected final H createHandlerInternal() {
this.order, theOrder -> ((Orderable) this.handler).setOrder(theOrder));
this.initialized = true;
}
finally {
this.initializationMonitor.unlock();
}
initializingBean();
return this.handler;
}
Expand Down
Loading