Documentation Index
Fetch the complete documentation index at: https://ibm-8c0b5b62-gpt-researcher-integration.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Retrieval Augmented Generation (RAG) is one of the keystones for efficient data processing and search in the age of AI agents. The goal is to surface
information from a knowledge base relevant to a specific user query and provide curated context to the LLM. This is
a complex topic with many variants. We will focus on the fundamental building blocks that any RAG pipeline needs.
The document processing pipeline:
- text extraction - process complex document formats (PDF, CSV, etc.)
- text splitting - create meaningful chunks out of long pages of text
- embedding - vectorize chunks (extract semantic meaning)
- store - insert chunks to a specialized database
Retrieval:
- embedding - vectorize user query
- search - retrieve the document chunks most similar to the user query
Building blocks
Let’s break down how each step can be implemented with the Agent Stack API, but first, make sure you have the
Platform API extension enabled in your agent:
from typing import Annotated
from a2a.types import Message
from agentstack_sdk.server import Server
from agentstack_sdk.a2a.extensions import (
PlatformApiExtensionServer,
PlatformApiExtensionSpec,
EmbeddingServiceExtensionServer,
EmbeddingServiceExtensionSpec,
)
from agentstack_sdk.server.context import RunContext
# File formats supported by the text-extraction service (docling)
default_input_modes = [
"text/plain",
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", # DOCX
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # XLSX
"application/vnd.openxmlformats-officedocument.presentationml.presentation", # PPTX
"text/markdown", # Markdown
"text/asciidoc", # AsciiDoc
"text/html", # HTML
"application/xhtml+xml", # XHTML
"text/csv", # CSV
"image/png", # PNG
"image/jpeg", # JPEG
"image/tiff", # TIFF
"image/bmp", # BMP
"image/webp", # WEBP
]
server = Server()
@server.agent(
default_input_modes=default_input_modes, default_output_modes=["text/plain"]
)
async def rag_agent(
input: Message,
context: RunContext,
embedding: Annotated[
EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()
],
_: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
): ... # Agent code
Agent Stack uses docling for extracting text out of documents in various
supported formats. To select which formats the
agent can accept, use the default_input_modes parameter in the agent decorator.
First, let’s build a set of functions to process the documents which we will then use in the agent.
To extract text from a File uploaded to the Platform API, simply use file.create_extraction() and wait for
the result. After extraction is completed, the extraction object will contain
extracted_files, which is a list of extracted files in different formats.
from agentstack_sdk.platform import File, Extraction
import asyncio
async def extract_file(file: File):
extraction = await file.create_extraction()
while extraction.status in {"pending", "in_progress"}:
await asyncio.sleep(1)
extraction = await file.get_extraction()
if extraction.status != "completed":
raise ValueError(f"Extraction failed with status: {extraction.status}")
Text extraction produces two extraction formats and you can request either subset by passing formats to create_extraction (e.g., ["markdown"] if you only need plain text):
- markdown: The extracted text formatted as Markdown (
file.load_text_content())
- vendor_specific_json: The Docling-specific JSON format containing document structure (
file.load_json_content())
WARNING:
The vendor_specific_json format is not generated for plain text or markdown files, as Docling does not support these formats as input.
Text Splitting
In this example we will use MarkdownTextSplitter from the
langchain-text-splitters package.
This will split a long document into reasonably sized chunks based on the Markdown header structure.
from langchain_text_splitters import MarkdownTextSplitter
def chunk_markdown(markdown_text: str) -> list[str]:
return MarkdownTextSplitter().split_text(markdown_text)
Embedding
Now we need to embed each chunk using the embedding service. Similarly to LLM, Agent Stack implements
OpenAI-compatible embedding API. You can use any preferred client, in this example we will use the embedding extension
to create an AsyncOpenAI client:
from openai import AsyncOpenAI
from agentstack_sdk.a2a.extensions import EmbeddingServiceExtensionServer
def get_embedding_client(
embedding: EmbeddingServiceExtensionServer,
) -> tuple[AsyncOpenAI, str]:
if not embedding or not embedding.data:
raise ValueError("Embedding extension not provided")
embedding_config = embedding.data.embedding_fulfillments.get("default")
if not embedding_config:
raise ValueError("Default embedding configuration not found")
embedding_client = AsyncOpenAI(
api_key=embedding_config.api_key, base_url=embedding_config.api_base
)
embedding_model = embedding_config.api_model
return embedding_client, embedding_model
Now we can use this client to embed our chunks and create vector store items:
from openai import AsyncOpenAI
from agentstack_sdk.platform import VectorStoreItem, File
async def embed_chunks(
file: File, chunks: list[str], embedding_client: AsyncOpenAI, embedding_model: str
) -> list[VectorStoreItem]:
vector_store_items = []
embedding_result = await embedding_client.embeddings.create(
input=chunks,
model=embedding_model,
encoding_format="float",
)
for i, embedding_data in enumerate(embedding_result.data):
item = VectorStoreItem(
document_id=file.id,
document_type="platform_file",
model_id=embedding_model,
text=chunks[i],
embedding=embedding_data.embedding,
metadata={"chunk_index": str(i)}, # add arbitrary string metadata
)
vector_store_items.append(item)
return vector_store_items
Store
Finally, to insert the prepared items, we need a function to create a vector store. For this we will need to know
the dimension of the embeddings and model_id. Because the model is chosen by the embedding extension and we don’t know
it in advance, we will create a test embedding request to calculate the dimension:
from openai import AsyncOpenAI
from agentstack_sdk.platform import VectorStore
async def create_vector_store(embedding_client: AsyncOpenAI, embedding_model: str):
embedding_response = await embedding_client.embeddings.create(
input="test", model=embedding_model
)
dimension = len(embedding_response.data[0].embedding)
return await VectorStore.create(
name="rag-example",
dimension=dimension,
model_id=embedding_model,
)
We can then add the prepared items using vector_store.add_documents, this will become clear in the final example.
Query vector store
Assuming we have our knowledge base of documents prepared, we can now easily search the store according to the user
query. The following function will retrieve five document chunks most similar to the query embedding:
from openai import AsyncOpenAI
from agentstack_sdk.platform import VectorStore, VectorStoreSearchResult
async def search_vector_store(
vector_store: VectorStore,
query: str,
embedding_client: AsyncOpenAI,
embedding_model: str,
) -> list[VectorStoreSearchResult]:
embedding_response = await embedding_client.embeddings.create(
input=query, model=embedding_model
)
query_vector = embedding_response.data[0].embedding
return await vector_store.search(query_vector=query_vector, limit=5)
Putting all together
Having all the pieces in place, we can now build the agent.
Simple agent
This is a simplified agent that expects a message with one or more files attached as FilePart and a
user query as TextPart. A new vector store is created for each message.
@server.agent(
default_input_modes=default_input_modes,
default_output_modes=["text/plain"],
)
async def rag_agent(
input: Message,
context: RunContext,
embedding: Annotated[
EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()
],
_: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
) -> AsyncGenerator[RunYield, None]:
# Create embedding client
embedding_client, embedding_model = get_embedding_client(embedding)
# Extract files and query from input
files = []
query = ""
for part in input.parts:
match part.root:
case FilePart(file=FileWithUri(uri=uri)):
files.append(await File.get(PlatformFileUrl(uri).file_id))
case TextPart(text=text):
query = text
case _:
raise NotImplementedError(f"Unsupported part: {type(part.root)}")
if not files or not query:
raise ValueError("No files or query provided")
# Create vector store
vector_store = await create_vector_store(embedding_client, embedding_model)
# Process files, add to vector store
for file in files:
await extract_file(file)
async with file.load_text_content() as loaded_file:
chunks = chunk_markdown(loaded_file.text)
items = await embed_chunks(file, chunks, embedding_client, embedding_model)
await vector_store.add_documents(items=items)
# Search vector store
results = await search_vector_store(
vector_store, query, embedding_client, embedding_model
)
# TODO: You can add LLM result processing here
snippet = [res.model_dump() for res in results]
yield f"# Results:\n```\n{json.dumps(snippet, indent=2)}\n```"
Instead of simply returning the output of the vector store, you would typically plug this as a tool into your favorite
agentic framework.
Conversational agent
Having a new vector store for each message is not really a good practice. Typically, you would want to search through
all documents uploaded in the conversation. Below is a version of the agent which will reuse the vector store across
messages so you can ask multiple queries and or additional documents later on.
@server.agent(
default_input_modes=default_input_modes,
default_output_modes=["text/plain"],
)
async def rag_agent(
input: Message,
context: RunContext,
embedding: Annotated[
EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()
],
_: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
) -> AsyncGenerator[RunYield, None]:
# Create embedding client
embedding_client, embedding_model = get_embedding_client(embedding)
# Extract files and query from input
files = []
query = ""
for part in input.parts:
match part.root:
case FilePart(file=FileWithUri(uri=uri)):
files.append(await File.get(PlatformFileUrl(uri).file_id))
case TextPart(text=text):
query = text
case _:
raise NotImplementedError(f"Unsupported part: {type(part.root)}")
# Check if vector store exists
vector_store = None
async for message in context.load_history():
match message:
case Message(parts=[Part(root=DataPart(data=data))]):
vector_store = await VectorStore.get(data["vector_store_id"])
# Create vector store if it does not exist
if not vector_store:
vector_store = await create_vector_store(embedding_client, embedding_model)
# store vector store id in context for future messages
data_part = DataPart(data={"vector_store_id": vector_store.id})
await context.store(AgentMessage(parts=[data_part]))
# Process files, add to vector store
for file in files:
await extract_file(file)
async with file.load_text_content() as loaded_file:
chunks = chunk_markdown(loaded_file.text)
items = await embed_chunks(file, chunks, embedding_client, embedding_model)
await vector_store.add_documents(items=items)
# Search vector store
if query:
results = await search_vector_store(
vector_store, query, embedding_client, embedding_model
)
snippet = [res.model_dump() for res in results]
# TODO: You can add LLM result processing here
yield f"# Results:\n```\n{json.dumps(snippet, indent=2)}\n```"
elif files:
yield f"{len(files)} file(s) processed"
else:
yield "Nothing to do"
Next steps
To further improve the agent, learn how to use other parts of the platform such as LLMs,
file uploads and conversations: