A production-grade Retrieval-Augmented Generation system with multi-step LLM pipeline, hybrid search, and real-time trace visibility.
- Overview
- Architecture
- Key Features
- Technology Stack
- Getting Started
- Pipeline Deep Dive
- Pipeline Observability & SQLite Logging
- Document Ingestion
- Frontend
- Testing
- Project Structure
This is not a basic RAG system. It implements a multi-step agentic pipeline that intelligently routes queries, evaluates retrieval quality, and retries with improved queries when results are insufficient β all while streaming live pipeline traces to the frontend in real time.
Built for accuracy over speed: the system will loop and self-correct rather than hallucinate an answer.
The system processes every user query through a structured pipeline of LLM calls and application logic:
User Query
β
βΌ
Load Conversation History
β
βΌ
[LLM Call 1] Query Rewriter βββΊ Rewrite query using conversation context
β
βΌ
[LLM Call 2] Orchestrator βββΊ Does this query need RAG?
β β
β No β Yes
βΌ βΌ
[LLM] Direct Response ChromaDB Vector Search
β
βΌ
RRF Re-ranking (BM25 + Semantic)
β
βΌ
[LLM Call 3] Relevance Evaluator
Are documents sufficient?
β β
β Yes β No
βΌ βΌ
[LLM] Grounded Retry Limit Check
Response β β
β β No β Yes
β βΌ βΌ
β Safe Response [LLM] Improve Query
β β
β ββββΊ Retry Retrieval
βΌ
Return Response to User
β
βΌ
Save to Conversation Memory
Four distinct LLM interactions, each with a specific role:
| Step | Role | Description |
|---|---|---|
| LLM Call 1 | Query Rewriter | Rephrases the user's message into a standalone search query using conversation history |
| LLM Call 2 | Orchestrator / Router | Decides whether the query requires knowledge base retrieval or a direct response |
| LLM Call 3 | Relevance Evaluator | Assesses whether retrieved documents actually answer the query |
| LLM Call 4 | Grounded Response Generator | Synthesizes a final answer strictly from retrieved context with source citations |
When the Relevance Evaluator determines retrieved documents are insufficient, the system:
- Feeds evaluator feedback back to the Query Rewriter
- Generates an improved search query
- Re-runs retrieval with the refined query
- Repeats until documents are relevant or the retry limit is hit
This eliminates hallucinations β the system returns a safe "not enough information" response rather than fabricating an answer.
Implements Reciprocal Rank Fusion (RRF) from scratch to blend two retrieval signals:
- Semantic Search β ChromaDB vector similarity using Gemini embeddings
- Keyword Search β BM25 (
rank-bm25) for precise term matching
The fused ranking consistently outperforms either signal in isolation.
A unified ingestion pipeline normalizes all file types into a standard { text, metadata, doc_id } format:
| Format | Loader |
|---|---|
.txt, .md |
Direct reading |
.pdf |
Page-by-page extraction via PyMuPDF; falls back to Gemini Vision for scanned PDFs |
.csv, .xlsx |
Pandas row-to-text conversion with header context |
.html |
BeautifulSoup4 structural extraction |
.docx |
python-docx heading and paragraph extraction |
| Images | Gemini Vision LLM for diagrams, charts, and embedded text |
The frontend's unique two-column layout includes a real-time Pipeline Trace panel powered by Server-Sent Events (SSE):
- Displays each backend step as it executes (routing β retrieving β evaluating β generating)
- Shows execution time in milliseconds per step
- Gives full visibility into what the system is doing behind the scenes
Session-based memory with JSON file persistence and a token budget strategy:
- Retains the most relevant recent interactions within a configurable token window
- Gracefully discards older context without crashing
- All history is passed to the Query Rewriter and final response generator
Every step of every pipeline run is written to a normalized SQLite database in real time. This creates a full audit trail across three linked tables β sessions, pipeline runs, and individual step logs β enabling latency analysis, debugging, and replay of any past query.
- Isolated Prompts: All LLM system prompts live in
/prompts, decoupled from application code β change behavior without touching logic - Strategy Pattern: Document loaders use a dispatch dictionary, eliminating
if/elifchains and making it trivial to add new file types
| Layer | Technology |
|---|---|
| Language | Python 3.9+ |
| Framework | FastAPI (async, high-performance) |
| Vector Database | ChromaDB |
| Relational Database | SQLite (pipeline step logging & observability) |
| Keyword Search | BM25 via rank-bm25 |
| Role | Provider |
|---|---|
| Primary inference | Groq (LLaMA 3.3) |
| Embeddings & Vision | Google Gemini (google-genai) |
| Fallback | OpenAI, Anthropic |
| Layer | Technology |
|---|---|
| Framework | React 18+ |
| Styling | Tailwind CSS 3.0+ |
| Build Tool | Vite with TypeScript |
| Format | Library |
|---|---|
PyMuPDF (pymupdf) |
|
| Excel / CSV | Pandas + OpenPyXL |
| HTML | BeautifulSoup4 |
| Word | python-docx |
| Images | Pillow + Gemini Vision |
- Python 3.9+
- Node.js 18+
- API keys for Groq and Google Gemini (minimum); OpenAI/Anthropic optional
# Clone the repository
cd advanced-rag-chatbot
# Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Configure environment
cp .env.example .env
# Edit .env with your API keys
# Start the API server
uvicorn main:app --reloadcd frontend
# Install dependencies
npm install
# Start development server
npm run devThe app will be available at http://localhost:5173, with the API running at http://localhost:8000.
# Required
GROQ_API_KEY=your_groq_api_key
GOOGLE_API_KEY=your_google_gemini_api_key
# Optional fallbacks
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
# Configuration
MAX_RETRIES=3
CHROMA_PERSIST_DIR=./chroma_db
MEMORY_TOKEN_BUDGET=4000Takes the raw user message and conversation history, outputs a self-contained search query stripped of pronouns and references to previous turns. This ensures ChromaDB receives a query that makes sense in isolation.
A lightweight routing LLM that classifies the rewritten query:
- Needs RAG β triggers document retrieval
- Direct answer β skips retrieval (e.g., greetings, simple factual questions the model can answer without context)
Semantic score (ChromaDB) βββ
ββββΊ RRF Fusion βββΊ Unified ranked list
Keyword score (BM25) βββ
RRF formula: score(d) = Ξ£ 1 / (k + rank(d)) where k=60
Evaluates retrieved documents against both the original and rewritten query, returning a structured verdict:
- Sufficient β proceed to response generation
- Insufficient + feedback β trigger retry loop with improvement hints
Every pipeline execution is persisted to a local SQLite database (pipeline_logs.db) using a normalized three-table schema. This gives you a full, queryable audit trail of every query the system has ever processed β including latency per step, retry counts, routing decisions, and final outcomes.
sessions
βββ session_id TEXT PRIMARY KEY
βββ created_at TEXT
βββ metadata TEXT (JSON)
pipeline_runs
βββ run_id TEXT PRIMARY KEY
βββ session_id TEXT β sessions.session_id
βββ original_query TEXT
βββ rewritten_query TEXT
βββ routed_to_rag INTEGER (0 / 1)
βββ retry_count INTEGER
βββ final_outcome TEXT ('grounded_response' | 'direct_response' | 'safe_response')
βββ total_duration_ms INTEGER
βββ created_at TEXT
pipeline_steps
βββ step_id INTEGER PRIMARY KEY AUTOINCREMENT
βββ run_id TEXT β pipeline_runs.run_id
βββ step_name TEXT ('query_rewriter' | 'orchestrator' | 'retrieval' |
β 'rrf_rerank' | 'relevance_evaluator' | 'response_generator')
βββ step_order INTEGER
βββ status TEXT ('success' | 'skipped' | 'retry' | 'failed')
βββ duration_ms INTEGER
βββ input_summary TEXT (JSON β truncated snapshot of step input)
βββ output_summary TEXT (JSON β truncated snapshot of step output)
βββ created_at TEXT
Each pipeline step writes a row to pipeline_steps the moment it completes, with the step's input and output captured as normalized JSON snapshots. This means you can reconstruct exactly what happened at any point in any pipeline run:
| Step | Logged Input | Logged Output |
|---|---|---|
| Query Rewriter | Original query + conversation history | Rewritten query |
| Orchestrator | Rewritten query | Routing decision (rag / direct) |
| Retrieval | Rewritten query | Top-N document IDs + scores |
| RRF Re-rank | Semantic + BM25 ranked lists | Fused ranked list |
| Relevance Evaluator | Query + retrieved doc summaries | Verdict + feedback string |
| Response Generator | Full context bundle | Final response text |
-- Average latency per pipeline step across all runs
SELECT step_name, AVG(duration_ms) AS avg_ms
FROM pipeline_steps
GROUP BY step_name
ORDER BY avg_ms DESC;
-- All runs that triggered a retry
SELECT run_id, original_query, retry_count, final_outcome
FROM pipeline_runs
WHERE retry_count > 0;
-- Full step-by-step trace for a specific run
SELECT step_order, step_name, status, duration_ms, output_summary
FROM pipeline_steps
WHERE run_id = 'your-run-id'
ORDER BY step_order;
-- Sessions with the highest average total pipeline duration
SELECT s.session_id, AVG(p.total_duration_ms) AS avg_duration
FROM sessions s
JOIN pipeline_runs p ON s.session_id = p.session_id
GROUP BY s.session_id
ORDER BY avg_duration DESC;SQLITE_DB_PATH=./pipeline_logs.dbUpload documents to populate the knowledge base via the /ingest endpoint:
curl -X POST http://localhost:8000/ingest \
-F "file=@document.pdf"Supported formats: .pdf, .txt, .md, .csv, .xlsx, .html, .docx, .png, .jpg, .jpeg, .webp
All documents are chunked, embedded via Gemini, and stored in ChromaDB. BM25 indices are rebuilt automatically after ingestion.
The React frontend features a two-column layout:
βββββββββββββββββββββββ¬βββββββββββββββββββββββ
β β Pipeline Trace β
β Chat Interface β β
β β β Query rewritten β
β User: ... β β Routed to RAG β
β Assistant: ... β β Retrieved 5 docs β
β β β Evaluated: pass β
β [Input box] β β Response generated β
β β Total: 1,243ms β
βββββββββββββββββββββββ΄βββββββββββββββββββββββ
Pipeline trace steps update in real time via SSE as the backend processes each stage.
# Run all tests
pytest
# Run with async support
pytest --asyncio-mode=auto
# Run specific test file
pytest tests/test_retrieval.py -v
# Run with coverage
pytest --cov=. --cov-report=htmlTests use pytest, pytest-asyncio, and httpx for async API testing.
advanced-rag-chatbot/
βββ main.py # FastAPI app entry point
βββ prompts/ # All LLM system prompts (externalized)
β βββ query_rewriter.txt
β βββ orchestrator.txt
β βββ relevance_evaluator.txt
β βββ response_generator.txt
βββ loaders/ # Document loaders (strategy pattern)
β βββ pdf_loader.py
β βββ csv_loader.py
β βββ html_loader.py
β βββ image_loader.py
βββ retrieval/ # Hybrid search & RRF
β βββ chroma_search.py
β βββ bm25_search.py
β βββ rrf.py
βββ memory/ # Conversation memory management
β βββ session_store.py
βββ observability/ # SQLite pipeline logging
β βββ db.py # Schema init & connection management
β βββ logger.py # Step logging helpers
β βββ pipeline_logs.db # Auto-created SQLite database
βββ tests/ # pytest test suite
βββ frontend/ # React + Vite + Tailwind
β βββ src/
β β βββ components/
β β β βββ ChatInterface.tsx
β β β βββ PipelineTrace.tsx
β β βββ App.tsx
β βββ package.json
βββ requirements.txt
βββ .env.example
βββ README.md
Built with care to prioritize accuracy over speed and transparency over black-box magic.