Skip to content

Implement randomizationFactor in ExponentialBackOffWithMaxRetries #3849

Open
@JoosJuliet

Description

@JoosJuliet

hello

Expected Behavior

The ExponentialBackOffWithMaxRetries class should allow for randomized backoff intervals to prevent simultaneous retries from multiple instances, which can overload servers. This randomness should be adjustable via a randomizationFactor to provide flexibility in how the backoff intervals are calculated.

Current Behavior

Currently, the ExponentialBackOffWithMaxRetries class calculates backoff intervals based solely on fixed exponential factors without any randomness. This can lead to predictable and synchronized retries in distributed systems, potentially causing spikes in load and collision risks.

Context

To enhance the stability and fairness of our Kafka message processing system, we need to address a key challenge: consumer pod starvation caused by synchronized retries when using RecoveringBatchErrorHandler. The predictable nature of the current retry intervals is the root cause of this synchronization, leading to uneven load distribution.

The core solution is introducing a randomizationFactor within the ExponentialBackOffWithMaxRetries class. This element of randomness adds necessary jitter, fine-tuning each pod's retry interval to prevent simultaneous attempts and distribute them over time.

Consequently, this enhancement will effectively prevent pod starvation and ensure message consumption opportunities are distributed more evenly across all nodes. This stabilizes system load and significantly improves the resilience of our Kafka processing architecture.

Proposed Code Changes

Here is the proposed enhancement to the ExponentialBackOffWithMaxRetries class

public class ExponentialBackOffWithMaxRetries extends ExponentialBackOff {

    private final int maxRetries;
    private double randomizationFactor = 0.0; // Default to no randomization for backward compatibility

    public ExponentialBackOffWithMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
        calculateMaxElapsed();
    }

    public void setRandomizationFactor(double randomizationFactor) {
        if (randomizationFactor < 0 || randomizationFactor > 1) {
            throw new IllegalArgumentException("Randomization factor must be between 0 and 1");
        }
        this.randomizationFactor = randomizationFactor;
        calculateMaxElapsed();
    }

    private void calculateMaxElapsed() {
        long maxInterval = getMaxInterval();
        long maxElapsed = Math.min(getInitialInterval(), maxInterval);
        long current = maxElapsed;
        for (int i = 1; i < this.maxRetries; i++) {
            long next = Math.min((long) (current * getMultiplier()), maxInterval);
            current = applyRandomization(next);
            maxElapsed += current;
        }
        super.setMaxElapsedTime(maxElapsed);
    }

    private long applyRandomization(long interval) {
        double randomMultiplier = (1 - randomizationFactor) + Math.random() * 2 * randomizationFactor;
        return (long) (interval * randomMultiplier);
    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions