LangChain’s rate limiting for LLM calls isn’t about stopping requests; it’s about gracefully handling the inevitable slowdowns from external services.
Let’s see this in action. Imagine you’re building a chatbot that summarizes long documents. You’ll be making many LLM calls, potentially in rapid succession. Without rate limiting, your application might hammer the LLM API, leading to errors and crashes.
Here’s a simplified Python snippet demonstrating the problem and then the solution:
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
import time
# --- Problematic Code (No Rate Limiting) ---
print("--- Running without rate limiting ---")
llm_no_limit = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
prompt_text = "Summarize the following text: {text}"
prompt = PromptTemplate(template=prompt_text, input_variables=["text"])
chain_no_limit = LLMChain(llm=llm_no_limit, prompt=prompt)
texts_to_summarize = ["This is a very long document part 1.", "This is a very long document part 2."] * 10 # Simulate many calls
start_time = time.time()
for i, text in enumerate(texts_to_summarize):
try:
result = chain_no_limit.run(text=text)
print(f"Summarized {i+1}/{len(texts_to_summarize)}: {result[:30]}...")
except Exception as e:
print(f"Error on call {i+1}: {e}")
# In a real app, you'd likely stop or implement retry here
break # Stop on first error for demonstration
end_time = time.time()
print(f"Finished without rate limiting in {end_time - start_time:.2f} seconds.\n")
# --- Code with Rate Limiting ---
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI # For demonstration, assuming OpenAI API
# Custom LLM wrapper with rate limiting
class RateLimitedLLM(LLM):
"""
A wrapper around an LLM that adds rate limiting.
"""
base_llm: LLM
rate_limit_per_minute: int
_last_call_time: float = 0
_calls_in_current_minute: int = 0
_minute_start_time: float = 0
def __init__(self, base_llm: LLM, rate_limit_per_minute: int = 60):
super().__init__()
self.base_llm = base_llm
self.rate_limit_per_minute = rate_limit_per_minute
self._minute_start_time = time.time()
@property
def _llm_type(self) -> str:
return self.base_llm._llm_type
def _call(
self,
prompt: str,
stop=None,
run_manager=None,
**kwargs,
) -> str:
current_time = time.time()
seconds_since_minute_start = current_time - self._minute_start_time
# Reset counters if a new minute has started
if seconds_since_minute_start > 60:
self._calls_in_current_minute = 0
self._minute_start_time = current_time
# Check if we've hit the rate limit
if self._calls_in_current_minute >= self.rate_limit_per_minute:
time_to_wait = 60 - seconds_since_minute_start
print(f"Rate limit hit. Waiting for {time_to_wait:.2f} seconds...")
time.sleep(time_to_wait)
# Reset for the new minute after waiting
self._calls_in_current_minute = 0
self._minute_start_time = time.time()
# Record the call and make it
self._last_call_time = current_time
self._calls_in_current_minute += 1
return self.base_llm._call(prompt, stop, run_manager, **kwargs)
async def _acall(
self,
prompt: str,
stop=None,
run_manager=None,
**kwargs,
) -> str:
# For simplicity, async version is not implemented here,
# but a real implementation would need careful async handling.
# For now, we'll just call the sync version.
return self._call(prompt, stop, run_manager, **kwargs)
# --- Using the Rate Limited LLM ---
print("--- Running with rate limiting (60 calls/min) ---")
# Use a real LLM instance, e.g., ChatOpenAI
# For demonstration purposes, we'll simulate a slow LLM
# In a real scenario, you'd wrap your actual ChatOpenAI instance:
# real_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# rate_limited_llm = RateLimitedLLM(base_llm=real_llm, rate_limit_per_minute=60)
# --- Simulation of a slow LLM for demonstration ---
class MockSlowLLM(LLM):
def _call(self, prompt: str, stop=None, run_manager=None, **kwargs) -> str:
time.sleep(0.5) # Simulate API latency
return f"Mock summary for: {prompt[:20]}..."
@property
def _llm_type(self) -> str:
return "mock_slow_llm"
mock_llm = MockSlowLLM()
rate_limited_llm = RateLimitedLLM(base_llm=mock_llm, rate_limit_per_minute=60) # Allow 60 calls/min
# --- End Simulation ---
prompt_template_text = "Summarize the following text: {text}"
prompt_template = PromptTemplate(template=prompt_template_text, input_variables=["text"])
chain_with_limit = LLMChain(llm=rate_limited_llm, prompt=prompt_template)
texts_to_summarize_long = ["This is a very long document part 1."] * 100 # Simulate many calls
start_time = time.time()
for i, text in enumerate(texts_to_summarize_long):
try:
result = chain_with_limit.run(text=text)
if (i + 1) % 10 == 0: # Print progress every 10 calls
print(f"Summarized {i+1}/{len(texts_to_summarize_long)}: {result[:30]}...")
except Exception as e:
print(f"Error on call {i+1}: {e}")
break # Stop on first error for demonstration
end_time = time.time()
print(f"Finished with rate limiting in {end_time - start_time:.2f} seconds.")
The core problem LangChain’s rate limiting addresses is preventing your application from overwhelming an LLM API with too many requests in a short period. LLM providers (like OpenAI, Anthropic, Google) enforce limits on how many requests you can make per minute or per second to ensure fair usage, prevent abuse, and maintain service stability. Exceeding these limits typically results in HTTP 429 "Too Many Requests" errors, which would crash your application if not handled.
LangChain provides mechanisms to implement this handling, most commonly through:
RetryErrorCallback: This is the most direct way. You configure LangChain to automatically retry failed calls that result from rate limiting errors.- Custom LLM Wrappers: As shown in the example above, you can wrap your LLM instance with a custom class that enforces delays before making a call if it detects it’s approaching a rate limit.
The RetryError callback is often simpler for basic rate limiting. You’d typically configure it within your LangChain ChatOpenAI or other LLM class initialization:
from langchain_openai import ChatOpenAI
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.retry import RetryCallback
# Define the retry strategy: retry 3 times with exponential backoff
retry_callback = RetryCallback(
exponential_multiplier=2, # Wait 1s, then 2s, then 4s
max_retries=3,
# You can specify which errors to retry, e.g., HTTP 429
# errors_to_retry=["429"]
)
# Initialize LLM with the retry callback
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0,
callback_manager=CallbackManager([retry_callback]),
# You might also need to configure the underlying client for specific retry logic
# For OpenAI, this often involves passing retry parameters to the client itself
# e.g., openai_api_key="...", max_retries=5, retry_interval_seconds=5
)
# Now, if an LLM call fails with a rate limit error (or other specified errors),
# LangChain will automatically retry it according to the callback's configuration.
The custom wrapper approach, as demonstrated in the code block, gives you more explicit control. It tracks the number of calls made within a rolling minute and time.sleep()s if the limit is about to be breached. This prevents the error from occurring in the first place, rather than reacting to it. This is generally more robust for strict rate limits because it doesn’t rely on the API returning a specific error code; it proactively manages the call rate.
The mental model is that your application acts as a polite guest at a busy party. You can’t just barge in and demand attention from everyone at once. You have to wait your turn. Rate limiting is the mechanism that enforces this politeness. The RetryCallback is like having a friend who nudges you and says, "Hey, wait a sec, they’re busy. Try again in 5 seconds." The custom wrapper is like you looking at the crowd, counting how many people are being served, and deciding yourself to wait a bit before approaching the counter.
The key levers you control are:
max_retries: How many times LangChain will attempt to re-run a failed call.exponential_multiplier: How much longer to wait between each retry (e.g., 1s, 2s, 4s, 8s).rate_limit_per_minute: In a custom wrapper, this is the absolute maximum calls allowed within a 60-second window.- Specific error codes to retry: For
RetryCallback, you can target specific HTTP status codes (like 429) or exceptions.
A crucial detail often missed is that many LLM providers also have per-request timeouts and internal retry mechanisms within their SDKs (like the openai Python package). LangChain’s rate limiting and retry callbacks work on top of these. Sometimes, you might configure retries both in LangChain’s callback and in the underlying SDK (e.g., passing max_retries=5 to openai.OpenAI(...)). This can lead to a cascade of retries. It’s usually best to let LangChain handle the application-level rate limiting and retries for logical flow, and ensure the underlying SDK is configured reasonably (perhaps fewer retries there, or none if LangChain is managing it). Overlapping retry logic can make debugging difficult and introduce unexpected delays.
The next concept you’ll likely encounter is handling LLM failures that are not rate limits, such as content moderation flags or outright API outages.