1
\$\begingroup\$

I want to validate my solution for a lock-free leaky bucket rate limiter. Given a queuing capacity and rate limit per second, it should queue requests till capacity is reached and it should allow only certain number of requests per second.

Extra constraints: No explicit queue. No locks. No background threads. I am just trying to see what is the most standalone solution that is possible to implement.

Ignoring thread interrupts, is there anything wrong with my algorithm that can cause issues?

How it works: I set a global request counter globalRequestIdTracker and another variable canProcessUpto to track which requests are valid at a given time. Threads that are queued go to sleep for 1sec, when they wake up they update the canProcessUpto and if they are valid threads, they move on.

public class LeakyBucket {

  int bucketCapacity;
  int leaksPerSecond;
  AtomicInteger lastLeakTime;
  long startTime = System.currentTimeMillis();

  AtomicInteger waitingThreads;
  AtomicInteger globalRequestIdTracker;
  AtomicInteger canProcessUpto;

  LeakyBucket(int bucketCapacity, int leaksPerSecond) {
    this.bucketCapacity = bucketCapacity;
    this.leaksPerSecond = leaksPerSecond;
    this.waitingThreads = new AtomicInteger(0);
    this.lastLeakTime = new AtomicInteger(0);
    this.globalRequestIdTracker = new AtomicInteger(1);
    this.canProcessUpto = new AtomicInteger(leaksPerSecond);
  }

  boolean allowRequest() {
    int myRequestId = -1;
    while (true) {
      if (waitingThreads.get() >= bucketCapacity) {
        logEvent("❌ Dropping");
        return false;
      }
      int currentQueueSize = waitingThreads.get();
      if (waitingThreads.compareAndSet(currentQueueSize, currentQueueSize + 1)) {
        myRequestId = globalRequestIdTracker.getAndIncrement();
        break;
      }
    }
    if (canProcessUpto.get() < myRequestId) {
      logEvent("➡️ Queuing");
    }
    while (canProcessUpto.get() < myRequestId) {
      try {
        Thread.sleep(1000);
        int currentCanProcessUpto = canProcessUpto.get();
        int currentLastLeakTime = lastLeakTime.get();
        int currentTime = getTime();
        if (currentTime > lastLeakTime.get()) { // at least 1 sec has passed.
          // First update last leak time to prevent other threads from reading
          // updated canProcessUpto, and un-updated lastLeakTime
          lastLeakTime.compareAndSet(currentLastLeakTime, currentTime);
          canProcessUpto.compareAndSet(
              currentCanProcessUpto, currentCanProcessUpto + leaksPerSecond);
        }
      } catch (InterruptedException e) {
        // Ignore
      }
    }
    waitingThreads.decrementAndGet();
    logEvent("✅ Serving");
    return true;
  }

  private int getTime() {
    return (int) (System.currentTimeMillis() - startTime) / 1000;
  }

  private void logEvent(String event) {
    System.out.println(
        "Time = " + getTime() + " " + event + " request by " + Thread.currentThread().getName());
  }

  public static void main(String[] args) throws Exception {
    LeakyBucket leakyBucket = new LeakyBucket(10, 2);

    for (int i = 0; i <= 20; i++) {
      // This will reject 10 requests, and accept 10 which will leak at 2req/sec.
      var t = new Thread(leakyBucket::allowRequest);
      t.setName("1st Phase Thread-" + i);
      t.start();
    }

    Thread.sleep(3100); // By now, 6 requests should have been served.
    for (int i = 0; i <= 20; i++) {
      // This will reject 14 requests. And queue 6,
      var t = new Thread(leakyBucket::allowRequest);
      t.setName("2nd Phase Thread-" + i);
      t.start();
    }
    Thread.sleep(10000);
  }
}
\$\endgroup\$
3
  • 1
    \$\begingroup\$ allowRequest is returning a boolean. I assumed it would just check if a request can be processed or whether it has reached the rate limit and return a boolean value accordingly. Why is there such complex logic inside? What's the purpose of that function? \$\endgroup\$ Commented Jul 15 at 8:20
  • \$\begingroup\$ Also, where's the queue that stores the request temporarily until they are flushed out? I noticed that you are not sending any request data, just using no. of requests. \$\endgroup\$ Commented Jul 15 at 8:34
  • \$\begingroup\$ Why would a second have passed? Xs999ms is X, then it goes to X+1s000ms (max) one ms later... \$\endgroup\$ Commented Aug 5 at 0:31

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.