%pip install pypdf langchain langchain-community openai lancedb python-dotenv tqdm datasets pdfplumber ipywidgets langchain_openai googlesearch-python tantivy pylanceAutomated Q&A Generation from PDF files
Technical Approach Paper for Knowledge Base Creation
Why Extract Q&A from PDFs?
PDF documents contain valuable knowledge—research papers, manuals, reports, and more—but their static format makes information retrieval inefficient.
Extracting information from PDFs using traditional search or summarization methods can often miss deeper insights.
By generating Question-Answer (Q&A) pairs, we explicitly transform passive content into active knowledge.
By transforming PDF content into a question-answering (Q&A) knowledge base, you enable:
Instant, Precise Information Retrieval: Instead of manually searching through pages, users can ask natural language questions and get direct answers with source references. This is ideal for technical documentation, legal contracts, or academic papers where quick lookup is critical.
Scalable Knowledge Management: Automatically process hundreds of PDFs into a structured, queryable format. It helps maintaining versioned knowledge bases (e.g., updated policy documents or research findings).
AI-Augmented Understanding: LLMs (like Azure OpenAI) summarize, connect concepts, and clarify dense text. It helps users grasp key points without reading entire documents.
Integration with Chatbots & Assistants: Deploy the Q&A system in help desks, internal wikis, or customer support via APIs. Example: Quick lookup of recommandations from past evaluation report.
Future-Proofing Knowledge: Export structured Q&A pairs to platforms like Hugging Face for community use or fine-tuning smaller models.
Introduction
The notebook present an approach based on:
Automatic Question Generation: Uses LLM to create relevant questions from document content
Context-Aware Q&A: Maintains document context for each generated question
Two-Stage LLM Processing: A Creative mode (higher temp) for question generation and an Accurate mode (lower temp) for answer generation
Comprehensive Metadata: Tracks sources, pages, and context for all Q&A pairs
CrewAI and HuggingFace-Ready Export: Structured JSON output with dataset statistics
Environment Set up
The body of this document targets a technical audience. Below are all the codes so that the whole process can be reproduced and audited. This assume to use the following code within Visual Studio Code.
First we need to use a virtual environment in Python development. This is essential for managing dependencies, avoiding conflicts, and ensuring reproducibility. It allows you to isolate project-specific libraries and versions, preventing interference with other projects or the global Python installation. This isolation helps maintain a clean development environment, simplifies project setup for collaborators, and enhances security by reducing the risk of introducing vulnerabilities. Overall, virtual environments provide a consistent and organized way to manage your Python projects effectively.
Make sure to install the last stable version of python language and create a dedicated python environment to have a fresh install where to manage correctly all the dependencies between packages. To specify a particular version of Python when creating a virtual environment, you can use the full path to the desired Python executable. Here is how you can do it:
Open your terminal (Command Prompt, PowerShell, or any terminal emulator).
Navigate to your project directory where you want to create the virtual environment.
Run the following command to create a virtual environment,here called .venv:
#| eval: false
python -m venv .venv
Then, activate the virtual environment:
#| eval: false
.\.venv\Scripts\activate
Then, configure visual Studio Code to use the virtual environment: Open the Command Palette using the shortcut Ctrl+Shift+P and type Jupyter: Select Interpreter and select the interpreter that corresponds to your newly created virtual environment: ('venv': venv).
Once this environment selected as a kernel to run the notebook, we can install the required python modules the rest of the process:
then Restart the jupyter kernel for this notebook
%reset -fSearch for PDF URL
import json
from datetime import datetime
from googlesearch import search
import requests
from bs4 import BeautifulSoup
def find_official_pdfs(query, num_results=20):
"""Search for official PDF documents related to migration regulations"""
pdf_urls = []
search_query = f"{query} filetype:pdf site:.gov OR site:.org OR site:.int"
try:
print(f"Searching for: {search_query}")
for url in search(search_query, num_results=num_results, advanced=True):
if url.url.lower().endswith('.pdf'):
# Verify it's an official source
if any(domain in url.url for domain in ['.gov', '.org', '.int', 'unhcr', 'iom']):
print(f"Found PDF: {url.url}")
pdf_urls.append(url.url)
except Exception as e:
print(f"Search error: {e}")
return pdf_urls
def generate_json_output(urls):
"""Generate the JSON structure"""
return {
"name": "Official Migration Regulations",
"description": "Collection of official government PDF documents detailing migration pathways and regulations",
"urls": urls,
"last_updated": datetime.now().isoformat(),
"sources": [
"Government websites",
"International organizations",
"UN agencies"
]
}Let’s test this!
queries = [
"official migration pathways PDF",
"government immigration regulations filetype:pdf",
"legal migration routes document",
"national visa policy PDF",
"resettlement programs official document"
]
all_urls = []
for query in queries:
all_urls.extend(find_official_pdfs(query))
# Remove duplicates
unique_urls = list(set(all_urls))
# Generate JSON
output_data = generate_json_output(unique_urls)
with open("migration_regulations.json", "w") as f:
json.dump(output_data, f, indent=2)
print(f"Generated JSON file with {len(unique_urls)} PDF URLs")Load PDF from URL
import os
import json
import requests
from urllib.parse import urlparse
def download_pdfs_from_json(json_path, save_folder):
# Create folder if it doesn't exist
os.makedirs(save_folder, exist_ok=True)
# Load JSON data
with open(json_path, 'r') as file:
data = json.load(file)
# Get the list of URLs
urls = data.get("urls", [])
if not urls:
print("No URLs found in the JSON.")
return
# Download each PDF
for url in urls:
try:
response = requests.get(url, stream=True)
response.raise_for_status()
# Extract filename from URL
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
# Full path to save the file
save_path = os.path.join(save_folder, filename)
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded: {save_path}")
except requests.exceptions.RequestException as e:
print(f"Failed to download from {url}: {e}")## test
pdf_urls_file = "pdf_urls.json" # JSON file containing PDF URLs
pdf_folder = "pdf_documents"
# Load PDF URLs
# Example usage:
download_pdfs_from_json(pdf_urls_file, pdf_folder)Initialize LLM Components
Here we use Azure OpenAI - but one could switch easily to Ollama to run this offline on sensitive documents.
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Azure OpenAI settings
azure_openai_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
deployment_name = os.getenv("AZURE_DEPLOYMENT_NAME")
api_version = os.getenv("AZURE_OPENAI_API_VERSION")
embedding_deployment = os.getenv("EMBEDDING_MODEL")
api_version_embed = os.getenv("AZURE_OPENAI_API_VERSION_EMBED")from langchain_openai import AzureChatOpenAI
# Initialize LLM with higher temperature for creative question generation
llm_creative = AzureChatOpenAI(
deployment_name=deployment_name,
api_key=azure_openai_key,
azure_endpoint=azure_openai_endpoint,
api_version=api_version,
temperature=0.7,
max_tokens=500
)
llm_accurate = AzureChatOpenAI(
deployment_name=deployment_name,
api_key=azure_openai_key,
azure_endpoint=azure_openai_endpoint,
api_version=api_version,
temperature=0.1,
max_tokens=1000
)
# Initialize embeddings
from langchain_openai import AzureOpenAIEmbeddings
embeddings = AzureOpenAIEmbeddings(
deployment=embedding_deployment,
api_key=azure_openai_key,
azure_endpoint=azure_openai_endpoint,
api_version=api_version_embed,
chunk_size=1
)Testing API…
# First, verify your Azure OpenAI resources
print("Verifying Azure OpenAI resources...")
print(f"Endpoint: {azure_openai_endpoint}")
print(f"Deployment Name: {deployment_name}")
from langchain_openai import AzureChatOpenAI # Changed from AzureOpenAI
from langchain_core.messages import HumanMessage
# Initialize with enhanced error handling
try:
# Initialize LLMs with validation
llm_creative = AzureChatOpenAI(
deployment_name=deployment_name,
api_key=azure_openai_key,
azure_endpoint=azure_openai_endpoint,
api_version=api_version,
temperature=0.7
)
# Test the LLM connection
test_response = llm_creative.invoke([HumanMessage(content="Hello")])
print("LLM test successful! Response type:", type(test_response))
print("Response content:", test_response.content)
except Exception as e:
print(f"Failed to initialize Azure OpenAI LLM: {str(e)}")
print("Please verify:")
print("1. Your deployment exists in Azure OpenAI Studio")
print("2. The deployment name matches exactly")
print("3. The model is assigned to the deployment")
print("4. Your API key has permissions")
raiseand embedding…
print(f"Embedding Deployment: {embedding_deployment}")
try:
# Initialize embeddings with validation
embeddings = AzureOpenAIEmbeddings(
deployment=embedding_deployment,
api_key=azure_openai_key,
azure_endpoint=azure_openai_endpoint,
api_version=api_version,
chunk_size=1
)
# Test embeddings
test_embedding = embeddings.embed_query("Test embedding")
print("Embeddings connection test successful!")
print(f"Embedding vector length: {len(test_embedding)}")
except Exception as e:
print(f"Failed to initialize Azure OpenAI Embeddings: {str(e)}")
print("Please verify your embedding deployment exists")
raiseAutomatic Question Generation
import os
import json
import time
import logging
import pdfplumber
import warnings
from tqdm import tqdm
from typing import List, Dict, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Constants
MIN_SECTION_LENGTH = 200
MAX_ERRORS = 5
RATE_LIMIT_DELAY = 0.5
MAX_WORKERS = 4 # Adjust based on your system capabilities
def process_pdf_page(page, filename: str, page_num: int, question_chain, min_section_length: int) -> Tuple[List[Dict], List[Dict]]:
"""Process a single PDF page and generate questions."""
documents = []
questions = []
try:
text = page.extract_text()
if not text or len(text.strip()) < min_section_length:
return documents, questions
time.sleep(RATE_LIMIT_DELAY)
# Invoke LLM
response = question_chain.invoke({"text": text})
# Debug: Print raw response
logger.debug(f"Raw LLM response for {filename} page {page_num}: {response}")
# Debug: Print raw response
print(f"Raw LLM response: {response}") # Add this line to see what's being returned
questions_list = []
# Case 1: If response is a dictionary and contains a 'text' key with embedded JSON
if isinstance(response, dict) and "text" in response:
try:
embedded_json = response["text"]
response_dict = json.loads(embedded_json)
questions_list = response_dict.get("questions", [])
except Exception as e:
logger.warning(f"Failed to parse embedded JSON in response for {filename} page {page_num}: {e}")
logger.debug(f"Raw embedded JSON: {response['text']}")
questions_list = []
# Case 2: If response has a `.text` attribute (LangChain format)
elif hasattr(response, 'text'):
response_text = response.text.strip()
try:
response_dict = json.loads(response_text)
questions_list = response_dict.get("questions", [])
except json.JSONDecodeError:
try:
if '```json' in response_text:
json_str = response_text.split('```json')[1].split('```')[0]
else:
json_str = response_text
response_dict = json.loads(json_str)
questions_list = response_dict.get("questions", [])
except Exception as inner_e:
logger.warning(f"Failed to parse fallback JSON from LLM response on {filename} page {page_num}: {inner_e}")
logger.debug(f"Raw fallback: {response_text}")
questions_list = []
# Sanity check
if not isinstance(questions_list, list):
questions_list = []
# Build question entries
for q in questions_list:
if isinstance(q, str) and q.strip():
questions.append({
"question": q.strip(),
"source": filename,
"page": page_num,
"context": text[:500] + "..."
})
if questions:
documents.append({
"text": text,
"source": filename,
"page": page_num
})
else:
logger.warning(f"No valid questions extracted for {filename} page {page_num}")
except Exception as e:
logger.error(f"Error processing {filename} page {page_num}: {str(e)}")
return documents, questions
def process_pdf_file(pdf_path: str, question_chain, min_section_length: int) -> Tuple[List[Dict], List[Dict]]:
"""Process a single PDF file with parallel page processing."""
all_documents = []
all_questions = []
try:
with pdfplumber.open(pdf_path) as pdf:
filename = os.path.basename(pdf_path)
total_pages = len(pdf.pages)
with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, total_pages)) as executor:
futures = []
for page_num, page in enumerate(pdf.pages, start=1):
futures.append(
executor.submit(
process_pdf_page,
page=page,
filename=filename,
page_num=page_num,
question_chain=question_chain,
min_section_length=min_section_length
)
)
for future in as_completed(futures):
try:
docs, qs = future.result()
all_documents.extend(docs)
all_questions.extend(qs)
except Exception as e:
logger.error(f"Error processing future: {str(e)}")
continue
except Exception as e:
logger.error(f"Failed to process PDF {pdf_path}: {str(e)}")
return all_documents, all_questions
def process_and_generate_questions(
pdf_folder: str,
question_chain,
min_section_length: int = MIN_SECTION_LENGTH,
suppress_warnings: bool = True,
max_workers: int = MAX_WORKERS
) -> Tuple[List[Dict], List[Dict]]:
"""
Process all PDFs in a folder and generate questions using an LLM chain.
"""
if suppress_warnings:
warnings.filterwarnings("ignore", category=UserWarning, module="pdfplumber")
pdf_files = [
os.path.join(pdf_folder, f)
for f in os.listdir(pdf_folder)
if f.lower().endswith('.pdf')
]
all_documents = []
all_questions = []
error_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
process_pdf_file,
pdf_path=pdf_path,
question_chain=question_chain,
min_section_length=min_section_length
): pdf_path for pdf_path in pdf_files
}
with tqdm(total=len(pdf_files), desc="Processing PDFs") as pbar:
for future in as_completed(futures):
pdf_path = futures[future]
try:
docs, qs = future.result()
all_documents.extend(docs)
all_questions.extend(qs)
if not qs:
logger.warning(f"No questions generated for {os.path.basename(pdf_path)}")
else:
error_count = 0 # Reset on success
except Exception as e:
logger.error(f"Error processing {pdf_path}: {str(e)}")
error_count += 1
if error_count >= MAX_ERRORS:
logger.error(f"Stopping after {MAX_ERRORS} consecutive errors")
executor.shutdown(wait=False)
break
pbar.update(1)
return all_documents, all_questionsNow the prompt to generate the questions!
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
# Define prompt for question generation
question_prompt = PromptTemplate(
input_variables=["text"],
template="""
Analyze the following text and generate 3-5 relevant questions that this text could answer.
**Important**:
- The questions should cover different aspects of the content and be of varying complexity.
- The questions should reflect the document end-user perspective or interest, and avoid jargon
- Focus on key concepts, findings, methodologies, and important details.
- The questions should remain generic in the formulation and do not refer to specific elements within the content (like according to a specific chapter or reference number)
Return ONLY this JSON format:
{{
"questions": [
"question1",
"question2",
"question3"
]
}}
Text: {text}
"""
)
# Initialize question generation chain
question_chain = LLMChain(llm=llm_creative, prompt=question_prompt)Now generating!
pdf_folder = "pdf_documents/"
min_section_length = 500
# Process PDFs and generate questions
documents, generated_questions = process_and_generate_questions(pdf_folder, question_chain, min_section_length)
print(f" {len(generated_questions)} questions were generated !!")import json
output_path = "questions_UN_staff.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(generated_questions, f, indent=2, ensure_ascii=False)Create Knowledge Base
import os
import shutil
import time
import atexit
from concurrent.futures import ThreadPoolExecutor
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import LanceDB
import lancedb
def process_document(doc, text_splitter):
"""Helper function to process a single document in parallel"""
chunks = text_splitter.split_text(doc["text"])
metadatas = [{"source": doc["source"], "page": doc["page"]} for _ in chunks]
return chunks, metadatas
def force_delete_directory(path, max_retries=3, delay=1):
"""Robust directory deletion with retries and delay"""
for attempt in range(max_retries):
try:
if os.path.exists(path):
shutil.rmtree(path)
return True
except Exception as e:
if attempt == max_retries - 1:
print(f"Failed to delete {path} after {max_retries} attempts: {e}")
return False
time.sleep(delay)
return False
def cleanup_lancedb_directory(db_path, table_name):
"""Safely remove existing LanceDB table directory"""
table_path = os.path.join(db_path, f"{table_name}.lance")
# First try normal deletion
if force_delete_directory(table_path):
return
# If normal deletion fails, try renaming
temp_path = f"{table_path}.old_{int(time.time())}"
try:
if os.path.exists(table_path):
os.rename(table_path, temp_path)
print(f"Renamed {table_path} to {temp_path} for deferred cleanup")
except Exception as e:
print(f"Could not rename {table_path}: {e}")
def create_knowledge_base(documents, chunk_size, chunk_overlap, lancedb_path, embeddings, max_workers=4):
"""Create vector store with comprehensive error handling"""
# Initialize text splitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
length_function=len,
keep_separator=True
)
# Parallel processing of documents
texts = []
metadatas = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for doc in documents:
futures.append(executor.submit(process_document, doc, text_splitter))
for future in futures:
chunks, doc_metadatas = future.result()
texts.extend(chunks)
metadatas.extend(doc_metadatas)
# Handle LanceDB connection with robust cleanup
table_name = "pdf_qa_vectors"
cleanup_lancedb_directory(lancedb_path, table_name)
# Add delay to ensure filesystem operations complete
time.sleep(1)
try:
db = lancedb.connect(lancedb_path)
except Exception as e:
raise RuntimeError(f"Failed to connect to LanceDB at {lancedb_path}: {e}")
# Create vector store with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
vector_store = LanceDB.from_texts(
texts=texts,
embedding=embeddings,
metadatas=metadatas,
connection=db,
table_name=table_name,
vector_key="vector",
text_key="text",
id_key="id"
)
break
except Exception as e:
if attempt == max_retries - 1:
raise RuntimeError(f"Failed to create LanceDB table after {max_retries} attempts: {e}")
time.sleep(2)
cleanup_lancedb_directory(lancedb_path, table_name)
# Get the underlying table
try:
table = db.open_table(table_name)
except Exception as e:
raise RuntimeError(f"Failed to open table {table_name}: {e}")
# Create vector index with retry logic
for attempt in range(max_retries):
try:
table.create_index(
metric="cosine",
num_partitions=256,
num_sub_vectors=96,
replace=True
)
break
except Exception as e:
if attempt == max_retries - 1:
print(f"Warning: Could not create vector index: {e}")
time.sleep(1)
# Skip FTS index creation to avoid Windows file locking issues
print("Skipping FTS index creation due to known Windows file locking issues")
return vector_store
# Register cleanup function for program exit
@atexit.register
def cleanup_temp_dirs():
"""Clean up any leftover .old directories"""
now = time.time()
if os.path.exists(lancedb_path):
for dirname in os.listdir(lancedb_path):
if dirname.startswith('pdf_qa_vectors.lance.old_'):
dirpath = os.path.join(lancedb_path, dirname)
try:
# Delete directories older than 1 hour
if os.path.getmtime(dirpath) < now - 3600:
shutil.rmtree(dirpath, ignore_errors=True)
except:
passlancedb_path = "./lancedb_data_qa"
embeddings= embeddings
os.makedirs(lancedb_path, exist_ok=True)
# Create knowledge base
chunk_size = 300
chunk_overlap = 200
vector_store = create_knowledge_base(documents, chunk_size, chunk_overlap, lancedb_path, embeddings, max_workers=4)#print(vector_store.indexes())
print(dir(vector_store))# Check if 'vector' or other similar methods exist
print(dir(search)) # This will list methods available on `search`# Search for a query
query = "How are staff recruited?"
# Perform search
result = vector_store.similarity_search(query, k=5) # k is the number of results to return
# Print the results
print(f"Top matching documents for query '{query}':")
for i, res in enumerate(result):
print(f"{i+1}. Source: {res['metadata']['source']}, Page: {res['metadata']['page']},
# Check for text content under possible attribute names
text = getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')# Verify if the index is built correctly
try:
index_status = vector_store.check_index_status()
print(f"Index status: {index_status}")
except Exception as e:
print(f"Error checking index status: {e}")result = vector_store.similarity_search(query, k=5)
# Ensure the result is not empty
if result:
print(f"Top matching documents for query '{query}':")
for i, res in enumerate(result):
# Access metadata directly
source = res.metadata.get('source', 'Unknown') # Metadata access
page = res.metadata.get('page', 'Unknown') # Metadata access
# Check for text content under possible attribute names
text = getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')
print(f"{i+1}. Source: {source}, Page: {page}, Text: {text}")
else:
print(f"No results found for query: {query}")def test_vector_store(vector_store, query, k=5):
"""Test the vector store by performing a similarity search and printing results."""
try:
result = vector_store.similarity_search(query, k=k)
if result:
print(f"Top matching documents for query '{query}':")
for i, res in enumerate(result):
# Access metadata
source = res.metadata.get('source', 'Unknown')
page = res.metadata.get('page', 'Unknown')
# Try to access the text from the most likely fields
text = getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')
print(f"{i+1}. Source: {source}, Page: {page}\nText: {text}\n")
else:
print(f"No results found for query: {query}")
except Exception as e:
print(f"Error during vector store query: {e}")test_vector_store(vector_store, query)Set up Information Retriver
from langchain.chains import RetrievalQA
from langchain_core.vectorstores import VectorStoreRetriever
def initialize_qa_system(vector_store, llm, k=5):
"""Wrap the LanceDB vector store into a retriever and initialize RetrievalQA."""
retriever = VectorStoreRetriever(
vectorstore=vector_store,
search_kwargs={"k": k}
)
qa = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # or "map_reduce" / "refine" depending on needs
retriever=retriever,
return_source_documents=True
)
return qaqa_system = initialize_qa_system(vector_store, llm_accurate)def test_hybrid_retriever(retriever, query, top_k=5):
try:
print(f"\nRunning hybrid search for query: '{query}'\n{'-'*60}")
docs = retriever.get_relevant_documents(query)
if not docs:
print("⚠️ No relevant documents found.")
return
for i, doc in enumerate(docs):
source = doc.metadata.get("source", "Unknown")
page = doc.metadata.get("page", "Unknown")
print(f"{i+1}. Source: {source}, Page: {page}\nText: {doc.page_content[:300]}...\n")
except Exception as e:
print(f"❌ Error during hybrid retrieval test: {e}")test_hybrid_retriever(initialize_qa_system, query)Run all questions
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def process_single_question(qa_system, item):
"""Process a single question and return the QA pair"""
try:
result = qa_system({"query": item["question"]})
return {
"question": item["question"],
"answer": result["result"],
"context": item["context"],
"source": item["source"],
"page": item["page"],
"source_documents": [{
"source": doc.metadata["source"],
"page": doc.metadata["page"],
"content": doc.page_content
} for doc in result["source_documents"]]
}
except Exception as e:
print(f"Error answering question: {item['question']} - {str(e)}")
return None
def generate_answers(qa_system, generated_questions, max_workers=4):
"""Generate answers in parallel for all auto-generated questions"""
qa_pairs = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all questions to the executor
futures = {
executor.submit(process_single_question, qa_system, item): item
for item in generated_questions
}
# Process results as they complete with progress bar
for future in tqdm(as_completed(futures), total=len(generated_questions), desc="Generating Answers"):
result = future.result()
if result:
qa_pairs.append(result)
return qa_pairs# Generate answers
qa_pairs = generate_answers(qa_system, generated_questions)Export
from datasets import Dataset, DatasetDict
# Prepare dataset
hf_dataset = Dataset.from_list([
{"question": qa["question"], "answer": qa["answer"], "source": qa["source"]} for qa in qa_pairs
])As json to be used as a crewai knowledge base
import json
output_path = "crewai_qa_knowledge_base.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(qa_pairs, f, indent=2, ensure_ascii=False)Hugging Face for data labelingg
# Push to Hub
from huggingface_hub import login
# Login first
login(token=os.getenv("HF_TOKEN")) hf_dataset.push_to_hub("edouardlgp/qa-un-staff-rules")