fleetmind-dispatch-ai / archive /conversation.py
mashrur950's picture
feat: Initialize FleetMind MCP Server with core functionalities
0a743f9
raw
history blame
2.37 kB
"""
Conversation manager for FleetMind chat
Handles conversation state and history
"""
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class ConversationManager:
"""Manage conversation state and history"""
def __init__(self):
self.history = []
self.tool_calls = [] # Track all tool calls for transparency
self.order_context = {} # Accumulated order details
def add_message(self, role: str, content: str):
"""
Add message to conversation history
Args:
role: "user" or "assistant"
content: Message content
"""
self.history.append({
"role": role,
"content": content
})
def add_tool_result(self, tool_name: str, tool_input: dict, tool_result: dict):
"""
Track tool usage for transparency
Args:
tool_name: Name of the tool called
tool_input: Input parameters
tool_result: Result from tool execution
"""
self.tool_calls.append({
"tool": tool_name,
"input": tool_input,
"result": tool_result
})
def get_history(self) -> List[Dict]:
"""Get full conversation history"""
return self.history
def get_tool_calls(self) -> List[Dict]:
"""Get all tool calls made in this conversation"""
return self.tool_calls
def get_last_tool_call(self) -> Dict:
"""Get the most recent tool call"""
if self.tool_calls:
return self.tool_calls[-1]
return {}
def clear_tool_calls(self):
"""Clear tool call history"""
self.tool_calls = []
def reset(self):
"""Start a new conversation"""
self.history = []
self.tool_calls = []
self.order_context = {}
logger.info("Conversation reset")
def get_message_count(self) -> int:
"""Get number of messages in conversation"""
return len(self.history)
def get_formatted_history(self) -> List[Dict]:
"""
Get history formatted for Gradio chatbot (messages format)
Returns:
List of message dictionaries with 'role' and 'content' keys
"""
# For Gradio type="messages", return list of dicts with role/content
return self.history