LangChain’s rate limiting for LLM calls isn’t about stopping requests; it’s about gracefully handling the inevitable slowdowns from external services.

Let’s see this in action. Imagine you’re building a chatbot that summarizes long documents. You’ll be making many LLM calls, potentially in rapid succession. Without rate limiting, your application might hammer the LLM API, leading to errors and crashes.

Here’s a simplified Python snippet demonstrating the problem and then the solution:

from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
import time

# --- Problematic Code (No Rate Limiting) ---
print("--- Running without rate limiting ---")
llm_no_limit = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
prompt_text = "Summarize the following text: {text}"
prompt = PromptTemplate(template=prompt_text, input_variables=["text"])
chain_no_limit = LLMChain(llm=llm_no_limit, prompt=prompt)

texts_to_summarize = ["This is a very long document part 1.", "This is a very long document part 2."] * 10 # Simulate many calls

start_time = time.time()
for i, text in enumerate(texts_to_summarize):
    try:
        result = chain_no_limit.run(text=text)
        print(f"Summarized {i+1}/{len(texts_to_summarize)}: {result[:30]}...")
    except Exception as e:
        print(f"Error on call {i+1}: {e}")
        # In a real app, you'd likely stop or implement retry here
        break # Stop on first error for demonstration
end_time = time.time()
print(f"Finished without rate limiting in {end_time - start_time:.2f} seconds.\n")


# --- Code with Rate Limiting ---
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI # For demonstration, assuming OpenAI API

# Custom LLM wrapper with rate limiting
class RateLimitedLLM(LLM):
    """
    A wrapper around an LLM that adds rate limiting.
    """
    base_llm: LLM
    rate_limit_per_minute: int
    _last_call_time: float = 0
    _calls_in_current_minute: int = 0
    _minute_start_time: float = 0

    def __init__(self, base_llm: LLM, rate_limit_per_minute: int = 60):
        super().__init__()
        self.base_llm = base_llm
        self.rate_limit_per_minute = rate_limit_per_minute
        self._minute_start_time = time.time()

    @property
    def _llm_type(self) -> str:
        return self.base_llm._llm_type

    def _call(
        self,
        prompt: str,
        stop=None,
        run_manager=None,
        **kwargs,
    ) -> str:
        current_time = time.time()
        seconds_since_minute_start = current_time - self._minute_start_time

        # Reset counters if a new minute has started
        if seconds_since_minute_start > 60:
            self._calls_in_current_minute = 0
            self._minute_start_time = current_time

        # Check if we've hit the rate limit
        if self._calls_in_current_minute >= self.rate_limit_per_minute:
            time_to_wait = 60 - seconds_since_minute_start
            print(f"Rate limit hit. Waiting for {time_to_wait:.2f} seconds...")
            time.sleep(time_to_wait)
            # Reset for the new minute after waiting
            self._calls_in_current_minute = 0
            self._minute_start_time = time.time()

        # Record the call and make it
        self._last_call_time = current_time
        self._calls_in_current_minute += 1

        return self.base_llm._call(prompt, stop, run_manager, **kwargs)

    async def _acall(
        self,
        prompt: str,
        stop=None,
        run_manager=None,
        **kwargs,
    ) -> str:
        # For simplicity, async version is not implemented here,
        # but a real implementation would need careful async handling.
        # For now, we'll just call the sync version.
        return self._call(prompt, stop, run_manager, **kwargs)

# --- Using the Rate Limited LLM ---
print("--- Running with rate limiting (60 calls/min) ---")
# Use a real LLM instance, e.g., ChatOpenAI
# For demonstration purposes, we'll simulate a slow LLM
# In a real scenario, you'd wrap your actual ChatOpenAI instance:
# real_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# rate_limited_llm = RateLimitedLLM(base_llm=real_llm, rate_limit_per_minute=60)

# --- Simulation of a slow LLM for demonstration ---
class MockSlowLLM(LLM):
    def _call(self, prompt: str, stop=None, run_manager=None, **kwargs) -> str:
        time.sleep(0.5) # Simulate API latency
        return f"Mock summary for: {prompt[:20]}..."
    @property
    def _llm_type(self) -> str:
        return "mock_slow_llm"

mock_llm = MockSlowLLM()
rate_limited_llm = RateLimitedLLM(base_llm=mock_llm, rate_limit_per_minute=60) # Allow 60 calls/min
# --- End Simulation ---


prompt_template_text = "Summarize the following text: {text}"
prompt_template = PromptTemplate(template=prompt_template_text, input_variables=["text"])
chain_with_limit = LLMChain(llm=rate_limited_llm, prompt=prompt_template)

texts_to_summarize_long = ["This is a very long document part 1."] * 100 # Simulate many calls

start_time = time.time()
for i, text in enumerate(texts_to_summarize_long):
    try:
        result = chain_with_limit.run(text=text)
        if (i + 1) % 10 == 0: # Print progress every 10 calls
            print(f"Summarized {i+1}/{len(texts_to_summarize_long)}: {result[:30]}...")
    except Exception as e:
        print(f"Error on call {i+1}: {e}")
        break # Stop on first error for demonstration
end_time = time.time()
print(f"Finished with rate limiting in {end_time - start_time:.2f} seconds.")

The core problem LangChain’s rate limiting addresses is preventing your application from overwhelming an LLM API with too many requests in a short period. LLM providers (like OpenAI, Anthropic, Google) enforce limits on how many requests you can make per minute or per second to ensure fair usage, prevent abuse, and maintain service stability. Exceeding these limits typically results in HTTP 429 "Too Many Requests" errors, which would crash your application if not handled.

LangChain provides mechanisms to implement this handling, most commonly through:

  1. RetryError Callback: This is the most direct way. You configure LangChain to automatically retry failed calls that result from rate limiting errors.
  2. Custom LLM Wrappers: As shown in the example above, you can wrap your LLM instance with a custom class that enforces delays before making a call if it detects it’s approaching a rate limit.

The RetryError callback is often simpler for basic rate limiting. You’d typically configure it within your LangChain ChatOpenAI or other LLM class initialization:

from langchain_openai import ChatOpenAI
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.retry import RetryCallback

# Define the retry strategy: retry 3 times with exponential backoff
retry_callback = RetryCallback(
    exponential_multiplier=2,  # Wait 1s, then 2s, then 4s
    max_retries=3,
    # You can specify which errors to retry, e.g., HTTP 429
    # errors_to_retry=["429"]
)

# Initialize LLM with the retry callback
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0,
    callback_manager=CallbackManager([retry_callback]),
    # You might also need to configure the underlying client for specific retry logic
    # For OpenAI, this often involves passing retry parameters to the client itself
    # e.g., openai_api_key="...", max_retries=5, retry_interval_seconds=5
)

# Now, if an LLM call fails with a rate limit error (or other specified errors),
# LangChain will automatically retry it according to the callback's configuration.

The custom wrapper approach, as demonstrated in the code block, gives you more explicit control. It tracks the number of calls made within a rolling minute and time.sleep()s if the limit is about to be breached. This prevents the error from occurring in the first place, rather than reacting to it. This is generally more robust for strict rate limits because it doesn’t rely on the API returning a specific error code; it proactively manages the call rate.

The mental model is that your application acts as a polite guest at a busy party. You can’t just barge in and demand attention from everyone at once. You have to wait your turn. Rate limiting is the mechanism that enforces this politeness. The RetryCallback is like having a friend who nudges you and says, "Hey, wait a sec, they’re busy. Try again in 5 seconds." The custom wrapper is like you looking at the crowd, counting how many people are being served, and deciding yourself to wait a bit before approaching the counter.

The key levers you control are:

  • max_retries: How many times LangChain will attempt to re-run a failed call.
  • exponential_multiplier: How much longer to wait between each retry (e.g., 1s, 2s, 4s, 8s).
  • rate_limit_per_minute: In a custom wrapper, this is the absolute maximum calls allowed within a 60-second window.
  • Specific error codes to retry: For RetryCallback, you can target specific HTTP status codes (like 429) or exceptions.

A crucial detail often missed is that many LLM providers also have per-request timeouts and internal retry mechanisms within their SDKs (like the openai Python package). LangChain’s rate limiting and retry callbacks work on top of these. Sometimes, you might configure retries both in LangChain’s callback and in the underlying SDK (e.g., passing max_retries=5 to openai.OpenAI(...)). This can lead to a cascade of retries. It’s usually best to let LangChain handle the application-level rate limiting and retries for logical flow, and ensure the underlying SDK is configured reasonably (perhaps fewer retries there, or none if LangChain is managing it). Overlapping retry logic can make debugging difficult and introduce unexpected delays.

The next concept you’ll likely encounter is handling LLM failures that are not rate limits, such as content moderation flags or outright API outages.

Want structured learning?

Take the full Langchain course →