Automated Q&A Generation from PDF files

Technical Approach Paper for Knowledge Base Creation

Author

Edouard Legoupil, Chief Data Officer, IOM

Published

9 May 2025

Why Extract Q&A from PDFs?

PDF documents contain valuable knowledge—research papers, manuals, reports, and more—but their static format makes information retrieval inefficient.

Extracting information from PDFs using traditional search or summarization methods can often miss deeper insights.

By generating Question-Answer (Q&A) pairs, we explicitly transform passive content into active knowledge.

By transforming PDF content into a question-answering (Q&A) knowledge base, you enable:

  1. Instant, Precise Information Retrieval: Instead of manually searching through pages, users can ask natural language questions and get direct answers with source references. This is ideal for technical documentation, legal contracts, or academic papers where quick lookup is critical.

  2. Scalable Knowledge Management: Automatically process hundreds of PDFs into a structured, queryable format. It helps maintaining versioned knowledge bases (e.g., updated policy documents or research findings).

  3. AI-Augmented Understanding: LLMs (like Azure OpenAI) summarize, connect concepts, and clarify dense text. It helps users grasp key points without reading entire documents.

  4. Integration with Chatbots & Assistants: Deploy the Q&A system in help desks, internal wikis, or customer support via APIs. Example: Quick lookup of recommandations from past evaluation report.

  5. Future-Proofing Knowledge: Export structured Q&A pairs to platforms like Hugging Face for community use or fine-tuning smaller models.

Introduction

The notebook present an approach based on:

  1. Automatic Question Generation: Uses LLM to create relevant questions from document content

  2. Context-Aware Q&A: Maintains document context for each generated question

  3. Two-Stage LLM Processing: A Creative mode (higher temp) for question generation and an Accurate mode (lower temp) for answer generation

  4. Comprehensive Metadata: Tracks sources, pages, and context for all Q&A pairs

  5. CrewAI and HuggingFace-Ready Export: Structured JSON output with dataset statistics


Environment Set up

The body of this document targets a technical audience. Below are all the codes so that the whole process can be reproduced and audited. This assume to use the following code within Visual Studio Code.

First we need to use a virtual environment in Python development. This is essential for managing dependencies, avoiding conflicts, and ensuring reproducibility. It allows you to isolate project-specific libraries and versions, preventing interference with other projects or the global Python installation. This isolation helps maintain a clean development environment, simplifies project setup for collaborators, and enhances security by reducing the risk of introducing vulnerabilities. Overall, virtual environments provide a consistent and organized way to manage your Python projects effectively.

Make sure to install the last stable version of python language and create a dedicated python environment to have a fresh install where to manage correctly all the dependencies between packages. To specify a particular version of Python when creating a virtual environment, you can use the full path to the desired Python executable. Here is how you can do it:

Open your terminal (Command Prompt, PowerShell, or any terminal emulator).

Navigate to your project directory where you want to create the virtual environment.

Run the following command to create a virtual environment,here called .venv:

#| eval: false 
python -m venv .venv

Then, activate the virtual environment:

#| eval: false
.\.venv\Scripts\activate

Then, configure visual Studio Code to use the virtual environment: Open the Command Palette using the shortcut Ctrl+Shift+P and type Jupyter: Select Interpreter and select the interpreter that corresponds to your newly created virtual environment: ('venv': venv).

Once this environment selected as a kernel to run the notebook, we can install the required python modules the rest of the process:

%pip install pypdf langchain langchain-community openai lancedb python-dotenv tqdm datasets pdfplumber ipywidgets langchain_openai googlesearch-python tantivy pylance

then Restart the jupyter kernel for this notebook

%reset -f

Search for PDF URL

import json
from datetime import datetime
from googlesearch import search
import requests
from bs4 import BeautifulSoup

def find_official_pdfs(query, num_results=20):
    """Search for official PDF documents related to migration regulations"""
    pdf_urls = []
    search_query = f"{query} filetype:pdf site:.gov OR site:.org OR site:.int"
    
    try:
        print(f"Searching for: {search_query}")
        for url in search(search_query, num_results=num_results, advanced=True):
            if url.url.lower().endswith('.pdf'):
                # Verify it's an official source
                if any(domain in url.url for domain in ['.gov', '.org', '.int', 'unhcr', 'iom']):
                    print(f"Found PDF: {url.url}")
                    pdf_urls.append(url.url)
                    
    except Exception as e:
        print(f"Search error: {e}")
    
    return pdf_urls

def generate_json_output(urls):
    """Generate the JSON structure"""
    return {
        "name": "Official Migration Regulations",
        "description": "Collection of official government PDF documents detailing migration pathways and regulations",
        "urls": urls,
        "last_updated": datetime.now().isoformat(),
        "sources": [
            "Government websites",
            "International organizations",
            "UN agencies"
        ]
    }

Let’s test this!

queries = [
    "official migration pathways PDF",
    "government immigration regulations filetype:pdf",
    "legal migration routes document",
    "national visa policy PDF",
    "resettlement programs official document"
]

all_urls = []
for query in queries:
    all_urls.extend(find_official_pdfs(query))

# Remove duplicates
unique_urls = list(set(all_urls))

# Generate JSON
output_data = generate_json_output(unique_urls)

with open("migration_regulations.json", "w") as f:
    json.dump(output_data, f, indent=2)

print(f"Generated JSON file with {len(unique_urls)} PDF URLs")

Load PDF from URL

import os
import json
import requests
from urllib.parse import urlparse

def download_pdfs_from_json(json_path, save_folder):
    # Create folder if it doesn't exist
    os.makedirs(save_folder, exist_ok=True)

    # Load JSON data
    with open(json_path, 'r') as file:
        data = json.load(file)

    # Get the list of URLs
    urls = data.get("urls", [])
    
    if not urls:
        print("No URLs found in the JSON.")
        return

    # Download each PDF
    for url in urls:
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status()

            # Extract filename from URL
            parsed_url = urlparse(url)
            filename = os.path.basename(parsed_url.path)

            # Full path to save the file
            save_path = os.path.join(save_folder, filename)

            with open(save_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)

            print(f"Downloaded: {save_path}")
        except requests.exceptions.RequestException as e:
            print(f"Failed to download from {url}: {e}")
## test
pdf_urls_file = "pdf_urls.json"  # JSON file containing PDF URLs
pdf_folder = "pdf_documents"
# Load PDF URLs
# Example usage:
download_pdfs_from_json(pdf_urls_file, pdf_folder)

Initialize LLM Components

Here we use Azure OpenAI - but one could switch easily to Ollama to run this offline on sensitive documents.

import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()

# Azure OpenAI settings
azure_openai_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
deployment_name = os.getenv("AZURE_DEPLOYMENT_NAME")
api_version = os.getenv("AZURE_OPENAI_API_VERSION")
embedding_deployment = os.getenv("EMBEDDING_MODEL")
api_version_embed = os.getenv("AZURE_OPENAI_API_VERSION_EMBED")
from langchain_openai import AzureChatOpenAI 
# Initialize LLM with higher temperature for creative question generation
llm_creative = AzureChatOpenAI(
    deployment_name=deployment_name,
    api_key=azure_openai_key,
    azure_endpoint=azure_openai_endpoint,
    api_version=api_version,
    temperature=0.7,
    max_tokens=500
)

llm_accurate = AzureChatOpenAI(
    deployment_name=deployment_name,
    api_key=azure_openai_key,
    azure_endpoint=azure_openai_endpoint,
    api_version=api_version,
    temperature=0.1,
    max_tokens=1000
)

# Initialize embeddings
from langchain_openai import AzureOpenAIEmbeddings
embeddings = AzureOpenAIEmbeddings(
    deployment=embedding_deployment,
    api_key=azure_openai_key,
    azure_endpoint=azure_openai_endpoint,
    api_version=api_version_embed,
    chunk_size=1
)

Testing API…

# First, verify your Azure OpenAI resources
print("Verifying Azure OpenAI resources...")
print(f"Endpoint: {azure_openai_endpoint}")
print(f"Deployment Name: {deployment_name}")
from langchain_openai import AzureChatOpenAI  # Changed from AzureOpenAI
from langchain_core.messages import HumanMessage
# Initialize with enhanced error handling
try:
    # Initialize LLMs with validation
    llm_creative = AzureChatOpenAI(
        deployment_name=deployment_name,
        api_key=azure_openai_key,
        azure_endpoint=azure_openai_endpoint,
        api_version=api_version,
        temperature=0.7
    )
    
    # Test the LLM connection
    test_response = llm_creative.invoke([HumanMessage(content="Hello")])
    print("LLM test successful! Response type:", type(test_response))
    print("Response content:", test_response.content)

except Exception as e:
    print(f"Failed to initialize Azure OpenAI LLM: {str(e)}")
    print("Please verify:")
    print("1. Your deployment exists in Azure OpenAI Studio")
    print("2. The deployment name matches exactly")
    print("3. The model is assigned to the deployment")
    print("4. Your API key has permissions")
    raise

and embedding…

print(f"Embedding Deployment: {embedding_deployment}")
try:
    # Initialize embeddings with validation
    embeddings = AzureOpenAIEmbeddings(
        deployment=embedding_deployment,
        api_key=azure_openai_key,
        azure_endpoint=azure_openai_endpoint,
        api_version=api_version,
        chunk_size=1
    )
    
    # Test embeddings
    test_embedding = embeddings.embed_query("Test embedding")
    print("Embeddings connection test successful!")
    print(f"Embedding vector length: {len(test_embedding)}")

except Exception as e:
    print(f"Failed to initialize Azure OpenAI Embeddings: {str(e)}")
    print("Please verify your embedding deployment exists")
    raise

Automatic Question Generation

import os
import json
import time
import logging
import pdfplumber
import warnings
from tqdm import tqdm
from typing import List, Dict, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Constants
MIN_SECTION_LENGTH = 200
MAX_ERRORS = 5
RATE_LIMIT_DELAY = 0.5
MAX_WORKERS = 4  # Adjust based on your system capabilities

def process_pdf_page(page, filename: str, page_num: int, question_chain, min_section_length: int) -> Tuple[List[Dict], List[Dict]]:
    """Process a single PDF page and generate questions."""
    documents = []
    questions = []

    try:
        text = page.extract_text()
        if not text or len(text.strip()) < min_section_length:
            return documents, questions

        time.sleep(RATE_LIMIT_DELAY)

        # Invoke LLM
        response = question_chain.invoke({"text": text})

        # Debug: Print raw response
        logger.debug(f"Raw LLM response for {filename} page {page_num}: {response}")
        # Debug: Print raw response
        print(f"Raw LLM response: {response}")  # Add this line to see what's being returned
      
        questions_list = []

        # Case 1: If response is a dictionary and contains a 'text' key with embedded JSON
        if isinstance(response, dict) and "text" in response:
            try:
                embedded_json = response["text"]
                response_dict = json.loads(embedded_json)
                questions_list = response_dict.get("questions", [])
            except Exception as e:
                logger.warning(f"Failed to parse embedded JSON in response for {filename} page {page_num}: {e}")
                logger.debug(f"Raw embedded JSON: {response['text']}")
                questions_list = []

        # Case 2: If response has a `.text` attribute (LangChain format)
        elif hasattr(response, 'text'):
            response_text = response.text.strip()
            try:
                response_dict = json.loads(response_text)
                questions_list = response_dict.get("questions", [])
            except json.JSONDecodeError:
                try:
                    if '```json' in response_text:
                        json_str = response_text.split('```json')[1].split('```')[0]
                    else:
                        json_str = response_text
                    response_dict = json.loads(json_str)
                    questions_list = response_dict.get("questions", [])
                except Exception as inner_e:
                    logger.warning(f"Failed to parse fallback JSON from LLM response on {filename} page {page_num}: {inner_e}")
                    logger.debug(f"Raw fallback: {response_text}")
                    questions_list = []

        # Sanity check
        if not isinstance(questions_list, list):
            questions_list = []

        # Build question entries
        for q in questions_list:
            if isinstance(q, str) and q.strip():
                questions.append({
                    "question": q.strip(),
                    "source": filename,
                    "page": page_num,
                    "context": text[:500] + "..."
                })

        if questions:
            documents.append({
                "text": text,
                "source": filename,
                "page": page_num
            })
        else:
            logger.warning(f"No valid questions extracted for {filename} page {page_num}")

    except Exception as e:
        logger.error(f"Error processing {filename} page {page_num}: {str(e)}")

    return documents, questions

def process_pdf_file(pdf_path: str, question_chain, min_section_length: int) -> Tuple[List[Dict], List[Dict]]:
    """Process a single PDF file with parallel page processing."""
    all_documents = []
    all_questions = []

    try:
        with pdfplumber.open(pdf_path) as pdf:
            filename = os.path.basename(pdf_path)
            total_pages = len(pdf.pages)

            with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, total_pages)) as executor:
                futures = []
                for page_num, page in enumerate(pdf.pages, start=1):
                    futures.append(
                        executor.submit(
                            process_pdf_page,
                            page=page,
                            filename=filename,
                            page_num=page_num,
                            question_chain=question_chain,
                            min_section_length=min_section_length
                        )
                    )

                for future in as_completed(futures):
                    try:
                        docs, qs = future.result()
                        all_documents.extend(docs)
                        all_questions.extend(qs)
                    except Exception as e:
                        logger.error(f"Error processing future: {str(e)}")
                        continue

    except Exception as e:
        logger.error(f"Failed to process PDF {pdf_path}: {str(e)}")

    return all_documents, all_questions

def process_and_generate_questions(
    pdf_folder: str,
    question_chain,
    min_section_length: int = MIN_SECTION_LENGTH,
    suppress_warnings: bool = True,
    max_workers: int = MAX_WORKERS
) -> Tuple[List[Dict], List[Dict]]:
    """
    Process all PDFs in a folder and generate questions using an LLM chain.
    """
    if suppress_warnings:
        warnings.filterwarnings("ignore", category=UserWarning, module="pdfplumber")

    pdf_files = [
        os.path.join(pdf_folder, f)
        for f in os.listdir(pdf_folder)
        if f.lower().endswith('.pdf')
    ]

    all_documents = []
    all_questions = []
    error_count = 0

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                process_pdf_file,
                pdf_path=pdf_path,
                question_chain=question_chain,
                min_section_length=min_section_length
            ): pdf_path for pdf_path in pdf_files
        }

        with tqdm(total=len(pdf_files), desc="Processing PDFs") as pbar:
            for future in as_completed(futures):
                pdf_path = futures[future]
                try:
                    docs, qs = future.result()
                    all_documents.extend(docs)
                    all_questions.extend(qs)

                    if not qs:
                        logger.warning(f"No questions generated for {os.path.basename(pdf_path)}")
                    else:
                        error_count = 0  # Reset on success

                except Exception as e:
                    logger.error(f"Error processing {pdf_path}: {str(e)}")
                    error_count += 1
                    if error_count >= MAX_ERRORS:
                        logger.error(f"Stopping after {MAX_ERRORS} consecutive errors")
                        executor.shutdown(wait=False)
                        break

                pbar.update(1)

    return all_documents, all_questions

Now the prompt to generate the questions!

from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
# Define prompt for question generation
question_prompt = PromptTemplate(
    input_variables=["text"],
    template="""
    Analyze the following text and generate 3-5 relevant questions that this text could answer.

    **Important**: 
        - The questions should cover different aspects of the content and be of varying complexity.
        - The questions should reflect the document end-user perspective or interest, and avoid jargon
        - Focus on key concepts, findings, methodologies, and important details.
        - The questions should remain generic in the formulation and do not refer to specific elements within the content (like according to a specific chapter or reference number)


    Return ONLY this JSON format:
    {{
        "questions": [
            "question1",
            "question2", 
            "question3"
        ]
    }}

    Text: {text}
    
    """
)

# Initialize question generation chain
question_chain = LLMChain(llm=llm_creative, prompt=question_prompt)

Now generating!

pdf_folder = "pdf_documents/"
min_section_length = 500

# Process PDFs and generate questions
documents, generated_questions = process_and_generate_questions(pdf_folder, question_chain, min_section_length) 
print(f" {len(generated_questions)} questions were generated !!")
import json

output_path = "questions_UN_staff.json"
with open(output_path, "w", encoding="utf-8") as f:
    json.dump(generated_questions, f, indent=2, ensure_ascii=False)

Create Knowledge Base

import os
import shutil
import time
import atexit
from concurrent.futures import ThreadPoolExecutor
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import LanceDB
import lancedb

def process_document(doc, text_splitter):
    """Helper function to process a single document in parallel"""
    chunks = text_splitter.split_text(doc["text"])
    metadatas = [{"source": doc["source"], "page": doc["page"]} for _ in chunks]
    return chunks, metadatas

def force_delete_directory(path, max_retries=3, delay=1):
    """Robust directory deletion with retries and delay"""
    for attempt in range(max_retries):
        try:
            if os.path.exists(path):
                shutil.rmtree(path)
                return True
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"Failed to delete {path} after {max_retries} attempts: {e}")
                return False
            time.sleep(delay)
    return False

def cleanup_lancedb_directory(db_path, table_name):
    """Safely remove existing LanceDB table directory"""
    table_path = os.path.join(db_path, f"{table_name}.lance")
    
    # First try normal deletion
    if force_delete_directory(table_path):
        return
    
    # If normal deletion fails, try renaming
    temp_path = f"{table_path}.old_{int(time.time())}"
    try:
        if os.path.exists(table_path):
            os.rename(table_path, temp_path)
            print(f"Renamed {table_path} to {temp_path} for deferred cleanup")
    except Exception as e:
        print(f"Could not rename {table_path}: {e}")

def create_knowledge_base(documents, chunk_size, chunk_overlap, lancedb_path, embeddings, max_workers=4):
    """Create vector store with comprehensive error handling"""
    # Initialize text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
        length_function=len,
        keep_separator=True
    )
    
    # Parallel processing of documents
    texts = []
    metadatas = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for doc in documents:
            futures.append(executor.submit(process_document, doc, text_splitter))
        
        for future in futures:
            chunks, doc_metadatas = future.result()
            texts.extend(chunks)
            metadatas.extend(doc_metadatas)
    
    # Handle LanceDB connection with robust cleanup
    table_name = "pdf_qa_vectors"
    cleanup_lancedb_directory(lancedb_path, table_name)
    
    # Add delay to ensure filesystem operations complete
    time.sleep(1)
    
    try:
        db = lancedb.connect(lancedb_path)
    except Exception as e:
        raise RuntimeError(f"Failed to connect to LanceDB at {lancedb_path}: {e}")
    
    # Create vector store with retry logic
    max_retries = 3
    for attempt in range(max_retries):
        try:
            vector_store = LanceDB.from_texts(
                texts=texts,
                embedding=embeddings,
                metadatas=metadatas,
                connection=db,
                table_name=table_name,
                vector_key="vector",
                text_key="text",
                id_key="id"
            )
            break
        except Exception as e:
            if attempt == max_retries - 1:
                raise RuntimeError(f"Failed to create LanceDB table after {max_retries} attempts: {e}")
            time.sleep(2)
            cleanup_lancedb_directory(lancedb_path, table_name)
    
    # Get the underlying table
    try:
        table = db.open_table(table_name)
    except Exception as e:
        raise RuntimeError(f"Failed to open table {table_name}: {e}")
    
    # Create vector index with retry logic
    for attempt in range(max_retries):
        try:
            table.create_index(
                metric="cosine",
                num_partitions=256,
                num_sub_vectors=96,
                replace=True
            )
            break
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"Warning: Could not create vector index: {e}")
            time.sleep(1)
    
    # Skip FTS index creation to avoid Windows file locking issues
    print("Skipping FTS index creation due to known Windows file locking issues")
    
    return vector_store

# Register cleanup function for program exit
@atexit.register
def cleanup_temp_dirs():
    """Clean up any leftover .old directories"""
    now = time.time()
    if os.path.exists(lancedb_path):
        for dirname in os.listdir(lancedb_path):
            if dirname.startswith('pdf_qa_vectors.lance.old_'):
                dirpath = os.path.join(lancedb_path, dirname)
                try:
                    # Delete directories older than 1 hour
                    if os.path.getmtime(dirpath) < now - 3600:
                        shutil.rmtree(dirpath, ignore_errors=True)
                except:
                    pass
lancedb_path = "./lancedb_data_qa"
embeddings= embeddings
os.makedirs(lancedb_path, exist_ok=True)
# Create knowledge base
chunk_size = 300
chunk_overlap = 200
vector_store = create_knowledge_base(documents, chunk_size, chunk_overlap, lancedb_path, embeddings, max_workers=4)
#print(vector_store.indexes())
print(dir(vector_store))
# Check if 'vector' or other similar methods exist
print(dir(search))  # This will list methods available on `search`
# Search for a query
query = "How are staff recruited?"

# Perform search
result = vector_store.similarity_search(query, k=5)  # k is the number of results to return

# Print the results
print(f"Top matching documents for query '{query}':")
for i, res in enumerate(result):
    print(f"{i+1}. Source: {res['metadata']['source']}, Page: {res['metadata']['page']}, 
    # Check for text content under possible attribute names
        text = getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')
# Verify if the index is built correctly
try:
    index_status = vector_store.check_index_status()
    print(f"Index status: {index_status}")
except Exception as e:
    print(f"Error checking index status: {e}")
result = vector_store.similarity_search(query, k=5)

# Ensure the result is not empty
if result:
    print(f"Top matching documents for query '{query}':")
    for i, res in enumerate(result):
        # Access metadata directly
        source = res.metadata.get('source', 'Unknown')  # Metadata access
        page = res.metadata.get('page', 'Unknown')  # Metadata access
        
        # Check for text content under possible attribute names
        text = getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')
        
        print(f"{i+1}. Source: {source}, Page: {page}, Text: {text}")
else:
    print(f"No results found for query: {query}")
def test_vector_store(vector_store, query, k=5):
    """Test the vector store by performing a similarity search and printing results."""
    try:
        result = vector_store.similarity_search(query, k=k)

        if result:
            print(f"Top matching documents for query '{query}':")
            for i, res in enumerate(result):
                # Access metadata
                source = res.metadata.get('source', 'Unknown')
                page = res.metadata.get('page', 'Unknown')

                # Try to access the text from the most likely fields
                text = getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')

                print(f"{i+1}. Source: {source}, Page: {page}\nText: {text}\n")
        else:
            print(f"No results found for query: {query}")

    except Exception as e:
        print(f"Error during vector store query: {e}")
test_vector_store(vector_store, query)

Set up Information Retriver

from langchain.chains import RetrievalQA
from langchain_core.vectorstores import VectorStoreRetriever

def initialize_qa_system(vector_store, llm, k=5):
    """Wrap the LanceDB vector store into a retriever and initialize RetrievalQA."""
    retriever = VectorStoreRetriever(
        vectorstore=vector_store,
        search_kwargs={"k": k}
    )

    qa = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",  # or "map_reduce" / "refine" depending on needs
        retriever=retriever,
        return_source_documents=True
    )
    
    return qa
qa_system = initialize_qa_system(vector_store, llm_accurate)
def test_hybrid_retriever(retriever, query, top_k=5):
    try:
        print(f"\nRunning hybrid search for query: '{query}'\n{'-'*60}")
        docs = retriever.get_relevant_documents(query)

        if not docs:
            print("⚠️ No relevant documents found.")
            return

        for i, doc in enumerate(docs):
            source = doc.metadata.get("source", "Unknown")
            page = doc.metadata.get("page", "Unknown")
            print(f"{i+1}. Source: {source}, Page: {page}\nText: {doc.page_content[:300]}...\n")

    except Exception as e:
        print(f"❌ Error during hybrid retrieval test: {e}")
test_hybrid_retriever(initialize_qa_system, query)

Run all questions

from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def process_single_question(qa_system, item):
    """Process a single question and return the QA pair"""
    try:
        result = qa_system({"query": item["question"]})
        return {
            "question": item["question"],
            "answer": result["result"],
            "context": item["context"],
            "source": item["source"],
            "page": item["page"],
            "source_documents": [{
                "source": doc.metadata["source"],
                "page": doc.metadata["page"],
                "content": doc.page_content
            } for doc in result["source_documents"]]
        }
    except Exception as e:
        print(f"Error answering question: {item['question']} - {str(e)}")
        return None

def generate_answers(qa_system, generated_questions, max_workers=4):
    """Generate answers in parallel for all auto-generated questions"""
    qa_pairs = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all questions to the executor
        futures = {
            executor.submit(process_single_question, qa_system, item): item
            for item in generated_questions
        }
        
        # Process results as they complete with progress bar
        for future in tqdm(as_completed(futures), total=len(generated_questions), desc="Generating Answers"):
            result = future.result()
            if result:
                qa_pairs.append(result)
    
    return qa_pairs
# Generate answers
qa_pairs = generate_answers(qa_system, generated_questions)

Export

from datasets import Dataset, DatasetDict

# Prepare dataset
hf_dataset = Dataset.from_list([
    {"question": qa["question"], "answer": qa["answer"], "source": qa["source"]} for qa in qa_pairs
])

As json to be used as a crewai knowledge base

import json

output_path = "crewai_qa_knowledge_base.json"
with open(output_path, "w", encoding="utf-8") as f:
    json.dump(qa_pairs, f, indent=2, ensure_ascii=False)

Hugging Face for data labelingg

# Push to Hub
from huggingface_hub import login
# Login first
login(token=os.getenv("HF_TOKEN"))   
hf_dataset.push_to_hub("edouardlgp/qa-un-staff-rules")