Spaces:
Running
Running
| # app.py - Single-tab Gradio App with GPT-Orchestrated RAG Pipeline | |
| """ | |
| AgriScholarQA: Agricultural Research Assistant | |
| Pipeline per user query: | |
| 1. GPT classifier checks if the question is an agricultural scholarly question. | |
| 2. If NOT agricultural scholarly: | |
| - No RAG retrieval. | |
| - The system just explains what AgriScholarQA is and what it can do. | |
| 3. If agricultural scholarly: | |
| - RAG pipeline (AgriCritiqueRAG) retrieves evidence + generates a raw answer. | |
| - The same RAG model self-validates (validate_answer) using the evidence. | |
| - GPT refines the answer: | |
| - Thinks about the question + raw answer + evidence + critique. | |
| - Removes repetition and noise. | |
| - Produces a clean, well-structured Markdown answer with: | |
| - main answer | |
| - evidence citations [1], [2], ... | |
| - a short, high-level reasoning section (no step-by-step chain-of-thought). | |
| """ | |
| import os | |
| import json | |
| from dataclasses import dataclass, asdict | |
| from typing import Any, Dict, List, Optional | |
| import gradio as gr | |
| from rag_pipeline import AgriCritiqueRAG | |
| # ---- OpenAI client (GPT) ---- | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| OpenAI = None | |
| # ---------------------------------------------------------------------- | |
| # Data structures for orchestration | |
| # ---------------------------------------------------------------------- | |
| class GPTClassification: | |
| """Structured view of GPT's classification result.""" | |
| is_agri_scholarly: bool | |
| intent_type: str # "agri_scholarly" | "chit_chat" | "generic_qa" | "other" | |
| confidence: float | |
| brief_reason: str | |
| class OrchestratorResult: | |
| """ | |
| Unified result returned by the orchestrator to the UI. | |
| mode: | |
| - "rag" : RAG pipeline was used | |
| - "system_chat" : only system explanation / light chat | |
| - "error" : some error (GPT / RAG / OpenAI issue) | |
| answer: | |
| - final answer string to show to user in chat | |
| evidence: | |
| - list of evidence chunks (from RAG) if mode == "rag" | |
| - empty otherwise | |
| meta: | |
| - extra diagnostic info (classification, raw RAG output, critique, etc.) | |
| """ | |
| mode: str | |
| answer: str | |
| evidence: List[Dict[str, Any]] | |
| meta: Dict[str, Any] | |
| # ---------------------------------------------------------------------- | |
| # GPT + RAG Orchestrator | |
| # ---------------------------------------------------------------------- | |
| class GPTAgriRAGOrchestrator: | |
| """ | |
| Orchestrator that: | |
| 1) Uses GPT to decide if a query is an agricultural scholarly question. | |
| 2) For agri-scholarly queries: | |
| - runs the RAG pipeline (AgriCritiqueRAG) for evidence + answer, | |
| - validates that answer using the RAG model, | |
| - sends everything to GPT for polishing and formatting. | |
| 3) For non-agri queries: | |
| - no RAG, just a friendly system explanation. | |
| """ | |
| def __init__( | |
| self, | |
| rag_system: AgriCritiqueRAG, | |
| gpt_model_classify: str = "gpt-4.1-mini", | |
| gpt_model_refine: Optional[str] = None, | |
| openai_api_key_env: str = "OPENAI_API_KEY", | |
| ): | |
| """ | |
| Args: | |
| rag_system: instance of AgriCritiqueRAG. | |
| gpt_model_classify: OpenAI model used for classification. | |
| gpt_model_refine: OpenAI model used for answer refinement (defaults to same). | |
| openai_api_key_env: env var for the OpenAI API key. | |
| """ | |
| self.rag = rag_system | |
| self.gpt_model_classify = gpt_model_classify | |
| self.gpt_model_refine = gpt_model_refine or gpt_model_classify | |
| api_key = os.getenv(openai_api_key_env) | |
| if OpenAI is None: | |
| self.client = None | |
| self.gpt_available = False | |
| elif not api_key: | |
| self.client = None | |
| self.gpt_available = False | |
| else: | |
| self.client = OpenAI(api_key=api_key) | |
| self.gpt_available = True | |
| # ------------------------------------------------------------------ | |
| # 1. GPT classification | |
| # ------------------------------------------------------------------ | |
| def _classify_with_gpt(self, question: str) -> GPTClassification: | |
| """ | |
| Ask GPT: is this an agricultural scholarly question? | |
| GPT should return JSON: | |
| { | |
| "is_agri_scholarly": true/false, | |
| "intent_type": "agri_scholarly" | "chit_chat" | "generic_qa" | "other", | |
| "confidence": 0-1, | |
| "brief_reason": "..." | |
| } | |
| """ | |
| # If GPT not available, simple fallback: treat everything as agri_scholarly | |
| if not self.gpt_available: | |
| return GPTClassification( | |
| is_agri_scholarly=True, | |
| intent_type="agri_scholarly", | |
| confidence=0.5, | |
| brief_reason="GPT not available; falling back to always using RAG." | |
| ) | |
| system_prompt = ( | |
| "You are a classifier for an agricultural research assistant called AgriScholarQA.\n\n" | |
| "Your job: given a single user query, decide whether it is an " | |
| "**agricultural scholarly question** that should trigger a retrieval-augmented " | |
| "pipeline over agricultural research papers.\n\n" | |
| "Definitions:\n" | |
| "- Agricultural scholarly question: asks about crops, soils, climate impacts, " | |
| " agronomy, plant physiology, agricultural experiments, yields, pests, diseases, " | |
| " fertilizers, irrigation, crop models, etc., in a technically informed way.\n" | |
| "- Chit-chat / meta: greetings, what is this system, who are you, etc.\n" | |
| "- Generic QA: everyday knowledge or non-agricultural topics.\n" | |
| "- Other: anything else not clearly fitting above.\n\n" | |
| "Return a strict JSON object with fields:\n" | |
| "- is_agri_scholarly: boolean\n" | |
| "- intent_type: one of \"agri_scholarly\", \"chit_chat\", \"generic_qa\", \"other\"\n" | |
| "- confidence: float between 0 and 1\n" | |
| "- brief_reason: short natural language reason (1β2 sentences)\n\n" | |
| "Do not add extra keys. Do not write explanations outside the JSON." | |
| ) | |
| user_prompt = f"User query:\n\"\"\"{question}\"\"\"" | |
| resp = self.client.chat.completions.create( | |
| model=self.gpt_model_classify, | |
| temperature=0, | |
| response_format={"type": "json_object"}, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| ) | |
| raw = resp.choices[0].message.content.strip() | |
| try: | |
| data = json.loads(raw) | |
| except json.JSONDecodeError as e: | |
| # Fallback: if parsing fails, treat as agri_scholarly with low confidence | |
| return GPTClassification( | |
| is_agri_scholarly=True, | |
| intent_type="agri_scholarly", | |
| confidence=0.5, | |
| brief_reason=f"Failed to parse GPT JSON: {e} | raw={raw[:200]}", | |
| ) | |
| return GPTClassification( | |
| is_agri_scholarly=bool(data.get("is_agri_scholarly", False)), | |
| intent_type=str(data.get("intent_type", "other")), | |
| confidence=float(data.get("confidence", 0.0)), | |
| brief_reason=str(data.get("brief_reason", "")), | |
| ) | |
| # ------------------------------------------------------------------ | |
| # 2. GPT refinement of answer (UPDATED TO BE "EDITOR" ONLY) | |
| # ------------------------------------------------------------------ | |
| def _refine_answer_with_gpt( | |
| self, | |
| question: str, | |
| raw_answer: str, | |
| evidence: List[Dict[str, Any]], | |
| critique: str, | |
| ) -> str: | |
| """ | |
| Use GPT to clean up and structure the RAG answer. | |
| IMPORTANT (as per design): | |
| - Treat the RAG draft answer as the primary source of content. | |
| - Do NOT delete or drop important points from the draft, except for duplicates. | |
| - Main job is to: | |
| * remove repetition, | |
| * merge overlapping points, | |
| * improve clarity and structure, | |
| * add light formatting (Markdown), | |
| * optionally reference evidence. | |
| - Do NOT invent new facts or numbers that are not in the draft answer. | |
| - If evidence is weak or not directly relevant, just ignore it | |
| instead of commenting on "lack of evidence". | |
| - Do NOT write sentences like "no evidence was available" or | |
| "the snippets do not contain direct results". | |
| - List down the extracted evidence as paper citation. | |
| """ | |
| if not self.gpt_available: | |
| # If GPT not available, just return raw answer + short note | |
| return ( | |
| "*(GPT refinement disabled β showing raw RAG answer.)*\n\n" | |
| + raw_answer | |
| ) | |
| # Build compact evidence text (used only as soft support / for citations) | |
| ev_blocks = [] | |
| for i, ev in enumerate(evidence[:5], 1): | |
| title = ev.get("paper_title") or ev.get("paper_id") or f"Doc {ev.get('idx', i)}" | |
| snippet = ev.get("text") or ev.get("text_preview") or "" | |
| snippet = " ".join(snippet.split()) | |
| snippet = snippet[:800] # cap per evidence block | |
| ev_blocks.append(f"[{i}] {title}\n{snippet}\n") | |
| evidence_text = "\n\n".join(ev_blocks) if ev_blocks else "(no evidence text provided)" | |
| system_prompt = ( | |
| "You are an expert agricultural research assistant.\n\n" | |
| "You are given:\n" | |
| "1) The user's question.\n" | |
| "2) A draft answer produced by an internal RAG model (this is the MAIN content).\n" | |
| "3) Evidence snippets from research papers, each labeled [1], [2], etc.\n" | |
| "4) A critique from another checker model.\n\n" | |
| "Your role here is primarily an **editor and organizer**, not a critic:\n" | |
| "- Keep all important substantive points from the draft answer.\n" | |
| "- Do NOT delete major claims or sections unless they are clearly duplicate.\n" | |
| "- Do NOT introduce new claims, numbers, or experimental results that are not in the draft.\n" | |
| "- Do NOT write sentences like βno direct evidence is availableβ, " | |
| " βthe snippets do not contain dataβ, or similar.\n" | |
| "- If evidence does not clearly support a point, simply avoid citing it; do not comment on that.\n\n" | |
| "Your main tasks:\n" | |
| "- Remove repetition and merge overlapping points.\n" | |
| "- Improve clarity, flow, and structure.\n" | |
| "- Format the answer nicely in Markdown (sections, bullets, etc.).\n" | |
| "- Where appropriate, you may attach citations like [1], [2] after statements that are clearly " | |
| " supported by a snippet.\n" | |
| "- Use the critique only to polish wording and structure, not to argue that evidence is missing.\n\n" | |
| "Output ONLY the final, organized answer in Markdown." | |
| ) | |
| user_prompt = ( | |
| f"QUESTION:\n{question}\n\n" | |
| f"DRAFT ANSWER (from RAG model):\n{raw_answer}\n\n" | |
| f"EVIDENCE SNIPPETS (optional, use only when clearly helpful):\n{evidence_text}\n\n" | |
| f"CRITIQUE (for polishing, not for rejection):\n{critique}\n\n" | |
| "Now rewrite the answer according to the instructions above." | |
| ) | |
| resp = self.client.chat.completions.create( | |
| model=self.gpt_model_refine, | |
| temperature=0.3, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| ) | |
| refined = resp.choices[0].message.content.strip() | |
| return refined | |
| # ------------------------------------------------------------------ | |
| # 3. System chat (non-agri) | |
| # ------------------------------------------------------------------ | |
| def _system_chat_answer(self, question: str, cls: GPTClassification) -> str: | |
| """ | |
| For non-agri queries: explain the system and capabilities. | |
| """ | |
| intro = ( | |
| "Hi! π Iβm **AgriScholarQA**, an agricultural scholarly assistant.\n\n" | |
| "Iβm designed specifically to answer **research-oriented questions about agriculture** " | |
| "using a retrieval-augmented pipeline over scientific papers." | |
| ) | |
| capabilities = ( | |
| "\n\n**Hereβs what I can do:**\n" | |
| "- π Answer questions about **crop production, soil, climate impacts, pests, diseases**, etc.\n" | |
| "- π Retrieve and show **evidence from agricultural research papers**.\n" | |
| "- π§ͺ Help you reason about **field experiments, treatments, and agronomic practices**.\n" | |
| "- π¨ Detect potential **hallucinations or weakly supported claims**.\n" | |
| ) | |
| meta = ( | |
| f"\nYour current query looks like **{cls.intent_type.replace('_', ' ')}** " | |
| "rather than a detailed agricultural scholarly question, so I did not trigger " | |
| "the heavy retrieval pipeline for this turn.\n" | |
| ) | |
| nudge = ( | |
| "\nIf youβd like to use my full capabilities, you can ask questions like:\n" | |
| "- *βHow does nitrogen fertilizer rate affect rice yield under water stress?β*\n" | |
| "- *βWhat are sustainable pest management strategies for maize in the tropics?β*\n" | |
| "- *βHow does climate change influence wheat phenology and grain quality?β*\n" | |
| ) | |
| return intro + capabilities + meta + nudge | |
| # ------------------------------------------------------------------ | |
| # 4. Main entry point: handle single query | |
| # ------------------------------------------------------------------ | |
| def handle_query(self, question: str) -> OrchestratorResult: | |
| """ | |
| Handle a single user question through the full pipeline. | |
| Returns: | |
| OrchestratorResult with: | |
| - mode: "rag" | "system_chat" | "error" | |
| - answer: final answer string | |
| - evidence: list[dict] (if rag) | |
| - meta: classification info, raw_rag_result, critique, etc. | |
| """ | |
| q = (question or "").strip() | |
| if not q: | |
| return OrchestratorResult( | |
| mode="system_chat", | |
| answer="Please enter a question. I specialize in **agricultural research** questions.", | |
| evidence=[], | |
| meta={"classification": None}, | |
| ) | |
| # 1. Classify (GPT or fallback) | |
| try: | |
| cls = self._classify_with_gpt(q) | |
| except Exception as e: | |
| return OrchestratorResult( | |
| mode="error", | |
| answer=f"β οΈ Error while classifying your question: `{e}`", | |
| evidence=[], | |
| meta={"classification": None}, | |
| ) | |
| # 2. If NOT agricultural scholarly (or confidence low): system chat | |
| if (not cls.is_agri_scholarly) or cls.confidence < 0.5: | |
| answer = self._system_chat_answer(q, cls) | |
| return OrchestratorResult( | |
| mode="system_chat", | |
| answer=answer, | |
| evidence=[], | |
| meta={"classification": asdict(cls)}, | |
| ) | |
| # 3. Agricultural scholarly β run RAG | |
| try: | |
| rag_result = self.rag.ask(q) | |
| except Exception as e: | |
| return OrchestratorResult( | |
| mode="error", | |
| answer=( | |
| "Your question looks like an **agricultural scholarly query**, " | |
| "but I hit an error while running the retrieval pipeline:\n\n" | |
| f"`{e}`" | |
| ), | |
| evidence=[], | |
| meta={"classification": asdict(cls)}, | |
| ) | |
| raw_answer = rag_result.get("answer", "") if isinstance(rag_result, dict) else str(rag_result) | |
| evidence = rag_result.get("evidence", []) if isinstance(rag_result, dict) else [] | |
| # 4. Self-validation using RAG's own validate_answer method | |
| try: | |
| critique = self.rag.validate_answer(q, raw_answer, evidence) | |
| except Exception as e: | |
| critique = f"(Validation step failed: {e})" | |
| # 5. GPT refinement SKIPPED (User request) | |
| # We directly use the raw_answer from the RAG model. | |
| # The critique is still calculated (step 4) and available in 'meta', but we don't use GPT to merge it. | |
| refined_answer = raw_answer | |
| # Previous GPT logic disabled: | |
| # try: | |
| # refined_answer = self._refine_answer_with_gpt(q, raw_answer, evidence, critique) | |
| # except Exception as e: | |
| # refined_answer = ( | |
| # f"β οΈ I had trouble refining the answer with GPT (`{e}`). " | |
| # "Showing the original RAG answer plus critique:\n\n" | |
| # f"{raw_answer}\n\n---\n\n**Internal critique:**\n{critique}" | |
| # ) | |
| return OrchestratorResult( | |
| mode="rag", | |
| answer=refined_answer, | |
| evidence=evidence, | |
| meta={ | |
| "classification": asdict(cls), | |
| "raw_rag_result": rag_result, | |
| "critique": critique, | |
| }, | |
| ) | |
| # ---------------------------------------------------------------------- | |
| # Global instances (lazy init) | |
| # ---------------------------------------------------------------------- | |
| rag_system: Optional[AgriCritiqueRAG] = None | |
| orchestrator: Optional[GPTAgriRAGOrchestrator] = None | |
| def initialize_orchestrator() -> GPTAgriRAGOrchestrator: | |
| global rag_system, orchestrator | |
| if rag_system is None: | |
| rag_system = AgriCritiqueRAG() | |
| if orchestrator is None: | |
| orchestrator = GPTAgriRAGOrchestrator(rag_system=rag_system) | |
| return orchestrator | |
| # ---------------------------------------------------------------------- | |
| # Helper: Format evidence for display | |
| # ---------------------------------------------------------------------- | |
| def format_evidence_for_display(evidence: List[Dict[str, Any]]) -> str: | |
| """ | |
| Format evidence chunks into a readable Markdown reference section. | |
| """ | |
| if not evidence: | |
| return "" | |
| out = ["\n\n---\n### π Evidence Sources"] | |
| for i, ev in enumerate(evidence, 1): | |
| title = ev.get("paper_id", "Unknown Paper") | |
| # limit snippet length for display | |
| snippet = ev.get("text", "")[:300].replace("\n", " ") + "..." | |
| score = f"{ev.get('score', 0.0):.4f}" | |
| out.append(f"**[{i}] {title}** (Score: {score})\n> {snippet}") | |
| return "\n".join(out) | |
| # ---------------------------------------------------------------------- | |
| # Gradio Chat function | |
| # ---------------------------------------------------------------------- | |
| def chat_response(message: str, history: List[List[str]]) -> str: | |
| """ | |
| Main chat function used by Gradio. | |
| Args: | |
| message: current user input | |
| history: list of [user, bot] pairs (not used directly since RAG keeps its own session) | |
| Returns: | |
| Final answer string to display in the chat. | |
| """ | |
| if not message: | |
| return "Please enter a question. I specialize in **agricultural research** questions." | |
| try: | |
| orch = initialize_orchestrator() | |
| result = orch.handle_query(message) | |
| # Append evidence if available (for RAG mode) | |
| final_output = result.answer | |
| if result.mode == "rag" and result.evidence: | |
| evidence_section = format_evidence_for_display(result.evidence) | |
| final_output += evidence_section | |
| return final_output | |
| except Exception as e: | |
| return f"β Unexpected error in chat pipeline: `{e}`" | |
| # ---------------------------------------------------------------------- | |
| # Build Gradio UI (single-tab ChatInterface with larger chat area) | |
| # ---------------------------------------------------------------------- | |
| with gr.Blocks(title="πΎ AgriScholarQA", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # πΎ AgriScholarQA Research Assistant | |
| **Evidence-Based Agricultural QA with Self-Correction & GPT-Orchestrated Answering** | |
| - π§ Uses GPT to detect if your question is an *agricultural scholarly* query. | |
| - π For scholarly queries: runs a RAG pipeline over research papers. | |
| - π The internal model self-checks its answer before responding. | |
| - β¨ GPT then refines the answer for clarity, formatting, and evidence grounding. | |
| """ | |
| ) | |
| gr.ChatInterface( | |
| fn=chat_response, | |
| title=None, | |
| description=None, | |
| examples=[ | |
| "How does drought stress during flowering affect rice yield?", | |
| "What are sustainable pest management strategies for maize?", | |
| "How does increased temperature impact wheat phenology and grain quality?", | |
| ], | |
| retry_btn=None, | |
| undo_btn=None, | |
| clear_btn="ποΈ Clear", | |
| chatbot=gr.Chatbot( | |
| height=600, # Increased height for better visibility | |
| show_label=False, | |
| container=True, | |
| scale=1, | |
| elem_id="chatbot" | |
| ), | |
| textbox=gr.Textbox( | |
| placeholder="Ask your agricultural research question here...", | |
| container=False, | |
| scale=7, | |
| lines=2 | |
| ), | |
| ) | |
| if __name__ == "__main__": | |
| print("π Starting AgriScholarQA (single-tab GPT-orchestrated RAG)...") | |
| demo.launch(share=True) | |