From 7e98b3a1f2015c25359ff7b2d62995c3f64f5548 Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 14 Mar 2023 18:50:43 -0400 Subject: [PATCH 1/2] Some `LockRegistryLeaderInitiator` improvements It is better to not go to the target lock provider at all if the current thread is already interrupted. * Check for the `Thread.currentThread().isInterrupted()` in the `while` loop and `restartSelectorBecauseOfError()` immediately without checking for a lock * Fix some other simple typos in the `LockRegistryLeaderInitiator` **Cherry-pick to `6.0.x`** --- .../leader/LockRegistryLeaderInitiator.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java index a646dfeeabc..a37230ee77d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -353,6 +353,10 @@ protected class LeaderSelector implements Callable { public Void call() { try { while (isRunning()) { + if (Thread.currentThread().isInterrupted()) { + restartSelectorBecauseOfError(new InterruptedException()); + return null; + } try { tryAcquireLock(); } @@ -373,7 +377,7 @@ public Void call() { LOGGER.debug("Could not unlock during stop for " + this.context + " - treat as broken. Revoking...", e); } - // We are stopping, therefore not leading any more + // We are stopping, therefore not leading anymore handleRevoked(); } } @@ -385,8 +389,8 @@ private void tryAcquireLock() throws InterruptedException { LOGGER.debug("Acquiring the lock for " + this.context); } // We always try to acquire the lock, in case it expired - boolean acquired = this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, - TimeUnit.MILLISECONDS); + boolean acquired = + this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS); if (!this.locked) { if (acquired) { // Success: we are now leader @@ -398,8 +402,7 @@ else if (isPublishFailedEvents()) { } } else if (acquired) { - // If we were able to acquire it but we were already locked we - // should release it + // If we were able to acquire it, but we were already locked, we should release it this.lock.unlock(); if (isRunning()) { // Give it a chance to expire. @@ -408,7 +411,7 @@ else if (acquired) { } else { this.locked = false; - // We were not able to acquire it, therefore not leading any more + // We were not able to acquire it, therefore not leading anymore handleRevoked(); if (isRunning()) { // Try again quickly in case the lock holder dropped it @@ -446,7 +449,7 @@ private boolean handleLockException(Exception ex) { // NOSONAR Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis); } catch (InterruptedException e1) { - // Ignore interruption and let it to be caught on the next cycle. + // Ignore interruption and let it be caught on the next cycle. Thread.currentThread().interrupt(); } } From 4102aec6d060fe1f67e3e417a21dc33a04615654 Mon Sep 17 00:00:00 2001 From: abilan Date: Wed, 15 Mar 2023 09:38:30 -0400 Subject: [PATCH 2/2] * Introduce a `LeaderSelector.yielding` flag to revoke leader smoothly. Turns out just canceling the `Future` may lead to a broken lock where we cannot unlock it because the target lock repository may not work with interrupted threads. This way a new leader must wait until the lock is expired in the store --- .../leader/LockRegistryLeaderInitiator.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java index a37230ee77d..1e5323cdc9d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java @@ -344,6 +344,8 @@ protected class LeaderSelector implements Callable { private volatile boolean locked = false; + private volatile boolean yielding = false; + LeaderSelector(String lockKey) { this.lock = LockRegistryLeaderInitiator.this.locks.obtain(lockKey); this.lockKey = lockKey; @@ -354,14 +356,21 @@ public Void call() { try { while (isRunning()) { if (Thread.currentThread().isInterrupted()) { + // No need to try to lock in the interrupted thread, and we might not be able to unlock restartSelectorBecauseOfError(new InterruptedException()); return null; } + if (this.yielding) { + this.yielding = false; + // When yielding, we have to unlock and continue after busyWaitMillis to elect + unlockAndHandleException(null); + continue; + } try { tryAcquireLock(); } catch (Exception e) { - if (handleLockException(e)) { + if (unlockAndHandleException(e)) { return null; } } @@ -420,7 +429,7 @@ else if (acquired) { } } - private boolean handleLockException(Exception ex) { // NOSONAR + private boolean unlockAndHandleException(Exception ex) { // NOSONAR if (this.locked) { this.locked = false; try { @@ -537,9 +546,7 @@ public void yield() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Yielding leadership from " + this); } - if (LockRegistryLeaderInitiator.this.future != null) { - LockRegistryLeaderInitiator.this.future.cancel(true); - } + LockRegistryLeaderInitiator.this.leaderSelector.yielding = true; } @Override