# test.py — Agentic logic using OpenAI + MCP tools (langchain_core for parsing) import os import json from typing import Any, Dict, Optional, List, Literal, Type from pydantic import BaseModel, ValidationError from openai import OpenAI from langchain_core.output_parsers import PydanticOutputParser # ← requested parser # -------------------- OpenAI setup -------------------- OAI_MODEL = os.getenv("OAI_MODEL", "gpt-4o-mini") client_oai = OpenAI(api_key="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") def _format_history_for_context( conversation: List[Dict[str, str]], max_turns: int = 8 ) -> str: """ Convert the last N messages from the session into a compact context string. Expected item format: {"role": "user"|"assistant", "content": "..."}. """ if not conversation: return "" window = conversation[-max_turns:] lines = [] for m in window: role = m.get("role", "user") content = m.get("content", "").strip() if not content: continue if role == "user": lines.append(f"User: {content}") else: lines.append(f"Assistant: {content}") return "\n".join(lines) def llm_invoke( prompt: str, system: str = "You are a helpful assistant. Return JSON when requested.", temperature: float = 0.0, ) -> str: """ Invoke OpenAI Chat Completions for planning/intent classification (low temperature). """ resp = client_oai.chat.completions.create( model=OAI_MODEL, messages=[ {"role": "system", "content": system}, {"role": "user", "content": prompt}, ], temperature=temperature, ) return resp.choices[0].message.content # -------------------- Pydantic models -------------------- class IntentSpec(BaseModel): in_scope: bool intent: Literal["in_scope", "out_of_scope", "chit_chat"] reason: Optional[str] = None class SubQuery(BaseModel): id: str query: str tool_name: Literal["ask_excel", "ask_pdf", "ask_link"] required_params: Dict[str, Any] depends_on: List[str] = [] class PlanResponse(BaseModel): subqueries: List[SubQuery] class ContextEnhancer(BaseModel): answer_found: bool needs_enhancement: bool enhanced_query: Optional[str] = None cached_answer: Optional[str] = None reason: Optional[str] = None # -------------------- JSON parsing via langchain_core -------------------- def _safe_json(text: str) -> str: """ Heuristic sanitizer: strip code fences and extract the main JSON block to help PydanticOutputParser if the model adds extra text. """ t = text.strip() if t.startswith("```"): # Remove triple backtick fences; allow optional 'json' hint t = t.strip("`").strip() if t.lower().startswith("json"): t = t[4:].strip() # Try direct JSON try: json.loads(t) return t except Exception: pass # Fallback: find first '{' and last '}' start = t.find("{") end = t.rfind("}") if start != -1 and end != -1 and end > start: return t[start : end + 1] return text def parse_response(text: str, model_spec: Type[BaseModel]) -> BaseModel: """ Parse into a Pydantic model using langchain_core's PydanticOutputParser, with a robust fallback to standard json+pydantic if needed. """ parser = PydanticOutputParser(pydantic_object=model_spec) # First try parser.parse() directly try: return parser.parse(text) except Exception: pass # Fallback: sanitize and try again try: return parser.parse(_safe_json(text)) except Exception: # Last fallback: manual pydantic construction data = json.loads(_safe_json(text)) return model_spec(**data) # -------------------- Prompts (intent + planning) -------------------- ''' def intent_prompt(query: str, available_iits: List = [], available_branches: List = [], years: List = []) -> str: parser = PydanticOutputParser(pydantic_object=IntentSpec) fmt = parser.get_format_instructions() # <- tells the LLM the exact JSON keys/types return f"""You are an intent classifier for a JOSAA Counseling Assistant. Supported IITs: {', '.join(available_iits)} Supported Branches: {', '.join(available_branches)} Available Data: opening/closing ranks ({', '.join(years)}), curriculum, NIRF, placements/faculty/research/facilities. Classify the user's message into EXACTLY ONE of: - "chit_chat" - "in_scope" - "out_of_scope" Rules: - "chit_chat" for greetings/small talk (hi/hello/how are you/what can you do). - "in_scope" for queries about SUPPORTED IITs/branches, counseling, ranks/cutoffs, courses, curriculum, NIRF, placements, faculty, research, alumni/distinguished alumni and campus facilities. - "out_of_scope" otherwise. Return ONLY a JSON object following these instructions: {fmt} User query: "{query}" """.strip() ''' def intent_prompt( query: str, available_iits: List = [], available_branches: List = [], years: List = [], conversation_context: str = "" # NEW ) -> str: parser = PydanticOutputParser(pydantic_object=IntentSpec) fmt = parser.get_format_instructions() convo = f"\n\nRecent conversation:\n{conversation_context}\n\n" if conversation_context else "\n\n" return f"""You are an intent classifier for a JOSAA Counseling Assistant. Supported IITs: {', '.join(available_iits)} Supported Branches: {', '.join(available_branches)} Available Data: opening/closing ranks ({', '.join(years)}), curriculum, NIRF, placements/faculty/research/facilities.{convo} Classify the user's message into EXACTLY ONE of: - "chit_chat" - "in_scope" - "out_of_scope" Rules: - "chit_chat" for greetings/small talk (hi/hello/how are you/what can you do). - "in_scope" for queries about SUPPORTED IITs/branches, counseling, ranks/cutoffs, courses, curriculum, NIRF, placements, faculty, research, alumni/distinguished alumni and campus facilities. - "out_of_scope" otherwise. Return ONLY a JSON object following these instructions: {fmt} User query: "{query}" """.strip() ''' def planning_prompt(query: str, available_iits: List = [], available_branches: List = [], years: List = []) -> str: parser = PydanticOutputParser(pydantic_object=PlanResponse) fmt = parser.get_format_instructions() return f"""You are a query planner for a JEE counseling assistant. AVAILABLE TOOLS: - ask_excel — ranks/cutoffs; params may include iit_name, branch, year - ask_pdf — curriculum/NIRF; params may include iit_name, branch - ask_link — placements/faculty/research/facilities; params may include iit_name, branch, or a URL Break the user query into specific subqueries targeting ONE tool each. Use ONLY supported IIT names and branch names when present. Return ONLY a JSON object following these instructions: {fmt} User Query: "{query}" """.strip() ''' def planning_prompt( query: str, available_iits: List = [], available_branches: List = [], years: List = [], conversation_context: str = "" # NEW ) -> str: parser = PydanticOutputParser(pydantic_object=PlanResponse) fmt = parser.get_format_instructions() convo = f"\n\nRecent conversation:\n{conversation_context}\n\n" if conversation_context else "\n\n" return f"""You are a query planner for a JEE counseling assistant. AVAILABLE TOOLS: - ask_excel — ranks/cutoffs - ask_pdf — curriculum/NIRF - ask_link — placements/faculty/research/facilities{convo} Break the user query into specific subqueries targeting ONE tool each. Return ONLY a JSON object following these instructions: {fmt} User Query: "{query}" """.strip() # -------------------- Intent detection & planning -------------------- ''' def intent_detect(user_q: str, available_iits: List, available_branches: List, years: List) -> IntentSpec: response = llm_invoke(intent_prompt(user_q, available_iits, available_branches, years), temperature=0.0) print("intent is", f"{response}") try: return parse_response(response, IntentSpec) except Exception as e: # default to out_of_scope if parsing fails return IntentSpec(in_scope=False, intent="out_of_scope", reason=f"Parse error: {e}") ''' def intent_detect( user_q: str, available_iits: List, available_branches: List, years: List, conversation_context: str # NEW ) -> IntentSpec: response = llm_invoke( intent_prompt(user_q, available_iits, available_branches, years, conversation_context), temperature=0.0 ) return parse_response(response, IntentSpec) ''' def make_query_plan(user_q: str, available_iits: List, available_branches: List, years: List) -> PlanResponse: response = llm_invoke(planning_prompt(user_q, available_iits, available_branches, years), temperature=0.0) return parse_response(response, PlanResponse) ''' def make_query_plan( user_q: str, available_iits: List, available_branches: List, years: List, conversation_context: str # NEW ) -> PlanResponse: response = llm_invoke( planning_prompt(user_q, available_iits, available_branches, years, conversation_context), temperature=0.0 ) return parse_response(response, PlanResponse) # -------------------- MCP tool registry (real calls) -------------------- def _build_query_text(query: str, params: Dict[str, Any]) -> str: """Compose a single question string using the planner's params and description.""" if not params: return query param_str = "; ".join(f"{k}: {v}" for k, v in params.items()) return f"{query}\nParameters: {param_str}" ''' def make_tool_registry(mcp_client) -> Dict[str, Any]: """ Return callables that invoke actual MCP tools via your client. """ def call_ask_excel(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str: q_text = _build_query_text(query, required_params) return mcp_client.ask_excel( question=q_text, top_k=top_k, sheet=required_params.get("sheet", 0), temperature=temperature, ) def call_ask_pdf(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str: q_text = _build_query_text(query, required_params) return mcp_client.ask_pdf( question=q_text, top_k=top_k, temperature=temperature, ) def call_ask_link(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str: q_text = _build_query_text(query, required_params) return mcp_client.ask_link( question=q_text, temperature=temperature, subquery_context=required_params.get("subquery_context"), top_k=top_k, ) return { "ask_excel": call_ask_excel, "ask_pdf": call_ask_pdf, "ask_link": call_ask_link, } ''' # AFTER (CHANGE): def make_tool_registry(mcp_client, conversation_context: str) -> Dict[str, Any]: def _build_query_text(query: str, params: Dict[str, Any], conversation_context: str) -> str: parts = [query.strip()] if params: parts.append("Parameters: " + "; ".join(f"{k}: {v}" for k, v in params.items())) if conversation_context: parts.append("Conversation context:\n" + conversation_context) return "\n".join(parts) def call_ask_excel(query, required_params, temperature=0.1, top_k=5): q_text = _build_query_text(query, required_params, conversation_context) return mcp_client.ask_excel(question=q_text, top_k=top_k, sheet=required_params.get("sheet", 0), temperature=temperature) def call_ask_pdf(query, required_params, temperature=0.1, top_k=5): q_text = _build_query_text(query, required_params, conversation_context) return mcp_client.ask_pdf(question=q_text, top_k=top_k, temperature=temperature) def call_ask_link(query, required_params, temperature=0.1, top_k=5): q_text = _build_query_text(query, required_params, "") # put convo in subquery_context instead subctx = conversation_context if conversation_context else required_params.get("subquery_context") # IMPORTANT: align param name with your server (query vs question) return mcp_client.ask_link( query=q_text, # if server expects 'query'; use question=q_text otherwise temperature=temperature, subquery_context=subctx, top_k=top_k, ) return {"ask_excel": call_ask_excel, "ask_pdf": call_ask_pdf, "ask_link": call_ask_link} # -------------------- Execute subqueries & synthesize final -------------------- def build_execution_order(subqueries: List[SubQuery]) -> List[List[str]]: """ Create batches of IDs whose dependencies are satisfied (simple topological batching). """ if not subqueries: return [] remaining = {sq.id: sq for sq in subqueries} completed = set() order: List[List[str]] = [] while remaining: ready = [sq_id for sq_id, sq in remaining.items() if all(dep in completed for dep in sq.depends_on)] if not ready: raise ValueError(f"Circular or unsatisfiable dependencies: {list(remaining.keys())}") order.append(ready) for sq_id in ready: completed.add(sq_id) del remaining[sq_id] return order #def execute_plan( # user_q: str, # plan: PlanResponse, # mcp_client, # temperature: float = 0.1, # top_k: int = 5 #) -> Dict[str, Any]: # """ # Execute subqueries in batches; returns a dict of {sq_id: {tool, answer}}. # """ # registry = make_tool_registry(mcp_client) def execute_plan(user_q, plan, mcp_client, conversation_context: str, temperature=0.1, top_k=5): registry = make_tool_registry(mcp_client, conversation_context) subqs = plan.subqueries exec_order = build_execution_order(subqs) results: Dict[str, Any] = {} for batch in exec_order: for sq_id in batch: sq = next(s for s in subqs if s.id == sq_id) tool_fn = registry.get(sq.tool_name) if not tool_fn: results[sq_id] = {"tool": sq.tool_name, "answer": f"❌ Unknown tool '{sq.tool_name}'"} continue try: ans = tool_fn(sq.query, sq.required_params, temperature=temperature, top_k=top_k) results[sq_id] = {"tool": sq.tool_name, "answer": ans} except Exception as e: results[sq_id] = {"tool": sq.tool_name, "answer": f"❌ Error calling tool: {e}"} return {"execution_order": exec_order, "results": results} ''' def synthesize_answer(user_q: str, exec_result: Dict[str, Any]) -> str: """ Use OpenAI to write a concise final answer using all tool outputs. """ tool_outputs = [] for batch in exec_result.get("execution_order", []): for sq_id in batch: entry = exec_result["results"].get(sq_id, {}) tool_outputs.append(f"[{sq_id} • {entry.get('tool')}] {entry.get('answer', '')}") context = "\n".join(tool_outputs) if tool_outputs else "(no tool outputs)" prompt = f"""You are a helpful assistant for JEE/JOSAA counseling. User Question: {user_q} Tool Results: {context} Write a concise, accurate final answer grounded in the tool results. If the tool results are insufficient, state that clearly. Avoid bracketed tags and avoid repeating metadata like [sq1]. """ return llm_invoke(prompt, system="You are a helpful assistant. Use only provided context.", temperature=0.2) ''' # AFTER (CHANGE): def synthesize_answer(user_q, exec_result, conversation_context: str): tool_outputs = [] # ... prompt = f"""You are a helpful assistant for JEE/JOSAA counseling. Recent conversation: {conversation_context or "(none)"} User Question: {user_q} Tool Results: {exec_result} Write a concise, accurate final answer grounded in the tool results and the recent conversation. If the available context is insufficient, state that clearly. Avoid bracketed tags and metadata like [sq1]. """ return llm_invoke(prompt, system="You are a helpful assistant. Use only provided context.", temperature=0.2) # -------------------- Public entry point used by chat_app -------------------- # AFTER (CHANGE): def run_agent( user_q: str, mcp_client, available_iits: List[str], available_branches: List[str], years: List[str], conversation: List[Dict[str, str]], # NEW top_k: int = 5, temperature: float = 0.1, ) -> str: conversation_context = _format_history_for_context(conversation, max_turns=8) intent = intent_detect(user_q, available_iits, available_branches, years, conversation_context) print(intent) print("The intent response is", f"{intent}") if intent.intent == "chit_chat": return ( f"Hi! I’m your JOSAA Counseling Assistant.\n" f"Ask about branches, opening/closing ranks, or options for your rank.\n" f"Supported IITs: {', '.join(available_iits)}; branches: {', '.join(available_branches)}." ) if not intent.in_scope or intent.intent == "out_of_scope": return ( "This assistant only supports JEE/JOSAA counseling.\n" f"Supported IITs: {', '.join(available_iits)}; branches: {', '.join(available_branches)}.\n" "Please refine your query accordingly." ) # In-scope → plan → execute → synthesize plan = make_query_plan(user_q, available_iits, available_branches, years, conversation_context) print(plan) exec_result = execute_plan(user_q, plan, mcp_client, conversation_context, temperature=temperature, top_k=top_k) final = synthesize_answer(user_q, exec_result, conversation_context) return final.strip()