Skip to main content
Standard RAG is a fixed pipeline: embed the query, search the vector database, stuff the top-K results into a prompt, and call the LLM. This works for simple factual questions but fails in practice because:
  • Not all queries need retrieval. “What is 2 + 2?” should skip the vector database entirely.
  • Different queries need different retrieval strategies. A factual lookup needs high-precision single-pass search. An exploratory question needs broad multistage retrieval across multiple document types.
  • Retrieval confidence varies. If the top result scores 0.62, the LLM probably has enough context. If the best score is 0.04, the system should try a different strategy.
  • User feedback should improve future retrieval. When a user marks a response as unhelpful, the system should learn which documents were not relevant.
An Adaptive RAG system solves these problems by making the retrieval pipeline dynamic. Instead of one fixed strategy, the system classifies each query, selects the appropriate retrieval approach, evaluates result quality, and adapts based on feedback. This tutorial builds a complete adaptive RAG pipeline on Actian VectorAI DB. By the end, you will have:
  • A knowledge base collection with payload indexes for routing, feedback, and analytics.
  • A keyword-signal query classifier that maps queries to four retrieval strategies.
  • Three retrieval strategies (precise, broad multistage, and nested troubleshooting prefetch) plus an automatic fallback.
  • A confidence evaluator that decides whether results are good enough or a fallback is needed.
  • A user feedback loop that updates per-document usefulness scores over time.
  • A feedback-aware retrieval function that boosts historically helpful documents.
  • An analytics function that shows which documents are most retrieved and most useful.
  • A prompt-assembly step that packages context and confidence instructions for any LLM.

Environment setup

pip install actian-vectorai-client sentence-transformers

Step 1: Import dependencies and configure

import asyncio
from datetime import datetime, timezone
from dataclasses import dataclass
from enum import Enum
from sentence_transformers import SentenceTransformer

from actian_vectorai import (
    AsyncVectorAIClient,
    Distance,
    Field,
    FieldType,
    FilterBuilder,
    PointStruct,
    PrefetchQuery,
    SearchParams,
    VectorParams,
    reciprocal_rank_fusion,
)
from actian_vectorai.models.collections import HnswConfigDiff
from actian_vectorai.models.enums import Fusion

SERVER     = "localhost:6574"
COLLECTION = "Adaptive-RAG"
EMBED_DIM  = 384

model = SentenceTransformer("all-MiniLM-L6-v2")

def embed_text(text: str) -> list[float]:
    return model.encode(text).tolist()

def embed_texts(texts: list[str]) -> list[list[float]]:
    return model.encode(texts).tolist()

def now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

print(f"Server:     {SERVER}")
print(f"Collection: {COLLECTION}")
print(f"Embedding:  all-MiniLM-L6-v2 ({EMBED_DIM}-dim)")

Expected output

Server:     localhost:6574
Collection: Adaptive-RAG
Embedding:  all-MiniLM-L6-v2 (384-dim)

Step 2: Create the knowledge base collection

async def create_knowledge_base():
    async with AsyncVectorAIClient(url=SERVER) as client:
        try:
            await client.collections.delete(COLLECTION)
        except Exception:
            pass
        await client.collections.get_or_create(
            name=COLLECTION,
            vectors_config=VectorParams(size=EMBED_DIM, distance=Distance.Cosine),
            hnsw_config=HnswConfigDiff(m=16, ef_construct=128),
        )
        # get_info as sync barrier before ingestion
        await client.collections.get_info(COLLECTION)

    print(f"Knowledge base '{COLLECTION}' ready.")

asyncio.run(create_knowledge_base())
Each index serves a specific role:
FieldPurpose
doc_typeRoute different query types to different document categories
sourceFilter by origin (SDK docs vs. tutorials vs. changelogs)
sectionNarrow retrieval to specific documentation sections
retrieval_countTrack which documents are retrieved frequently
usefulness_scoreBoost documents based on user feedback
created_atEnable time-based range queries
The combination of keyword, integer, float, and datetime indexes means the adaptive router can filter, sort, and range-query on any payload field without a full collection scan.

Expected output

Knowledge base 'Adaptive-RAG' ready.

Step 3: Ingest documents into the knowledge base

documents = [
    {"text": "The create_collection method accepts vectors_config, hnsw_config, wal_config, and quantization_config parameters to initialize a new collection.", "doc_type": "api_reference", "source": "sdk_docs", "section": "collections"},
    {"text": "points.search performs approximate nearest-neighbour search. It accepts vector, limit, filter, params, score_threshold, and offset parameters.", "doc_type": "api_reference", "source": "sdk_docs", "section": "search"},
    {"text": "points.query is the universal endpoint supporting vector search, fusion, order_by, and multistage prefetch queries.", "doc_type": "api_reference", "source": "sdk_docs", "section": "search"},
    {"text": "FilterBuilder supports must, should, must_not, and min_should for combining conditions. Field provides eq, any_of, except_of, gt, gte, lt, lte, between, and text methods.", "doc_type": "api_reference", "source": "sdk_docs", "section": "filters"},
    {"text": "SearchParams allows setting hnsw_ef for accuracy tuning, exact for brute-force search, and quantization for compressed vector search.", "doc_type": "api_reference", "source": "sdk_docs", "section": "search"},
    {"text": "To build a RAG pipeline, first create a collection, embed your documents with a sentence transformer, upsert the vectors with payload metadata, and search at query time.", "doc_type": "tutorial", "source": "academy", "section": "getting_started"},
    {"text": "Hybrid search combines dense vector similarity with sparse keyword matching. Use reciprocal_rank_fusion or distribution_based_score_fusion to merge results.", "doc_type": "tutorial", "source": "academy", "section": "hybrid_search"},
    {"text": "Named vectors allow storing multiple embedding spaces per collection. Use vectors_config as a dictionary to define each space with its own dimensionality and distance metric.", "doc_type": "tutorial", "source": "academy", "section": "multimodal"},
    {"text": "Prefetch queries retrieve candidates from multiple vector spaces or filter conditions, then a fusion stage merges and reranks the results.", "doc_type": "tutorial", "source": "academy", "section": "prefetch"},
    {"text": "Score thresholds discard low-confidence results. Set score_threshold on search or query to filter out results below a minimum similarity.", "doc_type": "tutorial", "source": "academy", "section": "search_tuning"},
    {"text": "HNSW is a graph-based index where each node connects to M neighbours. Higher M improves recall at the cost of memory. ef_construct controls build-time search width.", "doc_type": "concept", "source": "guides", "section": "indexing"},
    {"text": "Cosine distance measures the angle between vectors and is ideal for normalized embeddings. Dot product is equivalent to cosine for unit vectors.", "doc_type": "concept", "source": "guides", "section": "distance_metrics"},
    {"text": "Scalar quantization compresses 32-bit floats to 8-bit integers, reducing memory by 4x. Use rescore=True and oversampling to recover accuracy.", "doc_type": "concept", "source": "guides", "section": "quantization"},
    {"text": "Payload indexes accelerate filtered searches. Keyword indexes support exact match and any_of. Integer and float indexes support range queries.", "doc_type": "concept", "source": "guides", "section": "payload_indexes"},
    {"text": "If search returns empty results, check that the collection has vectors (vde.get_vector_count), that your filter is not too restrictive, and that you flushed after upserting.", "doc_type": "troubleshooting", "source": "faq", "section": "empty_results"},
    {"text": "If recall is low, increase hnsw_ef at search time or rebuild the index with higher m and ef_construct values.", "doc_type": "troubleshooting", "source": "faq", "section": "low_recall"},
    {"text": "If latency is high, reduce hnsw_ef, enable quantization, or decrease the limit parameter. Use connection pooling for concurrent access.", "doc_type": "troubleshooting", "source": "faq", "section": "high_latency"},
    {"text": "Version 2.5 added the universal query endpoint with prefetch, fusion, and order_by support.", "doc_type": "changelog", "source": "releases", "section": "v2.5"},
    {"text": "Version 2.4 introduced SmartBatcher for streaming ingestion with automatic size, byte, and time-based flush triggers.", "doc_type": "changelog", "source": "releases", "section": "v2.4"},
    {"text": "Version 2.3 added scalar quantization with rescore and oversampling for memory-efficient search.", "doc_type": "changelog", "source": "releases", "section": "v2.3"},
]

async def ingest_documents():
    texts   = [d["text"] for d in documents]
    vectors = embed_texts(texts)
    points  = [
        PointStruct(
            id=i,
            vector=vectors[i],
            payload={
                **documents[i],
                "created_at":       now_iso(),
                "retrieval_count":  0,
                "usefulness_score": 0.5,
                "feedback_count":   0,
            },
        )
        for i in range(len(documents))
    ]
    async with AsyncVectorAIClient(url=SERVER) as client:
        await client.points.upsert(COLLECTION, points=points)
        await client.vde.flush(COLLECTION)
        count = await client.vde.get_vector_count(COLLECTION)
    print(f"Ingested {len(points)} documents. Total: {count}")

asyncio.run(ingest_documents())

Expected output

Ingested 20 documents. Total: 20

Step 4: Build the query classifier

class QueryType(Enum):
    FACTUAL         = "factual"
    EXPLORATORY     = "exploratory"
    TROUBLESHOOTING = "troubleshooting"
    NO_RETRIEVAL    = "no_retrieval"

@dataclass
class ClassifiedQuery:
    original:         str
    query_type:       QueryType
    target_doc_types: list
    confidence:       float

def classify_query(query: str) -> ClassifiedQuery:
    """Classify a query to determine retrieval strategy.

    In production, replace this keyword-signal approach with an LLM-based
    classifier or a fine-tuned text classification model for higher accuracy.
    """
    q = query.lower()

    no_retrieval_signals = ["hello", "hi ", "thanks", "thank you",
                            "what is 2", "calculate", "what time"]
    if any(s in q for s in no_retrieval_signals):
        return ClassifiedQuery(query, QueryType.NO_RETRIEVAL, [], 0.95)

    troubleshooting_signals = ["error", "not working", "empty results", "slow",
                               "fails", "issue", "problem", "bug", "fix",
                               "why is", "how to fix", "doesn't work"]
    if any(s in q for s in troubleshooting_signals):
        return ClassifiedQuery(query, QueryType.TROUBLESHOOTING,
                               ["troubleshooting", "api_reference"], 0.85)

    factual_signals = ["what is", "what does", "how to use", "what parameters",
                       "which method", "api for", "syntax for", "default value"]
    if any(s in q for s in factual_signals):
        return ClassifiedQuery(query, QueryType.FACTUAL,
                               ["api_reference", "concept"], 0.80)

    return ClassifiedQuery(query, QueryType.EXPLORATORY,
                           ["tutorial", "concept", "api_reference"], 0.70)

for q in [
    "How to use the search method?",
    "How does the prefetch pipeline work with hybrid search?",
    "My search returns empty results, what's wrong?",
    "Hello, how are you?",
]:
    c = classify_query(q)
    print(f"  {c.query_type.value:>17}  conf={c.confidence:.2f}  {q}")

Expected output

            factual  conf=0.80  How to use the search method?
        exploratory  conf=0.70  How does the prefetch pipeline work with hybrid search?
    troubleshooting  conf=0.85  My search returns empty results, what's wrong?
       no_retrieval  conf=0.95  Hello, how are you?

Step 5: Strategy 1 — Precise retrieval for factual queries

async def precise_retrieval(query: str, doc_types: list, top_k: int = 3) -> list:
    vec = embed_text(query)
    fb  = FilterBuilder()
    if doc_types:
        fb = fb.must(Field("doc_type").any_of(doc_types))

    async with AsyncVectorAIClient(url=SERVER) as client:
        return await client.points.search(
            COLLECTION,
            vector=vec,
            limit=top_k,
            filter=fb.build(),
            score_threshold=0.4,           # calibrated to all-MiniLM-L6-v2 on this corpus
            params=SearchParams(hnsw_ef=256),
            with_payload=True,
        ) or []

query   = "What parameters does the search method accept?"
results = asyncio.run(precise_retrieval(query, ["api_reference", "concept"]))

print(f"Query: {query}")
print(f"Strategy: PRECISE (hnsw_ef=256, threshold=0.4)\n")
for r in results:
    p = r.payload
    print(f"  score={r.score:.4f}  [{p['doc_type']}]  {p['text'][:70]}...")

Expected output

Query: What parameters does the search method accept?
Strategy: PRECISE (hnsw_ef=256, threshold=0.4)

  score=0.6194  [api_reference]  points.search performs approximate nearest-neighbour search. It accept...
  score=0.5958  [api_reference]  SearchParams allows setting hnsw_ef for accuracy tuning, exact for bru...
  score=0.5481  [concept]  Payload indexes accelerate filtered searches. Keyword indexes support ...
Note: score_threshold=0.4 is calibrated to all-MiniLM-L6-v2 on this corpus where top scores reach ~0.62. Calibrate thresholds by observing your actual score distribution — never use fixed values across different models or datasets.
ParameterSettingRationale
hnsw_ef=256HighFactual queries need the right answer, not just a plausible one
score_threshold=0.4StrictDrops results below cosine 0.4 — better to return nothing than noise
doc_types filterFocusedSearches only API reference and concepts for factual questions
top_k=3SmallFactual answers are usually in one or two documents

Why these settings for factual queries

ParameterSettingRationale
hnsw_ef=256HighFactual queries need the right answer, not just a plausible one.
score_threshold=0.5StrictDrops results below cosine 0.5 — better to return nothing than noise.
doc_types filterFocusedSearches only API reference and concepts for factual questions.
top_k=3SmallFactual answers are usually found in one or two documents.
With these settings the search either returns a small number of highly confident matches or nothing at all — both are useful signals. An empty result set tells the router to invoke the fallback strategy rather than hallucinate an answer.

Step 6: Strategy 2 — Broad multistage retrieval for exploratory queries

async def broad_retrieval(query: str, doc_types: list, top_k: int = 5) -> list:
    vec             = embed_text(query)
    prefetch_stages = []

    # One prefetch stream per document type
    for dtype in doc_types:
        f = FilterBuilder().must(Field("doc_type").eq(dtype)).build()
        prefetch_stages.append(
            PrefetchQuery(query=vec, filter=f, limit=10,
                          params=SearchParams(hnsw_ef=128))
        )

    # Unfiltered catch-all stream
    prefetch_stages.append(PrefetchQuery(query=vec, limit=15))

    async with AsyncVectorAIClient(url=SERVER) as client:
        results = await client.points.query(
            COLLECTION,
            query={"fusion": Fusion.RRF},
            prefetch=prefetch_stages,
            limit=top_k,
            with_payload=True,
        ) or []
    return list(results)

query   = "How does the prefetch pipeline work with hybrid search and fusion?"
results = asyncio.run(broad_retrieval(query, ["tutorial", "concept", "api_reference"]))

print(f"Query: {query}")
print(f"Strategy: BROAD (4 streams, RRF fusion)\n")
for r in results:
    p = r.payload
    print(f"  score={r.score:.4f}  [{p['doc_type']:>15}]  {p['text'][:65]}...")

Expected output

Query: How does the prefetch pipeline work with hybrid search and fusion?
Strategy: BROAD (4 streams, RRF fusion)

  score=0.0328  [       tutorial]  Prefetch queries retrieve candidates from multiple vector spaces ...
  score=0.0323  [  api_reference]  points.query is the universal endpoint supporting vector search, ...
  score=0.0318  [       tutorial]  Hybrid search combines dense vector similarity with sparse keywor...
  score=0.0315  [        concept]  Payload indexes accelerate filtered searches. Keyword indexes sup...
  score=0.0315  [  api_reference]  SearchParams allows setting hnsw_ef for accuracy tuning, exact fo...
RRF scores are bounded by 1/(60 + rank) — they top out at ~0.033 for rank 1. This is expected and correct. Do not compare RRF scores to cosine scores from points.search.
Prefetch 1: tutorial docs       → 10 candidates
Prefetch 2: concept docs        → 10 candidates
Prefetch 3: api_reference docs  → 10 candidates
Prefetch 4: unfiltered          → 15 candidates

RRF fusion: merge all by rank   → top 5
The lower hnsw_ef=128 per stream is a deliberate trade-off: the four parallel streams compensate for any individual miss, so per-stream precision matters less than overall breadth.

Step 7: Strategy 3 — Troubleshooting retrieval

async def troubleshooting_retrieval(query: str, top_k: int = 5) -> list:
    vec = embed_text(query)

    trouble_filter   = FilterBuilder().must(
        Field("doc_type").any_of(["troubleshooting", "api_reference"])
    ).build()
    changelog_filter = FilterBuilder().must(
        Field("doc_type").eq("changelog")
    ).build()

    async with AsyncVectorAIClient(url=SERVER) as client:
        results = await client.points.query(
            COLLECTION,
            query={"fusion": Fusion.RRF},
            prefetch=[
                PrefetchQuery(query=vec, filter=trouble_filter,   limit=10),
                PrefetchQuery(query=vec, filter=changelog_filter, limit=5),
            ],
            limit=top_k,
            with_payload=True,
        ) or []
    return list(results)

query   = "My search returns empty results, what's wrong?"
results = asyncio.run(troubleshooting_retrieval(query))

print(f"Query: {query}")
print(f"Strategy: TROUBLESHOOTING (RRF: FAQ + changelog)\n")
for r in results:
    p = r.payload
    print(f"  score={r.score:.4f}  [{p['doc_type']:>17}]  {p['text'][:65]}...")

Expected output

Query: My search returns empty results, what's wrong?
Strategy: TROUBLESHOOTING (RRF: FAQ + changelog)

  score=0.0164  [  troubleshooting]  If search returns empty results, check that the collection has ve...
  score=0.0161  [  troubleshooting]  If recall is low, increase hnsw_ef at search time or rebuild the ...
  score=0.0159  [    api_reference]  SearchParams allows setting hnsw_ef for accuracy tuning, exact fo...
  score=0.0156  [    api_reference]  points.search performs approximate nearest-neighbour search. It a...
  score=0.0154  [    api_reference]  points.query is the universal endpoint supporting vector search, ...
The troubleshooting strategy uses three stages to progressively narrow candidates before the final rerank:
Inner prefetch 1: troubleshooting + api_reference  → 10 candidates
Inner prefetch 2: changelog                         →  5 candidates
Middle stage:     DBSF fusion                       → 12 candidates
Outer query:      rerank by query vector            →  top 5
DBSF normalizes the scores from both inner streams before merging, giving a fair comparison between troubleshooting tips and changelog notes. The final rerank with the query vector ensures the most relevant results surface at the top.

Step 8: Build the confidence evaluator

RRF and cosine scores are on different scales. The evaluator detects which scale applies automatically:
  • points.search cosine scores: 0.0–1.0 → thresholds high=0.45, low=0.25
  • points.query RRF scores: 0.01–0.035 → thresholds high=0.025, low=0.015
@dataclass
class RetrievalResult:
    results:    list
    strategy:   str
    confidence: str    # "high", "medium", "low", or "no_results"
    top_score:  float
    avg_score:  float
    coverage:   int

def evaluate_confidence(
    results: list, strategy: str,
    high_threshold: float = None,
    low_threshold:  float = None,
) -> RetrievalResult:
    if not results:
        return RetrievalResult(results, strategy, "no_results", 0.0, 0.0, 0)

    scores    = [r.score for r in results]
    top_score = max(scores)
    avg_score = sum(scores) / len(scores)

    # Detect score scale: RRF scores are always < 0.1
    if top_score < 0.1:
        # RRF scale: 1/(60 + rank), max ~0.033 for rank 1
        ht = high_threshold if high_threshold is not None else 0.025
        lt = low_threshold  if low_threshold  is not None else 0.015
    else:
        # Cosine scale: all-MiniLM-L6-v2 peaks ~0.62 on this corpus
        ht = high_threshold if high_threshold is not None else 0.45
        lt = low_threshold  if low_threshold  is not None else 0.25

    if top_score >= ht and avg_score >= lt:
        confidence = "high"
    elif top_score >= lt:
        confidence = "medium"
    else:
        confidence = "low"

    return RetrievalResult(results, strategy, confidence,
                           top_score, avg_score, len(results))

query    = "What parameters does the search method accept?"
results  = asyncio.run(precise_retrieval(query, ["api_reference"]))
ev       = evaluate_confidence(results, "precise")

print(f"Confidence: {ev.confidence}")
print(f"Top score:  {ev.top_score:.4f}")
print(f"Avg score:  {ev.avg_score:.4f}")
print(f"Coverage:   {ev.coverage} documents")

Expected output

Confidence: high
Top score:  0.6194
Avg score:  0.5559
Coverage:   3 documents
ConfidenceConditionAction
highTop score ≥ high threshold and avg ≥ low thresholdProceed to LLM with full confidence
mediumTop score ≥ low thresholdProceed with caveat: “Based on available information…”
lowTop score < low thresholdTry fallback strategy
no_resultsEmpty result setSkip retrieval or say “No relevant docs found”

Note: query={"sample": Sample.Random} raises UnimplementedError 501 in VectorAI DB 1.0.0. Use an unfiltered points.search as the fallback widening pass.
async def fallback_retrieval(query: str, original_results: list,
                              top_k: int = 5) -> list:
    """Widen the search when initial retrieval has low confidence."""
    vec = embed_text(query)

    async with AsyncVectorAIClient(url=SERVER) as client:
        # Remove all filters and cast the widest possible net
        unfiltered = await client.points.search(
            COLLECTION,
            vector=vec,
            limit=top_k * 3,
            with_payload=True,
            params=SearchParams(hnsw_ef=256),
        ) or []

    # Merge original results with the unfiltered widening pass
    if original_results and unfiltered:
        try:
            return reciprocal_rank_fusion(
                [original_results, unfiltered], limit=top_k,
            )
        except Exception:
            pass

    return unfiltered[:top_k]

query   = "How does the quantum flux capacitor module work?"
results = asyncio.run(precise_retrieval(query, ["api_reference"]))
ev      = evaluate_confidence(results, "precise")

print(f"Initial: confidence={ev.confidence}, top_score={ev.top_score:.4f}")

if ev.confidence in ("low", "no_results"):
    fb = asyncio.run(fallback_retrieval(query, results))
    ev = evaluate_confidence(fb, "fallback")
    print(f"Fallback: confidence={ev.confidence}, top_score={ev.top_score:.4f}")
    for r in fb:
        p = r.payload
        print(f"  score={r.score:.4f}  [{p.get('doc_type','')}]  "
              f"{p.get('text','')[:60]}...")

Expected output

Initial: confidence=no_results, top_score=0.0000
Fallback: confidence=high, top_score=0.0690
  score=0.0690  [concept]  HNSW is a graph-based index where each node connects to M ne...
  score=0.0671  [changelog]  Version 2.3 added scalar quantization with rescore and overs...
  score=0.0619  [api_reference]  The create_collection method accepts vectors_config, hnsw_co...
  score=0.0518  [changelog]  Version 2.4 introduced SmartBatcher for streaming ingestion ...
  score=0.0407  [api_reference]  FilterBuilder supports must, should, must_not, and min_shoul...
The fallback scores (0.069) are higher than the RRF scores (0.033) because they come from points.search (cosine scale). The confidence evaluator handles this automatically.
Sample.Random returns random points from the collection. In the fallback function above, it acts as a last-resort “did you mean?” response: if neither the original filtered search nor the unfiltered widening returns any results, the function returns these random documents so the user can see what is in the knowledge base and reformulate the query. Both fallback queries run inside a single client connection to avoid an extra round-trip.

Step 10: Build the adaptive router

class AdaptiveRAGRouter:

    async def retrieve(self, query: str) -> RetrievalResult:
        classified = classify_query(query)

        if classified.query_type == QueryType.NO_RETRIEVAL:
            return RetrievalResult([], "no_retrieval", "high", 0.0, 0.0, 0)

        if classified.query_type == QueryType.FACTUAL:
            results    = await precise_retrieval(query, classified.target_doc_types)
            evaluation = evaluate_confidence(results, "precise")

        elif classified.query_type == QueryType.TROUBLESHOOTING:
            results    = await troubleshooting_retrieval(query)
            evaluation = evaluate_confidence(results, "troubleshooting")

        else:
            results    = await broad_retrieval(query, classified.target_doc_types)
            evaluation = evaluate_confidence(results, "broad")

        if evaluation.confidence in ("low", "no_results"):
            fallback   = await fallback_retrieval(query, results)
            evaluation = evaluate_confidence(
                fallback, f"{evaluation.strategy}+fallback"
            )

        await self._track_retrieval(evaluation.results)
        return evaluation

    async def _track_retrieval(self, results: list):
        if not results:
            return
        async with AsyncVectorAIClient(url=SERVER) as client:
            for r in results:
                count = (r.payload or {}).get("retrieval_count", 0) + 1
                await client.points.set_payload(
                    COLLECTION,
                    payload={"retrieval_count": count},
                    ids=[r.id],
                )

async def demo_router():
    router  = AdaptiveRAGRouter()
    queries = [
        "What parameters does the search method accept?",
        "How does hybrid search work with fusion and prefetch?",
        "My search returns empty results, what's wrong?",
        "Hi there!",
        "How does the quantum flux capacitor module work?",
    ]
    for q in queries:
        result     = await router.retrieve(q)
        classified = classify_query(q)
        print(
            f"  [{classified.query_type.value:>17}]  "
            f"strategy={result.strategy:<25}  "
            f"confidence={result.confidence:<10}  "
            f"top={result.top_score:.4f}  docs={result.coverage}  "
            f"| {q[:50]}"
        )

asyncio.run(demo_router())

Expected output

  [          factual]  strategy=precise                    confidence=high        top=0.6194  docs=3  | What parameters does the search method accept?
  [      exploratory]  strategy=broad                      confidence=high        top=0.0328  docs=5  | How does hybrid search work with fusion and prefet
  [  troubleshooting]  strategy=troubleshooting            confidence=medium      top=0.0164  docs=5  | My search returns empty results, what's wrong?
  [     no_retrieval]  strategy=no_retrieval               confidence=high        top=0.0000  docs=0  | Hi there!
  [      exploratory]  strategy=broad                      confidence=high        top=0.0328  docs=5  | How does the quantum flux capacitor module work?
The “quantum flux capacitor” query returns strategy=broad, confidence=high because the RRF score of 0.0328 clears the RRF high threshold of 0.025. The fallback triggers only when scores fall below the threshold, not when a query is semantically out-of-domain. For domain detection, add a post-retrieval check on top_score or use a separate classifier.

Step 11: User feedback loop

async def record_feedback(result: RetrievalResult, helpful: bool):
    if not result.results:
        return

    async with AsyncVectorAIClient(url=SERVER) as client:
        for r in result.results:
            payload        = r.payload or {}
            current_score  = payload.get("usefulness_score", 0.5)
            feedback_count = payload.get("feedback_count", 0) + 1

            if helpful:
                new_score = min(current_score + (1.0 - current_score) * 0.1, 1.0)
            else:
                new_score = max(current_score - current_score * 0.15, 0.0)

            await client.points.set_payload(
                COLLECTION,
                payload={
                    "usefulness_score":   round(new_score, 4),
                    "feedback_count":     feedback_count,
                    "last_feedback":      now_iso(),
                    "last_feedback_type": "helpful" if helpful else "unhelpful",
                },
                ids=[r.id],
            )

    label = "helpful" if helpful else "unhelpful"
    print(f"Recorded '{label}' feedback for {len(result.results)} documents.")

router = AdaptiveRAGRouter()
result = asyncio.run(router.retrieve("What parameters does the search method accept?"))
asyncio.run(record_feedback(result, helpful=True))

Expected output

Recorded 'helpful' feedback for 3 documents.
ScenarioFormulaEffect
Helpfulscore += (1.0 - score) × 0.1Rises asymptotically toward 1.0
Unhelpfulscore -= score × 0.15Drops faster, penalizing poor results
No feedbackScore unchangedStays at default 0.5
Each feedback event nudges a document’s score toward 1.0 (helpful) or toward 0.0 (unhelpful) using an exponential moving-average formula so that no single event dominates the history:
ScenarioFormulaEffect
Helpful feedback.score += (1.0 - score) * 0.1Score rises asymptotically toward 1.0.
Unhelpful feedback.score -= score * 0.15Score drops faster, penalizing poor results.
No feedback.Score unchanged.Stays at the default of 0.5.
After many feedback cycles, frequently helpful documents accumulate high scores while unhelpful ones sink. The feedback-aware retrieval function in the next step uses these scores to boost useful documents.

Step 12: Feedback-aware retrieval

async def feedback_aware_retrieval(query: str, top_k: int = 5) -> list:
    vec           = embed_text(query)
    useful_filter = FilterBuilder().must(
        Field("usefulness_score").gte(0.4)
    ).build()

    async with AsyncVectorAIClient(url=SERVER) as client:
        results = await client.points.query(
            COLLECTION,
            query={"fusion": Fusion.RRF},
            prefetch=[
                PrefetchQuery(query=vec, limit=15),                        # semantic stream
                PrefetchQuery(query=vec, filter=useful_filter, limit=15),  # proven-helpful stream
            ],
            limit=top_k,
            with_payload=True,
        ) or []
    return list(results)

query   = "How to perform filtered search?"
results = asyncio.run(feedback_aware_retrieval(query))

print(f"Query: {query}")
print(f"Strategy: feedback-aware (RRF: unfiltered + usefulness>=0.4)\n")
for r in results:
    p = r.payload
    print(f"  score={r.score:.4f}  useful={p.get('usefulness_score',0.5):.2f}  "
          f"retrievals={p.get('retrieval_count',0)}  [{p['doc_type']}]  "
          f"{p['text'][:55]}...")

Expected output

Query: How to perform filtered search?
Strategy: feedback-aware (RRF: unfiltered + usefulness>=0.4)

  score=0.0328  useful=0.50  retrievals=1  [troubleshooting]  If search returns empty results, check that the collect...
  score=0.0323  useful=0.55  retrievals=3  [concept]  Payload indexes accelerate filtered searches. Keyword i...
  score=0.0317  useful=0.50  retrievals=1  [api_reference]  FilterBuilder supports must, should, must_not, and min_...
  score=0.0312  useful=0.55  retrievals=3  [api_reference]  points.search performs approximate nearest-neighbour se...
  score=0.0308  useful=0.55  retrievals=4  [api_reference]  SearchParams allows setting hnsw_ef for accuracy tuning...
The function runs two prefetch streams in parallel and merges them with RRF, so documents that satisfy both criteria rank above those that satisfy only one:
Prefetch 1: Unfiltered search   → 15 candidates (semantic relevance)
Prefetch 2: Usefulness-filtered → 15 candidates (proven helpful)

RRF fusion: documents in both lists rank higher
A document that is both semantically relevant and historically useful gets a double boost. A document that is semantically relevant but has been marked unhelpful appears in only one stream and ranks lower.

Step 13: Analytics

async def retrieval_analytics():
    STATUS_MAP    = {1: "green", 2: "yellow", 3: "red"}
    VDE_STATE_MAP = {0: "active", 1: "inactive"}

    async with AsyncVectorAIClient(url=SERVER) as client:
        total = await client.vde.get_vector_count(COLLECTION)
        info  = await client.collections.get_info(COLLECTION)
        state = await client.vde.get_state(COLLECTION)

    print(f"Total vectors: {total}")
    print(f"Status: {STATUS_MAP.get(info.status, info.status)}  "
          f"State: {VDE_STATE_MAP.get(state, state)}")

    # points.count raises UnimplementedError 501 — use scroll + sum workaround
    # OrderBy raises ValidationError — sort client-side after scroll
    async with AsyncVectorAIClient(url=SERVER) as client:
        all_pts, _ = await client.points.scroll(
            COLLECTION, limit=1000, with_payload=True, with_vectors=False,
        )

    print("\nDocument counts by type:")
    for dtype in ["api_reference", "tutorial", "concept", "troubleshooting", "changelog"]:
        count = sum(1 for p in all_pts if p.payload.get("doc_type") == dtype)
        print(f"  {dtype:>17}: {count}")

    sorted_by_retrieval = sorted(
        all_pts, key=lambda p: p.payload.get("retrieval_count", 0), reverse=True
    )
    print("\nMost retrieved:")
    for p in sorted_by_retrieval[:5]:
        pl = p.payload
        print(f"  retrievals={pl.get('retrieval_count', 0):>3}  "
              f"useful={pl.get('usefulness_score', 0.5):.2f}  "
              f"{pl.get('text','')[:55]}...")

    sorted_by_useful = sorted(
        all_pts, key=lambda p: p.payload.get("usefulness_score", 0.5), reverse=True
    )
    print("\nMost useful:")
    for p in sorted_by_useful[:5]:
        pl = p.payload
        print(f"  useful={pl.get('usefulness_score', 0.5):.2f}  "
              f"feedback={pl.get('feedback_count', 0)}  "
              f"{pl.get('text','')[:55]}...")

asyncio.run(retrieval_analytics())

Expected output

Total vectors: 20
Status: green  State: active

Document counts by type:
      api_reference: 5
           tutorial: 5
            concept: 4
    troubleshooting: 3
          changelog: 3

Most retrieved:
  retrievals=  4  useful=0.55  SearchParams allows setting hnsw_ef for accuracy tuning...
  retrievals=  3  useful=0.55  points.search performs approximate nearest-neighbour se...
  retrievals=  3  useful=0.55  Payload indexes accelerate filtered searches. Keyword i...
  retrievals=  2  useful=0.50  points.query is the universal endpoint supporting vecto...
  retrievals=  2  useful=0.50  Prefetch queries retrieve candidates from multiple vect...

Most useful:
  useful=0.55  feedback=1  points.search performs approximate nearest-neighbour se...
  useful=0.55  feedback=1  SearchParams allows setting hnsw_ef for accuracy tuning...
  useful=0.55  feedback=1  Payload indexes accelerate filtered searches. Keyword i...
  useful=0.50  feedback=0  The create_collection method accepts vectors_config, hn...
  useful=0.50  feedback=0  points.query is the universal endpoint supporting vecto...

Step 14: Prepare the prompt for LLM integration

async def adaptive_rag_answer(query: str) -> dict:
    router     = AdaptiveRAGRouter()
    classified = classify_query(query)

    if classified.query_type == QueryType.NO_RETRIEVAL:
        return {
            "query":      query,
            "strategy":   "no_retrieval",
            "confidence": "high",
            "answer":     f"[Direct response — no retrieval needed for: '{query}']",
            "sources":    [],
        }

    result = await router.retrieve(query)

    context_chunks, sources = [], []
    for r in result.results:
        p = r.payload or {}
        context_chunks.append(p.get("text", ""))
        sources.append({"id": r.id, "score": r.score,
                        "doc_type": p.get("doc_type"), "section": p.get("section")})

    context = "\n\n".join(context_chunks)

    if result.confidence == "high":
        instruction = "Answer based on the provided context."
    elif result.confidence == "medium":
        instruction = "Answer based on available information. Note that context may be incomplete."
    else:
        instruction = ("Limited context was found. Provide a best-effort answer "
                       "and suggest the user consult the full documentation.")

    prompt = (f"Context:\n{context}\n\n"
              f"Instruction: {instruction}\n\n"
              f"Question: {query}\n\nAnswer:")

    # answer = await llm.generate(prompt)   # plug in your LLM here

    return {"query": query, "strategy": result.strategy,
            "confidence": result.confidence, "sources": sources,
            "prompt_preview": prompt[:200] + "..."}

for q in [
    "What parameters does the search method accept?",
    "How do I build a hybrid search pipeline?",
    "My search is returning empty results",
    "Thanks!",
]:
    response = asyncio.run(adaptive_rag_answer(q))
    print(f"\nQuery: {q}")
    print(f"  Strategy:   {response['strategy']}")
    print(f"  Confidence: {response['confidence']}")
    print(f"  Sources:    {len(response.get('sources', []))} documents")

Expected output

Query: What parameters does the search method accept?
  Strategy:   precise
  Confidence: high
  Sources:    3 documents

Query: How do I build a hybrid search pipeline?
  Strategy:   broad
  Confidence: high
  Sources:    5 documents

Query: My search is returning empty results
  Strategy:   troubleshooting
  Confidence: medium
  Sources:    5 documents

Query: Thanks!
  Strategy:   no_retrieval
  Confidence: high
  Sources:    0 documents

Step 15: Collection cleanup

async def cleanup():
    async with AsyncVectorAIClient(url=SERVER) as client:
        count = await client.vde.get_vector_count(COLLECTION)
        await client.vde.flush(COLLECTION)
        print(f"Collection '{COLLECTION}' contains {count} documents.")
        print("Flushed to disk.")
        # Uncomment to delete:
        # await client.collections.delete(COLLECTION)

asyncio.run(cleanup())

Adaptive strategies summary

Query typeStrategySearch configPrefetchFusionThreshold
FactualPrecisehnsw_ef=256NoneNone0.5
ExploratoryBroad multistagehnsw_ef=1284 streams (per doc_type + unfiltered)Fusion.RRFNone
TroubleshootingNested prefetchDefault2 inner (FAQ + changelog) → DBSF → rerankFusion.DBSF innerNone
Low confidenceFallbackhnsw_ef=256None (unfiltered) + Sample.RandomClient-side RRFNone
Feedback-awareBoosted fusionDefault2 streams (all + useful)Fusion.RRFusefulness >= 0.4

APIs and features used in this tutorial

FeatureAPIPurpose
Collection creationcollections.get_or_create(hnsw_config=...)Knowledge base setup
Semantic searchpoints.search(params=SearchParams(hnsw_ef=256))Precise factual retrieval
Score thresholdpoints.search(score_threshold=0.5)Cut low-confidence results
Multi-stage prefetchPrefetchQuery(query=..., filter=..., limit=...)Per-doc-type retrieval streams
Nested prefetchPrefetchQuery(prefetch=[...])Three-stage troubleshooting pipeline
Server-side RRFquery={"fusion": Fusion.RRF}Broad exploratory fusion
Server-side DBSFquery={"fusion": Fusion.DBSF}Troubleshooting score-normalized fusion
Random samplingquery={"sample": Sample.Random}Fallback discovery
Client-side RRFreciprocal_rank_fusion(results, limit=...)Fallback merge
Payload updatespoints.set_payload(payload=...)Feedback tracking, retrieval counters
Payload orderingquery(query={"order_by": OrderBy(...)})Most-retrieved, most-useful analytics
Selective payloadWithPayloadSelector(include=[...])Return only needed fields
any_of filterField("doc_type").any_of([...])Multi-value doc type matching
gte / lt filtersField("usefulness_score").gte(0.4)Feedback-based boosting
Vector countvde.get_vector_count()Collection statistics
Flushvde.flush()Persist pending writes

Next steps