LlamaIndex streaming ingestion makes the "freshness" of your data an illusion, transforming it into a constant, flowing river rather than a static lake.
Let’s watch it work. Imagine we have a simple log file that’s being appended to in real-time. We want LlamaIndex to ingest these new log entries as they appear, making them immediately searchable.
import os
import time
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext, load_index_from_storage
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.readers import CustomReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.embeddings import OpenAIEmbedding
from llama_index.core.response.notebook_utils import display_response
# Define a custom reader for real-time log files
class RealtimeLogReader(CustomReader):
def __init__(self, file_path, **kwargs):
self.file_path = file_path
self.last_line_read = 0
def load_data(self, extra_info=None):
documents = []
current_line_num = 0
try:
with open(self.file_path, 'r') as f:
# Read up to the last line we saw
for line in f:
current_line_num += 1
if current_line_num > self.last_line_read:
documents.append(f"Log Entry: {line.strip()}")
self.last_line_read = current_line_num
except FileNotFoundError:
print(f"Log file not found: {self.file_path}")
return documents
# --- Setup ---
# Create a dummy log file
log_file_path = "realtime.log"
if not os.path.exists(log_file_path):
with open(log_file_path, "w") as f:
f.write("Initial log entry 1\n")
f.write("Initial log entry 2\n")
# Define storage path
storage_path = "./storage"
if not os.path.exists(storage_path):
os.makedirs(storage_path)
# Initialize components
embed_model = OpenAIEmbedding()
node_parser = SentenceSplitter(chunk_size=512, chunk_overlap=20)
pipeline = IngestionPipeline(
transformations=[node_parser, embed_model],
show_progress=True
)
# --- Initial Ingestion ---
print("--- Performing initial ingestion ---")
reader = RealtimeLogReader(file_path=log_file_path)
documents = reader.load_data() # Load initial data
if os.path.exists(storage_path) and os.listdir(storage_path):
print("Loading existing index...")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
index = load_index_from_storage(storage_context, embed_model=embed_model)
else:
print("Creating new index...")
index = VectorStoreIndex.from_documents(documents, transformations=[node_parser, embed_model])
index.storage_context.persist(persist_dir=storage_path)
query_engine = index.as_query_engine()
print("Initial index created. You can query it.")
# --- Simulate Real-time Updates ---
print("\n--- Simulating real-time updates ---")
# Append new log entries to the file
with open(log_file_path, "a") as f:
f.write("New log entry 3 arriving now...\n")
time.sleep(1) # Simulate delay
f.write("Another log entry 4, very important\n")
time.sleep(1)
f.write("Final log entry 5 for this batch\n")
print("New log entries appended. Re-ingesting...")
# Re-ingest only the new data
# The RealtimeLogReader will only return lines it hasn't seen before
new_documents = reader.load_data()
if new_documents:
print(f"Found {len(new_documents)} new documents to ingest.")
# Use the existing index and storage context
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
index = load_index_from_storage(storage_context, embed_model=embed_model) # Load existing index
# Use the pipeline to process and add new documents
nodes = pipeline(new_documents)
index.insert_nodes(nodes)
index.storage_context.persist(persist_dir=storage_path) # Persist the updated index
print("New documents ingested and index updated.")
else:
print("No new documents found to ingest.")
# --- Querying the Updated Index ---
print("\n--- Querying the updated index ---")
query_engine = index.as_query_engine()
response = query_engine.query("What was the very important log entry?")
print(f"Query: What was the very important log entry?")
display_response(response)
response = query_engine.query("Tell me about log entry 3.")
print(f"\nQuery: Tell me about log entry 3.")
display_response(response)
The core problem LlamaIndex streaming ingestion solves is keeping a vector index synchronized with data that changes after the initial ingestion. Traditional methods involve re-indexing the entire dataset, which is inefficient and costly for large, frequently updated datasets. Streaming ingestion allows for incremental updates, only processing and adding new or modified data.
Here’s how it works internally:
- Custom Reader: You need a way to detect and load only the new data. This is where
CustomReadercomes in. In our example,RealtimeLogReadertracks the last line number read. Whenload_data()is called again, it only returns lines beyond that point. This is crucial for efficiency. - Storage Context and Index Loading: When you want to update an index, you first load the existing one using
StorageContext.from_defaults(persist_dir=storage_path)andload_index_from_storage(). This gives you the current state of your knowledge base. - Ingestion Pipeline: The
IngestionPipelineis used to process the newdocumentsloaded by your custom reader. It applies the same transformations (likeSentenceSplitterandOpenAIEmbedding) used during the initial ingestion. This ensures consistency. insert_nodes(): This is the key method for incremental updates. Instead of rebuilding the index,index.insert_nodes(nodes)takes the processednodes(which are chunks of your new data with their embeddings) and adds them to the existing vector store and index structures.- Persistence: After inserting new nodes, you must persist the index again (
index.storage_context.persist()) to save the changes.
The levers you control are primarily within your CustomReader and how you orchestrate the re-ingestion process. You decide when to check for new data (e.g., on a timer, triggered by an event) and how your reader identifies new or changed data. The IngestionPipeline configuration (chunking strategy, embedding model) should remain consistent with the initial ingestion to ensure compatibility.
The one thing most people don’t realize is that the RealtimeLogReader doesn’t magically know what’s new; it relies on its internal state (self.last_line_read) being preserved across calls. When you load_index_from_storage, you’re loading the index, but the RealtimeLogReader instance is a separate object. You need to ensure that either the reader instance itself is re-instantiated with the correct last_line_read state (e.g., by loading it from a saved state, or by re-reading the file state) or, as shown in the example, you manage the reader’s state by calling reader.load_data() after the file has been updated. The reader’s state should persist between calls to load_data() within the same application run, and ideally, you’d save and load the reader’s state if the application restarts.
The next challenge you’ll face is managing data deletions or updates to existing entries, not just new additions.