From 26c6dc297a15e2445e5d2bead593e6c1f5751e2b Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Fri, 24 Feb 2023 18:30:14 -0800 Subject: [PATCH 01/10] add reduce --- composer/callbacks/speed_monitor.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index 0b23521e97..4b6cedfd52 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -270,13 +270,19 @@ def batch_end(self, state: State, logger: Logger): if not isinstance(model_flops_per_batch, Callable): raise TypeError('flops_per_batch must a callable accepting a batch and ' f'returning an int or float. Instead, got {type(model_flops_per_batch)}.') - flops_per_batch = model_flops_per_batch(state.batch) + device_flops_per_batch = model_flops_per_batch(state.batch) + + # Sum flops across all ranks since each rank computes the flops for its own batch + flops_per_batch_tensor = state.device.tensor_to_device( + torch.tensor(device_flops_per_batch, dtype=torch.int)) + dist.all_reduce(flops_per_batch_tensor, reduce_operation='SUM') + flops_per_batch = flops_per_batch_tensor.item() + flops_per_sec = flops_per_batch * batches_per_sec logger.log_metrics({'throughput/flops_per_sec': flops_per_sec}) - dev_flops_per_sec = flops_per_sec / world_size - logger.log_metrics({'throughput/device/flops_per_sec': dev_flops_per_sec}) + logger.log_metrics({'throughput/device/flops_per_sec': device_flops_per_batch}) if self.gpu_flops_available: - mfu = dev_flops_per_sec / self.gpu_flops_available + mfu = device_flops_per_batch / self.gpu_flops_available logger.log_metrics({'throughput/device/mfu': mfu}) # Log the time From ec46bf8aa25470af9f35bc794dd9924a38ca6b2a Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 27 Feb 2023 13:28:46 -0800 Subject: [PATCH 02/10] patch speed monitor --- composer/callbacks/speed_monitor.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index 4b6cedfd52..90280b6986 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -214,6 +214,7 @@ def __init__(self, window_size: int = 100, gpu_flops_available: Optional[Union[f # Track the batch num samples and wct to compute throughput over a window of batches self.history_samples: Deque[int] = deque(maxlen=window_size + 1) self.history_wct: Deque[float] = deque(maxlen=window_size + 1) + self.history_flops_per_batch: Deque[float] = deque(maxlen=window_size + 1) self.gpu_flops_available = gpu_flops_available @@ -278,12 +279,15 @@ def batch_end(self, state: State, logger: Logger): dist.all_reduce(flops_per_batch_tensor, reduce_operation='SUM') flops_per_batch = flops_per_batch_tensor.item() - flops_per_sec = flops_per_batch * batches_per_sec - logger.log_metrics({'throughput/flops_per_sec': flops_per_sec}) - logger.log_metrics({'throughput/device/flops_per_sec': device_flops_per_batch}) - if self.gpu_flops_available: - mfu = device_flops_per_batch / self.gpu_flops_available - logger.log_metrics({'throughput/device/mfu': mfu}) + self.history_flops_per_batch.append(flops_per_batch) + if len(self.history_flops_per_batch) == self.history_flops_per_batch.maxlen: + flops_per_sec = sum(self.history_flops_per_batch) / elapsed_wct + device_flops_per_sec = flops_per_sec / world_size + logger.log_metrics({'throughput/flops_per_sec': flops_per_sec}) + logger.log_metrics({'throughput/device/flops_per_sec': device_flops_per_sec}) + if self.gpu_flops_available: + mfu = device_flops_per_sec / self.gpu_flops_available + logger.log_metrics({'throughput/device/mfu': mfu}) # Log the time # `state.timestamp` excludes any time spent in evaluation From b59097a3a0a36948d77d651e0dde9eb5bdf1fe31 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 27 Feb 2023 14:22:25 -0800 Subject: [PATCH 03/10] change to float --- composer/callbacks/speed_monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index 90280b6986..87b2870e37 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -275,7 +275,7 @@ def batch_end(self, state: State, logger: Logger): # Sum flops across all ranks since each rank computes the flops for its own batch flops_per_batch_tensor = state.device.tensor_to_device( - torch.tensor(device_flops_per_batch, dtype=torch.int)) + torch.tensor(device_flops_per_batch, dtype=torch.float)) dist.all_reduce(flops_per_batch_tensor, reduce_operation='SUM') flops_per_batch = flops_per_batch_tensor.item() From 87f52f4b2875009d3da66b11353665a12dccb0bf Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 27 Feb 2023 14:56:10 -0800 Subject: [PATCH 04/10] add logs --- composer/callbacks/speed_monitor.py | 56 ++++++++++++++++------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index 87b2870e37..88c280b960 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -263,31 +263,37 @@ def batch_end(self, state: State, logger: Logger): except AttributeError: pass - composer_model = state.model - if not isinstance(composer_model, ComposerModel): - composer_model = composer_model.module # Pass through DDP wrapping - if hasattr(composer_model, 'flops_per_batch'): - model_flops_per_batch = composer_model.flops_per_batch # type: ignore - if not isinstance(model_flops_per_batch, Callable): - raise TypeError('flops_per_batch must a callable accepting a batch and ' - f'returning an int or float. Instead, got {type(model_flops_per_batch)}.') - device_flops_per_batch = model_flops_per_batch(state.batch) - - # Sum flops across all ranks since each rank computes the flops for its own batch - flops_per_batch_tensor = state.device.tensor_to_device( - torch.tensor(device_flops_per_batch, dtype=torch.float)) - dist.all_reduce(flops_per_batch_tensor, reduce_operation='SUM') - flops_per_batch = flops_per_batch_tensor.item() - - self.history_flops_per_batch.append(flops_per_batch) - if len(self.history_flops_per_batch) == self.history_flops_per_batch.maxlen: - flops_per_sec = sum(self.history_flops_per_batch) / elapsed_wct - device_flops_per_sec = flops_per_sec / world_size - logger.log_metrics({'throughput/flops_per_sec': flops_per_sec}) - logger.log_metrics({'throughput/device/flops_per_sec': device_flops_per_sec}) - if self.gpu_flops_available: - mfu = device_flops_per_sec / self.gpu_flops_available - logger.log_metrics({'throughput/device/mfu': mfu}) + # Compute flops stats if model has flops_per_batch + composer_model = state.model + if not isinstance(composer_model, ComposerModel): + composer_model = composer_model.module # Pass through DDP wrapping + if hasattr(composer_model, 'flops_per_batch'): + model_flops_per_batch = composer_model.flops_per_batch # type: ignore + if not isinstance(model_flops_per_batch, Callable): + raise TypeError('flops_per_batch must a callable accepting a batch and ' + f'returning an int or float. Instead, got {type(model_flops_per_batch)}.') + device_flops_per_batch = model_flops_per_batch(state.batch) + + # Sum flops across all ranks since each rank computes the flops for its own batch + flops_per_batch_tensor = state.device.tensor_to_device( + torch.tensor(device_flops_per_batch, dtype=torch.float)) + dist.all_reduce(flops_per_batch_tensor, reduce_operation='SUM') + flops_per_batch = flops_per_batch_tensor.item() + + print(f'flops_per_batch: {flops_per_batch}, device_flops_per_batch: {device_flops_per_batch}') + self.history_flops_per_batch.append(flops_per_batch) + + # Log the flops throughput + if len(self.history_flops_per_batch) == self.history_flops_per_batch.maxlen: + world_size = dist.get_world_size() + elapsed_wct = self.history_wct[-1] - self.history_wct[0] + flops_per_sec = sum(self.history_flops_per_batch) / elapsed_wct + device_flops_per_sec = flops_per_sec / world_size + logger.log_metrics({'throughput/flops_per_sec': flops_per_sec}) + logger.log_metrics({'throughput/device/flops_per_sec': device_flops_per_sec}) + if self.gpu_flops_available: + mfu = device_flops_per_sec / self.gpu_flops_available + logger.log_metrics({'throughput/device/mfu': mfu}) # Log the time # `state.timestamp` excludes any time spent in evaluation From 55bec31b3d7e05a7cfc6a8a1582073d4d643a3cb Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 27 Feb 2023 18:11:21 -0800 Subject: [PATCH 05/10] add logs --- composer/callbacks/speed_monitor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index 88c280b960..805c6ddef4 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -272,6 +272,7 @@ def batch_end(self, state: State, logger: Logger): if not isinstance(model_flops_per_batch, Callable): raise TypeError('flops_per_batch must a callable accepting a batch and ' f'returning an int or float. Instead, got {type(model_flops_per_batch)}.') + print(state.batch['input_ids'].shape) device_flops_per_batch = model_flops_per_batch(state.batch) # Sum flops across all ranks since each rank computes the flops for its own batch From 0edb1808b708500ef4b9b43e5a6a6e2f9b203469 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 28 Feb 2023 13:37:25 -0800 Subject: [PATCH 06/10] fix bug --- composer/callbacks/speed_monitor.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index 071ac1c06d..d7b6681ec2 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -223,7 +223,7 @@ def __init__( # Track the batch num samples and wct to compute throughput over a window of batches self.history_samples: Deque[int] = deque(maxlen=window_size + 1) self.history_wct: Deque[float] = deque(maxlen=window_size + 1) - self.history_flops_per_batch: Deque[float] = deque(maxlen=window_size + 1) + self.history_flops: Deque[float] = deque(maxlen=window_size + 1) self.gpu_flops_available = gpu_flops_available @@ -304,13 +304,14 @@ def batch_end(self, state: State, logger: Logger): flops_per_batch = flops_per_batch_tensor.item() print(f'flops_per_batch: {flops_per_batch}, device_flops_per_batch: {device_flops_per_batch}') - self.history_flops_per_batch.append(flops_per_batch) + self.history_flops.append(flops_per_batch) # Log the flops throughput - if len(self.history_flops_per_batch) == self.history_flops_per_batch.maxlen: + if len(self.history_flops) == self.history_flops.maxlen: world_size = dist.get_world_size() + elapsed_flops = self.history_flops[-1] - self.history_flops[0] elapsed_wct = self.history_wct[-1] - self.history_wct[0] - flops_per_sec = sum(self.history_flops_per_batch) / elapsed_wct + flops_per_sec = elapsed_flops / elapsed_wct device_flops_per_sec = flops_per_sec / world_size logger.log_metrics({'throughput/flops_per_sec': flops_per_sec}) logger.log_metrics({'throughput/device/flops_per_sec': device_flops_per_sec}) From de25ff7962d24084f30b14a187438bf23fc6d94e Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 28 Feb 2023 13:45:11 -0800 Subject: [PATCH 07/10] make cumsum --- composer/callbacks/speed_monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index d7b6681ec2..00ba6f8b54 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -304,7 +304,7 @@ def batch_end(self, state: State, logger: Logger): flops_per_batch = flops_per_batch_tensor.item() print(f'flops_per_batch: {flops_per_batch}, device_flops_per_batch: {device_flops_per_batch}') - self.history_flops.append(flops_per_batch) + self.history_flops.append(flops_per_batch + self.history_flops[-1]) # Log the flops throughput if len(self.history_flops) == self.history_flops.maxlen: From 9ca80272c7065ce569187b9ecdb87d2eab0e3003 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 28 Feb 2023 13:51:05 -0800 Subject: [PATCH 08/10] make cumsum --- composer/callbacks/speed_monitor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index 00ba6f8b54..ecd4f22e29 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -304,7 +304,8 @@ def batch_end(self, state: State, logger: Logger): flops_per_batch = flops_per_batch_tensor.item() print(f'flops_per_batch: {flops_per_batch}, device_flops_per_batch: {device_flops_per_batch}') - self.history_flops.append(flops_per_batch + self.history_flops[-1]) + cur_flop_sum = self.history_flops[-1] if len(self.history_flops) > 0 else 0 + self.history_flops.append(flops_per_batch + cur_flop_sum) # Log the flops throughput if len(self.history_flops) == self.history_flops.maxlen: From fc2c14151a96d59ac8c47c5430921a3215570525 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 28 Feb 2023 14:00:50 -0800 Subject: [PATCH 09/10] subtract 0 instead of cumsum --- composer/callbacks/speed_monitor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index ecd4f22e29..c546a98ebe 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -304,13 +304,12 @@ def batch_end(self, state: State, logger: Logger): flops_per_batch = flops_per_batch_tensor.item() print(f'flops_per_batch: {flops_per_batch}, device_flops_per_batch: {device_flops_per_batch}') - cur_flop_sum = self.history_flops[-1] if len(self.history_flops) > 0 else 0 - self.history_flops.append(flops_per_batch + cur_flop_sum) + self.history_flops.append(flops_per_batch) # Log the flops throughput if len(self.history_flops) == self.history_flops.maxlen: world_size = dist.get_world_size() - elapsed_flops = self.history_flops[-1] - self.history_flops[0] + elapsed_flops = sum(self.history_flops) - self.history_flops[0] elapsed_wct = self.history_wct[-1] - self.history_wct[0] flops_per_sec = elapsed_flops / elapsed_wct device_flops_per_sec = flops_per_sec / world_size From cbab878af85b755dbd4af181f43eb19fe0cb08e8 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 28 Feb 2023 15:29:27 -0800 Subject: [PATCH 10/10] remove prints --- composer/callbacks/speed_monitor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/composer/callbacks/speed_monitor.py b/composer/callbacks/speed_monitor.py index c546a98ebe..dc0512bdfa 100644 --- a/composer/callbacks/speed_monitor.py +++ b/composer/callbacks/speed_monitor.py @@ -294,7 +294,6 @@ def batch_end(self, state: State, logger: Logger): if not isinstance(model_flops_per_batch, Callable): raise TypeError('flops_per_batch must a callable accepting a batch and ' f'returning an int or float. Instead, got {type(model_flops_per_batch)}.') - print(state.batch['input_ids'].shape) device_flops_per_batch = model_flops_per_batch(state.batch) # Sum flops across all ranks since each rank computes the flops for its own batch @@ -303,7 +302,6 @@ def batch_end(self, state: State, logger: Logger): dist.all_reduce(flops_per_batch_tensor, reduce_operation='SUM') flops_per_batch = flops_per_batch_tensor.item() - print(f'flops_per_batch: {flops_per_batch}, device_flops_per_batch: {device_flops_per_batch}') self.history_flops.append(flops_per_batch) # Log the flops throughput