import os import logging import socket from typing import List, Tuple, Dict import numpy as np import pandas as pd import fitz # PyMuPDF from fastmcp import FastMCP from openai import OpenAI from langchain_community.document_loaders import WebBaseLoader from langchain_text_splitters import RecursiveCharacterTextSplitter # ---------- Config ---------- logging.basicConfig( level=logging.INFO, format="[%(asctime)s] %(levelname)-7s %(message)s", datefmt="%m/%d/%y %H:%M:%S", ) logger = logging.getLogger("rag-mcp-server") mcp = FastMCP(name="rag-mcp-server", version="1.1.0") # Paths EXCEL_PATH = "Data/IIT_Opening_Closing_Ranks.xlsx" PDF_FILES: Dict[str, str] = { "eng_design": "Data/Engineering_design_Course_Details.pdf", "aero_curriculum": "Data/Aerospace_curriculum.pdf", "nirf_2024": "Data/IR2024_Report.pdf", "iitm_curriculum_2024": "Data/Curriculum_-_2024_Batch_B.Tech_Version_1 (1).pdf", "iitb_cse_curriculum": "Data/IITB_CSE_Btech_Curriculum.pdf", "iitb_civil_curriculum": "Data/IITB_Civil_Btech_Curriculum.pdf", "iitb_mech_curriculum": "Data/IITB_Mechanical_Engg_Curriculum.pdf", "iitb_elec_curriculum": "Data/IITD_Electrical_Btech_Curriculum.pdf", "iitd_allprogrammes_curriculum": "Data/IITD_Programmes_Curriculum.pdf" } LINK_FILES: Dict[str, str] = { "linkedin_profile_iit_d": "https://alumni.iitd.ac.in/distinguished-alum-awards", "linkedin_profile_iit_m": "https://www.vaave.com/blog/iit-madras-notable-alumni/", "linkedin_profile_iit_b": "https://acr.iitbombay.org/distinguished-alumnus/", "linkedin_profile_iit_kgp": "http://alumni.iitkgp.ac.in/", } # Models EMBED_MODEL = "text-embedding-3-small" CHAT_MODEL = "gpt-4o-mini" TOP_K = 5 client = OpenAI(api_key = "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") # ---------- Utility ---------- def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> np.ndarray: a_norm = a / (np.linalg.norm(a) + 1e-12) b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-12) return b_norm @ a_norm # EXCEL PIPELINE EXCEL_INDEX = { "texts": None, "embeddings": None, "row_ids": None, "columns": None, } def _excel_to_texts(excel_path: str, sheet: int | str = 0) -> Tuple[List[str], List[int], List[str]]: df = pd.read_excel(excel_path, sheet_name=sheet) # requires openpyxl df = df.fillna("") cols = list(df.columns) texts, row_ids = [], [] for i, row in df.iterrows(): parts = [f"Row {i}"] for c in cols: parts.append(f"{c}: {row[c]}") texts.append(" | ".join(parts)) row_ids.append(i) return texts, row_ids, cols def _build_excel_index(force: bool = False, sheet: int | str = 0): if not force and EXCEL_INDEX["texts"] is not None and EXCEL_INDEX["embeddings"] is not None: return if not os.path.exists(EXCEL_PATH): raise FileNotFoundError(f"Excel not found at {EXCEL_PATH}") logger.info("Loading Excel and building embeddings index...") texts, row_ids, cols = _excel_to_texts(EXCEL_PATH, sheet) emb = client.embeddings.create(model=EMBED_MODEL, input=texts) vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) EXCEL_INDEX.update({ "texts": texts, "embeddings": vectors, "row_ids": row_ids, "columns": cols, }) logger.info(f"[EXCEL INDEX] rows={len(texts)} emb.shape={vectors.shape} cols={len(cols)}") def _retrieve_excel(question: str, top_k: int = TOP_K) -> List[Tuple[int, str]]: q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding q = np.array(q_emb, dtype=np.float32) sims = _cosine_similarity(q, EXCEL_INDEX["embeddings"]) idxs = np.argsort(sims)[::-1][:top_k] out = [(int(EXCEL_INDEX["row_ids"][i]), EXCEL_INDEX["texts"][i]) for i in idxs] top_info = [(int(EXCEL_INDEX["row_ids"][i]), float(sims[i])) for i in idxs] logger.info(f"[EXCEL RETRIEVE] q='{question[:80]}...' top_k={top_k} -> {top_info}") return out def _make_excel_prompt(question: str, retrieved_rows: List[Tuple[int, str]], subquery_context: str = None) -> List[dict]: context_lines = [f"[Row {rid}] {rtext}" for rid, rtext in retrieved_rows] context = "\n".join(context_lines) or "(no relevant rows found)" logger.info(f"[EXCEL PROMPT] context_len={len(context)}; preview:\n{context[:500]}") system = ( "You are a helpful assistant. Answer the user's question STRICTLY using the provided Excel context. " "If the answer is not present, say you don't have enough information." ) # ✅ Append subquery_context if provided user = ( f"Context (from Excel):\n{context}\n\n" f"User question: {question}\n\n" ) if subquery_context: user += f"Additional context:\n{subquery_context}\n\n" user += ( "Instructions:\n" "- Use only the context above.\n" "- Keep answers concise and accurate.\n" "- Do not include any bracketed tags or citations." ) return [{"role": "system", "content": system}, {"role": "user", "content": user}] @mcp.tool("ask_excel", description="RAG over an Excel file; answer questions grounded in the sheet.") def ask_excel(question: str, top_k: int = TOP_K, sheet: int | str = 0, temperature: float = 0.1, subquery_context: str = None) -> str: try: _build_excel_index(False, sheet) retrieved = _retrieve_excel(question, top_k) messages = _make_excel_prompt(question, retrieved, subquery_context) for m in messages: logger.info(f"[EXCEL MESSAGES] role={m['role']} len={len(m['content'])}") completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) answer = completion.choices[0].message.content or "I couldn't generate an answer." logger.info(f"[EXCEL ANSWER] len={len(answer)}; preview: {answer[:200]}") return answer.strip() except Exception as e: logger.exception("ask_excel failed: %s", e) return f"❌ Error: {e}" # PDF PIPELINE (Multi-file) # Per-PDF indices PDF_INDEXES: Dict[str, Dict[str, object]] = {key: {"chunks": None, "embeddings": None, "chunk_ids": None} for key in PDF_FILES} # Router: keyword heuristics to quickly select a PDF PDF_ROUTER_KEYWORDS: Dict[str, List[str]] = { "eng_design": [ "finite element", "non-linear", "lagrangian", "continuum mechanics", "contact mechanics", "ed5015", "ed5012", "ergonomics", "human factors", "design", "galerkin", "variational" ], "aero_curriculum": [ "aerospace", "b.tech", "semester", "credits", "as1010", "fluid mechanics", "gas dynamics", "strength of materials", "lab", "workshop", "curriculum" ], "nirf_2024": [ "nirf", "ranking", "perception", "outreach", "inclusivity", "graduation outcome", "research", "teaching", "learning", "resources", "department of higher education" ], "iitm_curriculum_2024": [ "curriculum", "credit requirements", "branch-wise", "data science", "computer science", "electrical", "mechanical", "metallurgical", "naval architecture", "engineering physics", "2024 batch", "2023 batch", "programme" ], } # Descriptors used for embedding-based fallback routing (short, representative strings) PDF_DESCRIPTORS: Dict[str, str] = { "eng_design": "Engineering Design course details including ED5015 finite element methods and ED5012 human factors.", "aero_curriculum": "IIT Madras Aerospace Engineering B.Tech curriculum semester-wise credits and course list.", "nirf_2024": "India Rankings 2024 NIRF categories: teaching, research, graduation outcomes, outreach, inclusivity, perception.", "iitm_curriculum_2024": "IIT Madras B.Tech curriculum 2024 batch branch-wise credit requirements across departments.", "iitb_cse_curriculum": "IIT Bombay Computer Science Engineering B.Tech curriculum semester-wise credits and course list.", "iitb_civil_curriculum": "IIT Bombay Civil Engineering B.Tech curriculum semester-wise credits and course list.", "iitb_mech_curriculum": "IIT Bombay Mechanical Engineering B.Tech curriculum semester-wise credits and course list.", "iitb_elec_curriculum": "IIT Bombay Electrical Engineering B.Tech curriculum semester-wise credits and course list.", "iitd_allprogrammes_curriculum": "IIT Delhi All B.Tech programmes curriculum semester-wise credits and course list." } PDF_DESC_EMB: Dict[str, np.ndarray] = {} # cached descriptor embeddings def _build_pdf_router_embeddings(): if PDF_DESC_EMB: return inputs = [PDF_DESCRIPTORS[k] for k in PDF_FILES.keys()] emb = client.embeddings.create(model=EMBED_MODEL, input=inputs) vecs = [np.array(e.embedding, dtype=np.float32) for e in emb.data] for k, v in zip(PDF_FILES.keys(), vecs): PDF_DESC_EMB[k] = v logger.info(f"[PDF ROUTER] cached descriptor embeddings for {len(PDF_DESC_EMB)} PDFs") def _pdf_to_chunks(pdf_path: str) -> List[str]: doc = fitz.open(pdf_path) chunks: List[str] = [] for pno, page in enumerate(doc, start=1): text = page.get_text("text") if not text: continue # Split into paragraphs to improve retrieval granularity paras = [p.strip() for p in text.split("\n\n") if p.strip()] for para in paras: para = " ".join(para.split()) # collapse whitespace chunks.append(f"Page {pno}: {para}") return chunks def _build_pdf_index(pdf_key: str, force: bool = False): idx = PDF_INDEXES[pdf_key] if not force and idx["chunks"] is not None and idx["embeddings"] is not None: return pdf_path = PDF_FILES[pdf_key] if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF not found: {pdf_path}") logger.info(f"[PDF INDEX] building for '{pdf_key}' -> {pdf_path}") chunks = _pdf_to_chunks(pdf_path) if not chunks: logger.warning(f"[PDF INDEX] No text extracted for '{pdf_key}'.") idx["chunks"], idx["embeddings"], idx["chunk_ids"] = [], np.zeros((0, 1), dtype=np.float32), [] return emb = client.embeddings.create(model=EMBED_MODEL, input=chunks) vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) idx["chunks"] = chunks idx["embeddings"] = vectors idx["chunk_ids"] = list(range(len(chunks))) logger.info(f"[PDF INDEX] '{pdf_key}' chunks={len(chunks)} emb.shape={vectors.shape}") def _retrieve_pdf(pdf_key: str, question: str, top_k: int = TOP_K) -> List[Tuple[int, str]]: idx = PDF_INDEXES[pdf_key] embeddings = idx["embeddings"] if embeddings is None or len(embeddings) == 0: logger.warning(f"[PDF RETRIEVE] Empty embeddings for '{pdf_key}'.") return [] q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding q = np.array(q_emb, dtype=np.float32) sims = _cosine_similarity(q, embeddings) idxs = np.argsort(sims)[::-1][:top_k] out = [(int(i), idx["chunks"][i]) for i in idxs] top_info = [(int(i), float(sims[i])) for i in idxs] logger.info(f"[PDF RETRIEVE] '{pdf_key}' q='{question[:80]}...' top_k={top_k} -> {top_info}") return out def _route_pdf(question: str) -> str: q_lower = question.lower() # 1) Keyword heuristic for key, kws in PDF_ROUTER_KEYWORDS.items(): if any(k in q_lower for k in kws): logger.info(f"[PDF ROUTER] keyword matched '{key}'") return key # 2) Embedding fallback (compare question to PDF descriptors) _build_pdf_router_embeddings() q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding q_vec = np.array(q_emb, dtype=np.float32) q_vec = q_vec / (np.linalg.norm(q_vec) + 1e-12) keys = list(PDF_DESC_EMB.keys()) desc_mat = np.stack([PDF_DESC_EMB[k] / (np.linalg.norm(PDF_DESC_EMB[k]) + 1e-12) for k in keys], axis=0) sims = desc_mat @ q_vec best_idx = int(np.argmax(sims)) chosen = keys[best_idx] logger.info(f"[PDF ROUTER] embed sims={[(k, float(s)) for k, s in zip(keys, sims.tolist())]} -> '{chosen}'") return chosen def _make_pdf_prompt(question: str, retrieved_chunks: List[Tuple[int, str]], pdf_key: str, subquery_context: str = None) -> List[dict]: tagged_preview = [f"[{pdf_key} | Chunk {cid}] {text}" for cid, text in retrieved_chunks] logger.info(f"[PDF PROMPT] '{pdf_key}' preview:\n{'\n'.join(tagged_preview)[:500]}") context_lines = [text for _, text in retrieved_chunks] context = "\n\n".join(context_lines) or "(no relevant chunks found)" system = ( "You are a helpful assistant. Answer the user's question STRICTLY using the provided PDF context. " "If the answer is not present, say you don't have enough information. " "Do not include file names, chunk ids, or any bracketed metadata in your answer." ) user = ( f"Context:\n{context}\n\n" f"User question: {question}\n\n" ) if subquery_context: user += f"Additional context:\n{subquery_context}\n\n" user += ( "Instructions:\n" "- Use only the context above.\n" "- Keep answers concise.\n" "- Do not include any bracketed tags or source identifiers." ) return [ {"role": "system", "content": system}, {"role": "user", "content": user}, ] @mcp.tool("ask_pdf", description="RAG over multiple PDFs; auto-select the best-matching document and answer.") def ask_pdf(question: str, top_k: int = TOP_K, temperature: float = 0.1, pdf_key: str = None, subquery_context: str = None) -> str: try: chosen = pdf_key or _route_pdf(question) print(_route_pdf(question)) _build_pdf_index(chosen, force=False) retrieved = _retrieve_pdf(chosen, question, top_k) messages = _make_pdf_prompt(question, retrieved, chosen, subquery_context) for m in messages: logger.info(f"[PDF MESSAGES] role={m['role']} len={len(m['content'])}") completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) answer = completion.choices[0].message.content or "I couldn't generate an answer." logger.info(f"[PDF ANSWER] '{chosen}' len={len(answer)}; preview: {answer[:200]}") return answer.strip() except Exception as e: logger.exception("ask_pdf failed: %s", e) return f"❌ Error: {e}" ''' @mcp.tool("ask_link", description="RAG over a webpage (LinkedIn or any site); answer questions grounded in the page content.") def ask_link( question: str, link_key: str = "linkedin_profile", url: str | None = None, temperature: float = 0.1, subquery_context: str | None = None, top_k: int = TOP_K ) -> str: """ Implements RAG for a webpage: - Loads content using LangChain WebBaseLoader. - Splits into chunks. - Embeds chunks and retrieves top_k relevant ones. - Builds prompt with retrieved chunks + optional subquery_context. """ try: # ✅ Resolve URL target_url = url or LINK_FILES.get(link_key) if not target_url: return f"❌ Error: No URL resolved for link_key='{link_key}'." logger.info(f"[LINK TOOL] Fetching and processing content from: {target_url}") # ✅ Load webpage content loader = WebBaseLoader(target_url, verify_ssl=False) documents = loader.load() if not documents or not documents[0].page_content.strip(): return "❌ Error: Could not extract readable content from the URL." page_text = documents[0].page_content.strip() # ✅ Split into chunks using langchain-text-splitters splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) chunks = splitter.split_text(page_text) if not chunks: return "❌ Error: No chunks generated from page content." # ✅ Embed chunks emb = client.embeddings.create(model=EMBED_MODEL, input=chunks) chunk_vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) # ✅ Embed question q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding q_vec = np.array(q_emb, dtype=np.float32) # ✅ Compute cosine similarity correctly chunk_norms = np.linalg.norm(chunk_vectors, axis=1) q_norm = np.linalg.norm(q_vec) sims = (chunk_vectors @ q_vec) / (chunk_norms * q_norm + 1e-12) # ✅ Sort and select top_k safely idxs = np.argsort(sims)[::-1][:min(top_k, len(chunks))] retrieved_chunks = [(i, chunks[i]) for i in idxs] logger.info(f"[LINK RETRIEVE] top_k={top_k} -> {[ (i, float(sims[i])) for i in idxs ]}") # ✅ Build prompt context_lines = [text for _, text in retrieved_chunks] context = "\n\n".join(context_lines) or "(no relevant chunks found)" system = ( "You are a helpful assistant. Answer the user's question STRICTLY using the provided webpage context. " "If the answer is not present, say you don't have enough information. " "Do not include URLs or any bracketed metadata in your answer." ) user = ( f"Context:\n{context}\n\n" f"User question: {question}\n\n" ) if subquery_context: user += f"Additional context:\n{subquery_context}\n\n" user += ( "Instructions:\n" "- Use only the context above.\n" "- Keep answers concise.\n" "- Do not include any bracketed tags or source identifiers." ) messages = [{"role": "system", "content": system}, {"role": "user", "content": user}] # ✅ LLM call completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) answer = completion.choices[0].message.content or "I couldn't generate an answer." logger.info(f"[LINK ANSWER] len={len(answer)}; preview: {answer[:200]}") return answer.strip() except Exception as e: logger.exception("ask_link failed: %s", e) return f"❌ Error: {e}" ''' ######################################################################################################################################################################################################################################################################################## LINK_DESCRIPTORS: Dict[str, str] = { "linkedin_profile_iit_m": "IIT Madras Alumni.", "linkedin_profile_iit_d": "IIT Delhi Alumni.", "linkedin_profile_iit_b": "IIT Bombay Alumni.", "linkedin_profile_iit_kgp": "IIT Kharagpur Alumni.", } LINK_DESC_EMB: Dict[str, np.ndarray] = {} def _build_link_router_embeddings(): if LINK_DESC_EMB: return inputs = [LINK_DESCRIPTORS[k] for k in LINK_FILES.keys()] emb = client.embeddings.create(model=EMBED_MODEL, input=inputs) vecs = [np.array(e.embedding, dtype=np.float32) for e in emb.data] for k, v in zip(LINK_FILES.keys(), vecs): LINK_DESC_EMB[k] = v def _route_link(question: str) -> str: q_lower = question.lower() _build_link_router_embeddings() q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding q_vec = np.array(q_emb, dtype=np.float32) q_vec = q_vec / (np.linalg.norm(q_vec) + 1e-12) keys = list(LINK_DESC_EMB.keys()) desc_mat = np.stack([LINK_DESC_EMB[k] / (np.linalg.norm(LINK_DESC_EMB[k]) + 1e-12) for k in keys], axis=0) sims = desc_mat @ q_vec best_idx = int(np.argmax(sims)) chosen = keys[best_idx] return chosen @mcp.tool("ask_link", description="RAG over a webpage (LinkedIn or any site); answer questions grounded in the page content.") def ask_link( query: str, link_key: str = "linkedin_profile", url: str | None = None, temperature: float = 0.1, subquery_context: str | None = None, top_k: int = TOP_K ) -> str: """ Implements RAG for a webpage: - Loads content using LangChain WebBaseLoader. - Splits into chunks. - Embeds chunks and retrieves top_k relevant ones. - Builds prompt with retrieved chunks + optional subquery_context. """ try: # ✅ Resolve URL target_url_key = url or _route_link(query) target_url = LINK_FILES[target_url_key] if not target_url: return f"❌ Error: No URL resolved for link_key='{link_key}'." # logger.info(f"[LINK TOOL] Fetching and processing content from: {target_url}") # ✅ Load webpage content loader = WebBaseLoader(target_url, verify_ssl=False) documents = loader.load() if not documents or not documents[0].page_content.strip(): return "❌ Error: Could not extract readable content from the URL." page_text = documents[0].page_content.strip() # ✅ Split into chunks using langchain-text-splitters splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) chunks = splitter.split_text(page_text) if not chunks: return "❌ Error: No chunks generated from page content." # ✅ Embed chunks emb = client.embeddings.create(model=EMBED_MODEL, input=chunks) chunk_vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) # ✅ Embed question q_emb = client.embeddings.create(model=EMBED_MODEL, input=[query]).data[0].embedding q_vec = np.array(q_emb, dtype=np.float32) # ✅ Compute cosine similarity correctly chunk_norms = np.linalg.norm(chunk_vectors, axis=1) q_norm = np.linalg.norm(q_vec) sims = (chunk_vectors @ q_vec) / (chunk_norms * q_norm + 1e-12) # ✅ Sort and select top_k safely idxs = np.argsort(sims)[::-1][:min(top_k, len(chunks))] retrieved_chunks = [(i, chunks[i]) for i in idxs] # logger.info(f"[LINK RETRIEVE] top_k={top_k} -> {[ (i, float(sims[i])) for i in idxs ]}") # ✅ Build prompt context_lines = [text for _, text in retrieved_chunks] context = "\n\n".join(context_lines) or "(no relevant chunks found)" system = ( "You are a helpful assistant. Answer the user's question STRICTLY using the provided webpage context. " "If the answer is not present, say you don't have enough information. " "Do not include URLs or any bracketed metadata in your answer." ) user = ( f"Context:\n{context}\n\n" f"User question: {query}\n\n" ) if subquery_context: user += f"Additional context:\n{subquery_context}\n\n" user += ( "Instructions:\n" "- Use only the context above.\n" "- Keep answers concise.\n" "- Do not include any bracketed tags or source identifiers." ) messages = [{"role": "system", "content": system}, {"role": "user", "content": user}] # ✅ LLM call completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) answer = completion.choices[0].message.content or "I couldn't generate an answer." # logger.info(f"[LINK ANSWER] len={len(answer)}; preview: {answer[:200]}") return answer.strip() except Exception as e: # logger.exception("ask_link failed: %s", e) return f"❌ Error: {e}" def find_available_port(start_port=8001) -> int: port = start_port while True: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: if s.connect_ex(("127.0.0.1", port)) != 0: return port port += 1 if __name__ == "__main__": try: port = find_available_port(8001) logger.info(f"Starting RAG MCP server (Excel + multi-PDF) on port {port}") mcp.run(transport="sse", host="127.0.0.1", port=port) except Exception as e: logger.error(f"Failed to start server: {e}") print(f"Error starting server: {e}")