%pip install pypdf langchain langchain-community openai lancedb python-dotenv tqdm datasets pdfplumber ipywidgets langchain_openai googlesearch-python tantivy pylance
Automated Q&A Generation from PDF files
Technical Approach Paper for Knowledge Base Creation
Why Extract Q&A from PDFs?
PDF documents contain valuable knowledge—research papers, manuals, reports, and more—but their static format makes information retrieval inefficient.
Extracting information from PDFs using traditional search or summarization methods can often miss deeper insights.
By generating Question-Answer (Q&A) pairs, we explicitly transform passive content into active knowledge.
By transforming PDF content into a question-answering (Q&A) knowledge base, you enable:
Instant, Precise Information Retrieval: Instead of manually searching through pages, users can ask natural language questions and get direct answers with source references. This is ideal for technical documentation, legal contracts, or academic papers where quick lookup is critical.
Scalable Knowledge Management: Automatically process hundreds of PDFs into a structured, queryable format. It helps maintaining versioned knowledge bases (e.g., updated policy documents or research findings).
AI-Augmented Understanding: LLMs (like Azure OpenAI) summarize, connect concepts, and clarify dense text. It helps users grasp key points without reading entire documents.
Integration with Chatbots & Assistants: Deploy the Q&A system in help desks, internal wikis, or customer support via APIs. Example: Quick lookup of recommandations from past evaluation report.
Future-Proofing Knowledge: Export structured Q&A pairs to platforms like Hugging Face for community use or fine-tuning smaller models.
Introduction
The notebook present an approach based on:
Automatic Question Generation: Uses LLM to create relevant questions from document content
Context-Aware Q&A: Maintains document context for each generated question
Two-Stage LLM Processing: A Creative mode (higher temp) for question generation and an Accurate mode (lower temp) for answer generation
Comprehensive Metadata: Tracks sources, pages, and context for all Q&A pairs
CrewAI and HuggingFace-Ready Export: Structured JSON output with dataset statistics
Environment Set up
The body of this document targets a technical audience. Below are all the codes so that the whole process can be reproduced and audited. This assume to use the following code within Visual Studio Code.
First we need to use a virtual environment in Python development. This is essential for managing dependencies, avoiding conflicts, and ensuring reproducibility. It allows you to isolate project-specific libraries and versions, preventing interference with other projects or the global Python installation. This isolation helps maintain a clean development environment, simplifies project setup for collaborators, and enhances security by reducing the risk of introducing vulnerabilities. Overall, virtual environments provide a consistent and organized way to manage your Python projects effectively.
Make sure to install the last stable version of python language and create a dedicated python environment to have a fresh install where to manage correctly all the dependencies between packages. To specify a particular version of Python when creating a virtual environment, you can use the full path to the desired Python executable. Here is how you can do it:
Open your terminal (Command Prompt, PowerShell, or any terminal emulator).
Navigate to your project directory where you want to create the virtual environment.
Run the following command to create a virtual environment,here called .venv
:
#| eval: false
python -m venv .venv
Then, activate the virtual environment:
#| eval: false
.\.venv\Scripts\activate
Then, configure visual Studio Code to use the virtual environment: Open the Command Palette using the shortcut Ctrl+Shift+P
and type Jupyter: Select Interpreter
and select the interpreter that corresponds to your newly created virtual environment: ('venv': venv)
.
Once this environment selected as a kernel to run the notebook, we can install the required python modules the rest of the process:
then Restart the jupyter kernel for this notebook
%reset -f
Search for PDF URL
import json
from datetime import datetime
from googlesearch import search
import requests
from bs4 import BeautifulSoup
def find_official_pdfs(query, num_results=20):
"""Search for official PDF documents related to migration regulations"""
= []
pdf_urls = f"{query} filetype:pdf site:.gov OR site:.org OR site:.int"
search_query
try:
print(f"Searching for: {search_query}")
for url in search(search_query, num_results=num_results, advanced=True):
if url.url.lower().endswith('.pdf'):
# Verify it's an official source
if any(domain in url.url for domain in ['.gov', '.org', '.int', 'unhcr', 'iom']):
print(f"Found PDF: {url.url}")
pdf_urls.append(url.url)
except Exception as e:
print(f"Search error: {e}")
return pdf_urls
def generate_json_output(urls):
"""Generate the JSON structure"""
return {
"name": "Official Migration Regulations",
"description": "Collection of official government PDF documents detailing migration pathways and regulations",
"urls": urls,
"last_updated": datetime.now().isoformat(),
"sources": [
"Government websites",
"International organizations",
"UN agencies"
] }
Let’s test this!
= [
queries "official migration pathways PDF",
"government immigration regulations filetype:pdf",
"legal migration routes document",
"national visa policy PDF",
"resettlement programs official document"
]
= []
all_urls for query in queries:
all_urls.extend(find_official_pdfs(query))
# Remove duplicates
= list(set(all_urls))
unique_urls
# Generate JSON
= generate_json_output(unique_urls)
output_data
with open("migration_regulations.json", "w") as f:
=2)
json.dump(output_data, f, indent
print(f"Generated JSON file with {len(unique_urls)} PDF URLs")
Load PDF from URL
import os
import json
import requests
from urllib.parse import urlparse
def download_pdfs_from_json(json_path, save_folder):
# Create folder if it doesn't exist
=True)
os.makedirs(save_folder, exist_ok
# Load JSON data
with open(json_path, 'r') as file:
= json.load(file)
data
# Get the list of URLs
= data.get("urls", [])
urls
if not urls:
print("No URLs found in the JSON.")
return
# Download each PDF
for url in urls:
try:
= requests.get(url, stream=True)
response
response.raise_for_status()
# Extract filename from URL
= urlparse(url)
parsed_url = os.path.basename(parsed_url.path)
filename
# Full path to save the file
= os.path.join(save_folder, filename)
save_path
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded: {save_path}")
except requests.exceptions.RequestException as e:
print(f"Failed to download from {url}: {e}")
## test
= "pdf_urls.json" # JSON file containing PDF URLs
pdf_urls_file = "pdf_documents"
pdf_folder # Load PDF URLs
# Example usage:
download_pdfs_from_json(pdf_urls_file, pdf_folder)
Initialize LLM Components
Here we use Azure OpenAI - but one could switch easily to Ollama to run this offline on sensitive documents.
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Azure OpenAI settings
= os.getenv("AZURE_OPENAI_API_KEY")
azure_openai_key = os.getenv("AZURE_OPENAI_ENDPOINT")
azure_openai_endpoint = os.getenv("AZURE_DEPLOYMENT_NAME")
deployment_name = os.getenv("AZURE_OPENAI_API_VERSION")
api_version = os.getenv("EMBEDDING_MODEL")
embedding_deployment = os.getenv("AZURE_OPENAI_API_VERSION_EMBED") api_version_embed
from langchain_openai import AzureChatOpenAI
# Initialize LLM with higher temperature for creative question generation
= AzureChatOpenAI(
llm_creative =deployment_name,
deployment_name=azure_openai_key,
api_key=azure_openai_endpoint,
azure_endpoint=api_version,
api_version=0.7,
temperature=500
max_tokens
)
= AzureChatOpenAI(
llm_accurate =deployment_name,
deployment_name=azure_openai_key,
api_key=azure_openai_endpoint,
azure_endpoint=api_version,
api_version=0.1,
temperature=1000
max_tokens
)
# Initialize embeddings
from langchain_openai import AzureOpenAIEmbeddings
= AzureOpenAIEmbeddings(
embeddings =embedding_deployment,
deployment=azure_openai_key,
api_key=azure_openai_endpoint,
azure_endpoint=api_version_embed,
api_version=1
chunk_size )
Testing API…
# First, verify your Azure OpenAI resources
print("Verifying Azure OpenAI resources...")
print(f"Endpoint: {azure_openai_endpoint}")
print(f"Deployment Name: {deployment_name}")
from langchain_openai import AzureChatOpenAI # Changed from AzureOpenAI
from langchain_core.messages import HumanMessage
# Initialize with enhanced error handling
try:
# Initialize LLMs with validation
= AzureChatOpenAI(
llm_creative =deployment_name,
deployment_name=azure_openai_key,
api_key=azure_openai_endpoint,
azure_endpoint=api_version,
api_version=0.7
temperature
)
# Test the LLM connection
= llm_creative.invoke([HumanMessage(content="Hello")])
test_response print("LLM test successful! Response type:", type(test_response))
print("Response content:", test_response.content)
except Exception as e:
print(f"Failed to initialize Azure OpenAI LLM: {str(e)}")
print("Please verify:")
print("1. Your deployment exists in Azure OpenAI Studio")
print("2. The deployment name matches exactly")
print("3. The model is assigned to the deployment")
print("4. Your API key has permissions")
raise
and embedding…
print(f"Embedding Deployment: {embedding_deployment}")
try:
# Initialize embeddings with validation
= AzureOpenAIEmbeddings(
embeddings =embedding_deployment,
deployment=azure_openai_key,
api_key=azure_openai_endpoint,
azure_endpoint=api_version,
api_version=1
chunk_size
)
# Test embeddings
= embeddings.embed_query("Test embedding")
test_embedding print("Embeddings connection test successful!")
print(f"Embedding vector length: {len(test_embedding)}")
except Exception as e:
print(f"Failed to initialize Azure OpenAI Embeddings: {str(e)}")
print("Please verify your embedding deployment exists")
raise
Automatic Question Generation
import os
import json
import time
import logging
import pdfplumber
import warnings
from tqdm import tqdm
from typing import List, Dict, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
# Configure logging
=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(level= logging.getLogger(__name__)
logger
# Constants
= 200
MIN_SECTION_LENGTH = 5
MAX_ERRORS = 0.5
RATE_LIMIT_DELAY = 4 # Adjust based on your system capabilities
MAX_WORKERS
def process_pdf_page(page, filename: str, page_num: int, question_chain, min_section_length: int) -> Tuple[List[Dict], List[Dict]]:
"""Process a single PDF page and generate questions."""
= []
documents = []
questions
try:
= page.extract_text()
text if not text or len(text.strip()) < min_section_length:
return documents, questions
time.sleep(RATE_LIMIT_DELAY)
# Invoke LLM
= question_chain.invoke({"text": text})
response
# Debug: Print raw response
f"Raw LLM response for {filename} page {page_num}: {response}")
logger.debug(# Debug: Print raw response
print(f"Raw LLM response: {response}") # Add this line to see what's being returned
= []
questions_list
# Case 1: If response is a dictionary and contains a 'text' key with embedded JSON
if isinstance(response, dict) and "text" in response:
try:
= response["text"]
embedded_json = json.loads(embedded_json)
response_dict = response_dict.get("questions", [])
questions_list except Exception as e:
f"Failed to parse embedded JSON in response for {filename} page {page_num}: {e}")
logger.warning(f"Raw embedded JSON: {response['text']}")
logger.debug(= []
questions_list
# Case 2: If response has a `.text` attribute (LangChain format)
elif hasattr(response, 'text'):
= response.text.strip()
response_text try:
= json.loads(response_text)
response_dict = response_dict.get("questions", [])
questions_list except json.JSONDecodeError:
try:
if '```json' in response_text:
= response_text.split('```json')[1].split('```')[0]
json_str else:
= response_text
json_str = json.loads(json_str)
response_dict = response_dict.get("questions", [])
questions_list except Exception as inner_e:
f"Failed to parse fallback JSON from LLM response on {filename} page {page_num}: {inner_e}")
logger.warning(f"Raw fallback: {response_text}")
logger.debug(= []
questions_list
# Sanity check
if not isinstance(questions_list, list):
= []
questions_list
# Build question entries
for q in questions_list:
if isinstance(q, str) and q.strip():
questions.append({"question": q.strip(),
"source": filename,
"page": page_num,
"context": text[:500] + "..."
})
if questions:
documents.append({"text": text,
"source": filename,
"page": page_num
})else:
f"No valid questions extracted for {filename} page {page_num}")
logger.warning(
except Exception as e:
f"Error processing {filename} page {page_num}: {str(e)}")
logger.error(
return documents, questions
def process_pdf_file(pdf_path: str, question_chain, min_section_length: int) -> Tuple[List[Dict], List[Dict]]:
"""Process a single PDF file with parallel page processing."""
= []
all_documents = []
all_questions
try:
with pdfplumber.open(pdf_path) as pdf:
= os.path.basename(pdf_path)
filename = len(pdf.pages)
total_pages
with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, total_pages)) as executor:
= []
futures for page_num, page in enumerate(pdf.pages, start=1):
futures.append(
executor.submit(
process_pdf_page,=page,
page=filename,
filename=page_num,
page_num=question_chain,
question_chain=min_section_length
min_section_length
)
)
for future in as_completed(futures):
try:
= future.result()
docs, qs
all_documents.extend(docs)
all_questions.extend(qs)except Exception as e:
f"Error processing future: {str(e)}")
logger.error(continue
except Exception as e:
f"Failed to process PDF {pdf_path}: {str(e)}")
logger.error(
return all_documents, all_questions
def process_and_generate_questions(
str,
pdf_folder:
question_chain,int = MIN_SECTION_LENGTH,
min_section_length: bool = True,
suppress_warnings: int = MAX_WORKERS
max_workers: -> Tuple[List[Dict], List[Dict]]:
) """
Process all PDFs in a folder and generate questions using an LLM chain.
"""
if suppress_warnings:
"ignore", category=UserWarning, module="pdfplumber")
warnings.filterwarnings(
= [
pdf_files
os.path.join(pdf_folder, f)for f in os.listdir(pdf_folder)
if f.lower().endswith('.pdf')
]
= []
all_documents = []
all_questions = 0
error_count
with ThreadPoolExecutor(max_workers=max_workers) as executor:
= {
futures
executor.submit(
process_pdf_file,=pdf_path,
pdf_path=question_chain,
question_chain=min_section_length
min_section_lengthfor pdf_path in pdf_files
): pdf_path
}
with tqdm(total=len(pdf_files), desc="Processing PDFs") as pbar:
for future in as_completed(futures):
= futures[future]
pdf_path try:
= future.result()
docs, qs
all_documents.extend(docs)
all_questions.extend(qs)
if not qs:
f"No questions generated for {os.path.basename(pdf_path)}")
logger.warning(else:
= 0 # Reset on success
error_count
except Exception as e:
f"Error processing {pdf_path}: {str(e)}")
logger.error(+= 1
error_count if error_count >= MAX_ERRORS:
f"Stopping after {MAX_ERRORS} consecutive errors")
logger.error(=False)
executor.shutdown(waitbreak
1)
pbar.update(
return all_documents, all_questions
Now the prompt to generate the questions!
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
# Define prompt for question generation
= PromptTemplate(
question_prompt =["text"],
input_variables="""
template Analyze the following text and generate 3-5 relevant questions that this text could answer.
**Important**:
- The questions should cover different aspects of the content and be of varying complexity.
- The questions should reflect the document end-user perspective or interest, and avoid jargon
- Focus on key concepts, findings, methodologies, and important details.
- The questions should remain generic in the formulation and do not refer to specific elements within the content (like according to a specific chapter or reference number)
Return ONLY this JSON format:
{{
"questions": [
"question1",
"question2",
"question3"
]
}}
Text: {text}
"""
)
# Initialize question generation chain
= LLMChain(llm=llm_creative, prompt=question_prompt) question_chain
Now generating!
= "pdf_documents/"
pdf_folder = 500
min_section_length
# Process PDFs and generate questions
= process_and_generate_questions(pdf_folder, question_chain, min_section_length)
documents, generated_questions print(f" {len(generated_questions)} questions were generated !!")
import json
= "questions_UN_staff.json"
output_path with open(output_path, "w", encoding="utf-8") as f:
=2, ensure_ascii=False) json.dump(generated_questions, f, indent
Create Knowledge Base
import os
import shutil
import time
import atexit
from concurrent.futures import ThreadPoolExecutor
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import LanceDB
import lancedb
def process_document(doc, text_splitter):
"""Helper function to process a single document in parallel"""
= text_splitter.split_text(doc["text"])
chunks = [{"source": doc["source"], "page": doc["page"]} for _ in chunks]
metadatas return chunks, metadatas
def force_delete_directory(path, max_retries=3, delay=1):
"""Robust directory deletion with retries and delay"""
for attempt in range(max_retries):
try:
if os.path.exists(path):
shutil.rmtree(path)return True
except Exception as e:
if attempt == max_retries - 1:
print(f"Failed to delete {path} after {max_retries} attempts: {e}")
return False
time.sleep(delay)return False
def cleanup_lancedb_directory(db_path, table_name):
"""Safely remove existing LanceDB table directory"""
= os.path.join(db_path, f"{table_name}.lance")
table_path
# First try normal deletion
if force_delete_directory(table_path):
return
# If normal deletion fails, try renaming
= f"{table_path}.old_{int(time.time())}"
temp_path try:
if os.path.exists(table_path):
os.rename(table_path, temp_path)print(f"Renamed {table_path} to {temp_path} for deferred cleanup")
except Exception as e:
print(f"Could not rename {table_path}: {e}")
def create_knowledge_base(documents, chunk_size, chunk_overlap, lancedb_path, embeddings, max_workers=4):
"""Create vector store with comprehensive error handling"""
# Initialize text splitter
= RecursiveCharacterTextSplitter(
text_splitter =chunk_size,
chunk_size=chunk_overlap,
chunk_overlap=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
separators=len,
length_function=True
keep_separator
)
# Parallel processing of documents
= []
texts = []
metadatas with ThreadPoolExecutor(max_workers=max_workers) as executor:
= []
futures for doc in documents:
futures.append(executor.submit(process_document, doc, text_splitter))
for future in futures:
= future.result()
chunks, doc_metadatas
texts.extend(chunks)
metadatas.extend(doc_metadatas)
# Handle LanceDB connection with robust cleanup
= "pdf_qa_vectors"
table_name
cleanup_lancedb_directory(lancedb_path, table_name)
# Add delay to ensure filesystem operations complete
1)
time.sleep(
try:
= lancedb.connect(lancedb_path)
db except Exception as e:
raise RuntimeError(f"Failed to connect to LanceDB at {lancedb_path}: {e}")
# Create vector store with retry logic
= 3
max_retries for attempt in range(max_retries):
try:
= LanceDB.from_texts(
vector_store =texts,
texts=embeddings,
embedding=metadatas,
metadatas=db,
connection=table_name,
table_name="vector",
vector_key="text",
text_key="id"
id_key
)break
except Exception as e:
if attempt == max_retries - 1:
raise RuntimeError(f"Failed to create LanceDB table after {max_retries} attempts: {e}")
2)
time.sleep(
cleanup_lancedb_directory(lancedb_path, table_name)
# Get the underlying table
try:
= db.open_table(table_name)
table except Exception as e:
raise RuntimeError(f"Failed to open table {table_name}: {e}")
# Create vector index with retry logic
for attempt in range(max_retries):
try:
table.create_index(="cosine",
metric=256,
num_partitions=96,
num_sub_vectors=True
replace
)break
except Exception as e:
if attempt == max_retries - 1:
print(f"Warning: Could not create vector index: {e}")
1)
time.sleep(
# Skip FTS index creation to avoid Windows file locking issues
print("Skipping FTS index creation due to known Windows file locking issues")
return vector_store
# Register cleanup function for program exit
@atexit.register
def cleanup_temp_dirs():
"""Clean up any leftover .old directories"""
= time.time()
now if os.path.exists(lancedb_path):
for dirname in os.listdir(lancedb_path):
if dirname.startswith('pdf_qa_vectors.lance.old_'):
= os.path.join(lancedb_path, dirname)
dirpath try:
# Delete directories older than 1 hour
if os.path.getmtime(dirpath) < now - 3600:
=True)
shutil.rmtree(dirpath, ignore_errorsexcept:
pass
= "./lancedb_data_qa"
lancedb_path = embeddings
embeddings=True)
os.makedirs(lancedb_path, exist_ok# Create knowledge base
= 300
chunk_size = 200
chunk_overlap = create_knowledge_base(documents, chunk_size, chunk_overlap, lancedb_path, embeddings, max_workers=4) vector_store
#print(vector_store.indexes())
print(dir(vector_store))
# Check if 'vector' or other similar methods exist
print(dir(search)) # This will list methods available on `search`
# Search for a query
= "How are staff recruited?"
query
# Perform search
= vector_store.similarity_search(query, k=5) # k is the number of results to return
result
# Print the results
print(f"Top matching documents for query '{query}':")
for i, res in enumerate(result):
print(f"{i+1}. Source: {res['metadata']['source']}, Page: {res['metadata']['page']},
# Check for text content under possible attribute names
= getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text') text
# Verify if the index is built correctly
try:
= vector_store.check_index_status()
index_status print(f"Index status: {index_status}")
except Exception as e:
print(f"Error checking index status: {e}")
= vector_store.similarity_search(query, k=5)
result
# Ensure the result is not empty
if result:
print(f"Top matching documents for query '{query}':")
for i, res in enumerate(result):
# Access metadata directly
= res.metadata.get('source', 'Unknown') # Metadata access
source = res.metadata.get('page', 'Unknown') # Metadata access
page
# Check for text content under possible attribute names
= getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')
text
print(f"{i+1}. Source: {source}, Page: {page}, Text: {text}")
else:
print(f"No results found for query: {query}")
def test_vector_store(vector_store, query, k=5):
"""Test the vector store by performing a similarity search and printing results."""
try:
= vector_store.similarity_search(query, k=k)
result
if result:
print(f"Top matching documents for query '{query}':")
for i, res in enumerate(result):
# Access metadata
= res.metadata.get('source', 'Unknown')
source = res.metadata.get('page', 'Unknown')
page
# Try to access the text from the most likely fields
= getattr(res, 'text', None) or getattr(res, 'page_content', None) or getattr(res, 'content', 'No Text')
text
print(f"{i+1}. Source: {source}, Page: {page}\nText: {text}\n")
else:
print(f"No results found for query: {query}")
except Exception as e:
print(f"Error during vector store query: {e}")
test_vector_store(vector_store, query)
Set up Information Retriver
from langchain.chains import RetrievalQA
from langchain_core.vectorstores import VectorStoreRetriever
def initialize_qa_system(vector_store, llm, k=5):
"""Wrap the LanceDB vector store into a retriever and initialize RetrievalQA."""
= VectorStoreRetriever(
retriever =vector_store,
vectorstore={"k": k}
search_kwargs
)
= RetrievalQA.from_chain_type(
qa =llm,
llm="stuff", # or "map_reduce" / "refine" depending on needs
chain_type=retriever,
retriever=True
return_source_documents
)
return qa
= initialize_qa_system(vector_store, llm_accurate) qa_system
def test_hybrid_retriever(retriever, query, top_k=5):
try:
print(f"\nRunning hybrid search for query: '{query}'\n{'-'*60}")
= retriever.get_relevant_documents(query)
docs
if not docs:
print("⚠️ No relevant documents found.")
return
for i, doc in enumerate(docs):
= doc.metadata.get("source", "Unknown")
source = doc.metadata.get("page", "Unknown")
page print(f"{i+1}. Source: {source}, Page: {page}\nText: {doc.page_content[:300]}...\n")
except Exception as e:
print(f"❌ Error during hybrid retrieval test: {e}")
test_hybrid_retriever(initialize_qa_system, query)
Run all questions
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def process_single_question(qa_system, item):
"""Process a single question and return the QA pair"""
try:
= qa_system({"query": item["question"]})
result return {
"question": item["question"],
"answer": result["result"],
"context": item["context"],
"source": item["source"],
"page": item["page"],
"source_documents": [{
"source": doc.metadata["source"],
"page": doc.metadata["page"],
"content": doc.page_content
for doc in result["source_documents"]]
}
}except Exception as e:
print(f"Error answering question: {item['question']} - {str(e)}")
return None
def generate_answers(qa_system, generated_questions, max_workers=4):
"""Generate answers in parallel for all auto-generated questions"""
= []
qa_pairs
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all questions to the executor
= {
futures
executor.submit(process_single_question, qa_system, item): itemfor item in generated_questions
}
# Process results as they complete with progress bar
for future in tqdm(as_completed(futures), total=len(generated_questions), desc="Generating Answers"):
= future.result()
result if result:
qa_pairs.append(result)
return qa_pairs
# Generate answers
= generate_answers(qa_system, generated_questions) qa_pairs
Export
from datasets import Dataset, DatasetDict
# Prepare dataset
= Dataset.from_list([
hf_dataset "question": qa["question"], "answer": qa["answer"], "source": qa["source"]} for qa in qa_pairs
{ ])
As json to be used as a crewai knowledge base
import json
= "crewai_qa_knowledge_base.json"
output_path with open(output_path, "w", encoding="utf-8") as f:
=2, ensure_ascii=False) json.dump(qa_pairs, f, indent
Hugging Face for data labelingg
# Push to Hub
from huggingface_hub import login
# Login first
=os.getenv("HF_TOKEN")) login(token
"edouardlgp/qa-un-staff-rules") hf_dataset.push_to_hub(