Llamaindex Query Pipelines: Quickstart Guide to the Declarative Query API
Author(s): Eduardo MuΓ±oz
Originally published on Towards AI.
Introduction
Query Pipelines is a new declarative API to orchestrate simple-to-advanced workflows within LlamaIndex to query over your data. Other frameworks have built similar approaches, an easier way to build LLM workflows over your data like RAG systems, query unstructured data or structured data extraction.
Itβs based on the QueryPipeline
abstraction. You can load in an extensive variety of modules (from LLMs to prompts to retrievers to query engines to other pipelines), connect them all together into a sequential chain or DAG, and run it end2end.
You can compose both sequential chains and directed acyclic graphs (DAGs) of arbitrary complexity in a more comprehensive style. Instead of building them imperatively with LlamaIndex modules, Query Pipelines provides you with a more efficient way and with fewer lines of code.
So what are the advantages of
QueryPipeline
?Express common workflows with fewer lines of code/boilerplate
Greater readability
Greater parity / better integration points with common low-code / no-code solutions (e.g. LangFlow)
[In the future] A declarative interface allows easy serializability of pipeline components, providing portability of pipelines/easier deployment to different systems.
Introduction to Llamaindex Query Pipelines in Llamaindex docs [1]
You can get detailed information in the Llamaindex documentation [2] or in the article by Jerry Liu, Llamaindex founder, Introducing Query Pipelines [3].
This post is a quickstart guide where we try to show you how to work with Query Pipelines API, collecting some use cases, so that you can quickly get down to work and make your first experiments.
How to use it?
There are two main ways to use it:
- As a sequential chain (easiest/most concise):
Some basic pipelines are linear in nature β the output of the previous module directly goes into the input of the next module.
prompt -> LLM
prompt -> LLM -> prompt -> LLM
retriever -> response synthesizer
- As a full DAG (more expressive)
When you are required to set up a complete DAG, for instance, a Retrieval Augmented Generation (RAG) pipeline. Here, the lower-level API allows you to add modules along with their keys and define links between previous module outputs and the next module inputs.
Weβll cover some of these use cases in the following sections. Some of them have been extracted from the Llamaindex documentation [1], but some others have been designed for this post:
- Simple Chain: Prompt Query + LLM
- Query Rewriting Workflow with Retrieval
- Simple RAG pipeline
- RAG pipeline with Query Rewriting
- RAG pipeline with Sentence Window Retrieval
- RAG pipeline with Auto Merging Retrieval
Here, we will show only the code to define and build the pipelines. You can inspect the code for data ingestion, index creation, etc⦠in the notebook in my GitHub repository llamaindex-RAG-techniques.
Sequential Chain
Simple Chain: Prompt Query + LLM
The simplest approach, define a sequential chain. A query pipeline where the components are sequential, the library converts outputs into the right format for the next inputs.
# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
# Define the query pipeline
p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)
# Run the pipeline
output = p.run(movie_name="The Departed")
print(str(output))
Chain together multiple prompts: Query Rewriting Workflow with Retrieval
In this scenario, we send the input through two prompts before initiating retrieval. One prompt will allow us to apply query rewriting and the other one produce a hallucinate answer to enrich the retrieval (called HyDE). We can build it as a sequential chain because each prompt only takes in one input, the QueryPipeline will automatically chain LLM outputs into the prompt and then into the LLM.
# First prompt: generate question regarding topic
prompt_str1 = "Please generate a concise question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl1 = PromptTemplate(prompt_str1)
# Second prompt: use HyDE to hallucinate answer.
prompt_str2 = (
"Please write a passage to answer the question\n"
"Try to include as many key details as possible.\n"
"\n"
"\n"
"{query_str}\n"
"\n"
"\n"
'Passage:"""\n'
)
prompt_tmpl2 = PromptTemplate(prompt_str2)
# Define the LLM and the retriever
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=5)
# Build the pipeline
p = QueryPipeline(
chain=[prompt_tmpl1, llm, prompt_tmpl2, llm, retriever], verbose=True
)
# Run the pipeline
nodes = p.run(topic="college")
Define a DAG to build a full RAG pipeline
These are more complex scenarios where we want to chain together a full RAG pipeline consisting of query rewriting, retrieval, reranking, post-processing, and response synthesis.
Some of the modules depend on multiple inputs, so we need to construct a DAG and explicitly define modules and the link between them.
Simple RAG pipeline
Here we build a classical approach: query, retrieval, rerank and response synthesis. In this case, we summarize the retrieved nodes or docs by applying a Tree Summarizer.
In this case, first, we declare the modules:
# define modules
retriever = index.as_retriever(similarity_top_k=5)
summarizer = TreeSummarize(
service_context=ServiceContext.from_defaults(
llm=OpenAI(model="gpt-3.5-turbo")
)
)
reranker = CohereRerank()
Now, we can build our pipeline by adding the modules and drawing the links between modules with the add_link
s function. add_link
takes in the source/destination module, and optionally the source_key and dest_key. Specify the source_key or dest_key if there are multiple outputs/inputs respectively.
Here we explicitly specify dest_key for the reranker and summarizer modules because they take in two inputs (query_str and nodes).
# define query pipeline
p = QueryPipeline(verbose=True)
# Adding the modules to the pipeline
p.add_modules(
{
"input": InputComponent(),
"retriever": retriever,
"reranker": reranker,
"summarizer": summarizer,
}
)
#Define the links between modules
p.add_link("input", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("input", "summarizer", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
You can view the set of input/output keys for each module through module.as_query_component().input_keys and module.as_query_component().output_keys.
# look at summarizer input keys
print(summarizer.as_query_component().input_keys)
Finally, we run the pipeline
# Run the pipeline
response = p.run(topic="Positional encoding")
print(str(response))
Or you can even do it async
response = await p.arun(topic="Positional encoding")
print(str(response))
RAG pipeline with Query Rewriting
As you can imagine, you can apply query rewriting including an initial Promp -> LLM step to the DAG in the former section and then join the inputs and outputs properly.
# define modules
prompt_str = "Please generate a question about the Transformer model regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
summarizer = TreeSummarize(
service_context=ServiceContext.from_defaults(llm=llm)
)
# define query pipeline
p = QueryPipeline(verbose=True)
# Adding the modules to the pipeline
p.add_modules(
{
"llm": llm,
"prompt_tmpl": prompt_tmpl,
"retriever": retriever,
"summarizer": summarizer,
"reranker": reranker,
}
)
#Define the links between modules
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")
# Run the pipeline
response = p.run(topic="Positional encoding")
RAG pipeline with Sentence Window Retrieval
In this section, weβll apply sentence window retrieval to our RAG pipeline. The sentence window retrieval is a post-processing technique we have to include in the right position of the pipeline.
This technique parses documents into single sentences per node. Each node also contains a βwindowβ, a richer context, with the sentences on either side of the node sentence. During retrieval, before passing the retrieved sentences to the LLM, the single sentences are replaced with a window containing the surrounding sentences, the richer context. This is most useful for large documents/indexes, as it helps to retrieve more fine-grained details.
For this example, the index and service context are different from the ones used in the previous cases. In my notebook, you can find how to apply these changes.
Letβs define the components:
from llama_index.indices.postprocessor import MetadataReplacementPostProcessor
from llama_index.response_synthesizers import TreeSummarize
from llama_index.query_pipeline import InputComponent
# Set the retriever
retriever = index.as_retriever(similarity_top_k=3)
# Define the summarizer
summarizer = TreeSummarize(
service_context=ServiceContext.from_defaults(
llm=OpenAI(model="gpt-3.5-turbo",
temperature=0.2,
max_tokens=512)
)
)
# Define the reranker
postprocessor = MetadataReplacementPostProcessor(
target_metadata_key="window"
)
And the following steps, adding the modules and the links between them:
# Define the query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"retriever": retriever,
"postprocessor": postprocessor,
"summarizer": summarizer,
}
)
# Set the links between components
p.add_link("input", "retriever")
p.add_link("input", "postprocessor", dest_key="query_str")
p.add_link("retriever", "postprocessor", dest_key="nodes")
p.add_link("input", "summarizer", dest_key="query_str")
p.add_link("postprocessor", "summarizer", dest_key="nodes")
# Run the pipeline
output = p.run(input="what is the purpose of positional encoding in the Transformer architecture?")
print(str(output))
You can observe that this technique is a post-processor, so this example is very similar to adding a reranker.
RAG pipeline with Auto Merging Retrieval
The idea here is pretty much similar to Sentence Window Retriever β to search for more granular pieces of information and then to extend the context window before feeding said context to an LLM for reasoning.
Documents are split into smaller child chunks, referring to larger parent chunks. Fetch smaller chunks during retrieval first, then if more than n chunks in top k retrieved chunks are linked to the same parent node (larger chunk), we replace the context fed to the LLM by this parent node β works like auto merging a few retrieved chunks into a larger parent chunk, hence the method name
Here, we showcase our AutoMergingRetriever
, which looks at a set of leaf nodes and recursively βmergesβ subsets of leaf nodes that reference a parent node beyond a given threshold. This allows us to consolidate potentially disparate, smaller contexts into a larger context that might help synthesis. You can make use of the parser HierarchicalNodeParser
that takes in a candidate set of documents and outputs an entire hierarchy of nodes, from βcoarse-to-fineβ.
We need to define our hierarchy of nodes:
# create the hierarchical node parser w/ default settings
node_parser = HierarchicalNodeParser.from_defaults(
chunk_sizes=[2048, 512, 128]
)
# Get the nodes
nodes = node_parser.get_nodes_from_documents(docs)
print(len(nodes))
# Get the leaf nodes
leaf_nodes = get_leaf_nodes(nodes)
Then, you create an appropriate Service Context and an Index using those nodes (you can check it in the notebook mentioned previously).
And now, we are ready to define our pipeline and its modules. This time weβll use a Refine
as the response synthesizer:
automerging_retriever = automerging_index.as_retriever(
similarity_top_k=6
)
retriever = AutoMergingRetriever(
automerging_retriever,
automerging_index.storage_context,
verbose=True
)
# Reranker
reranker = CohereRerank()
# Define the summarizer
summarizer = Refine(
service_context=ServiceContext.from_defaults(
llm=OpenAI(model="gpt-3.5-turbo",
temperature=0.2,
max_tokens=512)
)
)
# Define the query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"retriever": retriever,
"reranker": reranker,
"summarizer": summarizer,
}
)
# Set the links between components
p.add_link("input", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("input", "summarizer", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
And you can run it.
Thatβs all folks!! I hope you enjoy that post and find it useful for your future experiments. Follow me on Linkedin or GitHub to support my work!!
References
[1] An Introduction to LlamaIndex Query Pipelines in Llamaindex documentation
[2] Query Pipeline in Llamaindex documentation
[3] Introducing Query Pipelines by Jerry Liu
[4] Query Pipelines Usage Pattern in Llamaindex documentation
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI