NeMo_Canary / tests /core /test_dist_ckpt.py
Respair's picture
Upload folder using huggingface_hub
b386992 verified
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import types
from pathlib import Path
from typing import Any, Dict
import lightning.pytorch as pl
import pytest
import torch
from lightning.fabric.plugins import TorchCheckpointIO
from lightning.pytorch.demos.boring_classes import BoringModel
from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy
from nemo.utils.callbacks.dist_ckpt_io import (
AsyncFinalizableCheckpointIO,
AsyncFinalizerCallback,
DistributedCheckpointIO,
)
try:
from megatron.core.dist_checkpointing import ShardedTensor
HAVE_MEGATRON_CORE = True
except (ImportError, ModuleNotFoundError):
HAVE_MEGATRON_CORE = False
class ExampleModel(BoringModel):
def on_validation_epoch_end(self) -> None:
self.log("val_loss", torch.tensor(1.0))
class ExampleMCoreModel(ExampleModel):
def sharded_state_dict(self):
return {
'a': ShardedTensor.from_rank_offsets('a', self.layer.weight, replica_id=torch.distributed.get_rank()),
'const': 3,
}
def on_save_checkpoint(self, checkpoint: Dict[str, Any]) -> None:
checkpoint['sharded_state_dict'] = self.sharded_state_dict()
class MockDistributedCheckpointIO(DistributedCheckpointIO):
def __init__(self, save_ckpt_format):
super().__init__(save_ckpt_format)
self.save_checkpoint_called_args = None
def save_checkpoint(self, *args, **kwargs) -> None:
self.save_checkpoint_called_args = args, kwargs
class MockTorchCheckpointIO(TorchCheckpointIO):
def __init__(self):
super().__init__()
self.save_checkpoint_called_args = None
def save_checkpoint(self, *args, **kwargs) -> None:
self.save_checkpoint_called_args = args, kwargs
def _get_last_checkpoint_dir(root_dir: Path, model: pl.LightningModule, suffix: str = '') -> Path:
steps = len(model.train_dataloader().dataset) * model.trainer.max_epochs // torch.distributed.get_world_size()
return root_dir / 'checkpoints' / f'epoch={model.trainer.max_epochs - 1}-step={steps}{suffix}'
def _get_nlp_strategy_without_optimizer_state():
strategy = NLPDDPStrategy()
# this ensures optimizer sharded state creation is skipped
strategy.optimizer_sharded_state_dict = types.MethodType(
lambda self, unsharded_optim_state: unsharded_optim_state, strategy
)
return strategy
class TestDistCkptIO:
@pytest.mark.run_only_on('GPU')
def test_dist_ckpt_io_called_for_mcore_models(self, tmp_path):
strategy = _get_nlp_strategy_without_optimizer_state()
checkpoint_io = MockDistributedCheckpointIO('xxx')
test_trainer = pl.Trainer(
enable_checkpointing=True,
logger=False,
max_epochs=2,
strategy=strategy,
plugins=[checkpoint_io],
default_root_dir=tmp_path,
)
model = ExampleMCoreModel()
test_trainer.fit(model)
assert isinstance(test_trainer.strategy.checkpoint_io, MockDistributedCheckpointIO)
assert checkpoint_io.save_checkpoint_called_args is not None
(state_dict, path), _ = checkpoint_io.save_checkpoint_called_args
# Ckpt path doesn't contain the .ckpt suffix
assert path.name == _get_last_checkpoint_dir(tmp_path, model).name
@pytest.mark.run_only_on('GPU')
def test_dist_ckpt_path_not_executed_for_non_core_models(self, tmp_path):
strategy = NLPDDPStrategy()
checkpoint_io = MockTorchCheckpointIO()
test_trainer = pl.Trainer(
enable_checkpointing=True,
logger=False,
max_epochs=2,
strategy=strategy,
plugins=[checkpoint_io],
default_root_dir=tmp_path,
)
model = ExampleModel()
test_trainer.fit(model)
assert isinstance(test_trainer.strategy.checkpoint_io, MockTorchCheckpointIO)
if test_trainer.is_global_zero:
assert checkpoint_io.save_checkpoint_called_args is not None
(state_dict, path), _ = checkpoint_io.save_checkpoint_called_args
# Ckpt path *does* contain the .ckpt suffix
assert os.path.basename(path) == _get_last_checkpoint_dir(tmp_path, model, suffix='.ckpt').name
else:
assert checkpoint_io.save_checkpoint_called_args is None
class TestAsyncSave:
@pytest.mark.run_only_on('GPU')
def test_async_save_produces_same_checkpoints_as_sync(self, tmp_path):
strategy = _get_nlp_strategy_without_optimizer_state()
sync_checkpoint_io = DistributedCheckpointIO('torch_dist')
async_checkpoint_io = AsyncFinalizableCheckpointIO(DistributedCheckpointIO('torch_dist', async_save=True))
model = ExampleMCoreModel()
# dummy_trainer just to initialize NCCL
dummy_trainer = pl.Trainer(
enable_checkpointing=False,
logger=False,
max_epochs=1,
strategy=_get_nlp_strategy_without_optimizer_state(),
plugins=[sync_checkpoint_io],
)
dummy_trainer.fit(model)
tmp_path = strategy.broadcast(tmp_path)
sync_ckpt_dir = tmp_path / 'sync_checkpoints'
async_ckpt_dir = tmp_path / 'async_checkpoints'
sync_test_trainer = pl.Trainer(
enable_checkpointing=True,
logger=False,
max_epochs=1,
strategy=_get_nlp_strategy_without_optimizer_state(),
plugins=[sync_checkpoint_io],
default_root_dir=sync_ckpt_dir,
)
sync_test_trainer.fit(model)
async_test_trainer = pl.Trainer(
enable_checkpointing=True,
logger=False,
max_epochs=1,
strategy=_get_nlp_strategy_without_optimizer_state(),
plugins=[async_checkpoint_io],
callbacks=AsyncFinalizerCallback(),
default_root_dir=async_ckpt_dir,
)
async_test_trainer.fit(model)
# Load and compare checkpoints
checkpoint = {'sharded_state_dict': model.sharded_state_dict()}
sync_state_dict = sync_checkpoint_io.load_checkpoint(
_get_last_checkpoint_dir(sync_ckpt_dir, model), sharded_state_dict=checkpoint
)
async_state_dict = async_checkpoint_io.load_checkpoint(
_get_last_checkpoint_dir(async_ckpt_dir, model), sharded_state_dict=checkpoint
)
assert sync_state_dict['sharded_state_dict']['const'] == async_state_dict['sharded_state_dict']['const']
assert torch.all(sync_state_dict['sharded_state_dict']['a'] == async_state_dict['sharded_state_dict']['a'])