""" Full Conversational RAG Pipeline for Agri-Critique Includes: Session management, context-aware retrieval, memory management Loads everything from HuggingFace Hub """ import os import json import sqlite3 import uuid from datetime import datetime from typing import List, Dict, Any import torch from transformers import AutoTokenizer, AutoModelForCausalLM from peft import PeftModel from sentence_transformers import SentenceTransformer import faiss import numpy as np from huggingface_hub import hf_hub_download import warnings # Suppress PEFT warnings about unexpected keys in LoraConfig warnings.filterwarnings("ignore", category=UserWarning, module="peft") class ConversationManager: """Manages conversation sessions with persistent storage""" def __init__(self, db_path="conversations.db"): self.db_path = db_path self.conn = sqlite3.connect(db_path, check_same_thread=False) self.cursor = self.conn.cursor() self._init_db() def _init_db(self): """Initialize session database""" self.cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, created_at TEXT, last_updated TEXT, metadata TEXT ) """) self.cursor.execute(""" CREATE TABLE IF NOT EXISTS messages ( message_id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, role TEXT, content TEXT, timestamp TEXT, evidence TEXT, FOREIGN KEY (session_id) REFERENCES sessions(session_id) ) """) self.conn.commit() def create_session(self, metadata=None): """Create a new conversation session""" session_id = str(uuid.uuid4()) now = datetime.utcnow().isoformat() self.cursor.execute(""" INSERT INTO sessions (session_id, created_at, last_updated, metadata) VALUES (?, ?, ?, ?) """, (session_id, now, now, json.dumps(metadata or {}))) self.conn.commit() return session_id def add_message(self, session_id, role, content, evidence=None): """Add a message to a session""" now = datetime.utcnow().isoformat() self.cursor.execute(""" INSERT INTO messages (session_id, role, content, timestamp, evidence) VALUES (?, ?, ?, ?, ?) """, (session_id, role, content, now, json.dumps(evidence) if evidence else None)) # Update session timestamp self.cursor.execute(""" UPDATE sessions SET last_updated = ? WHERE session_id = ? """, (now, session_id)) self.conn.commit() def get_session_history(self, session_id, limit=None): """Get conversation history for a session""" query = """ SELECT role, content, timestamp, evidence FROM messages WHERE session_id = ? ORDER BY timestamp ASC """ if limit: query += f" LIMIT {limit}" self.cursor.execute(query, (session_id,)) messages = [] for row in self.cursor.fetchall(): messages.append({ 'role': row[0], 'content': row[1], 'timestamp': row[2], 'evidence': json.loads(row[3]) if row[3] else None }) return messages def summarize_old_messages(self, session_id, keep_recent=4): """Summarize old messages to save context window""" messages = self.get_session_history(session_id) if len(messages) <= keep_recent: return messages # Keep recent messages recent = messages[-keep_recent:] old = messages[:-keep_recent] # Create summary of old messages summary = "Previous conversation summary:\n" for msg in old[::2]: # Sample every other message summary += f"- {msg['role']}: {msg['content'][:100]}...\n" # Return summary + recent messages return [{'role': 'system', 'content': summary}] + recent class AgriCritiqueRAG: """Full RAG system with conversational capabilities""" def __init__(self): print("🔄 Initializing Agri-Critique Conversational RAG System...") # Model paths self.model_id = "sayande/AgriScholarQA-CoT" self.base_model_id = "Qwen/Qwen3-4B-Thinking-2507" self.index_repo = "sayande/agri-critique-index" # Conversation manager self.conversation_manager = ConversationManager() self.current_session = None # Load retriever print("đŸ“Ĩ Loading retriever...") self.retriever = SentenceTransformer("all-mpnet-base-v2") # ------------------------------------------------------------------ # Load FAISS indices (local first, then HF fallback) # ------------------------------------------------------------------ print("đŸ“Ĩ Loading FAISS indices...") self.chunk_index = None self.paper_index = None self.index = None # alias kept for backward compatibility base_dir = os.path.dirname(__file__) if "__file__" in globals() else os.getcwd() local_chunk_path = os.path.join(base_dir, "faiss.index") local_paper_path = os.path.join(base_dir, "faiss_papers.index") local_meta_path = os.path.join(base_dir, "meta.json") # ---- Try LOCAL chunk index ---- try: if os.path.exists(local_chunk_path): print(f"📁 Found local chunk index: {local_chunk_path}") self.chunk_index = faiss.read_index(local_chunk_path) self.index = self.chunk_index print(f"✅ Loaded local chunk FAISS index with {self.chunk_index.ntotal} vectors") else: print("â„šī¸ Local chunk index 'faiss.index' not found, will try HuggingFace Hub...") except Exception as e: print(f"âš ī¸ Could not load local chunk index: {e}") self.chunk_index = None self.index = None # ---- If no local chunk index, fall back to HF ---- if self.chunk_index is None: print("đŸ“Ĩ Loading FAISS index from HuggingFace dataset...") try: index_path = hf_hub_download( repo_id=self.index_repo, filename="faiss.index", repo_type="dataset" ) self.chunk_index = faiss.read_index(index_path) self.index = self.chunk_index print(f"✅ Loaded HF FAISS index with {self.chunk_index.ntotal} vectors") except Exception as e: print(f"âš ī¸ Could not load FAISS index from HF: {e}") self.chunk_index = None self.index = None # ---- Optional: paper-level index (not strictly required) ---- try: if os.path.exists(local_paper_path): print(f"📁 Found local paper index: {local_paper_path}") self.paper_index = faiss.read_index(local_paper_path) print(f"✅ Loaded local paper FAISS index with {self.paper_index.ntotal} vectors") else: print("â„šī¸ Local paper index 'faiss_papers.index' not found (this is optional).") except Exception as e: print(f"âš ī¸ Could not load local paper index: {e}") self.paper_index = None # ------------------------------------------------------------------ # Load metadata (local first, then HF) # ------------------------------------------------------------------ print("đŸ“Ĩ Loading metadata...") self.metadata = [] # Try local meta.json try: if os.path.exists(local_meta_path): print(f"📁 Found local metadata: {local_meta_path}") with open(local_meta_path, "r", encoding="utf-8") as f: self.metadata = json.load(f) print(f"✅ Loaded local metadata for {len(self.metadata)} chunks") else: print("â„šī¸ Local 'meta.json' not found, will try HuggingFace Hub...") except Exception as e: print(f"âš ī¸ Could not load local metadata: {e}") self.metadata = [] # If still empty, try HF if not self.metadata: print("đŸ“Ĩ Loading metadata from HuggingFace dataset...") try: meta_path = hf_hub_download( repo_id=self.index_repo, filename="meta.json", repo_type="dataset" ) with open(meta_path, "r", encoding="utf-8") as f: self.metadata = json.load(f) print(f"✅ Loaded HF metadata for {len(self.metadata)} chunks") except Exception as e: print(f"âš ī¸ Could not load metadata from HF: {e}") self.metadata = [] # Model will be loaded lazily on first use self.model = None self.tokenizer = None self.model_loaded = False print("✅ Agri-Critique Conversational RAG System initialized!") print("â„šī¸ Model will load on first query (Qwen3-4B with INT8 quantization)") def _ensure_model_loaded(self): """Lazy load model on first use""" if self.model_loaded: return print("đŸ“Ĩ Loading Agri-Critique model (this may take 1-2 minutes)...") # Get HF token from environment hf_token = os.getenv("HF_TOKEN") if not hf_token: raise ValueError("HF_TOKEN not found. Please add it to Space secrets.") self.tokenizer = AutoTokenizer.from_pretrained( self.base_model_id, token=hf_token ) from transformers import AutoConfig config = AutoConfig.from_pretrained(self.base_model_id, token=hf_token) # Qwen models work well with default config # No special rope_scaling adjustments needed print("đŸ–Ĩī¸ Loading Qwen3-4B model with INT4 quantization for speed") # Try to use INT4 quantization for faster inference (better for 4B models on CPU) try: from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_4bit=True, # INT4 is better for larger models on CPU bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4" ) print("✅ Using INT4 (NF4) quantization - optimized for Qwen 4B on CPU") except ImportError: print("âš ī¸ bitsandbytes not available, using float32") quantization_config = None base_model = AutoModelForCausalLM.from_pretrained( self.base_model_id, config=config, quantization_config=quantization_config, torch_dtype=torch.float32 if quantization_config is None else None, device_map="auto" if quantization_config else "cpu", low_cpu_mem_usage=True, token=hf_token, ) self.model = PeftModel.from_pretrained( base_model, self.model_id, token=hf_token, ) self.model.eval() self.model_loaded = True print("✅ Model loaded successfully!") def start_session(self, metadata=None): """Start a new conversation session""" self.current_session = self.conversation_manager.create_session(metadata) print(f"📝 Started new session: {self.current_session[:8]}...") return self.current_session def retrieve_with_context(self, query, conversation_history, top_k=3): """Context-aware retrieval: considers conversation history""" # Use chunk_index (local or HF). If missing, no retrieval. if self.chunk_index is None or not self.metadata: return [] # Combine current query with recent context context_queries = [query] # Add recent user questions for context for msg in conversation_history[-4:]: if msg["role"] == "user": context_queries.append(msg["content"]) # Encode all queries embeddings = self.retriever.encode(context_queries, convert_to_numpy=True) # Weight: current query gets 70%, history gets 30% if len(context_queries) > 1: weights = [0.7] + [0.3 / (len(context_queries) - 1)] * (len(context_queries) - 1) else: weights = [1.0] weighted_embedding = np.average(embeddings, axis=0, weights=weights).reshape(1, -1).astype("float32") faiss.normalize_L2(weighted_embedding) # Extract year from query (e.g., 2024, 2025) import re year_match = re.search(r"\b(20\d{2})\b", query) target_year = year_match.group(1) if year_match else None # Search over chunk index # Fetch more candidates to allow for temporal re-ranking # If year detected, fetch deep (e.g. 100) to find the year-match chunks if target_year: initial_k = 100 else: initial_k = top_k * 3 distances, indices = self.chunk_index.search(weighted_embedding.astype("float32"), initial_k) candidates = [] for idx, dist in zip(indices[0], distances[0]): if 0 <= idx < len(self.metadata): chunk_info = self.metadata[idx] # Check for year match in paper_id is_year_match = False if target_year and target_year in chunk_info.get("paper_id", ""): is_year_match = True candidates.append({ "data": chunk_info, "dist": float(dist), "is_year_match": is_year_match }) # Soft Boost Logic: # Instead of force-sorting year matches to the top (which brings in irrelevant junk), # we improve their distance score by a fixed amount (e.g., 0.5). # Assuming L2 distance (smaller is better): new_dist = old_dist - 0.5 # This lets a "Relevant Year-Match" beat "Relevant Non-Match", # but a "Totally Irrelevant Year-Match" will still lose to "Relevant Content". for cand in candidates: if cand["is_year_match"]: cand["effective_dist"] = cand["dist"] - 0.5 else: cand["effective_dist"] = cand["dist"] # Sort by effective distance (ascending) candidates.sort(key=lambda x: x["effective_dist"]) # Select top_k final_candidates = candidates[:top_k] evidence = [] for cand in final_candidates: ev = dict(cand["data"]) ev["score"] = cand["dist"] # FALLBACK: If 'text' is missing in metadata (common issue with this dataset version), # construct a proxy text from the section and paper ID so the RAG doesn't see empty strings. if "text" not in ev or not ev["text"]: paper = ev.get("paper_id", "Unknown Paper") sect = ev.get("section", "General") ev["text"] = f"[Note: Full text missing in metadata] Section '{sect}' from paper '{paper}'." evidence.append(ev) return evidence def _clean_paper_id(self, paper_id): """Clean paper ID for display""" if not isinstance(paper_id, str): return str(paper_id) clean = paper_id.strip("-_") clean = clean.replace("_", " ").replace("-", " ") return clean.title() def validate_and_answer(self, question, evidence, conversation_history): """Generate validated answer with reasoning - OPTIMIZED single-call version""" self._ensure_model_loaded() # Format evidence text for the model # Include title/paper_id so the model knows the source date/context evidence_text = "\n\n".join( [ f"[{i+1}] {ev.get('paper_title') or ev.get('paper_id')}\n{ev.get('text', '')}" for i, ev in enumerate(evidence) ] ) # OPTIMIZED: Single model call for both validation and answer # This reduces inference time by ~50% combined_messages = [ { "role": "system", "content": ( "You are an agricultural research assistant. Your task is to:\n" "1. Validate the question against the evidence\n" "2. Provide a clear, comprehensive answer based ONLY on the evidence\n" "3. Cite sources as [1], [2], etc.\n\n" "Check: Is the question relevant? Are there conflicting facts? " "Is there enough information?" ), }, { "role": "user", "content": f"""EVIDENCE: {evidence_text} QUESTION: {question} TASK: Provide a validated answer to the question. First briefly explain your reasoning, then give the final answer. Be detailed and thorough.""", }, ] input_text = self.tokenizer.apply_chat_template( combined_messages, tokenize=False, add_generation_prompt=True ) inputs = self.tokenizer(input_text, return_tensors="pt").to(self.model.device) # UPDATED: Increased token limit for more detailed answers with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=768, # Increased from 256 for detail temperature=0.5, # Balanced temperature do_sample=True, pad_token_id=self.tokenizer.pad_token_id, ) full_response = self.tokenizer.decode( outputs[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True, ).strip() # Split response into reasoning and answer # The model should naturally provide reasoning first, then answer if "\n\n" in full_response: parts = full_response.split("\n\n", 1) reasoning = parts[0] answer = parts[1] if len(parts) > 1 else full_response else: # Fallback: use entire response as answer reasoning = "Validated against evidence." answer = full_response return reasoning, answer def ask(self, question): """Full RAG pipeline with validation""" if not self.current_session: self.start_session() # Get conversation history history = self.conversation_manager.get_session_history(self.current_session) # Context-aware retrieval evidence = self.retrieve_with_context(question, history, top_k=3) # Generate validated answer reasoning, answer = self.validate_and_answer(question, evidence, history) # Save to session self.conversation_manager.add_message( self.current_session, "user", question, evidence ) self.conversation_manager.add_message( self.current_session, "assistant", answer ) # Format evidence for return (include all useful fields) formatted_evidence = [] for ev in evidence: paper_id = ev.get("paper_id", "unknown") display_paper = ev.get("paper_title") or self._clean_paper_id(paper_id) formatted_evidence.append( { "paper_id": display_paper, "raw_paper_id": paper_id, "text": ev.get("text", ""), "score": ev.get("score", 0.0), } ) return { "answer": answer, "reasoning": reasoning, "evidence": formatted_evidence, "session_id": self.current_session, } def validate_answer(self, question: str, proposed_answer: str, evidence: List[Dict[str, Any]]) -> str: """ Validate a proposed answer against evidence and return critique. This method uses the fine-tuned Llama model to critique the answer by checking: - Are all claims supported by the evidence? - Are there any hallucinations or fake findings? - Are citations accurate? - Are there temporal or causal errors? Args: question: The original question proposed_answer: The answer to validate evidence: List of evidence chunks with 'text' field Returns: Critique string identifying issues or confirming validity """ self._ensure_model_loaded() # Format evidence text for validation evidence_text = "\n\n".join([ f"[{i+1}] {ev.get('text', '')}" for i, ev in enumerate(evidence[:5]) # Limit to top 5 for context window ]) if not evidence_text.strip(): evidence_text = "(no evidence provided)" # Create validation prompt validation_messages = [ { "role": "system", "content": ( "You are a strict agricultural research validator. " "Your job is to critique the proposed answer by checking:\n" "1. Are all claims supported by the evidence?\n" "2. Are there any hallucinations or fake findings?\n" "3. Are citations accurate and properly used?\n" "4. Are there temporal or causal errors?\n" "5. Are there any unsupported extrapolations?\n\n" "Provide a concise critique. If the answer is well-supported, say so. " "If there are issues, clearly identify them." ), }, { "role": "user", "content": f"""QUESTION: {question} EVIDENCE: {evidence_text} PROPOSED ANSWER: {proposed_answer} TASK: Critique this answer. Identify any unsupported claims, hallucinations, citation errors, or other issues.""", }, ] # Generate critique using the model input_text = self.tokenizer.apply_chat_template( validation_messages, tokenize=False, add_generation_prompt=True ) inputs = self.tokenizer(input_text, return_tensors="pt").to(self.model.device) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=256, temperature=0.3, do_sample=True, pad_token_id=self.tokenizer.pad_token_id, ) critique = self.tokenizer.decode( outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True, ).strip() return critique