Spaces:
Configuration error
Configuration error
| import os | |
| import logging | |
| import socket | |
| from typing import List, Tuple, Dict | |
| import numpy as np | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| from fastmcp import FastMCP | |
| from openai import OpenAI | |
| from langchain_community.document_loaders import WebBaseLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # ---------- Config ---------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="[%(asctime)s] %(levelname)-7s %(message)s", | |
| datefmt="%m/%d/%y %H:%M:%S", | |
| ) | |
| logger = logging.getLogger("rag-mcp-server") | |
| mcp = FastMCP(name="rag-mcp-server", version="1.1.0") | |
| # Paths | |
| EXCEL_PATH = "Data/IIT_Opening_Closing_Ranks.xlsx" | |
| PDF_FILES: Dict[str, str] = { | |
| "eng_design": "Data/Engineering_design_Course_Details.pdf", | |
| "aero_curriculum": "Data/Aerospace_curriculum.pdf", | |
| "nirf_2024": "Data/IR2024_Report.pdf", | |
| "iitm_curriculum_2024": "Data/Curriculum_-_2024_Batch_B.Tech_Version_1 (1).pdf", | |
| "iitb_cse_curriculum": "Data/IITB_CSE_Btech_Curriculum.pdf", | |
| "iitb_civil_curriculum": "Data/IITB_Civil_Btech_Curriculum.pdf", | |
| "iitb_mech_curriculum": "Data/IITB_Mechanical_Engg_Curriculum.pdf", | |
| "iitb_elec_curriculum": "Data/IITD_Electrical_Btech_Curriculum.pdf", | |
| "iitd_allprogrammes_curriculum": "Data/IITD_Programmes_Curriculum.pdf" | |
| } | |
| LINK_FILES: Dict[str, str] = { | |
| "linkedin_profile_iit_d": "https://alumni.iitd.ac.in/distinguished-alum-awards", | |
| "linkedin_profile_iit_m": "https://www.vaave.com/blog/iit-madras-notable-alumni/", | |
| "linkedin_profile_iit_b": "https://acr.iitbombay.org/distinguished-alumnus/", | |
| "linkedin_profile_iit_kgp": "http://alumni.iitkgp.ac.in/", | |
| } | |
| # Models | |
| EMBED_MODEL = "text-embedding-3-small" | |
| CHAT_MODEL = "gpt-4o-mini" | |
| TOP_K = 5 | |
| client = OpenAI(api_key = "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") | |
| # ---------- Utility ---------- | |
| def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> np.ndarray: | |
| a_norm = a / (np.linalg.norm(a) + 1e-12) | |
| b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-12) | |
| return b_norm @ a_norm | |
| # EXCEL PIPELINE | |
| EXCEL_INDEX = { | |
| "texts": None, | |
| "embeddings": None, | |
| "row_ids": None, | |
| "columns": None, | |
| } | |
| def _excel_to_texts(excel_path: str, sheet: int | str = 0) -> Tuple[List[str], List[int], List[str]]: | |
| df = pd.read_excel(excel_path, sheet_name=sheet) # requires openpyxl | |
| df = df.fillna("") | |
| cols = list(df.columns) | |
| texts, row_ids = [], [] | |
| for i, row in df.iterrows(): | |
| parts = [f"Row {i}"] | |
| for c in cols: | |
| parts.append(f"{c}: {row[c]}") | |
| texts.append(" | ".join(parts)) | |
| row_ids.append(i) | |
| return texts, row_ids, cols | |
| def _build_excel_index(force: bool = False, sheet: int | str = 0): | |
| if not force and EXCEL_INDEX["texts"] is not None and EXCEL_INDEX["embeddings"] is not None: | |
| return | |
| if not os.path.exists(EXCEL_PATH): | |
| raise FileNotFoundError(f"Excel not found at {EXCEL_PATH}") | |
| logger.info("Loading Excel and building embeddings index...") | |
| texts, row_ids, cols = _excel_to_texts(EXCEL_PATH, sheet) | |
| emb = client.embeddings.create(model=EMBED_MODEL, input=texts) | |
| vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) | |
| EXCEL_INDEX.update({ | |
| "texts": texts, | |
| "embeddings": vectors, | |
| "row_ids": row_ids, | |
| "columns": cols, | |
| }) | |
| logger.info(f"[EXCEL INDEX] rows={len(texts)} emb.shape={vectors.shape} cols={len(cols)}") | |
| def _retrieve_excel(question: str, top_k: int = TOP_K) -> List[Tuple[int, str]]: | |
| q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding | |
| q = np.array(q_emb, dtype=np.float32) | |
| sims = _cosine_similarity(q, EXCEL_INDEX["embeddings"]) | |
| idxs = np.argsort(sims)[::-1][:top_k] | |
| out = [(int(EXCEL_INDEX["row_ids"][i]), EXCEL_INDEX["texts"][i]) for i in idxs] | |
| top_info = [(int(EXCEL_INDEX["row_ids"][i]), float(sims[i])) for i in idxs] | |
| logger.info(f"[EXCEL RETRIEVE] q='{question[:80]}...' top_k={top_k} -> {top_info}") | |
| return out | |
| def _make_excel_prompt(question: str, retrieved_rows: List[Tuple[int, str]], subquery_context: str = None) -> List[dict]: | |
| context_lines = [f"[Row {rid}] {rtext}" for rid, rtext in retrieved_rows] | |
| context = "\n".join(context_lines) or "(no relevant rows found)" | |
| logger.info(f"[EXCEL PROMPT] context_len={len(context)}; preview:\n{context[:500]}") | |
| system = ( | |
| "You are a helpful assistant. Answer the user's question STRICTLY using the provided Excel context. " | |
| "If the answer is not present, say you don't have enough information." | |
| ) | |
| # ✅ Append subquery_context if provided | |
| user = ( | |
| f"Context (from Excel):\n{context}\n\n" | |
| f"User question: {question}\n\n" | |
| ) | |
| if subquery_context: | |
| user += f"Additional context:\n{subquery_context}\n\n" | |
| user += ( | |
| "Instructions:\n" | |
| "- Use only the context above.\n" | |
| "- Keep answers concise and accurate.\n" | |
| "- Do not include any bracketed tags or citations." | |
| ) | |
| return [{"role": "system", "content": system}, {"role": "user", "content": user}] | |
| def ask_excel(question: str, top_k: int = TOP_K, sheet: int | str = 0, temperature: float = 0.1, subquery_context: str = None) -> str: | |
| try: | |
| _build_excel_index(False, sheet) | |
| retrieved = _retrieve_excel(question, top_k) | |
| messages = _make_excel_prompt(question, retrieved, subquery_context) | |
| for m in messages: | |
| logger.info(f"[EXCEL MESSAGES] role={m['role']} len={len(m['content'])}") | |
| completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) | |
| answer = completion.choices[0].message.content or "I couldn't generate an answer." | |
| logger.info(f"[EXCEL ANSWER] len={len(answer)}; preview: {answer[:200]}") | |
| return answer.strip() | |
| except Exception as e: | |
| logger.exception("ask_excel failed: %s", e) | |
| return f"❌ Error: {e}" | |
| # PDF PIPELINE (Multi-file) | |
| # Per-PDF indices | |
| PDF_INDEXES: Dict[str, Dict[str, object]] = {key: {"chunks": None, "embeddings": None, "chunk_ids": None} for key in PDF_FILES} | |
| # Router: keyword heuristics to quickly select a PDF | |
| PDF_ROUTER_KEYWORDS: Dict[str, List[str]] = { | |
| "eng_design": [ | |
| "finite element", "non-linear", "lagrangian", "continuum mechanics", "contact mechanics", | |
| "ed5015", "ed5012", "ergonomics", "human factors", "design", "galerkin", "variational" | |
| ], | |
| "aero_curriculum": [ | |
| "aerospace", "b.tech", "semester", "credits", "as1010", "fluid mechanics", "gas dynamics", | |
| "strength of materials", "lab", "workshop", "curriculum" | |
| ], | |
| "nirf_2024": [ | |
| "nirf", "ranking", "perception", "outreach", "inclusivity", "graduation outcome", | |
| "research", "teaching", "learning", "resources", "department of higher education" | |
| ], | |
| "iitm_curriculum_2024": [ | |
| "curriculum", "credit requirements", "branch-wise", "data science", "computer science", | |
| "electrical", "mechanical", "metallurgical", "naval architecture", "engineering physics", | |
| "2024 batch", "2023 batch", "programme" | |
| ], | |
| } | |
| # Descriptors used for embedding-based fallback routing (short, representative strings) | |
| PDF_DESCRIPTORS: Dict[str, str] = { | |
| "eng_design": "Engineering Design course details including ED5015 finite element methods and ED5012 human factors.", | |
| "aero_curriculum": "IIT Madras Aerospace Engineering B.Tech curriculum semester-wise credits and course list.", | |
| "nirf_2024": "India Rankings 2024 NIRF categories: teaching, research, graduation outcomes, outreach, inclusivity, perception.", | |
| "iitm_curriculum_2024": "IIT Madras B.Tech curriculum 2024 batch branch-wise credit requirements across departments.", | |
| "iitb_cse_curriculum": "IIT Bombay Computer Science Engineering B.Tech curriculum semester-wise credits and course list.", | |
| "iitb_civil_curriculum": "IIT Bombay Civil Engineering B.Tech curriculum semester-wise credits and course list.", | |
| "iitb_mech_curriculum": "IIT Bombay Mechanical Engineering B.Tech curriculum semester-wise credits and course list.", | |
| "iitb_elec_curriculum": "IIT Bombay Electrical Engineering B.Tech curriculum semester-wise credits and course list.", | |
| "iitd_allprogrammes_curriculum": "IIT Delhi All B.Tech programmes curriculum semester-wise credits and course list." | |
| } | |
| PDF_DESC_EMB: Dict[str, np.ndarray] = {} # cached descriptor embeddings | |
| def _build_pdf_router_embeddings(): | |
| if PDF_DESC_EMB: | |
| return | |
| inputs = [PDF_DESCRIPTORS[k] for k in PDF_FILES.keys()] | |
| emb = client.embeddings.create(model=EMBED_MODEL, input=inputs) | |
| vecs = [np.array(e.embedding, dtype=np.float32) for e in emb.data] | |
| for k, v in zip(PDF_FILES.keys(), vecs): | |
| PDF_DESC_EMB[k] = v | |
| logger.info(f"[PDF ROUTER] cached descriptor embeddings for {len(PDF_DESC_EMB)} PDFs") | |
| def _pdf_to_chunks(pdf_path: str) -> List[str]: | |
| doc = fitz.open(pdf_path) | |
| chunks: List[str] = [] | |
| for pno, page in enumerate(doc, start=1): | |
| text = page.get_text("text") | |
| if not text: | |
| continue | |
| # Split into paragraphs to improve retrieval granularity | |
| paras = [p.strip() for p in text.split("\n\n") if p.strip()] | |
| for para in paras: | |
| para = " ".join(para.split()) # collapse whitespace | |
| chunks.append(f"Page {pno}: {para}") | |
| return chunks | |
| def _build_pdf_index(pdf_key: str, force: bool = False): | |
| idx = PDF_INDEXES[pdf_key] | |
| if not force and idx["chunks"] is not None and idx["embeddings"] is not None: | |
| return | |
| pdf_path = PDF_FILES[pdf_key] | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"PDF not found: {pdf_path}") | |
| logger.info(f"[PDF INDEX] building for '{pdf_key}' -> {pdf_path}") | |
| chunks = _pdf_to_chunks(pdf_path) | |
| if not chunks: | |
| logger.warning(f"[PDF INDEX] No text extracted for '{pdf_key}'.") | |
| idx["chunks"], idx["embeddings"], idx["chunk_ids"] = [], np.zeros((0, 1), dtype=np.float32), [] | |
| return | |
| emb = client.embeddings.create(model=EMBED_MODEL, input=chunks) | |
| vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) | |
| idx["chunks"] = chunks | |
| idx["embeddings"] = vectors | |
| idx["chunk_ids"] = list(range(len(chunks))) | |
| logger.info(f"[PDF INDEX] '{pdf_key}' chunks={len(chunks)} emb.shape={vectors.shape}") | |
| def _retrieve_pdf(pdf_key: str, question: str, top_k: int = TOP_K) -> List[Tuple[int, str]]: | |
| idx = PDF_INDEXES[pdf_key] | |
| embeddings = idx["embeddings"] | |
| if embeddings is None or len(embeddings) == 0: | |
| logger.warning(f"[PDF RETRIEVE] Empty embeddings for '{pdf_key}'.") | |
| return [] | |
| q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding | |
| q = np.array(q_emb, dtype=np.float32) | |
| sims = _cosine_similarity(q, embeddings) | |
| idxs = np.argsort(sims)[::-1][:top_k] | |
| out = [(int(i), idx["chunks"][i]) for i in idxs] | |
| top_info = [(int(i), float(sims[i])) for i in idxs] | |
| logger.info(f"[PDF RETRIEVE] '{pdf_key}' q='{question[:80]}...' top_k={top_k} -> {top_info}") | |
| return out | |
| def _route_pdf(question: str) -> str: | |
| q_lower = question.lower() | |
| # 1) Keyword heuristic | |
| for key, kws in PDF_ROUTER_KEYWORDS.items(): | |
| if any(k in q_lower for k in kws): | |
| logger.info(f"[PDF ROUTER] keyword matched '{key}'") | |
| return key | |
| # 2) Embedding fallback (compare question to PDF descriptors) | |
| _build_pdf_router_embeddings() | |
| q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding | |
| q_vec = np.array(q_emb, dtype=np.float32) | |
| q_vec = q_vec / (np.linalg.norm(q_vec) + 1e-12) | |
| keys = list(PDF_DESC_EMB.keys()) | |
| desc_mat = np.stack([PDF_DESC_EMB[k] / (np.linalg.norm(PDF_DESC_EMB[k]) + 1e-12) for k in keys], axis=0) | |
| sims = desc_mat @ q_vec | |
| best_idx = int(np.argmax(sims)) | |
| chosen = keys[best_idx] | |
| logger.info(f"[PDF ROUTER] embed sims={[(k, float(s)) for k, s in zip(keys, sims.tolist())]} -> '{chosen}'") | |
| return chosen | |
| def _make_pdf_prompt(question: str, retrieved_chunks: List[Tuple[int, str]], pdf_key: str, subquery_context: str = None) -> List[dict]: | |
| tagged_preview = [f"[{pdf_key} | Chunk {cid}] {text}" for cid, text in retrieved_chunks] | |
| logger.info(f"[PDF PROMPT] '{pdf_key}' preview:\n{'\n'.join(tagged_preview)[:500]}") | |
| context_lines = [text for _, text in retrieved_chunks] | |
| context = "\n\n".join(context_lines) or "(no relevant chunks found)" | |
| system = ( | |
| "You are a helpful assistant. Answer the user's question STRICTLY using the provided PDF context. " | |
| "If the answer is not present, say you don't have enough information. " | |
| "Do not include file names, chunk ids, or any bracketed metadata in your answer." | |
| ) | |
| user = ( | |
| f"Context:\n{context}\n\n" | |
| f"User question: {question}\n\n" | |
| ) | |
| if subquery_context: | |
| user += f"Additional context:\n{subquery_context}\n\n" | |
| user += ( | |
| "Instructions:\n" | |
| "- Use only the context above.\n" | |
| "- Keep answers concise.\n" | |
| "- Do not include any bracketed tags or source identifiers." | |
| ) | |
| return [ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": user}, | |
| ] | |
| def ask_pdf(question: str, top_k: int = TOP_K, temperature: float = 0.1, pdf_key: str = None, subquery_context: str = None) -> str: | |
| try: | |
| chosen = pdf_key or _route_pdf(question) | |
| print(_route_pdf(question)) | |
| _build_pdf_index(chosen, force=False) | |
| retrieved = _retrieve_pdf(chosen, question, top_k) | |
| messages = _make_pdf_prompt(question, retrieved, chosen, subquery_context) | |
| for m in messages: | |
| logger.info(f"[PDF MESSAGES] role={m['role']} len={len(m['content'])}") | |
| completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) | |
| answer = completion.choices[0].message.content or "I couldn't generate an answer." | |
| logger.info(f"[PDF ANSWER] '{chosen}' len={len(answer)}; preview: {answer[:200]}") | |
| return answer.strip() | |
| except Exception as e: | |
| logger.exception("ask_pdf failed: %s", e) | |
| return f"❌ Error: {e}" | |
| ''' | |
| @mcp.tool("ask_link", description="RAG over a webpage (LinkedIn or any site); answer questions grounded in the page content.") | |
| def ask_link( | |
| question: str, | |
| link_key: str = "linkedin_profile", | |
| url: str | None = None, | |
| temperature: float = 0.1, | |
| subquery_context: str | None = None, | |
| top_k: int = TOP_K | |
| ) -> str: | |
| """ | |
| Implements RAG for a webpage: | |
| - Loads content using LangChain WebBaseLoader. | |
| - Splits into chunks. | |
| - Embeds chunks and retrieves top_k relevant ones. | |
| - Builds prompt with retrieved chunks + optional subquery_context. | |
| """ | |
| try: | |
| # ✅ Resolve URL | |
| target_url = url or LINK_FILES.get(link_key) | |
| if not target_url: | |
| return f"❌ Error: No URL resolved for link_key='{link_key}'." | |
| logger.info(f"[LINK TOOL] Fetching and processing content from: {target_url}") | |
| # ✅ Load webpage content | |
| loader = WebBaseLoader(target_url, verify_ssl=False) | |
| documents = loader.load() | |
| if not documents or not documents[0].page_content.strip(): | |
| return "❌ Error: Could not extract readable content from the URL." | |
| page_text = documents[0].page_content.strip() | |
| # ✅ Split into chunks using langchain-text-splitters | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| chunks = splitter.split_text(page_text) | |
| if not chunks: | |
| return "❌ Error: No chunks generated from page content." | |
| # ✅ Embed chunks | |
| emb = client.embeddings.create(model=EMBED_MODEL, input=chunks) | |
| chunk_vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) | |
| # ✅ Embed question | |
| q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding | |
| q_vec = np.array(q_emb, dtype=np.float32) | |
| # ✅ Compute cosine similarity correctly | |
| chunk_norms = np.linalg.norm(chunk_vectors, axis=1) | |
| q_norm = np.linalg.norm(q_vec) | |
| sims = (chunk_vectors @ q_vec) / (chunk_norms * q_norm + 1e-12) | |
| # ✅ Sort and select top_k safely | |
| idxs = np.argsort(sims)[::-1][:min(top_k, len(chunks))] | |
| retrieved_chunks = [(i, chunks[i]) for i in idxs] | |
| logger.info(f"[LINK RETRIEVE] top_k={top_k} -> {[ (i, float(sims[i])) for i in idxs ]}") | |
| # ✅ Build prompt | |
| context_lines = [text for _, text in retrieved_chunks] | |
| context = "\n\n".join(context_lines) or "(no relevant chunks found)" | |
| system = ( | |
| "You are a helpful assistant. Answer the user's question STRICTLY using the provided webpage context. " | |
| "If the answer is not present, say you don't have enough information. " | |
| "Do not include URLs or any bracketed metadata in your answer." | |
| ) | |
| user = ( | |
| f"Context:\n{context}\n\n" | |
| f"User question: {question}\n\n" | |
| ) | |
| if subquery_context: | |
| user += f"Additional context:\n{subquery_context}\n\n" | |
| user += ( | |
| "Instructions:\n" | |
| "- Use only the context above.\n" | |
| "- Keep answers concise.\n" | |
| "- Do not include any bracketed tags or source identifiers." | |
| ) | |
| messages = [{"role": "system", "content": system}, {"role": "user", "content": user}] | |
| # ✅ LLM call | |
| completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) | |
| answer = completion.choices[0].message.content or "I couldn't generate an answer." | |
| logger.info(f"[LINK ANSWER] len={len(answer)}; preview: {answer[:200]}") | |
| return answer.strip() | |
| except Exception as e: | |
| logger.exception("ask_link failed: %s", e) | |
| return f"❌ Error: {e}" | |
| ''' | |
| ######################################################################################################################################################################################################################################################################################## | |
| LINK_DESCRIPTORS: Dict[str, str] = { | |
| "linkedin_profile_iit_m": "IIT Madras Alumni.", | |
| "linkedin_profile_iit_d": "IIT Delhi Alumni.", | |
| "linkedin_profile_iit_b": "IIT Bombay Alumni.", | |
| "linkedin_profile_iit_kgp": "IIT Kharagpur Alumni.", | |
| } | |
| LINK_DESC_EMB: Dict[str, np.ndarray] = {} | |
| def _build_link_router_embeddings(): | |
| if LINK_DESC_EMB: | |
| return | |
| inputs = [LINK_DESCRIPTORS[k] for k in LINK_FILES.keys()] | |
| emb = client.embeddings.create(model=EMBED_MODEL, input=inputs) | |
| vecs = [np.array(e.embedding, dtype=np.float32) for e in emb.data] | |
| for k, v in zip(LINK_FILES.keys(), vecs): | |
| LINK_DESC_EMB[k] = v | |
| def _route_link(question: str) -> str: | |
| q_lower = question.lower() | |
| _build_link_router_embeddings() | |
| q_emb = client.embeddings.create(model=EMBED_MODEL, input=[question]).data[0].embedding | |
| q_vec = np.array(q_emb, dtype=np.float32) | |
| q_vec = q_vec / (np.linalg.norm(q_vec) + 1e-12) | |
| keys = list(LINK_DESC_EMB.keys()) | |
| desc_mat = np.stack([LINK_DESC_EMB[k] / (np.linalg.norm(LINK_DESC_EMB[k]) + 1e-12) for k in keys], axis=0) | |
| sims = desc_mat @ q_vec | |
| best_idx = int(np.argmax(sims)) | |
| chosen = keys[best_idx] | |
| return chosen | |
| def ask_link( | |
| query: str, | |
| link_key: str = "linkedin_profile", | |
| url: str | None = None, | |
| temperature: float = 0.1, | |
| subquery_context: str | None = None, | |
| top_k: int = TOP_K | |
| ) -> str: | |
| """ | |
| Implements RAG for a webpage: | |
| - Loads content using LangChain WebBaseLoader. | |
| - Splits into chunks. | |
| - Embeds chunks and retrieves top_k relevant ones. | |
| - Builds prompt with retrieved chunks + optional subquery_context. | |
| """ | |
| try: | |
| # ✅ Resolve URL | |
| target_url_key = url or _route_link(query) | |
| target_url = LINK_FILES[target_url_key] | |
| if not target_url: | |
| return f"❌ Error: No URL resolved for link_key='{link_key}'." | |
| # logger.info(f"[LINK TOOL] Fetching and processing content from: {target_url}") | |
| # ✅ Load webpage content | |
| loader = WebBaseLoader(target_url, verify_ssl=False) | |
| documents = loader.load() | |
| if not documents or not documents[0].page_content.strip(): | |
| return "❌ Error: Could not extract readable content from the URL." | |
| page_text = documents[0].page_content.strip() | |
| # ✅ Split into chunks using langchain-text-splitters | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| chunks = splitter.split_text(page_text) | |
| if not chunks: | |
| return "❌ Error: No chunks generated from page content." | |
| # ✅ Embed chunks | |
| emb = client.embeddings.create(model=EMBED_MODEL, input=chunks) | |
| chunk_vectors = np.array([e.embedding for e in emb.data], dtype=np.float32) | |
| # ✅ Embed question | |
| q_emb = client.embeddings.create(model=EMBED_MODEL, input=[query]).data[0].embedding | |
| q_vec = np.array(q_emb, dtype=np.float32) | |
| # ✅ Compute cosine similarity correctly | |
| chunk_norms = np.linalg.norm(chunk_vectors, axis=1) | |
| q_norm = np.linalg.norm(q_vec) | |
| sims = (chunk_vectors @ q_vec) / (chunk_norms * q_norm + 1e-12) | |
| # ✅ Sort and select top_k safely | |
| idxs = np.argsort(sims)[::-1][:min(top_k, len(chunks))] | |
| retrieved_chunks = [(i, chunks[i]) for i in idxs] | |
| # logger.info(f"[LINK RETRIEVE] top_k={top_k} -> {[ (i, float(sims[i])) for i in idxs ]}") | |
| # ✅ Build prompt | |
| context_lines = [text for _, text in retrieved_chunks] | |
| context = "\n\n".join(context_lines) or "(no relevant chunks found)" | |
| system = ( | |
| "You are a helpful assistant. Answer the user's question STRICTLY using the provided webpage context. " | |
| "If the answer is not present, say you don't have enough information. " | |
| "Do not include URLs or any bracketed metadata in your answer." | |
| ) | |
| user = ( | |
| f"Context:\n{context}\n\n" | |
| f"User question: {query}\n\n" | |
| ) | |
| if subquery_context: | |
| user += f"Additional context:\n{subquery_context}\n\n" | |
| user += ( | |
| "Instructions:\n" | |
| "- Use only the context above.\n" | |
| "- Keep answers concise.\n" | |
| "- Do not include any bracketed tags or source identifiers." | |
| ) | |
| messages = [{"role": "system", "content": system}, {"role": "user", "content": user}] | |
| # ✅ LLM call | |
| completion = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=temperature) | |
| answer = completion.choices[0].message.content or "I couldn't generate an answer." | |
| # logger.info(f"[LINK ANSWER] len={len(answer)}; preview: {answer[:200]}") | |
| return answer.strip() | |
| except Exception as e: | |
| # logger.exception("ask_link failed: %s", e) | |
| return f"❌ Error: {e}" | |
| def find_available_port(start_port=8001) -> int: | |
| port = start_port | |
| while True: | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| if s.connect_ex(("127.0.0.1", port)) != 0: | |
| return port | |
| port += 1 | |
| if __name__ == "__main__": | |
| try: | |
| port = find_available_port(8001) | |
| logger.info(f"Starting RAG MCP server (Excel + multi-PDF) on port {port}") | |
| mcp.run(transport="sse", host="127.0.0.1", port=port) | |
| except Exception as e: | |
| logger.error(f"Failed to start server: {e}") | |
| print(f"Error starting server: {e}") | |