RAG Module
RAG (Retrieval-Augmented Generation) enhances the ability of Large Language Models (LLMs) to generate responses by referencing external authoritative knowledge bases. This method allows the LLM to access domain-specific knowledge without retraining, improving the relevance, precision, and practicality of the output.
This article focuses on the RAG functions provided by the current MetaGPT:
- Data input, supports various file formats (including pdf/docx/md/csv/txt/ppt), Python objects.
- Retrieval, supports Faiss/BM25/ChromaDB/ES, and mixed retrieval.
- Post-retrieval, supports LLM Rerank/ColbertRerank/CohereRerank/BGERerank/ObjectRerank, reordering the retrieved content to get more accurate data.
- Data update, addition of text and Python objects.
- Data storage and recovery, vectorization is not required each time.
For more examples, please see rag_pipeline and rag_search
Prepare
- Installation
# from pypi
pip install metagpt[rag]
# from pypi
pip install metagpt[rag]
# from source code
pip install -e .[rag]
# from source code
pip install -e .[rag]
Note:
- Some modules are quite large and use lazy loading, requiring manual installation. For example, to use ColbertRerank, you need to install
llama-index-postprocessor-colbert-rerank
.
- Set embedding
# openai
embedding:
api_type: "openai"
base_url: "YOU_BASE_URL"
api_key: "YOU_API_KEY"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
# azure
embedding:
api_type: "azure"
base_url: "YOU_BASE_URL"
api_key: "YOU_API_KEY"
api_version: "YOU_API_VERSION"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
# gemini
embedding:
api_type: "gemini"
api_key: "YOU_API_KEY"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
# ollama
embedding:
api_type: "ollama"
base_url: "YOU_BASE_URL"
model: "YOU_MODEL"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
# openai
embedding:
api_type: "openai"
base_url: "YOU_BASE_URL"
api_key: "YOU_API_KEY"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
# azure
embedding:
api_type: "azure"
base_url: "YOU_BASE_URL"
api_key: "YOU_API_KEY"
api_version: "YOU_API_VERSION"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
# gemini
embedding:
api_type: "gemini"
api_key: "YOU_API_KEY"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
# ollama
embedding:
api_type: "ollama"
base_url: "YOU_BASE_URL"
model: "YOU_MODEL"
dimensions: "YOUR_MODEL_DIMENSIONS" # output dimension of embedding model
Note:
- For backward compatibility, if the embedding is not set and the llm's api_type is either openai or azure, the llm's config will be used.
- If llm is ollama, there might be an error message "context size was not non-negative". In this case, you need to set the max_token in llm, for example, 2048.
- If you need to use other types of embeddings, such as huggingface, bedrock, etc. The from_docs and from_objs functions provide the field
embed_model
, which can accept different embeddings, including the embeddings supported by Llama Index and the custom embeddings supported by Llama Index.
- Set omniparse
omniparse:
api_key: 'YOUR_API_KEY'
base_url: 'YOUR_BASE_URL'
omniparse:
api_key: 'YOUR_API_KEY'
base_url: 'YOUR_BASE_URL'
Note:
omniparse
is an optional configuration, the purpose is to optimize the parsing effect of- If
omniparse
is configured, onlyomniparse
, and other files still use the built-in parser ofLlama Index
.
1. Data input
Example 1.1: files or directory
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
In this example, we use the simplest configuration, input a file, receive a question, and print out the search results.
Example 1.2: Python objects
import asyncio
from pydantic import BaseModel
from metagpt.rag.engines import SimpleEngine
class Player(BaseModel):
name: str
goal: str
def rag_key(self):
return f"{self.name}'s goal is {self.goal}."
async def main():
objs = [Player(name="Jeff", goal="Top One"), Player(name="Mike", goal="Top Three")]
engine = SimpleEngine.from_objs(objs=objs)
answer = await engine.aquery("What is Jeff's goal?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from pydantic import BaseModel
from metagpt.rag.engines import SimpleEngine
class Player(BaseModel):
name: str
goal: str
def rag_key(self):
return f"{self.name}'s goal is {self.goal}."
async def main():
objs = [Player(name="Jeff", goal="Top One"), Player(name="Mike", goal="Top Three")]
engine = SimpleEngine.from_objs(objs=objs)
answer = await engine.aquery("What is Jeff's goal?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
In this example, with the simplest configuration, we define the Player object, wherein the most important is that the custom object must meet the interface (for details, refer to RAGObject).
2. Retrieval
Example 2.1: faiss
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=[FAISSRetrieverConfig()])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=[FAISSRetrieverConfig()])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
In this example, we use faiss for retrieval, where more parameters can be viewed in FAISSRetrieverConfig.
Example 2.2: faiss and bm25 hybrid retrieval
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig, BM25RetrieverConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=[FAISSRetrieverConfig(), BM25RetrieverConfig()])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig, BM25RetrieverConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=[FAISSRetrieverConfig(), BM25RetrieverConfig()])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
In this example, we use faiss and bm25 for mixed retrieval, combining the results from both retrievals after deduplication.
3. Post-retrieval
Example 3.1: LLM re-ranking
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig, LLMRankerConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=[FAISSRetrieverConfig()], ranker_configs=[LLMRankerConfig()])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig, LLMRankerConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
engine = SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=[FAISSRetrieverConfig()], ranker_configs=[LLMRankerConfig()])
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
In this example, we first use faiss for retrieval, then apply LLMRanker to re-rank the retrieved results, and obtain the final search outcome.
Note:
- Because use LLM Reranker, if the answer from the LLM is incorrect, may encounter
IndexError: list index out of range
.
4. Data update
Example 4.1: Adding text and Python objects
import asyncio
from pydantic import BaseModel
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
class Player(BaseModel):
name: str
goal: str
def rag_key(self):
return f"{self.name}'s goal is {self.goal}"
async def main():
engine = SimpleEngine.from_objs(retriever_configs=[FAISSRetrieverConfig()])
engine.add_docs([DOC_PATH])
answer = await engine.aquery("What does Bob like?")
print(answer)
engine.add_objs([Player(name="Jeff", goal="Top One")])
answer = await engine.aquery("What is Jeff's goal?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from pydantic import BaseModel
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
class Player(BaseModel):
name: str
goal: str
def rag_key(self):
return f"{self.name}'s goal is {self.goal}"
async def main():
engine = SimpleEngine.from_objs(retriever_configs=[FAISSRetrieverConfig()])
engine.add_docs([DOC_PATH])
answer = await engine.aquery("What does Bob like?")
print(answer)
engine.add_objs([Player(name="Jeff", goal="Top One")])
answer = await engine.aquery("What is Jeff's goal?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
In this example, after creating an engine, we can add documents or objects. Most importantly, if a custom retriever is needed, one must implement the interface ModifiableRAGRetriever.
5. Data storage and recovery
Example 5.1
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig, FAISSIndexConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
persist_dir = "./tmp_storage"
retriever_configs = [FAISSRetrieverConfig()]
# 1. save index
SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=retriever_configs).persist(persist_dir)
# 2. load index
engine = SimpleEngine.from_index(index_config=FAISSIndexConfig(persist_path=persist_dir), retriever_configs=retriever_configs)
# 3. query
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import FAISSRetrieverConfig, FAISSIndexConfig
from metagpt.const import EXAMPLE_DATA_PATH
DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
async def main():
persist_dir = "./tmp_storage"
retriever_configs = [FAISSRetrieverConfig()]
# 1. save index
SimpleEngine.from_docs(input_files=[DOC_PATH], retriever_configs=retriever_configs).persist(persist_dir)
# 2. load index
engine = SimpleEngine.from_index(index_config=FAISSIndexConfig(persist_path=persist_dir), retriever_configs=retriever_configs)
# 3. query
answer = await engine.aquery("What does Bob like?")
print(answer)
if __name__ == "__main__":
asyncio.run(main())
Note:
- Using Post-retrieval can obtain better result, if use LLM Reranker, due to the uncertainty of the capabilities of LLM, it is not always guaranteed that the output will be parseable for reranking, prefer
gpt-4-turbo
, otherwise might encounterIndexError: list index out of range
orValueError: invalid literal for int() with base 10
.
In this example, we first save the vectorized data in persist_dir, then query after restoring from persist_dir.