|
|
import os |
|
|
import torch |
|
|
from datasets import load_dataset, DatasetDict |
|
|
from transformers import AutoTokenizer |
|
|
import logging |
|
|
from tqdm import tqdm |
|
|
import json |
|
|
import requests |
|
|
import zipfile |
|
|
import pandas as pd |
|
|
from io import StringIO, BytesIO |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class DataProcessor: |
|
|
def __init__(self, model_name="t5-small", max_length=128): |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
|
self.max_length = max_length |
|
|
self.pad_token_id = self.tokenizer.pad_token_id |
|
|
|
|
|
def download_dataset(self, dataset_url="https://object.pouta.csc.fi/OPUS-GlobalVoices/v2018q4/moses/en-fr.txt.zip", sample_size=None): |
|
|
"""Download and prepare the GlobalVoices dataset""" |
|
|
logger.info(f"Downloading GlobalVoices dataset from {dataset_url}...") |
|
|
|
|
|
try: |
|
|
|
|
|
response = requests.get(dataset_url) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(BytesIO(response.content)) as zip_file: |
|
|
|
|
|
file_list = zip_file.namelist() |
|
|
logger.info(f"Files in zip: {file_list}") |
|
|
|
|
|
|
|
|
en_file = None |
|
|
fr_file = None |
|
|
|
|
|
for file_name in file_list: |
|
|
if 'en' in file_name and file_name.endswith('.en'): |
|
|
en_file = file_name |
|
|
elif 'fr' in file_name and file_name.endswith('.fr'): |
|
|
fr_file = file_name |
|
|
|
|
|
if not en_file or not fr_file: |
|
|
raise ValueError("Could not find English and French files in the zip") |
|
|
|
|
|
logger.info(f"Found English file: {en_file}") |
|
|
logger.info(f"Found French file: {fr_file}") |
|
|
|
|
|
|
|
|
with zip_file.open(en_file) as f: |
|
|
english_lines = [line.decode('utf-8').strip() for line in f] |
|
|
|
|
|
with zip_file.open(fr_file) as f: |
|
|
french_lines = [line.decode('utf-8').strip() for line in f] |
|
|
|
|
|
|
|
|
dataset_dict = { |
|
|
'en': english_lines, |
|
|
'fr': french_lines |
|
|
} |
|
|
|
|
|
|
|
|
from datasets import Dataset |
|
|
dataset = Dataset.from_dict(dataset_dict) |
|
|
|
|
|
|
|
|
if sample_size and sample_size < len(dataset): |
|
|
dataset = dataset.select(range(sample_size)) |
|
|
|
|
|
logger.info(f"Dataset downloaded successfully. Size: {len(dataset)}") |
|
|
logger.info(f"Dataset structure sample: {dataset[0]}") |
|
|
|
|
|
return dataset |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error downloading dataset: {e}") |
|
|
raise |
|
|
|
|
|
def preprocess_function(self, examples): |
|
|
"""Preprocess batch of examples for GlobalVoices format""" |
|
|
try: |
|
|
|
|
|
inputs = [f"translate English to French: {text}" for text in examples['en']] |
|
|
targets = examples['fr'] |
|
|
|
|
|
|
|
|
model_inputs = self.tokenizer( |
|
|
inputs, |
|
|
max_length=self.max_length, |
|
|
padding="max_length", |
|
|
truncation=True, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
|
|
|
labels = self.tokenizer( |
|
|
targets, |
|
|
max_length=self.max_length, |
|
|
padding="max_length", |
|
|
truncation=True, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
model_inputs["labels"] = labels["input_ids"] |
|
|
return model_inputs |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in preprocessing: {e}") |
|
|
logger.error(f"Examples keys: {examples.keys()}") |
|
|
raise |
|
|
|
|
|
def preprocess_single_example(self, example): |
|
|
"""Preprocess a single example for saving - GlobalVoices format""" |
|
|
try: |
|
|
|
|
|
input_text = f"translate English to French: {example['en']}" |
|
|
target_text = example['fr'] |
|
|
|
|
|
|
|
|
inputs = self.tokenizer( |
|
|
input_text, |
|
|
max_length=self.max_length, |
|
|
padding="max_length", |
|
|
truncation=True, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
targets = self.tokenizer( |
|
|
target_text, |
|
|
max_length=self.max_length, |
|
|
padding="max_length", |
|
|
truncation=True, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
return { |
|
|
'input_ids': inputs['input_ids'].squeeze(), |
|
|
'attention_mask': inputs['attention_mask'].squeeze(), |
|
|
'labels': targets['input_ids'].squeeze() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error preprocessing example: {e}") |
|
|
logger.error(f"Example: {example}") |
|
|
raise |
|
|
|
|
|
def create_data_splits(self, dataset, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1): |
|
|
"""Split dataset into train, validation, and test sets""" |
|
|
assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0" |
|
|
|
|
|
total_size = len(dataset) |
|
|
train_size = int(train_ratio * total_size) |
|
|
val_size = int(val_ratio * total_size) |
|
|
test_size = total_size - train_size - val_size |
|
|
|
|
|
|
|
|
train_dataset = dataset.select(range(train_size)) |
|
|
val_dataset = dataset.select(range(train_size, train_size + val_size)) |
|
|
test_dataset = dataset.select(range(train_size + val_size, total_size)) |
|
|
|
|
|
logger.info(f"Data splits - Train: {len(train_dataset)}, Val: {len(val_dataset)}, Test: {len(test_dataset)}") |
|
|
|
|
|
return DatasetDict({ |
|
|
'train': train_dataset, |
|
|
'validation': val_dataset, |
|
|
'test': test_dataset |
|
|
}) |
|
|
|
|
|
def save_processed_data(self, dataset_dict, save_dir="./data/processed"): |
|
|
"""Save processed datasets""" |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
for split_name, dataset in dataset_dict.items(): |
|
|
file_path = os.path.join(save_dir, f"{split_name}_dataset.pt") |
|
|
|
|
|
|
|
|
processed_data = [] |
|
|
for i in tqdm(range(len(dataset)), desc=f"Processing {split_name}"): |
|
|
try: |
|
|
item = dataset[i] |
|
|
processed = self.preprocess_single_example(item) |
|
|
processed_data.append(processed) |
|
|
except Exception as e: |
|
|
logger.warning(f"Skipping example {i} due to error: {e}") |
|
|
continue |
|
|
|
|
|
torch.save(processed_data, file_path) |
|
|
logger.info(f"Saved {split_name} data to {file_path} - {len(processed_data)} examples") |
|
|
|
|
|
|
|
|
self.tokenizer.save_pretrained(save_dir) |
|
|
|
|
|
|
|
|
dataset_info = { |
|
|
'max_length': self.max_length, |
|
|
'vocab_size': self.tokenizer.vocab_size, |
|
|
'dataset_sizes': {k: len(v) for k, v in dataset_dict.items()} |
|
|
} |
|
|
|
|
|
with open(os.path.join(save_dir, 'dataset_info.json'), 'w') as f: |
|
|
json.dump(dataset_info, f, indent=2) |
|
|
|
|
|
logger.info(f"All data saved to {save_dir}") |
|
|
|
|
|
def load_processed_data(self, data_dir="./data/processed"): |
|
|
"""Load processed datasets""" |
|
|
splits = {} |
|
|
|
|
|
for split_name in ['train', 'validation', 'test']: |
|
|
file_path = os.path.join(data_dir, f"{split_name}_dataset.pt") |
|
|
if os.path.exists(file_path): |
|
|
splits[split_name] = torch.load(file_path) |
|
|
logger.info(f"Loaded {split_name} data: {len(splits[split_name])} examples") |
|
|
else: |
|
|
logger.warning(f"File not found: {file_path}") |
|
|
|
|
|
|
|
|
tokenizer_path = os.path.join(data_dir, "tokenizer.json") |
|
|
if os.path.exists(os.path.join(data_dir, "tokenizer.json")): |
|
|
tokenizer = AutoTokenizer.from_pretrained(data_dir) |
|
|
else: |
|
|
tokenizer = self.tokenizer |
|
|
|
|
|
return splits, tokenizer |
|
|
|
|
|
def analyze_dataset(self, dataset_dict): |
|
|
"""Analyze dataset statistics""" |
|
|
logger.info("=== Dataset Analysis ===") |
|
|
|
|
|
for split_name, dataset in dataset_dict.items(): |
|
|
if len(dataset) > 0: |
|
|
|
|
|
sample = dataset[0] |
|
|
|
|
|
|
|
|
input_text = self.tokenizer.decode(sample['input_ids'], skip_special_tokens=True) |
|
|
target_text = self.tokenizer.decode(sample['labels'], skip_special_tokens=True) |
|
|
|
|
|
logger.info(f"\n{split_name.upper()} Split:") |
|
|
logger.info(f"Size: {len(dataset)} examples") |
|
|
logger.info(f"Input shape: {sample['input_ids'].shape}") |
|
|
logger.info(f"Sample input: {input_text[:100]}...") |
|
|
logger.info(f"Sample target: {target_text[:100]}...") |
|
|
|
|
|
|
|
|
input_lengths = [len(item['input_ids']) for item in dataset] |
|
|
avg_input_len = sum(input_lengths) / len(input_lengths) |
|
|
|
|
|
logger.info(f"Average input length: {avg_input_len:.2f}") |
|
|
|
|
|
def debug_dataset_structure(): |
|
|
"""Debug function to check dataset structure""" |
|
|
logger.info("=== Debugging Dataset Structure ===") |
|
|
|
|
|
try: |
|
|
|
|
|
processor = DataProcessor() |
|
|
dataset = processor.download_dataset(sample_size=5) |
|
|
logger.info(f"Dataset type: {type(dataset)}") |
|
|
logger.info(f"First item: {dataset[0]}") |
|
|
logger.info(f"First item keys: {dataset[0].keys()}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Debug error: {e}") |
|
|
|
|
|
def main(): |
|
|
"""Main function to download and preprocess data""" |
|
|
|
|
|
|
|
|
debug_dataset_structure() |
|
|
|
|
|
|
|
|
processor = DataProcessor(max_length=128) |
|
|
|
|
|
|
|
|
logger.info("Starting data download...") |
|
|
dataset = processor.download_dataset( |
|
|
dataset_url="https://object.pouta.csc.fi/OPUS-GlobalVoices/v2018q4/moses/en-fr.txt.zip", |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
dataset_dict = processor.create_data_splits( |
|
|
dataset, |
|
|
train_ratio=0.8, |
|
|
val_ratio=0.1, |
|
|
test_ratio=0.1 |
|
|
) |
|
|
|
|
|
|
|
|
processor.save_processed_data(dataset_dict) |
|
|
|
|
|
|
|
|
processed_data, tokenizer = processor.load_processed_data() |
|
|
processor.analyze_dataset(processed_data) |
|
|
|
|
|
logger.info("Data preprocessing completed successfully!") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |