irfanh0926's picture
Upload 13 files
64fab7f verified
import os
import torch
from datasets import load_dataset, DatasetDict
from transformers import AutoTokenizer
import logging
from tqdm import tqdm
import json
import requests
import zipfile
import pandas as pd
from io import StringIO, BytesIO
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataProcessor:
def __init__(self, model_name="t5-small", max_length=128):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.max_length = max_length
self.pad_token_id = self.tokenizer.pad_token_id
def download_dataset(self, dataset_url="https://object.pouta.csc.fi/OPUS-GlobalVoices/v2018q4/moses/en-fr.txt.zip", sample_size=None):
"""Download and prepare the GlobalVoices dataset"""
logger.info(f"Downloading GlobalVoices dataset from {dataset_url}...")
try:
# Download the zip file
response = requests.get(dataset_url)
response.raise_for_status()
# Extract the zip file in memory
with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
# Find the text files in the zip
file_list = zip_file.namelist()
logger.info(f"Files in zip: {file_list}")
# Look for English-French parallel files
en_file = None
fr_file = None
for file_name in file_list:
if 'en' in file_name and file_name.endswith('.en'):
en_file = file_name
elif 'fr' in file_name and file_name.endswith('.fr'):
fr_file = file_name
if not en_file or not fr_file:
raise ValueError("Could not find English and French files in the zip")
logger.info(f"Found English file: {en_file}")
logger.info(f"Found French file: {fr_file}")
# Read the text files
with zip_file.open(en_file) as f:
english_lines = [line.decode('utf-8').strip() for line in f]
with zip_file.open(fr_file) as f:
french_lines = [line.decode('utf-8').strip() for line in f]
# Create dataset dictionary
dataset_dict = {
'en': english_lines,
'fr': french_lines
}
# Convert to datasets format
from datasets import Dataset
dataset = Dataset.from_dict(dataset_dict)
# Apply sampling if requested
if sample_size and sample_size < len(dataset):
dataset = dataset.select(range(sample_size))
logger.info(f"Dataset downloaded successfully. Size: {len(dataset)}")
logger.info(f"Dataset structure sample: {dataset[0]}")
return dataset
except Exception as e:
logger.error(f"Error downloading dataset: {e}")
raise
def preprocess_function(self, examples):
"""Preprocess batch of examples for GlobalVoices format"""
try:
# GlobalVoices format: {'en': ['english text1', 'english text2'], 'fr': ['french text1', 'french text2']}
inputs = [f"translate English to French: {text}" for text in examples['en']]
targets = examples['fr']
# Tokenize inputs
model_inputs = self.tokenizer(
inputs,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt"
)
# Tokenize targets
labels = self.tokenizer(
targets,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt"
)
model_inputs["labels"] = labels["input_ids"]
return model_inputs
except Exception as e:
logger.error(f"Error in preprocessing: {e}")
logger.error(f"Examples keys: {examples.keys()}")
raise
def preprocess_single_example(self, example):
"""Preprocess a single example for saving - GlobalVoices format"""
try:
# GlobalVoices format: {'en': 'english text', 'fr': 'french text'}
input_text = f"translate English to French: {example['en']}"
target_text = example['fr']
# Tokenize
inputs = self.tokenizer(
input_text,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt"
)
targets = self.tokenizer(
target_text,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt"
)
return {
'input_ids': inputs['input_ids'].squeeze(),
'attention_mask': inputs['attention_mask'].squeeze(),
'labels': targets['input_ids'].squeeze()
}
except Exception as e:
logger.error(f"Error preprocessing example: {e}")
logger.error(f"Example: {example}")
raise
def create_data_splits(self, dataset, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
"""Split dataset into train, validation, and test sets"""
assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0"
total_size = len(dataset)
train_size = int(train_ratio * total_size)
val_size = int(val_ratio * total_size)
test_size = total_size - train_size - val_size
# Split dataset
train_dataset = dataset.select(range(train_size))
val_dataset = dataset.select(range(train_size, train_size + val_size))
test_dataset = dataset.select(range(train_size + val_size, total_size))
logger.info(f"Data splits - Train: {len(train_dataset)}, Val: {len(val_dataset)}, Test: {len(test_dataset)}")
return DatasetDict({
'train': train_dataset,
'validation': val_dataset,
'test': test_dataset
})
def save_processed_data(self, dataset_dict, save_dir="./data/processed"):
"""Save processed datasets"""
os.makedirs(save_dir, exist_ok=True)
# Save each split
for split_name, dataset in dataset_dict.items():
file_path = os.path.join(save_dir, f"{split_name}_dataset.pt")
# Convert to PyTorch format and save
processed_data = []
for i in tqdm(range(len(dataset)), desc=f"Processing {split_name}"):
try:
item = dataset[i]
processed = self.preprocess_single_example(item)
processed_data.append(processed)
except Exception as e:
logger.warning(f"Skipping example {i} due to error: {e}")
continue
torch.save(processed_data, file_path)
logger.info(f"Saved {split_name} data to {file_path} - {len(processed_data)} examples")
# Save tokenizer and config
self.tokenizer.save_pretrained(save_dir)
# Save dataset info
dataset_info = {
'max_length': self.max_length,
'vocab_size': self.tokenizer.vocab_size,
'dataset_sizes': {k: len(v) for k, v in dataset_dict.items()}
}
with open(os.path.join(save_dir, 'dataset_info.json'), 'w') as f:
json.dump(dataset_info, f, indent=2)
logger.info(f"All data saved to {save_dir}")
def load_processed_data(self, data_dir="./data/processed"):
"""Load processed datasets"""
splits = {}
for split_name in ['train', 'validation', 'test']:
file_path = os.path.join(data_dir, f"{split_name}_dataset.pt")
if os.path.exists(file_path):
splits[split_name] = torch.load(file_path)
logger.info(f"Loaded {split_name} data: {len(splits[split_name])} examples")
else:
logger.warning(f"File not found: {file_path}")
# Load tokenizer
tokenizer_path = os.path.join(data_dir, "tokenizer.json")
if os.path.exists(os.path.join(data_dir, "tokenizer.json")):
tokenizer = AutoTokenizer.from_pretrained(data_dir)
else:
tokenizer = self.tokenizer
return splits, tokenizer
def analyze_dataset(self, dataset_dict):
"""Analyze dataset statistics"""
logger.info("=== Dataset Analysis ===")
for split_name, dataset in dataset_dict.items():
if len(dataset) > 0:
# Sample some examples
sample = dataset[0]
# Decode sample
input_text = self.tokenizer.decode(sample['input_ids'], skip_special_tokens=True)
target_text = self.tokenizer.decode(sample['labels'], skip_special_tokens=True)
logger.info(f"\n{split_name.upper()} Split:")
logger.info(f"Size: {len(dataset)} examples")
logger.info(f"Input shape: {sample['input_ids'].shape}")
logger.info(f"Sample input: {input_text[:100]}...")
logger.info(f"Sample target: {target_text[:100]}...")
# Calculate sequence lengths
input_lengths = [len(item['input_ids']) for item in dataset]
avg_input_len = sum(input_lengths) / len(input_lengths)
logger.info(f"Average input length: {avg_input_len:.2f}")
def debug_dataset_structure():
"""Debug function to check dataset structure"""
logger.info("=== Debugging Dataset Structure ===")
try:
# Test GlobalVoices download with small sample
processor = DataProcessor()
dataset = processor.download_dataset(sample_size=5)
logger.info(f"Dataset type: {type(dataset)}")
logger.info(f"First item: {dataset[0]}")
logger.info(f"First item keys: {dataset[0].keys()}")
except Exception as e:
logger.error(f"Debug error: {e}")
def main():
"""Main function to download and preprocess data"""
# First debug the dataset structure
debug_dataset_structure()
# Initialize processor
processor = DataProcessor(max_length=128)
# Download dataset (using smaller subset for testing)
logger.info("Starting data download...")
dataset = processor.download_dataset(
dataset_url="https://object.pouta.csc.fi/OPUS-GlobalVoices/v2018q4/moses/en-fr.txt.zip",
# sample_size=1000 # Use 1K samples for testing first
)
# Create splits
dataset_dict = processor.create_data_splits(
dataset,
train_ratio=0.8,
val_ratio=0.1,
test_ratio=0.1
)
# Preprocess and save data
processor.save_processed_data(dataset_dict)
# Analyze dataset
processed_data, tokenizer = processor.load_processed_data()
processor.analyze_dataset(processed_data)
logger.info("Data preprocessing completed successfully!")
if __name__ == "__main__":
main()