import os import torch from datasets import load_dataset, DatasetDict from transformers import AutoTokenizer import logging from tqdm import tqdm import json import requests import zipfile import pandas as pd from io import StringIO, BytesIO # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DataProcessor: def __init__(self, model_name="t5-small", max_length=128): self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.max_length = max_length self.pad_token_id = self.tokenizer.pad_token_id def download_dataset(self, dataset_url="https://object.pouta.csc.fi/OPUS-GlobalVoices/v2018q4/moses/en-fr.txt.zip", sample_size=None): """Download and prepare the GlobalVoices dataset""" logger.info(f"Downloading GlobalVoices dataset from {dataset_url}...") try: # Download the zip file response = requests.get(dataset_url) response.raise_for_status() # Extract the zip file in memory with zipfile.ZipFile(BytesIO(response.content)) as zip_file: # Find the text files in the zip file_list = zip_file.namelist() logger.info(f"Files in zip: {file_list}") # Look for English-French parallel files en_file = None fr_file = None for file_name in file_list: if 'en' in file_name and file_name.endswith('.en'): en_file = file_name elif 'fr' in file_name and file_name.endswith('.fr'): fr_file = file_name if not en_file or not fr_file: raise ValueError("Could not find English and French files in the zip") logger.info(f"Found English file: {en_file}") logger.info(f"Found French file: {fr_file}") # Read the text files with zip_file.open(en_file) as f: english_lines = [line.decode('utf-8').strip() for line in f] with zip_file.open(fr_file) as f: french_lines = [line.decode('utf-8').strip() for line in f] # Create dataset dictionary dataset_dict = { 'en': english_lines, 'fr': french_lines } # Convert to datasets format from datasets import Dataset dataset = Dataset.from_dict(dataset_dict) # Apply sampling if requested if sample_size and sample_size < len(dataset): dataset = dataset.select(range(sample_size)) logger.info(f"Dataset downloaded successfully. Size: {len(dataset)}") logger.info(f"Dataset structure sample: {dataset[0]}") return dataset except Exception as e: logger.error(f"Error downloading dataset: {e}") raise def preprocess_function(self, examples): """Preprocess batch of examples for GlobalVoices format""" try: # GlobalVoices format: {'en': ['english text1', 'english text2'], 'fr': ['french text1', 'french text2']} inputs = [f"translate English to French: {text}" for text in examples['en']] targets = examples['fr'] # Tokenize inputs model_inputs = self.tokenizer( inputs, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt" ) # Tokenize targets labels = self.tokenizer( targets, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt" ) model_inputs["labels"] = labels["input_ids"] return model_inputs except Exception as e: logger.error(f"Error in preprocessing: {e}") logger.error(f"Examples keys: {examples.keys()}") raise def preprocess_single_example(self, example): """Preprocess a single example for saving - GlobalVoices format""" try: # GlobalVoices format: {'en': 'english text', 'fr': 'french text'} input_text = f"translate English to French: {example['en']}" target_text = example['fr'] # Tokenize inputs = self.tokenizer( input_text, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt" ) targets = self.tokenizer( target_text, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt" ) return { 'input_ids': inputs['input_ids'].squeeze(), 'attention_mask': inputs['attention_mask'].squeeze(), 'labels': targets['input_ids'].squeeze() } except Exception as e: logger.error(f"Error preprocessing example: {e}") logger.error(f"Example: {example}") raise def create_data_splits(self, dataset, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1): """Split dataset into train, validation, and test sets""" assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0" total_size = len(dataset) train_size = int(train_ratio * total_size) val_size = int(val_ratio * total_size) test_size = total_size - train_size - val_size # Split dataset train_dataset = dataset.select(range(train_size)) val_dataset = dataset.select(range(train_size, train_size + val_size)) test_dataset = dataset.select(range(train_size + val_size, total_size)) logger.info(f"Data splits - Train: {len(train_dataset)}, Val: {len(val_dataset)}, Test: {len(test_dataset)}") return DatasetDict({ 'train': train_dataset, 'validation': val_dataset, 'test': test_dataset }) def save_processed_data(self, dataset_dict, save_dir="./data/processed"): """Save processed datasets""" os.makedirs(save_dir, exist_ok=True) # Save each split for split_name, dataset in dataset_dict.items(): file_path = os.path.join(save_dir, f"{split_name}_dataset.pt") # Convert to PyTorch format and save processed_data = [] for i in tqdm(range(len(dataset)), desc=f"Processing {split_name}"): try: item = dataset[i] processed = self.preprocess_single_example(item) processed_data.append(processed) except Exception as e: logger.warning(f"Skipping example {i} due to error: {e}") continue torch.save(processed_data, file_path) logger.info(f"Saved {split_name} data to {file_path} - {len(processed_data)} examples") # Save tokenizer and config self.tokenizer.save_pretrained(save_dir) # Save dataset info dataset_info = { 'max_length': self.max_length, 'vocab_size': self.tokenizer.vocab_size, 'dataset_sizes': {k: len(v) for k, v in dataset_dict.items()} } with open(os.path.join(save_dir, 'dataset_info.json'), 'w') as f: json.dump(dataset_info, f, indent=2) logger.info(f"All data saved to {save_dir}") def load_processed_data(self, data_dir="./data/processed"): """Load processed datasets""" splits = {} for split_name in ['train', 'validation', 'test']: file_path = os.path.join(data_dir, f"{split_name}_dataset.pt") if os.path.exists(file_path): splits[split_name] = torch.load(file_path) logger.info(f"Loaded {split_name} data: {len(splits[split_name])} examples") else: logger.warning(f"File not found: {file_path}") # Load tokenizer tokenizer_path = os.path.join(data_dir, "tokenizer.json") if os.path.exists(os.path.join(data_dir, "tokenizer.json")): tokenizer = AutoTokenizer.from_pretrained(data_dir) else: tokenizer = self.tokenizer return splits, tokenizer def analyze_dataset(self, dataset_dict): """Analyze dataset statistics""" logger.info("=== Dataset Analysis ===") for split_name, dataset in dataset_dict.items(): if len(dataset) > 0: # Sample some examples sample = dataset[0] # Decode sample input_text = self.tokenizer.decode(sample['input_ids'], skip_special_tokens=True) target_text = self.tokenizer.decode(sample['labels'], skip_special_tokens=True) logger.info(f"\n{split_name.upper()} Split:") logger.info(f"Size: {len(dataset)} examples") logger.info(f"Input shape: {sample['input_ids'].shape}") logger.info(f"Sample input: {input_text[:100]}...") logger.info(f"Sample target: {target_text[:100]}...") # Calculate sequence lengths input_lengths = [len(item['input_ids']) for item in dataset] avg_input_len = sum(input_lengths) / len(input_lengths) logger.info(f"Average input length: {avg_input_len:.2f}") def debug_dataset_structure(): """Debug function to check dataset structure""" logger.info("=== Debugging Dataset Structure ===") try: # Test GlobalVoices download with small sample processor = DataProcessor() dataset = processor.download_dataset(sample_size=5) logger.info(f"Dataset type: {type(dataset)}") logger.info(f"First item: {dataset[0]}") logger.info(f"First item keys: {dataset[0].keys()}") except Exception as e: logger.error(f"Debug error: {e}") def main(): """Main function to download and preprocess data""" # First debug the dataset structure debug_dataset_structure() # Initialize processor processor = DataProcessor(max_length=128) # Download dataset (using smaller subset for testing) logger.info("Starting data download...") dataset = processor.download_dataset( dataset_url="https://object.pouta.csc.fi/OPUS-GlobalVoices/v2018q4/moses/en-fr.txt.zip", # sample_size=1000 # Use 1K samples for testing first ) # Create splits dataset_dict = processor.create_data_splits( dataset, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1 ) # Preprocess and save data processor.save_processed_data(dataset_dict) # Analyze dataset processed_data, tokenizer = processor.load_processed_data() processor.analyze_dataset(processed_data) logger.info("Data preprocessing completed successfully!") if __name__ == "__main__": main()