Soundaryasos's picture
Update app.py
e2278a6 verified
import streamlit as st
from transformers import pipeline
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
from wordcloud import WordCloud
import base64
from io import BytesIO
import nltk
from textblob import TextBlob
import praw
from googleapiclient.discovery import build
import os
import time
from functools import lru_cache
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from sklearn.model_selection import train_test_split
# --------------------------
# Initial Setup
# --------------------------
st.set_page_config(
page_title="πŸš€ SentimentSync Pro",
page_icon="πŸ“ˆ",
layout="wide"
)
# --------------------------
# Performance Optimizations
# --------------------------
@st.cache_resource
def load_models():
"""Load models with progress indicators"""
progress = st.progress(0, text="Loading sentiment models...")
try:
with st.spinner("Loading BERT model..."):
bert_sentiment = pipeline(
"sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment"
)
progress.progress(50)
with st.spinner("Loading VADER analyzer..."):
vader_analyzer = SentimentIntensityAnalyzer()
progress.progress(100)
return bert_sentiment, vader_analyzer
except Exception as e:
st.error(f"Model loading failed: {str(e)}")
return None, None
@st.cache_resource
def setup_api_clients():
"""Initialize API clients with error handling"""
try:
with st.spinner("Initializing Reddit API..."):
reddit = praw.Reddit(
client_id="S7pTXhj5JDFGDb3-_zrJEA",
client_secret="QP3NYN4lrAKVLrBamzLGrpFywiVg8w",
user_agent="SentimentSync/1.0"
)
with st.spinner("Initializing YouTube API..."):
youtube = build('youtube', 'v3', developerKey="AIzaSyDcUAkcoPvkTwN_tksmiW0dVPI5Bse7qos")
return reddit, youtube
except Exception as e:
st.error(f"API initialization failed: {str(e)}")
return None, None
# --------------------------
# Core Functions
# --------------------------
def analyze_text(text, models):
"""Optimized text analysis with batch processing"""
bert_sentiment, vader_analyzer = models
# Truncate very long texts to improve performance
truncated_text = text[:2000] if text else ""
try:
if not truncated_text.strip():
return {
'vader': 0,
'bert': 0,
'textblob': 0,
'bert_label': 'Neutral',
'bert_confidence': 0
}
vader_score = vader_analyzer.polarity_scores(truncated_text)['compound']
textblob_score = TextBlob(truncated_text).sentiment.polarity
bert_result = bert_sentiment(truncated_text[:512])[0] # BERT 512 token limit
label_map = {
'1 star': -1,
'2 stars': -0.5,
'3 stars': 0,
'4 stars': 0.5,
'5 stars': 1
}
bert_num = label_map.get(bert_result['label'], 0)
return {
'vader': vader_score,
'bert': bert_num,
'textblob': textblob_score,
'bert_label': bert_result['label'],
'bert_confidence': bert_result['score']
}
except Exception as e:
st.error(f"Analysis error: {str(e)}")
return {
'vader': 0,
'bert': 0,
'textblob': 0,
'bert_label': 'Error',
'bert_confidence': 0
}
@st.cache_data(ttl=3600, show_spinner="Fetching data...")
def fetch_reddit_data(keyword, limit=30):
"""Optimized Reddit data fetching"""
try:
reddit, _ = setup_api_clients()
if not reddit:
return pd.DataFrame()
posts = list(reddit.subreddit("all").search(keyword, limit=limit))
return pd.DataFrame([{
'date': datetime.fromtimestamp(post.created_utc),
'text': f"{post.title}\n{post.selftext}",
'source': 'Reddit',
'url': f"https://reddit.com{post.permalink}"
} for post in posts])
except Exception as e:
st.error(f"Reddit fetch error: {str(e)}")
return pd.DataFrame()
@st.cache_data(ttl=3600, show_spinner="Fetching data...")
def fetch_youtube_data(keyword, limit=30):
"""Optimized YouTube data fetching"""
try:
_, youtube = setup_api_clients()
if not youtube:
return pd.DataFrame()
response = youtube.search().list(
q=keyword,
part="snippet",
maxResults=limit,
type="video",
order="relevance"
).execute()
return pd.DataFrame([{
'date': datetime.strptime(item['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ'),
'text': f"{item['snippet']['title']}\n{item['snippet']['description']}",
'source': 'YouTube',
'url': f"https://youtube.com/watch?v={item['id']['videoId']}"
} for item in response['items']])
except Exception as e:
st.error(f"YouTube fetch error: {str(e)}")
return pd.DataFrame()
# --------------------------
# Visualization Functions
# --------------------------
def generate_wordcloud(text):
"""Fast word cloud generation"""
try:
if not text.strip():
return ""
wordcloud = WordCloud(
width=800,
height=400,
background_color='white',
collocations=False, # Faster processing
stopwords=nltk.corpus.stopwords.words('english')
).generate(text)
img = BytesIO()
wordcloud.to_image().save(img, format='PNG')
return base64.b64encode(img.getvalue()).decode()
except Exception as e:
st.error(f"Word cloud generation error: {str(e)}")
return ""
# --------------------------
# Prediction Functions
# --------------------------
def prepare_data_for_prediction(data):
"""Prepare time series data for prediction, handling NaN values"""
try:
if data.empty:
st.warning("No data available for prediction")
return None
# Ensure data is sorted by date
data = data.sort_values('date')
# Filter out rows with invalid sentiment scores
data = data.dropna(subset=['average'])
# Create daily aggregates
daily_data = data.groupby(pd.Grouper(key='date', freq='D'))['average'].mean().reset_index()
# Remove any remaining NaN values from aggregation
daily_data = daily_data.dropna(subset=['average'])
# Check if enough data points remain
if len(daily_data) < 5:
st.warning("Insufficient valid data points for prediction (minimum 5 required)")
return None
# Create numerical features (days since first date)
daily_data['days'] = (daily_data['date'] - daily_data['date'].min()).dt.days
return daily_data
except Exception as e:
st.error(f"Data preparation error: {str(e)}")
return None
def train_sentiment_model(data):
"""Train Ridge regression model, ensuring valid input"""
try:
if data is None:
st.warning("No valid data for model training")
return None, None
# Verify sufficient data points
if len(data) < 5:
st.warning("Not enough data points for reliable prediction (minimum 5 required)")
return None, None
# Extract features and target
X = data['days'].values.reshape(-1, 1)
y = data['average'].values
# Check for NaN values
if np.any(np.isnan(X)) or np.any(np.isnan(y)):
st.warning("Invalid values detected in data. Skipping prediction.")
return None, None
# Train polynomial Ridge regression
model = make_pipeline(
PolynomialFeatures(degree=2),
Ridge(alpha=1.0)
)
model.fit(X, y)
return model, data
except Exception as e:
st.error(f"Model training error: {str(e)}")
return None, None
def predict_future_sentiment(model, training_data, days_to_predict=15):
"""Predict future sentiment using trained model"""
try:
if model is None or training_data is None:
st.warning("No valid model or data for prediction")
return None
# Create future dates
last_date = training_data['date'].max()
future_dates = [last_date + timedelta(days=i) for i in range(1, days_to_predict+1)]
# Create feature matrix for future dates
min_date = training_data['date'].min()
future_days = [(date - min_date).days for date in future_dates]
X_future = np.array(future_days).reshape(-1, 1)
# Make predictions
predictions = model.predict(X_future)
# Create prediction dataframe
pred_df = pd.DataFrame({
'date': future_dates,
'average': predictions,
'type': 'prediction'
})
# Add training data for plotting
training_df = training_data.copy()
training_df['type'] = 'actual'
return pd.concat([training_df, pred_df], ignore_index=True)
except Exception as e:
st.error(f"Prediction error: {str(e)}")
return None
def plot_sentiment(data, keyword):
"""Plot sentiment trends, handling missing data"""
try:
if data is None or data.empty:
st.warning("No data available for plotting sentiment trends")
return None
# Separate actual and predicted data
actual_data = data[data['type'] == 'actual']
pred_data = data[data['type'] == 'prediction']
fig = go.Figure()
# Add actual data
if not actual_data.empty:
fig.add_trace(go.Scatter(
x=actual_data['date'],
y=actual_data['average'],
name='Actual Sentiment',
mode='lines+markers',
line=dict(color='#636EFA')
))
# Add predicted data if available
if not pred_data.empty:
fig.add_trace(go.Scatter(
x=pred_data['date'],
y=pred_data['average'],
name='Predicted Sentiment',
mode='lines+markers',
line=dict(color='#EF553B', dash='dot')
))
# Add confidence interval
fig.add_trace(go.Scatter(
x=pred_data['date'],
y=pred_data['average'] + 0.1,
mode='lines',
line=dict(width=0),
showlegend=False,
hoverinfo='skip'
))
fig.add_trace(go.Scatter(
x=pred_data['date'],
y=pred_data['average'] - 0.1,
mode='lines',
fill='tonexty',
line=dict(width=0),
fillcolor='rgba(239, 85, 59, 0.2)',
name='Prediction Range'
))
fig.update_layout(
title=f'Sentiment Analysis and Prediction for "{keyword}"',
xaxis_title="Date",
yaxis_title="Sentiment Score",
hovermode="x unified",
legend_title="Data Type"
)
return fig
except Exception as e:
st.error(f"Plotting error: {str(e)}")
return None
# --------------------------
# Evaluation & Dataset Stats
# --------------------------
def compute_dataset_stats(df):
stats = {}
stats['total'] = len(df)
try:
stats['start_date'] = df['date'].min()
stats['end_date'] = df['date'].max()
except:
stats['start_date'] = None
stats['end_date'] = None
stats['source_counts'] = df['source'].value_counts().to_dict() if 'source' in df.columns else {}
return stats
def prepare_eval_labels(df, pos_thresh=0.1, neg_thresh=-0.1):
"""Create categorical labels from average sentiment scores."""
df = df.copy()
df['label'] = df['average'].apply(lambda x: 'Positive' if x > pos_thresh else ('Negative' if x < neg_thresh else 'Neutral'))
label_map = {'Negative': 0, 'Neutral': 1, 'Positive': 2}
df['label_num'] = df['label'].map(label_map)
return df
def evaluate_sentiment_model(df):
"""Evaluate using simple regression->rounded classification baseline and return metrics"""
df = df.dropna(subset=['vader', 'bert', 'textblob', 'average'])
if len(df) < 5:
return None # insufficient data
df_eval = prepare_eval_labels(df)
X = df_eval[['vader', 'bert', 'textblob']].values
y = df_eval['label_num'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, random_state=42)
# Using Ridge as simple baseline regressor, then rounding predictions to nearest class
clf = Ridge(alpha=1.0)
clf.fit(X_train, y_train)
raw_preds = clf.predict(X_test)
preds = np.round(raw_preds).astype(int)
preds = np.clip(preds, 0, 2)
acc = accuracy_score(y_test, preds)
precision, recall, f1, _ = precision_recall_fscore_support(y_test, preds, average='weighted', zero_division=0)
cm = confusion_matrix(y_test, preds)
return {
'accuracy': acc,
'precision': precision,
'recall': recall,
'f1': f1,
'confusion_matrix': cm,
'class_map': {'0': 'Negative', '1': 'Neutral', '2': 'Positive'}
}
# --------------------------
# Main Application
# --------------------------
def main():
st.title("πŸš€ SentimentSync Pro - Real-time Analysis Dashboard")
# Sidebar controls
with st.sidebar:
st.header("πŸ”§ Analysis Controls")
analysis_mode = st.radio(
"Mode",
["Text Analysis", "Live Data Analysis"],
index=0
)
if analysis_mode == "Text Analysis":
user_input = st.text_area(
"Enter text to analyze",
height=200,
placeholder="Paste your content here..."
)
analyze_btn = st.button("Analyze Now")
else:
keyword = st.text_input(
"Search keyword",
placeholder="e.g., Apple, Tesla, etc."
)
reddit_limit = st.number_input("Reddit results", min_value=5, max_value=200, value=30, step=5)
youtube_limit = st.number_input("YouTube results", min_value=5, max_value=50, value=30, step=5)
analyze_btn = st.button("Fetch & Analyze")
st.markdown("---")
st.markdown("### Options")
show_details = st.checkbox("Show detailed results", value=False)
enable_prediction = st.checkbox("Enable sentiment prediction", value=True)
show_evaluation = st.checkbox("Run model evaluation (adds compute)", value=True)
st.markdown("---")
st.markdown("### Experimental Settings")
pos_thresh = st.number_input("Positive threshold", value=0.1, step=0.05, format="%.2f")
neg_thresh = st.number_input("Negative threshold", value=-0.1, step=0.05, format="%.2f")
st.markdown("---")
# Main content
if analyze_btn:
models = load_models()
if not all(models):
st.error("Required models failed to load")
return
if analysis_mode == "Text Analysis":
if not user_input.strip():
st.warning("Please enter some text to analyze")
return
with st.spinner("Analyzing content..."):
start_time = time.time()
result = analyze_text(user_input, models)
processing_time = time.time() - start_time
st.success(f"Analysis completed in {processing_time:.2f} seconds")
cols = st.columns(3)
cols[0].metric("VADER Score", f"{result['vader']:.2f}",
"Positive" if result['vader'] > 0 else "Negative" if result['vader'] < 0 else "Neutral")
cols[1].metric("BERT Sentiment", result['bert_label'], f"Confidence: {result['bert_confidence']:.2f}")
cols[2].metric("TextBlob Score", f"{result['textblob']:.2f}",
"Positive" if result['textblob'] > 0 else "Negative" if result['textblob'] < 0 else "Neutral")
st.subheader("πŸ“Š Text Visualization")
wordcloud_img = f'data:image/png;base64,{generate_wordcloud(user_input)}'
if wordcloud_img:
st.image(wordcloud_img, use_column_width=True)
else:
st.info("No word cloud generated due to insufficient text")
else: # Live Data Analysis
if not keyword.strip():
st.warning("Please enter a search keyword")
return
with st.spinner(f"Gathering data for '{keyword}'..."):
start_time = time.time()
reddit_data = fetch_reddit_data(keyword, limit=reddit_limit)
youtube_data = fetch_youtube_data(keyword, limit=youtube_limit)
if reddit_data.empty and youtube_data.empty:
st.error("No data found. Try a different keyword.")
return
combined_data = pd.concat([reddit_data, youtube_data], ignore_index=True)
# Basic dataset stats BEFORE cleaning
raw_stats = compute_dataset_stats(combined_data)
# Filter out empty or invalid texts
combined_data['text'] = combined_data['text'].fillna('').astype(str)
combined_data = combined_data[combined_data['text'].str.strip() != '']
# Ensure date column exists and is datetime
combined_data['date'] = pd.to_datetime(combined_data['date'], errors='coerce')
combined_data = combined_data.dropna(subset=['date'])
# Analyze in batches
analysis_results = []
for _, row in combined_data.iterrows():
analysis_results.append(analyze_text(row['text'], models))
# Add results to dataframe
combined_data['vader'] = [r['vader'] for r in analysis_results]
combined_data['bert'] = [r['bert'] for r in analysis_results]
combined_data['textblob'] = [r['textblob'] for r in analysis_results]
combined_data['bert_label'] = [r['bert_label'] for r in analysis_results]
combined_data['bert_confidence'] = [r['bert_confidence'] for r in analysis_results]
# Ensure no NaN values in sentiment scores
combined_data = combined_data.dropna(subset=['vader', 'bert', 'textblob'])
combined_data['average'] = combined_data[['vader', 'bert', 'textblob']].mean(axis=1)
processing_time = time.time() - start_time
st.success(f"Analyzed {len(combined_data)} sources in {processing_time:.2f} seconds")
# Dataset statistics AFTER cleaning
cleaned_stats = compute_dataset_stats(combined_data)
st.subheader(f"πŸ“ˆ Overall Sentiment for '{keyword}'")
cols = st.columns(3)
avg_sentiment = combined_data['average'].mean()
pos_pct = (combined_data['average'] > pos_thresh).mean() * 100
neg_pct = (combined_data['average'] < neg_thresh).mean() * 100
cols[0].metric("Avg Sentiment", f"{avg_sentiment:.2f}",
"Positive" if avg_sentiment > 0 else "Negative" if avg_sentiment < 0 else "Neutral")
cols[1].metric("Positive Content", f"{pos_pct:.1f}%")
cols[2].metric("Negative Content", f"{neg_pct:.1f}%")
st.subheader("πŸ“Š Content Visualization")
all_text = " ".join(combined_data['text'].tolist())
wordcloud_img = f'data:image/png;base64,{generate_wordcloud(all_text)}'
if wordcloud_img:
st.image(wordcloud_img, use_column_width=True)
else:
st.info("No word cloud generated due to insufficient text")
# Show dataset stats panel
with st.expander("πŸ“š Dataset Statistics (raw vs cleaned)"):
st.write("**Before cleaning**")
st.write(f"- Total collected: {raw_stats['total']}")
st.write(f"- Date range: {raw_stats['start_date']} -> {raw_stats['end_date']}")
st.write(f"- Source counts: {raw_stats['source_counts']}")
st.write("**After cleaning**")
st.write(f"- Total after cleaning: {cleaned_stats['total']}")
st.write(f"- Date range: {cleaned_stats['start_date']} -> {cleaned_stats['end_date']}")
st.write(f"- Source counts: {cleaned_stats['source_counts']}")
st.write("**Sentiment score summary (average)**")
st.write(combined_data['average'].describe().to_frame().T)
st.write("**Sentiment distribution histogram**")
fig_hist = px.histogram(combined_data, x='average', nbins=30, title="Average Sentiment Distribution")
st.plotly_chart(fig_hist, use_container_width=True)
# Filter recent data
combined_data['date'] = pd.to_datetime(combined_data['date'])
recent_data = combined_data[combined_data['date'] >= (datetime.now() - timedelta(days=60))]
if not recent_data.empty:
st.subheader("πŸ“… Sentiment Over Time")
if enable_prediction:
with st.spinner("Training prediction model..."):
daily_data = prepare_data_for_prediction(recent_data)
model, training_data = train_sentiment_model(daily_data)
if model is not None and training_data is not None:
full_data = predict_future_sentiment(model, training_data)
fig = plot_sentiment(full_data, keyword)
else:
daily_data = daily_data if daily_data is not None else recent_data[['date', 'average']].assign(type='actual')
fig = plot_sentiment(daily_data.assign(type='actual') if daily_data is not None else recent_data[['date', 'average']].assign(type='actual'), keyword)
else:
daily_data = prepare_data_for_prediction(recent_data)
fig = plot_sentiment(daily_data.assign(type='actual') if daily_data is not None else recent_data[['date', 'average']].assign(type='actual'), keyword)
if fig:
st.plotly_chart(fig, use_container_width=True)
if enable_prediction and 'full_data' in locals() and full_data is not None:
last_actual = full_data[full_data['type'] == 'actual']['average'].iloc[-1]
last_pred = full_data[full_data['type'] == 'prediction']['average'].iloc[-1]
if last_pred > last_actual + 0.1:
st.success("πŸ“ˆ Prediction: Sentiment is expected to improve in the next 15 days")
elif last_pred < last_actual - 0.1:
st.warning("πŸ“‰ Prediction: Sentiment is expected to decline in the next 15 days")
else:
st.info("πŸ“Š Prediction: Sentiment is expected to remain stable in the next 15 days")
if show_evaluation:
with st.spinner("Running evaluation..."):
eval_results = evaluate_sentiment_model(recent_data)
if eval_results is None:
st.info("Not enough data points for evaluation.")
else:
st.subheader("πŸ“ Model Evaluation Results (Baseline Ridge)")
st.write(f"**Accuracy:** {eval_results['accuracy']:.3f}")
st.write(f"**Precision:** {eval_results['precision']:.3f}")
st.write(f"**Recall:** {eval_results['recall']:.3f}")
st.write(f"**F1-score:** {eval_results['f1']:.3f}")
st.write("**Confusion Matrix:**")
st.write(eval_results['confusion_matrix'])
if show_details:
st.subheader("πŸ” Detailed Results (Recent Data)")
st.dataframe(recent_data[['date', 'source', 'text', 'vader', 'bert', 'textblob', 'average']], use_container_width=True)
else:
st.info("No recent data found (within last 60 days).")
# Experimental details panel outside main flow so reviewers see it even without running analyses
with st.expander("πŸ§ͺ Experimental Details & Settings (Methodology)"):
st.markdown("""
**Preprocessing Steps**
- Remove empty posts and NAs.
- Truncate text to 2000 characters to keep BERT inference performant (BERT uses <=512 tokens).
- Convert model outputs to numeric ranges: BERT labels mapped to [-1, -0.5, 0, 0.5, 1].
- Aggregate VADER, BERT, and TextBlob by mean to create a fused 'average' sentiment score.
**Model Choices & Hyperparameters**
- VADER: default lexicon (lexicon-based sentiment for short social text).
- BERT: `nlptown/bert-base-multilingual-uncased-sentiment` used for multilingual rating-style classification.
- Regression baseline for temporal prediction: PolynomialFeatures(degree=2) + Ridge(alpha=1.0).
- Evaluation baseline: Ridge(alpha=1.0) regressor trained on [vader, bert, textblob] then rounded to class labels.
**Train/Test Split**
- Standard 80% train / 20% test split (random_state=42) used for evaluation experiments.
**Labeling thresholds**
- Positive threshold: configurable (default 0.1)
- Negative threshold: configurable (default -0.1)
""")
st.markdown("**Notes for reviewers:** Add more advanced time-series models (Prophet, ARIMA, LSTM) if temporal forecasting accuracy is critical. Current Ridge polynomial baseline is intentionally simple and explained in methodology.")
if __name__ == "__main__":
try:
nltk.data.path.append(os.path.join(os.path.expanduser("~"), "nltk_data"))
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
except:
pass
main()