Event-driven agent pipelines in LlamaIndex transform static LLM calls into dynamic, responsive systems that react to new information.
Let’s build a simple event-driven pipeline. Imagine we have a system that monitors incoming customer support tickets. When a new ticket arrives, we want an agent to classify its urgency and then, if it’s urgent, another agent to draft a response.
Here’s how we can set that up with LlamaIndex. First, we need our agents. We’ll use the OpenAIAgent for simplicity, but you could swap this with any LlamaIndex agent.
from llama_index.core.agent import FunctionCallingAgent, FunctionCallingTool
from llama_index.llms.openai import OpenAI
from llama_index.core.schema import Document
from llama_index.core.workflow import Workflow
from llama_index.core.workflow.types import WorkflowSpec
# Assume you have your OpenAI API key set as an environment variable
llm = OpenAI(model="gpt-3.5-turbo")
# Agent 1: Classify ticket urgency
def classify_urgency(ticket_text: str) -> str:
"""Classifies the urgency of a customer support ticket."""
prompt = f"Classify the urgency of the following ticket: '{ticket_text}'. Respond with 'Urgent', 'Medium', or 'Low'."
response = llm.chat(prompt).message.content
return response
urgency_classifier_tool = FunctionCallingTool.from_defaults(
fn=classify_urgency,
name="urgency_classifier",
description="Classifies the urgency of a customer support ticket."
)
urgency_agent = FunctionCallingAgent(llm=llm, tools=[urgency_classifier_tool])
# Agent 2: Draft response (only if urgent)
def draft_response(ticket_text: str) -> str:
"""Drafts a polite response to a customer support ticket."""
prompt = f"Draft a polite and helpful response to the following customer support ticket: '{ticket_text}'."
response = llm.chat(prompt).message.content
return response
response_drafter_tool = FunctionCallingTool.from_defaults(
fn=draft_response,
name="response_drafter",
description="Drafts a polite response to a customer support ticket."
)
response_agent = FunctionCallingAgent(llm=llm, tools=[response_drafter_tool])
Now, we define the workflow. This workflow will take a ticket, pass it to the urgency classifier, and then conditionally pass it to the response drafter.
# Define the workflow specification
workflow_spec = WorkflowSpec(
entry_point="classify_ticket",
states={
"classify_ticket": {
"type": "agent",
"agent": urgency_agent,
"next_state": {
"Urgent": "draft_response",
"Medium": "handle_medium_priority", # We'll define this later
"Low": "handle_low_priority", # And this
},
"input_key": "ticket_text",
},
"draft_response": {
"type": "agent",
"agent": response_agent,
"next_state": {"response_generated": "end"},
"input_key": "ticket_text",
},
"handle_medium_priority": {
"type": "function",
"function": lambda ticket_text: print(f"Handling medium priority ticket: {ticket_text}"),
"next_state": {"done": "end"},
},
"handle_low_priority": {
"type": "function",
"function": lambda ticket_text: print(f"Handling low priority ticket: {ticket_text}"),
"next_state": {"done": "end"},
},
"end": {"type": "end"}
}
)
# Instantiate the workflow
workflow = Workflow(workflow_spec=workflow_spec)
The WorkflowSpec is the blueprint. entry_point is where it starts. states define the steps. We have agent states (type: "agent") and function states (type: "function"). Notice how classify_ticket has next_state that branches based on the output of the urgency_agent. If the output is "Urgent", it moves to draft_response. Otherwise, it goes to simpler handling states.
To trigger this, we simulate an incoming ticket.
# Simulate an incoming urgent ticket
print("--- Processing Urgent Ticket ---")
urgent_ticket = "My website is down and I'm losing thousands of dollars per hour! This is a critical issue!"
output = workflow.run(ticket_text=urgent_ticket)
print(f"Workflow output for urgent ticket: {output}")
# Simulate an incoming low priority ticket
print("\n--- Processing Low Priority Ticket ---")
low_priority_ticket = "I have a question about your pricing page."
output = workflow.run(ticket_text=low_priority_ticket)
print(f"Workflow output for low priority ticket: {output}")
When you run this, you’ll see the agents execute their functions. For the urgent ticket, the urgency_agent will output "Urgent", triggering the response_agent. For the low priority ticket, the urgency_agent will output "Low", triggering the handle_low_priority function and ending the workflow.
The core idea is that the output of one state becomes the input (or a condition for the next state). This allows for complex, multi-agent logic to be defined declaratively. You can chain agents, call external tools, and handle different branches of logic based on LLM outputs.
Here’s a crucial detail: the next_state mapping in agent states is designed to consume the direct string output of the agent’s primary tool or function. If your agent can return structured data (like a JSON object), you’d typically need an intermediate step or a more sophisticated next_state handler that can parse that structure to determine the next path. For simpler use cases like this, relying on a single, clear string output from the LLM is the most straightforward way to drive workflow transitions.
The next frontier is handling asynchronous events and managing complex state across multiple, independent workflow runs, which LlamaIndex is also evolving to support.