Rate Limiting
Defense that controls request frequency to AI systems, protecting against model extraction, denial of service, and systematic exploitation attempts.
Definition
Rate limiting for AI systems controls how frequently users or applications can query the model, preventing abuse through excessive requests. While conceptually similar to traditional API rate limiting, AI-specific rate limiting must account for the unique threats against ML systems, including extraction attacks that require many queries to succeed.
Effective rate limiting doesn't just prevent denial of service—it raises the economic cost of attacks that rely on systematic querying, making model extraction and training data extraction significantly harder.
AI-Specific Threats Addressed
Model Extraction
Stealing model functionality requires thousands to millions of queries to build a surrogate model. Rate limiting makes this:
- Time-consuming (weeks/months instead of hours)
- Expensive (API costs for distributed attacks)
- Detectable (sustained high-volume patterns)
Training Data Extraction
Extracting memorized training data requires diverse prompts and sampling. Limits on:
- Requests per time window
- High-temperature generations
- Completion length
Systematic Jailbreak Attempts
Automated jailbreak discovery requires testing many prompt variations. Rate limiting slows adversarial prompt search.
Cost Amplification (DoS)
AI inference is computationally expensive. Attackers can cause financial damage through:
- Long input/output sequences consuming GPU time
- Complex reasoning tasks requiring multiple inference passes
- Batch requests overwhelming infrastructure
Rate Limiting Strategies
Request-Based Limits
# Simple request counting
class RequestRateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def allow_request(self, user_id: str) -> bool:
now = time.time()
window_start = now - self.window_seconds
# Clean old requests
self.requests[user_id] = [
t for t in self.requests[user_id]
if t > window_start
]
if len(self.requests[user_id]) >= self.max_requests:
return False
self.requests[user_id].append(now)
return True
# Example: 100 requests per minute
limiter = RequestRateLimiter(max_requests=100, window_seconds=60) Token-Based Limits
More granular control accounting for request complexity:
class TokenRateLimiter:
def __init__(self, max_tokens: int, window_seconds: int):
self.max_tokens = max_tokens
self.window_seconds = window_seconds
self.usage = defaultdict(list)
def allow_request(self, user_id: str, estimated_tokens: int) -> bool:
now = time.time()
window_start = now - self.window_seconds
# Calculate tokens used in window
recent_usage = [
(t, tokens) for t, tokens in self.usage[user_id]
if t > window_start
]
total_tokens = sum(tokens for _, tokens in recent_usage)
if total_tokens + estimated_tokens > self.max_tokens:
return False
self.usage[user_id].append((now, estimated_tokens))
return True
# Example: 100K tokens per hour
token_limiter = TokenRateLimiter(max_tokens=100000, window_seconds=3600) Cost-Based Limits
class CostBasedLimiter:
"""Limit based on actual compute cost"""
def __init__(self, max_cost_per_day: float):
self.max_cost = max_cost_per_day
self.daily_costs = defaultdict(float)
def track_cost(self, user_id: str, input_tokens: int, output_tokens: int):
# Example pricing (adjust to actual costs)
cost = (input_tokens * 0.00001) + (output_tokens * 0.00003)
self.daily_costs[user_id] += cost
def allow_request(self, user_id: str) -> bool:
return self.daily_costs[user_id] < self.max_cost Behavioral Limits
Limits based on usage patterns rather than raw counts:
- Burst detection — Flag sudden increases in request rate
- Diversity scoring — Limit users making highly similar requests
- Session limits — Cap total queries per conversation/session
- Entropy monitoring — Flag suspiciously systematic query patterns
Implementation Architecture
Layered Rate Limiting
┌─────────────────────────────────────────────────────────┐
│ CDN/Edge Layer │
│ Global rate limits, DDoS protection │
└────────────────────────┬────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ API Gateway Layer │
│ Per-user/API-key rate limits │
│ Authentication, request validation │
└────────────────────────┬────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ Application Layer │
│ Token/cost-based limits │
│ Behavioral analysis │
│ Model-specific limits │
└────────────────────────┬────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ Model Layer │
│ Queue management │
│ Priority scheduling │
└─────────────────────────────────────────────────────────┘ Distributed Rate Limiting
# Redis-based distributed rate limiter
import redis
class DistributedRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def allow_request(self, user_id: str, limit: int, window: int) -> bool:
key = f"ratelimit:{user_id}"
pipe = self.redis.pipeline()
now = time.time()
window_start = now - window
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current entries
pipe.zcard(key)
# Add new entry
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window)
_, count, _, _ = pipe.execute()
return count < limit Evasion and Countermeasures
Common Evasion Techniques
| Technique | Method | Countermeasure |
|---|---|---|
| Account rotation | Multiple accounts | Device fingerprinting, payment verification |
| IP rotation | Proxies, VPNs | Account-based limits, behavior analysis |
| Slow and low | Stay under thresholds | Aggregate limits, anomaly detection |
| Distributed attacks | Botnet distribution | Global rate limits, ML detection |
Adaptive Rate Limiting
class AdaptiveRateLimiter:
def __init__(self):
self.base_limits = {"requests_per_min": 60}
self.user_scores = defaultdict(lambda: 1.0)
def get_limit(self, user_id: str) -> int:
"""Adjust limits based on trust score"""
score = self.user_scores[user_id]
return int(self.base_limits["requests_per_min"] * score)
def update_score(self, user_id: str, behavior: dict):
"""Adjust trust based on behavior"""
if behavior.get("suspicious_patterns"):
self.user_scores[user_id] *= 0.5 # Reduce limit
elif behavior.get("good_standing"):
self.user_scores[user_id] = min(2.0, self.user_scores[user_id] * 1.1) Best Practices
- Multiple limit types — Combine requests, tokens, and cost limits
- Graceful degradation — Queue requests rather than hard reject where possible
- Clear communication — Return rate limit headers (X-RateLimit-*)
- Tier-based limits — Different limits for different user tiers
- Monitor and adjust — Track legitimate vs. abusive patterns
- Log everything — Enable forensic analysis of attacks
References
- MITRE (2023). "AML.M0004: Restrict Number of ML Model Queries." ATLAS Framework.
- OWASP (2023). "Rate Limiting." API Security Top 10.
- Tramèr, F. et al. (2016). "Stealing Machine Learning Models via Prediction APIs." USENIX Security.
Framework Mappings
| Framework | Reference |
|---|---|
| MITRE ATLAS | AML.M0004: Restrict Number of ML Model Queries |
| OWASP LLM Top 10 | LLM04: Model Denial of Service (Mitigation) |
| NIST CSF | PR.PT-4: Communications protection |
Related Entries
Citation
Aizen, K. (2025). "Rate Limiting." AI Security Wiki, snailsploit.com. Retrieved from https://snailsploit.com/ai-security/wiki/defenses/rate-limiting/