From 91e7c8ad293c53dd0c840e9e6190f8911edbe457 Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Wed, 25 May 2022 17:06:28 +0800 Subject: [PATCH 01/21] disable compute_stream when data parallel --- libai/models/utils/graph_base.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/libai/models/utils/graph_base.py b/libai/models/utils/graph_base.py index 2211d0e26..4654a0862 100644 --- a/libai/models/utils/graph_base.py +++ b/libai/models/utils/graph_base.py @@ -78,13 +78,11 @@ def __init__( self.config.allow_fuse_model_update_ops(True) self.config.allow_fuse_cast_scale(True) - # dist_util = dist.get_dist_util() - # Enable compute_stream for computation and communication with the same cuda stream. + dist_util = dist.get_dist_util() + # Enable cuda stream for computation and communication as the same stream. # This will reduce memory when using model parallelism. - # if dist_util.is_tensor_model_parallel() or dist_util.is_pipeline_model_parallel(): - - # Enable compute_stream by default. - flow.boxing.nccl.enable_use_compute_stream(True) + if dist_util.is_tensor_model_parallel() or dist_util.is_pipeline_model_parallel(): + flow.boxing.nccl.enable_use_compute_stream(True) def build(self, **kwargs): if self.is_train: From fa82eace551a991913818bfbb74228b6dfee90ef Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Wed, 25 May 2022 17:17:00 +0800 Subject: [PATCH 02/21] change input and label placmenet at the beginning --- libai/data/structures.py | 10 +++++----- libai/models/bert_model.py | 19 ++++++++++++++++++- libai/models/gpt_model.py | 11 ++++++++++- libai/models/t5_model.py | 5 +++++ 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/libai/data/structures.py b/libai/data/structures.py index b7eda612b..1f341b22c 100644 --- a/libai/data/structures.py +++ b/libai/data/structures.py @@ -61,12 +61,12 @@ def to_global(self, sbp=None, placement=None): # We do that to make sure that all the tensors used by the model are all generated # by the fist device group, in case that each device group containg # some random augmentations to the tensors without setting the same global seed. - main_placement = dist.get_layer_placement(0) + main_placement = dist.get_layer_placement(0, device_type="cpu") # put it on cpu first self.tensor = self.tensor.to_global(sbp=self.sbp, placement=main_placement) - if self.placement_idx != 0: - self.tensor = self.tensor.to_global( - placement=dist.get_layer_placement(self.placement_idx) - ) + # if self.placement_idx != 0: + # self.tensor = self.tensor.to_global( + # placement=dist.get_layer_placement(self.placement_idx) + # ) @staticmethod def stack(distTensor_lists: List["DistTensorData"]) -> "DistTensorData": diff --git a/libai/models/bert_model.py b/libai/models/bert_model.py index 0d0df45de..5cdd5a205 100644 --- a/libai/models/bert_model.py +++ b/libai/models/bert_model.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os + import oneflow as flow from oneflow import nn @@ -190,6 +192,11 @@ def __init__(self, add_binary_head): self.lm_loss = ParallelCrossEntropyLoss() def forward(self, lm_output, lm_labels, loss_mask, binary_logits, ns_labels): + lm_labels = lm_labels.to_global(placement=lm_output.placement) + loss_mask = loss_mask.to_global(placement=lm_output.placement) + binary_logits = binary_logits.to_global(placement=lm_output.placement) + ns_labels = ns_labels.to_global(placement=lm_output.placement) + lm_loss = self.lm_loss(lm_output, lm_labels) loss_mask = loss_mask.float() # Change loss_mask.sum() sbp sign from [P, B] -> [B, B] @@ -304,6 +311,7 @@ def __init__( # Mask generation self.extended_attn_mask = BertExtendedAttnMask() + self.multihead_attn_fusion = os.getenv("MULTIHEAD_ATTN_FUSION") is not None # Encoders self.encoders = nn.ModuleList( [ @@ -371,9 +379,16 @@ def forward(self, input_ids, attention_mask, tokentype_ids=None): embedding_output = self.embeddings(input_ids, tokentype_ids) hidden_states = embedding_output + if self.multihead_attn_fusion: + hidden_states = hidden_states.transpose(0, 1) # [seq, bs, dim] + for layer in self.encoders: hidden_states = layer(hidden_states, extended_attention_mask) encoder_output = self.final_layernorm(hidden_states) + + if self.multihead_attn_fusion: + encoder_output = encoder_output.transpose(0, 1) + pooled_output = self.pooler(encoder_output) if self.pooler is not None else None return encoder_output, pooled_output @@ -469,7 +484,9 @@ def forward( on ignored tokens. Tokens with indices set to `-1` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]` """ - + input_ids = input_ids.to_global(placement=dist.get_layer_placement(0)) + attention_mask = attention_mask.to_global(placement=dist.get_layer_placement(0)) + tokentype_ids = tokentype_ids.to_global(placement=dist.get_layer_placement(0)) outputs = self.bert(input_ids, attention_mask, tokentype_ids) sequence_output, pooled_output = outputs[:2] diff --git a/libai/models/gpt_model.py b/libai/models/gpt_model.py index 7a23a3a34..89ddaa9dc 100644 --- a/libai/models/gpt_model.py +++ b/libai/models/gpt_model.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os + import oneflow as flow from oneflow import nn from oneflow.nn import init @@ -204,7 +206,7 @@ def forward(self, input_ids): Returns: flow.Tensor: logits """ - + input_ids = input_ids.to_global(placement=dist.get_layer_placement(0)) input_embeds = self.embeddings(input_ids, 0) attention_mask = self.casual_mask(input_ids, past_length=0) @@ -275,6 +277,8 @@ def __init__( super().__init__() self.num_layers = num_layers + self.multihead_attn_fusion = os.getenv("MULTIHEAD_ATTN_FUSION") is not None + def build_layer(layer_number): return TransformerLayer( hidden_size, @@ -299,11 +303,16 @@ def build_layer(layer_number): def forward(self, hidden_states, attention_mask): # hidden_states shape: (batch_size, seq_length, hidden_size) # sbp: [S(0), B] + if self.multihead_attn_fusion: + hidden_states = hidden_states.transpose(0, 1) # [seq, bs, dim] for i, layer in enumerate(self.layers): hidden_states = layer(hidden_states, attention_mask) output = self.layernorm_f(hidden_states) + if self.multihead_attn_fusion: + output = output.transpose(0, 1) + return output diff --git a/libai/models/t5_model.py b/libai/models/t5_model.py index f383ee6ef..019cef8e3 100644 --- a/libai/models/t5_model.py +++ b/libai/models/t5_model.py @@ -284,6 +284,11 @@ def forward( Returns: flow.Tensor: logits """ + encoder_input_ids = encoder_input_ids.to_global(placement=dist.get_layer_placement(0)) + decoder_input_ids = decoder_input_ids.to_global(placement=dist.get_layer_placement(0)) + encoder_attn_mask = encoder_attn_mask.to_global(placement=dist.get_layer_placement(0)) + decoder_attn_mask = decoder_attn_mask.to_global(placement=dist.get_layer_placement(0)) + encoder_decoder_attn_mask = encoder_decoder_attn_mask.to_global(placement=dist.get_layer_placement(0)) if use_cache and self.encoder_states is not None: encoder_states = self.encoder_states else: From 64e3307d5fc74f60b83a04fc445f46493cc5e29e Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Wed, 25 May 2022 17:17:36 +0800 Subject: [PATCH 03/21] add env variable --- tools/train.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/train.sh b/tools/train.sh index f0fec346c..a7957f475 100755 --- a/tools/train.sh +++ b/tools/train.sh @@ -8,6 +8,8 @@ NODE_RANK=${NODE_RANK:-0} ADDR=${ADDR:-127.0.0.1} PORT=${PORT:-12345} +export ONEFLOW_EAGER_LOCAL_TO_GLOBAL_BALANCED_OVERRIDE=true + python3 -m oneflow.distributed.launch \ --nproc_per_node $GPUS --nnodes $NODE --node_rank $NODE_RANK --master_addr $ADDR --master_port $PORT \ $FILE --config-file $CONFIG ${@:4} From 82e39f7ffac0314c1d89607b009b532459e9bf70 Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Wed, 25 May 2022 17:23:22 +0800 Subject: [PATCH 04/21] globa tensor to local in graph build --- libai/models/utils/graph_base.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/libai/models/utils/graph_base.py b/libai/models/utils/graph_base.py index 4654a0862..ad37adedb 100644 --- a/libai/models/utils/graph_base.py +++ b/libai/models/utils/graph_base.py @@ -93,6 +93,15 @@ def build(self, **kwargs): loss_dict = self.model(**kwargs) losses = sum(loss_dict.values()) losses.backward() + loss_dict = { + k: v.to_global( + placement=flow.placement( + "cpu", ranks=[0] if v.placement.ranks.ndim == 1 else [[0]] + ), + sbp=dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]), + ) + for k, v in loss_dict.items() + } return loss_dict else: logger.info( From 9ddc9a2616ec8f60955337f55e6e7df33ed77b65 Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Wed, 25 May 2022 17:32:31 +0800 Subject: [PATCH 05/21] finish softmax fusion --- configs/common/models/bert.py | 2 +- configs/common/models/gpt.py | 2 +- libai/engine/default.py | 8 +-- libai/engine/trainer.py | 19 ++++--- libai/layers/attention.py | 102 ++++++++++++++++++++++------------ libai/layers/cross_entropy.py | 2 + 6 files changed, 86 insertions(+), 49 deletions(-) diff --git a/configs/common/models/bert.py b/configs/common/models/bert.py index 281243668..86fa1a04c 100644 --- a/configs/common/models/bert.py +++ b/configs/common/models/bert.py @@ -17,7 +17,7 @@ layernorm_eps=1e-5, bias_gelu_fusion=True, bias_dropout_fusion=True, - scale_mask_softmax_fusion=False, + scale_mask_softmax_fusion=True, apply_query_key_layer_scaling=True, apply_residual_post_layernorm=False, add_binary_head=True, diff --git a/configs/common/models/gpt.py b/configs/common/models/gpt.py index a539e4766..e858d4819 100644 --- a/configs/common/models/gpt.py +++ b/configs/common/models/gpt.py @@ -17,7 +17,7 @@ use_scaled_init_for_output_weights=True, bias_gelu_fusion=True, bias_dropout_fusion=True, - scale_mask_softmax_fusion=False, + scale_mask_softmax_fusion=True, apply_query_key_layer_scaling=True, apply_residual_post_layernorm=False, amp_enabled=False, diff --git a/libai/engine/default.py b/libai/engine/default.py index fff0c41cc..1eb246629 100644 --- a/libai/engine/default.py +++ b/libai/engine/default.py @@ -399,7 +399,7 @@ def build_hooks(self): ret = [ hooks.IterationTimer(), hooks.LRScheduler(), # for beauty lr scheduler printer in `nn.Graph` mode - hooks.PeriodicCheckpointer(self.checkpointer, self.cfg.train.checkpointer.period), + # hooks.PeriodicCheckpointer(self.checkpointer, self.cfg.train.checkpointer.period), ] if self.cfg.train.evaluation.enabled: @@ -452,7 +452,7 @@ def build_writers(self): return [ # It may not always print what you want to see, since it prints "common" metrics only. CommonMetricPrinter(self.global_batch_size, self.max_iter), - JSONWriter(os.path.join(self.cfg.train.output_dir, "metrics.json")), + # JSONWriter(os.path.join(self.cfg.train.output_dir, "metrics.json")), TensorboardXWriter(self.cfg.train.output_dir), ] @@ -605,9 +605,9 @@ def build_train_loader(cls, cfg, tokenizer=None): (cfg.train.train_iter // cfg.train.evaluation.eval_period + 1) * cfg.train.evaluation.eval_iter if cfg.train.evaluation.enabled - else 0 + else 1 ) - test_iter = cfg.train.evaluation.eval_iter if cfg.train.evaluation.enabled else 0 + test_iter = cfg.train.evaluation.eval_iter if cfg.train.evaluation.enabled else 1 cfg.dataloader.train.train_val_test_num_samples = [ int(cfg.train.samples), diff --git a/libai/engine/trainer.py b/libai/engine/trainer.py index 51980b0f1..576123cc9 100644 --- a/libai/engine/trainer.py +++ b/libai/engine/trainer.py @@ -191,10 +191,11 @@ def write_metrics( """ # Only get metric value on rank0 # Consider if it's 2d mesh, ranks should be [[0]] instead of [0] - metrics_dict = { - k: dist.tton(v, local_only=False, ranks=[0] if v.placement.ranks.ndim == 1 else [[0]]) - for k, v in loss_dict.items() - } + # metrics_dict = { + # k: dist.tton(v, local_only=False, ranks=[0] if v.placement.ranks.ndim == 1 else [[0]]) + # for k, v in loss_dict.items() + # } + metrics_dict = {k: dist.ttol(v, pure_local=True) for k, v in loss_dict.items()} metrics_dict["data_time"] = data_time # TODO: Gather metrics among all workers for logging @@ -216,11 +217,11 @@ def write_metrics( # } metrics_dict = all_metrics_dict total_losses_reduced = sum(metrics_dict.values()) - if not np.isfinite(total_losses_reduced): - raise FloatingPointError( - f"Loss became infinite or NaN at iteration={storage.iter}!\n" - f"loss_dict = {metrics_dict}" - ) + # if not np.isfinite(total_losses_reduced): + # raise FloatingPointError( + # f"Loss became infinite or NaN at iteration={storage.iter}!\n" + # f"loss_dict = {metrics_dict}" + # ) storage.put_scalar("{}total_loss".format(prefix), total_losses_reduced) if len(metrics_dict) > 1: diff --git a/libai/layers/attention.py b/libai/layers/attention.py index b27eea2ed..cc04020c3 100644 --- a/libai/layers/attention.py +++ b/libai/layers/attention.py @@ -14,6 +14,7 @@ # limitations under the License. import math +import os from typing import Tuple import oneflow as flow @@ -63,6 +64,7 @@ def __init__( layer_idx=0 ): super().__init__() + self.multihead_attn_fusion = os.getenv("MULTIHEAD_ATTN_FUSION") self.hidden_size = hidden_size if output_layer_init_method is None: output_layer_init_method = init_method @@ -74,6 +76,7 @@ def __init__( self.num_heads = num_attention_heads self.head_size = hidden_size // num_attention_heads + self.attention_dropout_prob = attention_dropout_prob self.dropout = nn.Dropout(p=attention_dropout_prob) self.norm_factor = 1.0 / math.sqrt(float(self.head_size)) self.coeff = None @@ -123,6 +126,22 @@ def __init__( layer_idx=layer_idx, ) + def fused_multihead_attn(self, h, attention_mask): + qmk, v = flow._C.fused_self_attention( + h, head_size=self.head_size, alpha=(1.0 / self.norm_factor) + ) + if self.scale_mask_softmax_fusion: + attention_weights = flow._C.fused_scale_tril_softmax_mask_scale(qmk, p=self.attention_dropout_prob, diagonal=0, tril_scale_value=self.coeff)[0] + else: + if self.coeff is not None: + qmk *= self.coeff + attention_scores = flow.mul(qmk, attention_mask) + attention_scores = attention_scores - 10000.0 * (1 - attention_mask) + attention_weights = flow.softmax(attention_scores, dim=-1) + # [bsz, num_heads, tgt_len, src_len] + attention_weights = self.dropout(attention_weights) + return flow._C.matmul(attention_weights, v) + def forward( self, hidden_states: flow.Tensor, @@ -184,11 +203,15 @@ def forward( # hidden_states is the last-added state, # the full key and value could be obtained by concatenating with past_key_value. query_key_value = self.query_key_value(hidden_states) - query_key_value = query_key_value.view(bsz, -1, self.num_heads, 3 * self.head_size) - query_key_value = query_key_value.permute( - 0, 2, 1, 3 - ) # [bsz, num_heads, src_len, 3 * head_size] - query, key, value = flow.chunk(query_key_value, chunks=3, dim=-1) + if self.multihead_attn_fusion: + hidden_states = self.fused_multihead_attn(query_key_value, attention_mask) + else: + query_key_value = query_key_value.view(bsz, -1, self.num_heads, 3 * self.head_size) + query_key_value = query_key_value.permute( + 0, 2, 1, 3 + ) # [bsz, num_heads, src_len, 3 * head_size] + query, key, value = flow.chunk(query_key_value, chunks=3, dim=-1) + if past_key_value is not None: past_key, past_value = past_key_value key = flow.cat((past_key.type_as(key), key), dim=2) @@ -198,38 +221,49 @@ def forward( if use_cache: past_key_value = (key, value) - # [bsz, num_heads, tgt_len, src_len] with [S(0), S(1)] - attention_scores = flow.matmul(query, key, transpose_b=True, alpha=self.norm_factor) - - # [S(0), S(1)] x [S(0), B] = [S(0), S(1)] - if attention_mask is not None: - if self.scale_mask_softmax_fusion: - attention_weights = flow._C.fused_scale_mask_softmax( - attention_scores, attention_mask, fill_value=-10000.0 - ) + if not self.multihead_attn_fusion: + # [bsz, num_heads, tgt_len, src_len] with [S(0), S(1)] + attention_scores = flow.matmul(query, key, transpose_b=True, alpha=self.norm_factor) + + # [S(0), S(1)] x [S(0), B] = [S(0), S(1)] + if attention_mask is not None: + if self.scale_mask_softmax_fusion: + # attention_mask = attention_mask.to(dtype=flow.bool) + # attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) + # attention_weights = flow._C.fused_scale_mask_softmax_dropout( + # attention_scores, attention_mask, fill_value=-10000.0, scale=self.coeff, p=self.attention_dropout_prob + # )[0] + attention_weights = flow._C.fused_scale_tril_softmax_mask_scale(attention_scores, p=self.attention_dropout_prob, diagonal=0, tril_scale_value=self.coeff)[0] + else: + if self.coeff is not None: + attention_scores *= self.coeff + attention_scores = flow.mul(attention_scores, attention_mask) + attention_scores = attention_scores - 10000.0 * (1 - attention_mask) + # TODO(l1aoxingyu): graph will occur `where_scalar` errors when using `masked_fill` + # attention_scores = attention_scores.masked_fill(1 - attention_mask, -10000.0) + attention_weights = flow.softmax(attention_scores, dim=-1) + # [bsz, num_heads, tgt_len, src_len] + attention_weights = self.dropout(attention_weights) else: - if self.coeff is not None: - attention_scores *= self.coeff - attention_scores = flow.mul(attention_scores, attention_mask) - attention_scores = attention_scores - 10000.0 * (1 - attention_mask) - # TODO(l1aoxingyu): graph will occur `where_scalar` errors when using `masked_fill` - # attention_scores = attention_scores.masked_fill(1 - attention_mask, -10000.0) attention_weights = flow.softmax(attention_scores, dim=-1) + # [bsz, num_heads, tgt_len, src_len] + attention_weights = self.dropout(attention_weights) + + # Context shape: [bsz, num_heads, tgt_len, head_size] with [S(0), S(1)] + context = flow.matmul(attention_weights, value) + # Change shape: [bsz, num_heads, tgt_len, head_size] -> + # [bsz, tgt_len, num_heads, head_size] + context = context.transpose(1, 2) + + # Concat multi-head results from + # [bsz, tgt_len, num_heads, head_size] -> [bsz, tgt_len, num_heads * head_size] + # SBP sign: [S(0), S(2)] + context = context.view(bsz, tgt_len, self.hidden_size) else: - attention_weights = flow.softmax(attention_scores, dim=-1) - - # [bsz, num_heads, tgt_len, src_len] - attention_weights = self.dropout(attention_weights) - - # Context shape: [bsz, num_heads, tgt_len, head_size] with [S(0), S(1)] - context = flow.matmul(attention_weights, value) - # Change shape: [bsz, num_heads, tgt_len, head_size] -> [bsz, tgt_len, num_heads, head_size] - context = context.transpose(1, 2) - - # Concat multi-head results from - # [bsz, tgt_len, num_heads, head_size] -> [bsz, tgt_len, num_heads * head_size] - # SBP sign: [S(0), S(2)] - context = context.view(bsz, tgt_len, self.hidden_size) + # (batch_size, num_heads, seq_len, head_size) -> + # (seq_len, batch_size, num_heads, head_size) + hidden_states = flow._C.transpose(hidden_states, perm=(2, 0, 1, 3)) + context = hidden_states.flatten(2) # [S(0), S(2)] x [B, S(0)] = [S(0), P] -> [S(0), B] output = self.dense(context) diff --git a/libai/layers/cross_entropy.py b/libai/layers/cross_entropy.py index e496a51bb..cde6b1632 100644 --- a/libai/layers/cross_entropy.py +++ b/libai/layers/cross_entropy.py @@ -36,6 +36,8 @@ def forward(self, logits: flow.Tensor, target: flow.Tensor): assert target.ndim == 2 assert logits.shape[0:2] == target.shape + target = target.to_global(placement=logits.placement) + # Change -1 in target to 0 because sparse_softmax_cross_entropy don't accept -1 target = target * (target >= 0) From 7cff1a0a81e87eb4f7a3338d89bab15e85a27891 Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Wed, 25 May 2022 21:27:09 +0800 Subject: [PATCH 06/21] turn off evaluation --- configs/common/train.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/common/train.py b/configs/common/train.py index f07be2eb2..26de0f255 100644 --- a/configs/common/train.py +++ b/configs/common/train.py @@ -80,7 +80,7 @@ # You can set the maximum evaluation iterations to run for validation/test. # You can also set a customized evaluator for use. evaluation=dict( - enabled=True, + enabled=False, # evaluator for calculating top-k acc evaluator=LazyCall(ClsEvaluator)(topk=(1, 5)), eval_period=5000, From f6df295fa95d1724ff4466238e2efe66a83289e1 Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Wed, 25 May 2022 21:49:41 +0800 Subject: [PATCH 07/21] * add fused_scale_mask_softmax_dropout in bert and t5 * fix bug for t5 training with placement errors * code format --- libai/data/structures.py | 2 +- libai/engine/trainer.py | 1 - libai/layers/attention.py | 41 +++++++++++++++++++++++-------- libai/layers/transformer_layer.py | 5 +++- libai/models/bert_model.py | 2 ++ libai/models/gpt_model.py | 2 ++ libai/models/t5_model.py | 8 +++++- 7 files changed, 47 insertions(+), 14 deletions(-) diff --git a/libai/data/structures.py b/libai/data/structures.py index 1f341b22c..65df5fd5f 100644 --- a/libai/data/structures.py +++ b/libai/data/structures.py @@ -61,7 +61,7 @@ def to_global(self, sbp=None, placement=None): # We do that to make sure that all the tensors used by the model are all generated # by the fist device group, in case that each device group containg # some random augmentations to the tensors without setting the same global seed. - main_placement = dist.get_layer_placement(0, device_type="cpu") # put it on cpu first + main_placement = dist.get_layer_placement(0, device_type="cpu") # put it on cpu first self.tensor = self.tensor.to_global(sbp=self.sbp, placement=main_placement) # if self.placement_idx != 0: # self.tensor = self.tensor.to_global( diff --git a/libai/engine/trainer.py b/libai/engine/trainer.py index 576123cc9..0c3ff0b50 100644 --- a/libai/engine/trainer.py +++ b/libai/engine/trainer.py @@ -18,7 +18,6 @@ import weakref from typing import Callable, List, Mapping -import numpy as np import oneflow as flow from libai.utils import distributed as dist diff --git a/libai/layers/attention.py b/libai/layers/attention.py index cc04020c3..5131a31a3 100644 --- a/libai/layers/attention.py +++ b/libai/layers/attention.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import enum import math import os from typing import Tuple @@ -23,6 +24,11 @@ from .linear import Linear +class AttnMaskType(enum.Enum): + padding = 1 + causal = 2 + + class MultiheadAttention(nn.Module): """Multi-head attention layer, support self attention and cross attention. @@ -60,6 +66,7 @@ def __init__( bias_dropout_fusion=False, scale_mask_softmax_fusion=False, apply_query_key_layer_scaling=False, + attn_mask_type=AttnMaskType.padding, *, layer_idx=0 ): @@ -75,6 +82,7 @@ def __init__( self.num_heads = num_attention_heads self.head_size = hidden_size // num_attention_heads + self.attn_mask_type = attn_mask_type self.attention_dropout_prob = attention_dropout_prob self.dropout = nn.Dropout(p=attention_dropout_prob) @@ -131,7 +139,9 @@ def fused_multihead_attn(self, h, attention_mask): h, head_size=self.head_size, alpha=(1.0 / self.norm_factor) ) if self.scale_mask_softmax_fusion: - attention_weights = flow._C.fused_scale_tril_softmax_mask_scale(qmk, p=self.attention_dropout_prob, diagonal=0, tril_scale_value=self.coeff)[0] + attention_weights = flow._C.fused_scale_tril_softmax_mask_scale( + qmk, p=self.attention_dropout_prob, diagonal=0, tril_scale_value=self.coeff + )[0] else: if self.coeff is not None: qmk *= self.coeff @@ -228,18 +238,29 @@ def forward( # [S(0), S(1)] x [S(0), B] = [S(0), S(1)] if attention_mask is not None: if self.scale_mask_softmax_fusion: - # attention_mask = attention_mask.to(dtype=flow.bool) - # attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) - # attention_weights = flow._C.fused_scale_mask_softmax_dropout( - # attention_scores, attention_mask, fill_value=-10000.0, scale=self.coeff, p=self.attention_dropout_prob - # )[0] - attention_weights = flow._C.fused_scale_tril_softmax_mask_scale(attention_scores, p=self.attention_dropout_prob, diagonal=0, tril_scale_value=self.coeff)[0] + if self.attn_mask_type == AttnMaskType.causal: + attention_weights = flow._C.fused_scale_tril_softmax_mask_scale( + attention_scores, + p=self.attention_dropout_prob, + diagonal=0, + tril_scale_value=self.coeff, + )[0] + else: + attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) + attention_weights = flow._C.fused_scale_mask_softmax_dropout( + attention_scores, + attention_mask, + fill_value=-10000.0, + scale=self.coeff, + p=self.attention_dropout_prob, + )[0] else: if self.coeff is not None: attention_scores *= self.coeff attention_scores = flow.mul(attention_scores, attention_mask) attention_scores = attention_scores - 10000.0 * (1 - attention_mask) - # TODO(l1aoxingyu): graph will occur `where_scalar` errors when using `masked_fill` + # TODO(xingyu.liao): graph will occur `where_scalar` errors + # when using `masked_fill` # attention_scores = attention_scores.masked_fill(1 - attention_mask, -10000.0) attention_weights = flow.softmax(attention_scores, dim=-1) # [bsz, num_heads, tgt_len, src_len] @@ -251,7 +272,7 @@ def forward( # Context shape: [bsz, num_heads, tgt_len, head_size] with [S(0), S(1)] context = flow.matmul(attention_weights, value) - # Change shape: [bsz, num_heads, tgt_len, head_size] -> + # Change shape: [bsz, num_heads, tgt_len, head_size] -> # [bsz, tgt_len, num_heads, head_size] context = context.transpose(1, 2) @@ -260,7 +281,7 @@ def forward( # SBP sign: [S(0), S(2)] context = context.view(bsz, tgt_len, self.hidden_size) else: - # (batch_size, num_heads, seq_len, head_size) -> + # (batch_size, num_heads, seq_len, head_size) -> # (seq_len, batch_size, num_heads, head_size) hidden_states = flow._C.transpose(hidden_states, perm=(2, 0, 1, 3)) context = hidden_states.flatten(2) diff --git a/libai/layers/transformer_layer.py b/libai/layers/transformer_layer.py index 740587c93..4372daf33 100644 --- a/libai/layers/transformer_layer.py +++ b/libai/layers/transformer_layer.py @@ -17,7 +17,7 @@ from libai.utils import distributed as dist -from .attention import MultiheadAttention +from .attention import AttnMaskType, MultiheadAttention from .droppath import DropPath from .layer_norm import LayerNorm from .mlp import MLP @@ -72,6 +72,7 @@ def __init__( scale_mask_softmax_fusion=False, apply_query_key_layer_scaling=False, apply_residual_post_layernorm=False, + attn_mask_type=AttnMaskType.padding, *, layer_idx=0 ): @@ -82,6 +83,7 @@ def __init__( self.attention_dropout_prob = attention_dropout_prob self.output_dropout_prob = output_dropout_prob self.layernorm_epsilon = layernorm_epsilon + self.attn_mask_type = attn_mask_type self.layer_idx = layer_idx self.is_decoder = is_decoder @@ -241,5 +243,6 @@ def build_attention(self, is_cross_attention=False): bias_dropout_fusion=self.bias_dropout_fusion, scale_mask_softmax_fusion=self.scale_mask_softmax_fusion, apply_query_key_layer_scaling=self.apply_query_key_layer_scaling, + attn_mask_type=self.attn_mask_type, layer_idx=self.layer_idx, ) diff --git a/libai/models/bert_model.py b/libai/models/bert_model.py index 5cdd5a205..f9f392cf6 100644 --- a/libai/models/bert_model.py +++ b/libai/models/bert_model.py @@ -29,6 +29,7 @@ VocabEmbedding, build_activation, ) +from libai.layers.attention import AttnMaskType from libai.utils import distributed as dist from .utils import init_method_normal, scaled_init_method_normal @@ -329,6 +330,7 @@ def __init__( init_method=init_method, output_layer_init_method=scaled_init_method, apply_residual_post_layernorm=apply_residual_post_layernorm, + attn_mask_type=AttnMaskType.padding, # bert mask type layer_idx=i, ) for i in range(hidden_layers) diff --git a/libai/models/gpt_model.py b/libai/models/gpt_model.py index 89ddaa9dc..a32dbbcac 100644 --- a/libai/models/gpt_model.py +++ b/libai/models/gpt_model.py @@ -28,6 +28,7 @@ TransformerLayer, VocabEmbedding, ) +from libai.layers.attention import AttnMaskType from libai.utils import distributed as dist from .utils import init_method_normal, scaled_init_method_normal @@ -294,6 +295,7 @@ def build_layer(layer_number): scale_mask_softmax_fusion=scale_mask_softmax_fusion, apply_query_key_layer_scaling=apply_query_key_layer_scaling, apply_residual_post_layernorm=apply_residual_post_layernorm, + attn_mask_type=AttnMaskType.causal, layer_idx=layer_number, ) diff --git a/libai/models/t5_model.py b/libai/models/t5_model.py index 019cef8e3..6d3289c03 100644 --- a/libai/models/t5_model.py +++ b/libai/models/t5_model.py @@ -24,6 +24,7 @@ TransformerLayer, VocabEmbedding, ) +from libai.layers.attention import AttnMaskType from libai.models.utils import init_method_normal, scaled_init_method_normal from libai.utils import distributed as dist @@ -173,6 +174,7 @@ def __init__( scale_mask_softmax_fusion=scale_mask_softmax_fusion, apply_query_key_layer_scaling=apply_query_key_layer_scaling, apply_residual_post_layernorm=apply_residual_post_layernorm, + attn_mask_type=AttnMaskType.padding, layer_idx=i, ) for i in range(hidden_layers) @@ -205,6 +207,7 @@ def __init__( bias_dropout_fusion=bias_dropout_fusion, scale_mask_softmax_fusion=scale_mask_softmax_fusion, apply_query_key_layer_scaling=apply_query_key_layer_scaling, + attn_mask_type=AttnMaskType.padding, layer_idx=i, ) for i in range(hidden_layers, 2 * hidden_layers) @@ -288,7 +291,9 @@ def forward( decoder_input_ids = decoder_input_ids.to_global(placement=dist.get_layer_placement(0)) encoder_attn_mask = encoder_attn_mask.to_global(placement=dist.get_layer_placement(0)) decoder_attn_mask = decoder_attn_mask.to_global(placement=dist.get_layer_placement(0)) - encoder_decoder_attn_mask = encoder_decoder_attn_mask.to_global(placement=dist.get_layer_placement(0)) + encoder_decoder_attn_mask = encoder_decoder_attn_mask.to_global( + placement=dist.get_layer_placement(0) + ) if use_cache and self.encoder_states is not None: encoder_states = self.encoder_states else: @@ -345,6 +350,7 @@ def __init__(self) -> None: def forward(self, logits, lm_labels, loss_mask): lm_loss = self.lm_loss(logits, lm_labels) + loss_mask = loss_mask.to_global(placement=lm_loss.placement) loss_mask = loss_mask.float() denominator = loss_mask.sum().to_global( sbp=dist.get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]) From 14c22313b8a691f34ec2e4fadbaf4bb71d8ef88f Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Thu, 26 May 2022 21:24:07 +0800 Subject: [PATCH 08/21] using graph block set stage by placement --- libai/models/bert_model.py | 25 +++++++++++++++++++------ libai/models/gpt_model.py | 16 ++++++++++++---- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/libai/models/bert_model.py b/libai/models/bert_model.py index f9f392cf6..210b1e42e 100644 --- a/libai/models/bert_model.py +++ b/libai/models/bert_model.py @@ -509,15 +509,28 @@ def set_pipeline_stage_id(model): for module_block in model.modules(): # module.origin can get the original module if isinstance(module_block.origin, BertEmbeddings): - module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + module_block.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) elif isinstance(module_block.origin, BertExtendedAttnMask): - module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + module_block.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) elif isinstance(module_block.origin, TransformerLayer): - module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + module_block.config.set_stage(dist_utils.get_layer_stage_id(module_block.layer_idx), + dist.get_layer_placement(module_block.layer_idx)) elif isinstance(module_block.origin, BertPooler): - module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + module_block.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) elif isinstance(module_block.origin, BertPreTrainingHeads): - module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + module_block.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) # Set the last layernorm stage id - model.bert.final_layernorm.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.bert.final_layernorm.config.stage_id = dist_utils.get_layer_stage_id(-1) + model.bert.final_layernorm.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + diff --git a/libai/models/gpt_model.py b/libai/models/gpt_model.py index a32dbbcac..4bd2a364a 100644 --- a/libai/models/gpt_model.py +++ b/libai/models/gpt_model.py @@ -371,10 +371,18 @@ def set_pipeline_stage_id(model: nn.Module): for module_block in model.modules(): if isinstance(module_block.origin, (GPTEmbedding, CasualMask)): - module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + module_block.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) elif isinstance(module_block.origin, TransformerLayer): - module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + module_block.config.set_stage(dist_utils.get_layer_stage_id(module_block.layer_idx), + dist.get_layer_placement(module_block.layer_idx)) elif isinstance(module_block.origin, (LMLogits, GPTLoss)): - module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + module_block.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) - model.GPT_model.transformer.layernorm_f.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.GPT_model.transformer.layernorm_f.config.stage_id = dist_utils.get_layer_stage_id(-1) + model.GPT_model.transformer.layernorm_f.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) From 718e9baa327dbb8432d785e4156eee07f88cb4eb Mon Sep 17 00:00:00 2001 From: "xingyu.liao" Date: Fri, 27 May 2022 16:44:17 +0800 Subject: [PATCH 09/21] fix multihead_fusion loss non-decreasing --- libai/layers/attention.py | 90 ++++++++++++++++++++------------------- tools/train.sh | 2 +- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/libai/layers/attention.py b/libai/layers/attention.py index 5131a31a3..7a498ed30 100644 --- a/libai/layers/attention.py +++ b/libai/layers/attention.py @@ -71,7 +71,7 @@ def __init__( layer_idx=0 ): super().__init__() - self.multihead_attn_fusion = os.getenv("MULTIHEAD_ATTN_FUSION") + self.multihead_attn_fusion = os.getenv("MULTIHEAD_ATTN_FUSION") is not None self.hidden_size = hidden_size if output_layer_init_method is None: output_layer_init_method = init_method @@ -214,7 +214,9 @@ def forward( # the full key and value could be obtained by concatenating with past_key_value. query_key_value = self.query_key_value(hidden_states) if self.multihead_attn_fusion: - hidden_states = self.fused_multihead_attn(query_key_value, attention_mask) + attention_scores, value = flow._C.fused_self_attention( + query_key_value, head_size=self.head_size, alpha=self.norm_factor + ) else: query_key_value = query_key_value.view(bsz, -1, self.num_heads, 3 * self.head_size) query_key_value = query_key_value.permute( @@ -235,59 +237,59 @@ def forward( # [bsz, num_heads, tgt_len, src_len] with [S(0), S(1)] attention_scores = flow.matmul(query, key, transpose_b=True, alpha=self.norm_factor) - # [S(0), S(1)] x [S(0), B] = [S(0), S(1)] - if attention_mask is not None: - if self.scale_mask_softmax_fusion: - if self.attn_mask_type == AttnMaskType.causal: - attention_weights = flow._C.fused_scale_tril_softmax_mask_scale( - attention_scores, - p=self.attention_dropout_prob, - diagonal=0, - tril_scale_value=self.coeff, - )[0] - else: - attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) - attention_weights = flow._C.fused_scale_mask_softmax_dropout( - attention_scores, - attention_mask, - fill_value=-10000.0, - scale=self.coeff, - p=self.attention_dropout_prob, - )[0] + # [S(0), S(1)] x [S(0), B] = [S(0), S(1)] + if attention_mask is not None: + if self.scale_mask_softmax_fusion: + if self.attn_mask_type == AttnMaskType.causal: + attention_weights = flow._C.fused_scale_tril_softmax_mask_scale( + attention_scores, + p=self.attention_dropout_prob, + diagonal=0, + tril_scale_value=self.coeff, + )[0] else: - if self.coeff is not None: - attention_scores *= self.coeff - attention_scores = flow.mul(attention_scores, attention_mask) - attention_scores = attention_scores - 10000.0 * (1 - attention_mask) - # TODO(xingyu.liao): graph will occur `where_scalar` errors - # when using `masked_fill` - # attention_scores = attention_scores.masked_fill(1 - attention_mask, -10000.0) - attention_weights = flow.softmax(attention_scores, dim=-1) - # [bsz, num_heads, tgt_len, src_len] - attention_weights = self.dropout(attention_weights) + attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) + attention_weights = flow._C.fused_scale_mask_softmax_dropout( + attention_scores, + attention_mask, + fill_value=-10000.0, + scale=self.coeff, + p=self.attention_dropout_prob, + )[0] else: + if self.coeff is not None: + attention_scores *= self.coeff + attention_scores = flow.mul(attention_scores, attention_mask) + attention_scores = attention_scores - 10000.0 * (1 - attention_mask) + # TODO(xingyu.liao): graph will occur `where_scalar` errors + # when using `masked_fill` + # attention_scores = attention_scores.masked_fill(1 - attention_mask, -10000.0) attention_weights = flow.softmax(attention_scores, dim=-1) # [bsz, num_heads, tgt_len, src_len] attention_weights = self.dropout(attention_weights) + else: + attention_weights = flow.softmax(attention_scores, dim=-1) + # [bsz, num_heads, tgt_len, src_len] + attention_weights = self.dropout(attention_weights) + + # Context shape: [bsz, num_heads, tgt_len, head_size] with [S(0), S(1)] + context = flow.matmul(attention_weights, value) - # Context shape: [bsz, num_heads, tgt_len, head_size] with [S(0), S(1)] - context = flow.matmul(attention_weights, value) + if self.multihead_attn_fusion: + context = flow._C.transpose(context, perm=(2, 0, 1, 3)) + else: # Change shape: [bsz, num_heads, tgt_len, head_size] -> # [bsz, tgt_len, num_heads, head_size] - context = context.transpose(1, 2) + # context = context.transpose(1, 2) + context = flow._C.transpose(context, perm=(0, 2, 1, 3)) - # Concat multi-head results from - # [bsz, tgt_len, num_heads, head_size] -> [bsz, tgt_len, num_heads * head_size] - # SBP sign: [S(0), S(2)] - context = context.view(bsz, tgt_len, self.hidden_size) - else: - # (batch_size, num_heads, seq_len, head_size) -> - # (seq_len, batch_size, num_heads, head_size) - hidden_states = flow._C.transpose(hidden_states, perm=(2, 0, 1, 3)) - context = hidden_states.flatten(2) + # Concat multi-head results from + # [bsz, tgt_len, num_heads, head_size] -> [bsz, tgt_len, num_heads * head_size] + # SBP sign: [S(0), S(2)] + # context = context.view(bsz, tgt_len, self.hidden_size) # [S(0), S(2)] x [B, S(0)] = [S(0), P] -> [S(0), B] - output = self.dense(context) + output = self.dense(context.flatten(2)) if self.bias_dropout_fusion: output, bias = output diff --git a/tools/train.sh b/tools/train.sh index a7957f475..a31be6dac 100755 --- a/tools/train.sh +++ b/tools/train.sh @@ -8,9 +8,9 @@ NODE_RANK=${NODE_RANK:-0} ADDR=${ADDR:-127.0.0.1} PORT=${PORT:-12345} +export MULTIHEAD_ATTN_FUSION=true export ONEFLOW_EAGER_LOCAL_TO_GLOBAL_BALANCED_OVERRIDE=true python3 -m oneflow.distributed.launch \ --nproc_per_node $GPUS --nnodes $NODE --node_rank $NODE_RANK --master_addr $ADDR --master_port $PORT \ $FILE --config-file $CONFIG ${@:4} - From f0d21998d5b20f67a9926bb19b874a20bb99df87 Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Fri, 27 May 2022 18:02:24 +0800 Subject: [PATCH 10/21] Add all set stage for libai models: resmlp, swin-t, t5, vit --- libai/models/resmlp.py | 20 ++++++++++++----- libai/models/swin_transformer.py | 36 ++++++++++++++++++++++-------- libai/models/t5_model.py | 36 ++++++++++++++++++++++-------- libai/models/vision_transformer.py | 34 +++++++++++++++++++++------- 4 files changed, 95 insertions(+), 31 deletions(-) diff --git a/libai/models/resmlp.py b/libai/models/resmlp.py index 21bf85200..221b30f48 100644 --- a/libai/models/resmlp.py +++ b/libai/models/resmlp.py @@ -222,14 +222,24 @@ def set_pipeline_stage_id(model): for module_block in model.modules(): # module.origin can get the original module if isinstance(module_block.origin, PatchEmbedding): - module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + module_block.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(0) elif isinstance(module_block.origin, layers_scale_mlp_blocks): - module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + module_block.config.set_stage(dist_utils.get_layer_stage_id(module_block.layer_idx), + dist.get_layer_placement(module_block.layer_idx)) # Set norm and head stage id - model.norm.config.stage_id = dist_utils.get_layer_stage_id(-1) - model.head.config.stage_id = dist_utils.get_layer_stage_id(-1) - model.loss_func.config.stage_id = dist_utils.get_layer_stage_id(-1) + model.norm.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + model.head.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + model.loss_func.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + # model.norm.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.head.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.loss_func.config.stage_id = dist_utils.get_layer_stage_id(-1) @staticmethod def set_activation_checkpoint(model): diff --git a/libai/models/swin_transformer.py b/libai/models/swin_transformer.py index f0bb3dccd..7a9fc1bf0 100644 --- a/libai/models/swin_transformer.py +++ b/libai/models/swin_transformer.py @@ -700,21 +700,39 @@ def forward(self, images, labels=None): def set_pipeline_stage_id(model): dist_utils = dist.get_dist_util() - model.patch_embed.config.stage_id = dist_utils.get_layer_stage_id(0) - model.pos_drop.config.stage_id = dist_utils.get_layer_stage_id(0) + # model.patch_embed.config.stage_id = dist_utils.get_layer_stage_id(0) + # model.pos_drop.config.stage_id = dist_utils.get_layer_stage_id(0) + model.patch_embed.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) + model.pos_drop.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) # Set pipeline parallelism stage_id for module_block in model.modules(): # module.origin can get the original module if isinstance(module_block.origin, SwinTransformerBlock): - module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + module_block.config.set_stage( + dist_utils.get_layer_stage_id(module_block.layer_idx), + dist.get_layer_placement(module_block.layer_idx)) elif isinstance(module_block.origin, PatchMerging): - module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) - - model.norm.config.stage_id = dist_utils.get_layer_stage_id(-1) - model.head.config.stage_id = dist_utils.get_layer_stage_id(-1) - model.avgpool.config.stage_id = dist_utils.get_layer_stage_id(-1) - model.loss_func.config.stage_id = dist_utils.get_layer_stage_id(-1) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + module_block.config.set_stage( + dist_utils.get_layer_stage_id(module_block.layer_idx), + dist.get_layer_placement(module_block.layer_idx)) + + model.norm.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + model.head.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + model.avgpool.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + model.loss_func.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + # model.norm.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.head.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.avgpool.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.loss_func.config.stage_id = dist_utils.get_layer_stage_id(-1) @staticmethod def set_activation_checkpoint(model): diff --git a/libai/models/t5_model.py b/libai/models/t5_model.py index 6d3289c03..1d30ebc46 100644 --- a/libai/models/t5_model.py +++ b/libai/models/t5_model.py @@ -449,19 +449,37 @@ def set_pipeline_stage_id(model): # Set pipeline parallelism stage_id for module_block in model.modules(): if isinstance(module_block.origin, T5Embedding): - module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + module_block.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) elif isinstance(module_block.origin, ExtendedMask): - module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + module_block.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) elif isinstance(module_block.origin, TransformerLayer): - module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + module_block.config.set_stage(dist_utils.get_layer_stage_id(module_block.layer_idx), + dist.get_layer_placement(module_block.layer_idx)) elif isinstance(module_block.origin, LMLogits): - module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + module_block.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) elif isinstance(module_block.origin, T5Loss): - module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(-1) + module_block.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) - model.t5_model.encoder.final_layernorm.config.stage_id = dist_utils.get_layer_stage_id( - model.t5_model.encoder.final_layernorm.layer_idx + model.t5_model.encoder.final_layernorm.config.set_stage( + dist_utils.get_layer_stage_id(model.t5_model.encoder.final_layernorm.layer_idx), + dist.get_layer_placement(model.t5_model.encoder.final_layernorm.layer_idx) ) - model.t5_model.decoder.final_layernorm.config.stage_id = dist_utils.get_layer_stage_id( - model.t5_model.decoder.final_layernorm.layer_idx + model.t5_model.decoder.final_layernorm.config.set_stage( + dist_utils.get_layer_stage_id(model.t5_model.encoder.final_layernorm.layer_idx), + dist.get_layer_placement(model.t5_model.encoder.final_layernorm.layer_idx) ) + # model.t5_model.encoder.final_layernorm.config.stage_id = dist_utils.get_layer_stage_id( + # model.t5_model.encoder.final_layernorm.layer_idx + # ) + # model.t5_model.decoder.final_layernorm.config.stage_id = dist_utils.get_layer_stage_id( + # model.t5_model.decoder.final_layernorm.layer_idx + # ) diff --git a/libai/models/vision_transformer.py b/libai/models/vision_transformer.py index d6e9e9923..94ba651a3 100644 --- a/libai/models/vision_transformer.py +++ b/libai/models/vision_transformer.py @@ -203,14 +203,32 @@ def set_pipeline_stage_id(model): for module_block in model.modules(): # module.origin can get the original module if isinstance(module_block.origin, PatchEmbedding): - module_block.config.stage_id = dist_utils.get_layer_stage_id(0) + module_block.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(0) elif isinstance(module_block.origin, TransformerLayer): - module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + # module_block.config.stage_id = dist_utils.get_layer_stage_id(module_block.layer_idx) + module_block.config.set_stage(dist_utils.get_layer_stage_id(module_block.layer_idx), + dist.get_layer_placement(module_block.layer_idx)) + # Set pos_embed and cls_token stage id - model.pos_embed.config.stage_id = dist_utils.get_layer_stage_id(0) - model.cls_token.config.stage_id = dist_utils.get_layer_stage_id(0) - model.pos_drop.config.stage_id = dist_utils.get_layer_stage_id(0) - model.norm.config.stage_id = dist_utils.get_layer_stage_id(-1) - model.head.config.stage_id = dist_utils.get_layer_stage_id(-1) - model.loss_func.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.pos_embed.config.stage_id = dist_utils.get_layer_stage_id(0) + # model.cls_token.config.stage_id = dist_utils.get_layer_stage_id(0) + # model.pos_drop.config.stage_id = dist_utils.get_layer_stage_id(0) + # model.norm.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.head.config.stage_id = dist_utils.get_layer_stage_id(-1) + # model.loss_func.config.stage_id = dist_utils.get_layer_stage_id(-1) + model.pos_embed.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) + model.cls_token.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) + model.pos_drop.config.set_stage(dist_utils.get_layer_stage_id(0), + dist.get_layer_placement(0)) + model.norm.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + model.head.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + model.loss_func.config.set_stage(dist_utils.get_layer_stage_id(-1), + dist.get_layer_placement(-1)) + From d53fde5b034a1e67a3d088ad74c3e3b2fe504b3e Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Fri, 27 May 2022 18:41:11 +0800 Subject: [PATCH 11/21] remove expend by broadcast softmax dropout --- libai/layers/attention.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libai/layers/attention.py b/libai/layers/attention.py index 7a498ed30..c016a569a 100644 --- a/libai/layers/attention.py +++ b/libai/layers/attention.py @@ -248,7 +248,7 @@ def forward( tril_scale_value=self.coeff, )[0] else: - attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) + # attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) attention_weights = flow._C.fused_scale_mask_softmax_dropout( attention_scores, attention_mask, From 90706bd98e582f554758bf9a31ff8e4fa4faafff Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Thu, 2 Jun 2022 14:54:53 +0800 Subject: [PATCH 12/21] fix sbp for 2-D in loss cls_head attention all gather and all2all --- libai/layers/linear.py | 3 ++- libai/models/bert_model.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/libai/layers/linear.py b/libai/layers/linear.py index 2cb0453c7..6ab975ec3 100644 --- a/libai/layers/linear.py +++ b/libai/layers/linear.py @@ -148,7 +148,8 @@ def forward(self, x): # x.grad sbp must be x.sbp, otherwise backward pass cannot be performed correctly. x = x.to_global(grad_sbp=x.sbp) # Change x.sbp to [S(0), S(0)] if weight is [B, B] - x = x.to_global(sbp=dist.get_nd_sbp([flow.sbp.split(0), flow.sbp.split(0)])) + # NOTE(chengcheng): when input x is [S(0), B], there is no need to change sbp for x. + # x = x.to_global(sbp=dist.get_nd_sbp([flow.sbp.split(0), flow.sbp.split(0)])) x = flow.matmul(x, self.weight, transpose_b=True) else: # Not supported weight_sbp, deduce sbp and communicate with nccl automatically. diff --git a/libai/models/bert_model.py b/libai/models/bert_model.py index 210b1e42e..a92a0d554 100644 --- a/libai/models/bert_model.py +++ b/libai/models/bert_model.py @@ -131,7 +131,7 @@ def __init__(self, hidden_size, init_method): hidden_size, hidden_size, bias=True, - parallel="col", + parallel="data", init_method=init_method, layer_idx=-1, ) From 258dba7bd550c256c5fbacfeab928b90caea1e18 Mon Sep 17 00:00:00 2001 From: cheng peng <410070869@qq.com> Date: Tue, 7 Jun 2022 11:50:04 +0800 Subject: [PATCH 13/21] change pipeline_num_layers in dist (#296) --- libai/utils/distributed.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/libai/utils/distributed.py b/libai/utils/distributed.py index b111b5d73..52fbc3285 100644 --- a/libai/utils/distributed.py +++ b/libai/utils/distributed.py @@ -125,6 +125,17 @@ def _init_placement_group(self, cfg): for i in range(0, self.world_size, num_devices_per_stage) ] + # change pipeline_num_layers to make the middle stages contain more layers + if ( + self._pipeline_parallel_size >= 4 + and cfg.pipeline_num_layers >= 8 + and cfg.pipeline_num_layers % self._pipeline_parallel_size == 0 + ): + temp_num_layers_per_stage = cfg.pipeline_num_layers // self._pipeline_parallel_size + cfg.pipeline_num_layers += min( + self._pipeline_parallel_size - 1, temp_num_layers_per_stage + ) + num_layers_per_stage = cfg.pipeline_num_layers // self._pipeline_parallel_size stage_offset = cfg.pipeline_num_layers % self._pipeline_parallel_size From 87e10cfbd0f53d3d28487078e07d88e3754bf400 Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Wed, 8 Jun 2022 13:54:44 +0800 Subject: [PATCH 14/21] fuse optimizer and fp16 cast --- tools/train.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/train.sh b/tools/train.sh index a31be6dac..02988df44 100755 --- a/tools/train.sh +++ b/tools/train.sh @@ -9,8 +9,10 @@ ADDR=${ADDR:-127.0.0.1} PORT=${PORT:-12345} export MULTIHEAD_ATTN_FUSION=true +export ONEFLOW_FUSE_OPTIMIZER_UPDATE_CAST=true export ONEFLOW_EAGER_LOCAL_TO_GLOBAL_BALANCED_OVERRIDE=true + python3 -m oneflow.distributed.launch \ --nproc_per_node $GPUS --nnodes $NODE --node_rank $NODE_RANK --master_addr $ADDR --master_port $PORT \ $FILE --config-file $CONFIG ${@:4} From 2ebab25746aa3c1ca897507e4d99a5ac9fa6196e Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Mon, 13 Jun 2022 16:31:57 +0800 Subject: [PATCH 15/21] disable fuse optim cast for zzk correctness bug fix --- tools/train.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/train.sh b/tools/train.sh index 02988df44..f6258ca09 100755 --- a/tools/train.sh +++ b/tools/train.sh @@ -9,7 +9,9 @@ ADDR=${ADDR:-127.0.0.1} PORT=${PORT:-12345} export MULTIHEAD_ATTN_FUSION=true -export ONEFLOW_FUSE_OPTIMIZER_UPDATE_CAST=true +# NOTE(chengcheng): temp disable fuse wait for zzk fix correctness bug. +#export ONEFLOW_FUSE_OPTIMIZER_UPDATE_CAST=true +unset ONEFLOW_FUSE_OPTIMIZER_UPDATE_CAST export ONEFLOW_EAGER_LOCAL_TO_GLOBAL_BALANCED_OVERRIDE=true From 1c30c500d43aa6fa9f198a35439302f78dbd0606 Mon Sep 17 00:00:00 2001 From: CPFLAME <410070869@qq.com> Date: Mon, 13 Jun 2022 08:36:45 +0000 Subject: [PATCH 16/21] del casual mask in gpt init && use fused tri in attention --- libai/layers/attention.py | 26 ++++++++++++++------------ libai/models/gpt_model.py | 6 +++--- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/libai/layers/attention.py b/libai/layers/attention.py index c016a569a..48caf3edf 100644 --- a/libai/layers/attention.py +++ b/libai/layers/attention.py @@ -239,15 +239,8 @@ def forward( # [S(0), S(1)] x [S(0), B] = [S(0), S(1)] if attention_mask is not None: - if self.scale_mask_softmax_fusion: - if self.attn_mask_type == AttnMaskType.causal: - attention_weights = flow._C.fused_scale_tril_softmax_mask_scale( - attention_scores, - p=self.attention_dropout_prob, - diagonal=0, - tril_scale_value=self.coeff, - )[0] - else: + if self.scale_mask_softmax_fusion: + if self.attn_mask_type == AttnMaskType.padding: # attention_mask = attention_mask.repeat(1, attention_scores.shape[1], 1, 1) attention_weights = flow._C.fused_scale_mask_softmax_dropout( attention_scores, @@ -268,9 +261,18 @@ def forward( # [bsz, num_heads, tgt_len, src_len] attention_weights = self.dropout(attention_weights) else: - attention_weights = flow.softmax(attention_scores, dim=-1) - # [bsz, num_heads, tgt_len, src_len] - attention_weights = self.dropout(attention_weights) + if self.scale_mask_softmax_fusion: + if self.attn_mask_type == AttnMaskType.causal: + attention_weights = flow._C.fused_scale_tril_softmax_mask_scale( + attention_scores, + p=self.attention_dropout_prob, + diagonal=0, + tril_scale_value=self.coeff, + )[0] + else: + attention_weights = flow.softmax(attention_scores, dim=-1) + # [bsz, num_heads, tgt_len, src_len] + attention_weights = self.dropout(attention_weights) # Context shape: [bsz, num_heads, tgt_len, head_size] with [S(0), S(1)] context = flow.matmul(attention_weights, value) diff --git a/libai/models/gpt_model.py b/libai/models/gpt_model.py index 4bd2a364a..6bf0f3c36 100644 --- a/libai/models/gpt_model.py +++ b/libai/models/gpt_model.py @@ -154,7 +154,7 @@ def __init__( amp_enabled=amp_enabled, ) - self.casual_mask = CasualMask() + # self.casual_mask = CasualMask() self.transformer = Transformer( num_layers, @@ -210,8 +210,8 @@ def forward(self, input_ids): input_ids = input_ids.to_global(placement=dist.get_layer_placement(0)) input_embeds = self.embeddings(input_ids, 0) - attention_mask = self.casual_mask(input_ids, past_length=0) - transformer_output = self.transformer(input_embeds, attention_mask) + # attention_mask = self.casual_mask(input_ids, past_length=0) + transformer_output = self.transformer(input_embeds, attention_mask=None) output = self.lm_head(transformer_output, self.embeddings.token_embeddings.weight) From 8fafd1144eb677b9442f733b76bb9f26edbf6575 Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Mon, 13 Jun 2022 20:44:53 +0800 Subject: [PATCH 17/21] init rdma after dataloader --- libai/engine/trainer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libai/engine/trainer.py b/libai/engine/trainer.py index 0c3ff0b50..522ba8c96 100644 --- a/libai/engine/trainer.py +++ b/libai/engine/trainer.py @@ -304,6 +304,7 @@ def __init__(self, graph, data_loader): self.data_loader = data_loader self._data_loader_iter = iter(data_loader) self.graph = graph + flow.env.init_rdma() def run_step(self, get_batch: Callable): """ From c8515913f011ad409be30b2660732eb7a9d11ebc Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Mon, 13 Jun 2022 20:51:20 +0800 Subject: [PATCH 18/21] refine zero config in graph base --- libai/models/utils/graph_base.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/libai/models/utils/graph_base.py b/libai/models/utils/graph_base.py index ad37adedb..331771843 100644 --- a/libai/models/utils/graph_base.py +++ b/libai/models/utils/graph_base.py @@ -61,16 +61,18 @@ def __init__( self.set_activation_checkpoint() if zero_optim: - dist_util = dist.get_dist_util() - assert ( - not dist_util.is_tensor_model_parallel() - ), "ZeRO don't support tensor_model_parallel!" - self.config.set_zero_redundancy_optimizer_mode("distributed_split") - if zero_stage > 1: - flow.boxing.nccl.enable_use_compute_stream(True) - if zero_stage > 2: - # stage 3 - flow.boxing.nccl.disable_group_boxing_by_dst_parallel(True) + self.config.enable_zero(True, stage = zero_stage) + # self.config.enable_zero(True, stage = zero_stage, shard_restore_level = 0) + # dist_util = dist.get_dist_util() + # assert ( + # not dist_util.is_tensor_model_parallel() + # ), "ZeRO don't support tensor_model_parallel!" + # self.config.set_zero_redundancy_optimizer_mode("distributed_split") + # if zero_stage > 1: + # flow.boxing.nccl.enable_use_compute_stream(True) + # if zero_stage > 2: + # # stage 3 + # flow.boxing.nccl.disable_group_boxing_by_dst_parallel(True) self.set_pipeline_stage_id() From 7001a2ccda9a18e5cdb6e535e94cb3951110c884 Mon Sep 17 00:00:00 2001 From: CPFLAME <410070869@qq.com> Date: Tue, 14 Jun 2022 02:28:24 +0000 Subject: [PATCH 19/21] refine rdma && add persistent_workers in dataloader --- libai/data/build.py | 9 ++++++++- libai/engine/default.py | 2 ++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/libai/data/build.py b/libai/data/build.py index d49561f22..a22e5e507 100644 --- a/libai/data/build.py +++ b/libai/data/build.py @@ -211,6 +211,7 @@ def build_nlp_train_loader( dataset, batch_sampler=sampler, num_workers=num_workers, + persistent_workers=True if num_workers > 0 else False, collate_fn=trivial_batch_collator if collate_fn is None else collate_fn, **kwargs, ) @@ -259,7 +260,11 @@ def build_nlp_test_loader( sampler = instantiate(sampler) test_loader = DataLoader( - dataset, batch_sampler=sampler, num_workers=num_workers, collate_fn=collate_fn + dataset, + batch_sampler=sampler, + num_workers=num_workers, + persistent_workers=True if num_workers > 0 else False, + collate_fn=collate_fn, ) return test_loader @@ -330,6 +335,7 @@ def build_image_train_loader( dataset, batch_sampler=sampler, num_workers=num_workers, + persistent_workers=True if num_workers > 0 else False, collate_fn=trivial_batch_collator if collate_fn is None else collate_fn, **kwargs, ) @@ -383,6 +389,7 @@ def build_image_test_loader( dataset, batch_sampler=sampler, num_workers=num_workers, + persistent_workers=True if num_workers > 0 else False, collate_fn=trivial_batch_collator if collate_fn is None else collate_fn, **kwargs, ) diff --git a/libai/engine/default.py b/libai/engine/default.py index 096665035..67e7eeb8f 100644 --- a/libai/engine/default.py +++ b/libai/engine/default.py @@ -294,6 +294,8 @@ def __init__(self, cfg): self.test_loader.extend(self.build_test_loader(cfg, self.tokenizer)) + flow.env.init_rdma() + # Automatically scale the hyperparams self.auto_scale_hyperparams(cfg, self.train_loader) From 623069702b8608773310bb82368d4479043f2217 Mon Sep 17 00:00:00 2001 From: CPFLAME <410070869@qq.com> Date: Tue, 14 Jun 2022 02:31:34 +0000 Subject: [PATCH 20/21] delete rdma in graph trainer --- libai/engine/trainer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/libai/engine/trainer.py b/libai/engine/trainer.py index 522ba8c96..0c3ff0b50 100644 --- a/libai/engine/trainer.py +++ b/libai/engine/trainer.py @@ -304,7 +304,6 @@ def __init__(self, graph, data_loader): self.data_loader = data_loader self._data_loader_iter = iter(data_loader) self.graph = graph - flow.env.init_rdma() def run_step(self, get_batch: Callable): """ From e0efe7d8249eb9073393b92d396b94c2e5aedd91 Mon Sep 17 00:00:00 2001 From: chengtbf <472491134@qq.com> Date: Sat, 18 Jun 2022 15:45:24 +0800 Subject: [PATCH 21/21] fix import dist --- libai/models/utils/graph_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libai/models/utils/graph_base.py b/libai/models/utils/graph_base.py index 47ccb0033..f71da1ab1 100644 --- a/libai/models/utils/graph_base.py +++ b/libai/models/utils/graph_base.py @@ -19,6 +19,7 @@ from oneflow import nn from libai.layers import TransformerLayer +from libai.utils import distributed as dist logger = logging.getLogger(__name__)