# Copyright (c) 2021, EleutherAI # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math import torch import numpy as np from typing import List, Tuple from itertools import zip_longest from functools import partial from megatron import mpu, print_rank_0 from megatron.data.indexed_dataset import make_dataset as make_indexed_dataset from megatron.data.blendable_dataset import BlendableDataset from megatron.data.gpt2_dataset import GPT2Dataset from megatron.data.samplers import DistributedBatchSampler def make_data_loader(dataset, neox_args): """Build dataloader given an input dataset.""" if dataset is None: return None # Data parallel arguments. world_size = mpu.get_data_parallel_world_size() rank = mpu.get_data_parallel_rank() global_batch_size = neox_args.batch_size * world_size num_workers = neox_args.num_workers # Use a simple sampler with distributed batch sampler. sampler = torch.utils.data.SequentialSampler(dataset) batch_sampler = DistributedBatchSampler( sampler=sampler, batch_size=global_batch_size, drop_last=True, rank=rank, world_size=world_size, ) # Torch dataloader. return torch.utils.data.DataLoader( dataset, batch_sampler=batch_sampler, num_workers=num_workers, pin_memory=True ) def build_the_dataset( data_prefix, name, data_impl, num_samples, seq_length, seed, skip_warmup, build_index_mappings=True, ): """Build train/valid/test datasets.""" indexed_dataset = make_indexed_dataset(data_prefix, data_impl, skip_warmup) total_num_of_documents = indexed_dataset.sizes.shape[0] print_rank_0(" {}:".format(name)) print_rank_0(" no. of documents:{}".format(total_num_of_documents)) dataset = None documents = np.arange(start=0, stop=total_num_of_documents, step=1, dtype=np.int32) dataset = GPT2Dataset( name, data_prefix, documents, indexed_dataset, num_samples, seq_length, seed, build_index_mappings=build_index_mappings, ) return dataset def build_train_valid_test_datasets( data_prefix, use_shared_fs, data_impl, splits_string, train_valid_test_num_samples, seq_length, seed, skip_warmup, ): """Build train, valid, and test datasets.""" # Indexed dataset. indexed_dataset = make_indexed_dataset(data_prefix, data_impl, skip_warmup) total_num_of_documents = indexed_dataset.sizes.shape[0] splits = get_train_valid_test_split_(splits_string, total_num_of_documents) # Print stats about the splits. print_rank_0(" > dataset split:") def print_split_stats(name, index): print_rank_0(" {}:".format(name)) print_rank_0( " document indices in [{}, {}) total of {} " "documents".format( splits[index], splits[index + 1], splits[index + 1] - splits[index] ) ) print_split_stats("train", 0) print_split_stats("validation", 1) print_split_stats("test", 2) def build_dataset(index, name): dataset = None if splits[index + 1] > splits[index]: documents = np.arange( start=splits[index], stop=splits[index + 1], step=1, dtype=np.int32 ) dataset = GPT2Dataset( name, data_prefix, documents, indexed_dataset, train_valid_test_num_samples[index], seq_length, seed, use_shared_fs=use_shared_fs ) return dataset train_dataset = build_dataset(0, "train") valid_dataset = build_dataset(1, "valid") test_dataset = build_dataset(2, "test") return train_dataset, valid_dataset, test_dataset def get_train_valid_test_split_(splits_string, size): """Get dataset splits from comma or '/' separated string list.""" splits = [] if splits_string.find(",") != -1: splits = [float(s) for s in splits_string.split(",")] elif splits_string.find("/") != -1: splits = [float(s) for s in splits_string.split("/")] else: splits = [float(splits_string)] while len(splits) < 3: splits.append(0.0) splits = splits[:3] splits_sum = sum(splits) assert splits_sum > 0.0 splits = [split / splits_sum for split in splits] splits_index = [0] for index, split in enumerate(splits): splits_index.append(splits_index[index] + int(round(split * float(size)))) diff = splits_index[-1] - size for index in range(1, len(splits_index)): splits_index[index] -= diff assert len(splits_index) == 4 assert splits_index[-1] == size return splits_index def get_normalized_weights_and_num_samples( weights: List[float], num_samples: int ) -> Tuple[List[float], List[int]]: # Normalize weights weight_sum = sum(weights) assert weight_sum > 0.0 weights = [weight / weight_sum for weight in weights] # Add 0.5% (the 1.005 factor) so in case the blending dataset does # not uniformly distribute the number of samples, we still have # samples left to feed to the network. weighted_num_samples = [] for weight in weights: weighted_num_samples.append(int(math.ceil(num_samples * weight * 1.005))) return weights, weighted_num_samples def build_weighted_datasets( neox_args, train_num_samples, valid_num_samples, test_num_samples, train_weights, valid_weights, test_weights, build_index_mappings=True, ): # build individual datasets train_datasets, valid_datasets, test_datasets = [], [], [] for i, (train_path, valid_path, test_path) in enumerate( zip_longest( neox_args.train_data_paths, neox_args.valid_data_paths, neox_args.test_data_paths, ) ): if train_path: train_datasets.append( build_the_dataset( data_prefix=train_path, name=f"train_{i}", data_impl=neox_args.data_impl, num_samples=train_num_samples[i], seq_length=neox_args.seq_length, seed=neox_args.seed, skip_warmup=(not neox_args.mmap_warmup), build_index_mappings=build_index_mappings, ) ) if valid_path: valid_datasets.append( build_the_dataset( data_prefix=valid_path, name=f"valid_{i}", data_impl=neox_args.data_impl, num_samples=valid_num_samples[i], seq_length=neox_args.seq_length, seed=neox_args.seed, skip_warmup=(not neox_args.mmap_warmup), build_index_mappings=build_index_mappings, ) ) if test_path: test_datasets.append( build_the_dataset( data_prefix=test_path, name=f"test_{i}", data_impl=neox_args.data_impl, num_samples=test_num_samples[i], seq_length=neox_args.seq_length, seed=neox_args.seed, skip_warmup=(not neox_args.mmap_warmup), build_index_mappings=build_index_mappings, ) ) return train_datasets, valid_datasets, test_datasets def weights_by_num_docs(l: list, alpha=0.3): """ Builds weights from a multinomial distribution over groups of data according to the number of samples in each group. We sample from a group according to the probability p(L) ∝ |L| ** α, where p(L) is the probability of sampling from a given group, |L| is the number of examples in that datapoint, and α is a coefficient that acts to upsample data from underrepresented groups Hence α (`alpha`) allows us to control how much to 'boost' the probability of training on low-resource groups. See https://arxiv.org/abs/1911.02116 for more details """ if len(l) == 1: return [1.0] total_n_docs = sum(l) unbiased_sample_probs = [i / total_n_docs for i in l] probs = [i**alpha for i in unbiased_sample_probs] # normalize total = sum(probs) probs = [i / total for i in probs] # weights should be the inverse of the number of samples unbiased_sample_probs_inverse = [1 - p for p in unbiased_sample_probs] weights = [p * p2 for p, p2 in zip(probs, unbiased_sample_probs_inverse)] # normalize total = sum(weights) weights = [i / total for i in weights] return weights def build_train_valid_test_data_iterators(neox_args): """XXX""" (train_dataloader, valid_dataloader, test_dataloader) = (None, None, None) print_rank_0("> building train, validation, and test datasets ...") # Ensure only the first/last pipeline stages have data loaders if neox_args.is_pipe_parallel: is_first_stage = mpu.get_pipe_parallel_rank() == 0 is_last_stage = ( mpu.get_pipe_parallel_rank() == mpu.get_pipe_parallel_world_size() - 1 ) pipe_load = is_first_stage or is_last_stage else: pipe_load = True # Data loader only on rank 0 of each model parallel group. if mpu.get_model_parallel_rank() == 0 and pipe_load: # Number of train/valid/test samples. train_iters = neox_args.train_iters eval_iters = (train_iters // neox_args.eval_interval + 1) * neox_args.eval_iters test_iters = neox_args.eval_iters train_val_test_num_samples = [ train_iters * neox_args.train_batch_size, eval_iters * neox_args.train_batch_size, test_iters * neox_args.train_batch_size, ] if neox_args.train_data_paths: # when individual train / valid / test data paths are provided # normalize weight values and get num samples for each dataset train_weights, train_num_samples = get_normalized_weights_and_num_samples( neox_args.train_data_weights, train_val_test_num_samples[0] ) valid_weights, valid_num_samples = get_normalized_weights_and_num_samples( neox_args.valid_data_weights, train_val_test_num_samples[1] ) test_weights, test_num_samples = get_normalized_weights_and_num_samples( neox_args.test_data_weights, train_val_test_num_samples[2] ) # build individual datasets train_datasets, valid_datasets, test_datasets = build_weighted_datasets( neox_args, train_num_samples, valid_num_samples, test_num_samples, train_weights, valid_weights, test_weights, build_index_mappings=not neox_args.weight_by_num_documents, ) if neox_args.weight_by_num_documents: # gets the number of documents in each datapath get_num_docs_list = lambda datasets: [ dataset.indexed_dataset.sizes.shape[0] for dataset in datasets ] train_num_docs, valid_num_docs, test_num_docs = ( get_num_docs_list(train_datasets), get_num_docs_list(valid_datasets), get_num_docs_list(test_datasets), ) # builds weights according to alpha + the number of docs fn = partial( weights_by_num_docs, alpha=neox_args.weighted_sampler_alpha ) train_weights, valid_weights, test_weights = ( fn(train_num_docs), fn(valid_num_docs), fn(test_num_docs), ) ( train_weights, train_num_samples, ) = get_normalized_weights_and_num_samples( train_weights, train_val_test_num_samples[0] ) ( valid_weights, valid_num_samples, ) = get_normalized_weights_and_num_samples( valid_weights, train_val_test_num_samples[1] ) test_weights, test_num_samples = get_normalized_weights_and_num_samples( test_weights, train_val_test_num_samples[2] ) # rebuild datasets weighted according to new weights train_datasets, valid_datasets, test_datasets = build_weighted_datasets( neox_args, train_num_samples, valid_num_samples, test_num_samples, train_weights, valid_weights, test_weights, ) if train_datasets: train_ds = BlendableDataset(train_datasets, train_weights) if valid_datasets: valid_ds = BlendableDataset(valid_datasets, valid_weights) if test_datasets: test_ds = BlendableDataset(test_datasets, test_weights) else: # when just data_path is provided # split dataset into train, valid and test from data_path train_ds, valid_ds, test_ds = build_train_valid_test_datasets( data_prefix=neox_args.data_path, use_shared_fs=neox_args.use_shared_fs, data_impl=neox_args.data_impl, splits_string=neox_args.split, train_valid_test_num_samples=train_val_test_num_samples, seq_length=neox_args.seq_length, seed=neox_args.seed, skip_warmup=(not neox_args.mmap_warmup), ) # Build dataloders. train_dataloader = make_data_loader(train_ds, neox_args=neox_args) valid_dataloader = make_data_loader(valid_ds, neox_args=neox_args) test_dataloader = make_data_loader(test_ds, neox_args=neox_args) # Flags to know if we need to do training/validation/testing. do_train = train_dataloader is not None and neox_args.train_iters > 0 do_valid = valid_dataloader is not None and neox_args.eval_iters > 0 do_test = test_dataloader is not None and neox_args.eval_iters > 0 # Need to broadcast num_tokens and num_type_tokens. flags = torch.cuda.LongTensor([int(do_train), int(do_valid), int(do_test)]) else: flags = torch.cuda.LongTensor([0, 0, 0]) # Broadcast num tokens. if neox_args.is_pipe_parallel: # Only first/last pipeline stages have data loaders, so pipeline parallelism should # broadcast globally instead of just the model parallel group. torch.distributed.broadcast(flags, src=0) else: torch.distributed.broadcast( flags, mpu.get_model_parallel_src_rank(), group=mpu.get_model_parallel_group(), ) neox_args.do_train = flags[0].item() neox_args.do_valid = flags[1].item() neox_args.do_test = flags[2].item() # Shift the start iterations. if train_dataloader is not None: train_dataloader.batch_sampler.start_iter = ( neox_args.iteration * neox_args.gradient_accumulation_steps ) % len(train_dataloader) print_rank_0( "setting training data start iteration to {}".format( train_dataloader.batch_sampler.start_iter ) ) if valid_dataloader is not None: start_iter_val = ( (neox_args.iteration * neox_args.gradient_accumulation_steps) // neox_args.eval_interval ) * neox_args.eval_iters valid_dataloader.batch_sampler.start_iter = start_iter_val % len( valid_dataloader ) print_rank_0( "setting validation data start iteration to {}".format( valid_dataloader.batch_sampler.start_iter ) ) # Build iterators. if train_dataloader is not None: train_data_iterator = iter(train_dataloader) else: train_data_iterator = None if valid_dataloader is not None: valid_data_iterator = iter(valid_dataloader) else: valid_data_iterator = None if test_dataloader is not None: test_data_iterator = iter(test_dataloader) else: test_data_iterator = None return train_data_iterator, valid_data_iterator, test_data_iterator def compile_helper(): """Compile helper function at runtime. Make sure this is invoked on a single process.""" import os import subprocess path = os.path.abspath(os.path.dirname(__file__)) ret = subprocess.run(["make", "-C", path]) if ret.returncode != 0: print("Making C++ dataset helpers module failed, exiting.") import sys sys.exit(1)