πͺ£ Token Bucket Problem - Rate Limiting in Concurrency
πΉ Why in Concurrency?
Section titled βπΉ Why in Concurrency?βIn multithreading, multiple threads may try to perform operations at the same time:
- Without control, they can overload the system (e.g., spam requests, write too much data, overwhelm CPU)
- The Token Bucket algorithm provides a thread-safe way to throttle these operations
πΉ Real-world Examples
Section titled βπΉ Real-world Examplesβ- API rate limiting: Prevents a user from sending more than 100 requests per minute
- Network bandwidth control: Limit upload/download speed
- Resource protection: Stop too many concurrent threads from overwhelming a database
- Database connection pooling: Limit concurrent database connections
- File I/O throttling: Control disk write operations per second
πΉ Core Algorithm
Section titled βπΉ Core AlgorithmβParameters:
capacityβ max number of tokens in the bucketrefillRateβ tokens added per second
Steps:
- Track last refill time
- Before allowing an operation, refill tokens based on elapsed time
- If at least 1 token is available β consume it, proceed
- Else β wait/reject
π» Code (with comments)
Section titled βπ» Code (with comments)βimport java.util.concurrent.locks.ReentrantLock;
public class TokenBucket { private final int capacity; // Maximum tokens in bucket private final int refillRate; // Tokens added per second private double tokens; // Current available tokens private long lastRefillTimestamp; // Last time tokens were refilled private final ReentrantLock lock = new ReentrantLock(); // Thread safety
public TokenBucket(int capacity, int refillRate) { this.capacity = capacity; this.refillRate = refillRate; this.tokens = capacity; // Start with full bucket this.lastRefillTimestamp = System.nanoTime(); }
public boolean tryConsume() { lock.lock(); try { refill(); // Refill tokens based on elapsed time if (tokens >= 1) { tokens -= 1; // Consume one token return true; } return false; // No tokens available } finally { lock.unlock(); } }
private void refill() { long now = System.nanoTime(); // Calculate tokens to add based on elapsed time double tokensToAdd = ((now - lastRefillTimestamp) / 1e9) * refillRate; if (tokensToAdd > 0) { // Add tokens, but don't exceed capacity tokens = Math.min(capacity, tokens + tokensToAdd); lastRefillTimestamp = now; } }
public double getAvailableTokens() { lock.lock(); try { refill(); // Update tokens before returning return tokens; } finally { lock.unlock(); } }}Enhanced Implementation with Waiting Support
Section titled βEnhanced Implementation with Waiting Supportβimport java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;
public class TokenBucketWithWait { private final int capacity; private final int refillRate; private double tokens; private long lastRefillTimestamp; private final ReentrantLock lock = new ReentrantLock(); private final Condition tokenAvailable = lock.newCondition();
public TokenBucketWithWait(int capacity, int refillRate) { this.capacity = capacity; this.refillRate = refillRate; this.tokens = capacity; this.lastRefillTimestamp = System.nanoTime(); }
public void consume() throws InterruptedException { lock.lock(); try { while (true) { refill(); if (tokens >= 1) { tokens -= 1; return; // Token consumed successfully } // Wait for tokens to become available tokenAvailable.await(); } } finally { lock.unlock(); } }
public boolean tryConsume(long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { long deadline = System.nanoTime() + unit.toNanos(timeout); while (true) { refill(); if (tokens >= 1) { tokens -= 1; return true; // Token consumed successfully }
long remaining = deadline - System.nanoTime(); if (remaining <= 0) { return false; // Timeout expired }
// Wait with timeout tokenAvailable.await(remaining, TimeUnit.NANOSECONDS); } } finally { lock.unlock(); } }
private void refill() { long now = System.nanoTime(); double tokensToAdd = ((now - lastRefillTimestamp) / 1e9) * refillRate; if (tokensToAdd > 0) { tokens = Math.min(capacity, tokens + tokensToAdd); lastRefillTimestamp = now;
// Notify waiting threads if tokens are available if (tokens >= 1) { tokenAvailable.signalAll(); } } }}π§ͺ Client Test Example
Section titled βπ§ͺ Client Test Exampleβpublic class Main { public static void main(String[] args) { TokenBucket bucket = new TokenBucket(5, 2); // 5 tokens max, 2 tokens/sec
Runnable task = () -> { for (int i = 0; i < 5; i++) { if (bucket.tryConsume()) { System.out.println(Thread.currentThread().getName() + " got a token β
"); } else { System.out.println(Thread.currentThread().getName() + " blocked β"); } try { Thread.sleep(300); // simulate work } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } };
// Multiple threads competing new Thread(task, "Thread-1").start(); new Thread(task, "Thread-2").start(); }}π Output Behavior (conceptually)
Section titled βπ Output Behavior (conceptually)βThread-1 got a token β
Thread-2 got a token β
Thread-1 got a token β
Thread-2 blocked βThread-1 blocked βThread-2 got a token β
(after refill)...