synapz's picture
Upload folder using huggingface_hub
7cd774e verified
# The MIT License (MIT)
# Copyright © 2023 Yuma Rao
# Copyright © 2023 KMFODA
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the “Software”), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of
# the Software.
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
import os
import time
os.environ["NEST_ASYNCIO"] = "0"
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
import math
import threading
import bittensor as bt
from transformers import AutoTokenizer
from distributed_training.base.validator import BaseValidatorNeuron
from distributed_training.utils.chain import log_peerid_to_chain
from distributed_training.utils.misc import (
AsyncDendritePool,
init_dht,
load_wandb,
setup_logging,
)
from distributed_training.utils.progress_tracker import (
GlobalTrainingProgress,
LocalTrainingProgress,
get_global_epoch,
)
from distributed_training.utils.state_loader import (
FastModelLoader,
cleanup_old_cache,
load_model_optimizer_gradient_averager,
load_state_from_peer,
)
from distributed_training.utils.uids import map_uid_to_peerid, update_run_peerid_list
from distributed_training.validator import forward
class Validator(BaseValidatorNeuron):
def __init__(self, config=None):
super(Validator, self).__init__(config=config)
self._update_wandb_project()
self._init_basic_components()
self._init_model_components()
self._init_network_components()
self._init_uid_components()
def _update_wandb_project(self):
suffix = "_validators" if self.neuron_type == "ValidatorNeuron" else "_miners"
self.config.neuron.wandb_project += suffix
def _init_basic_components(self):
"""Initialize basic validator components"""
setup_logging(config=self.config)
# Core setup
self.device = self.config.neuron.device
self.uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address)
self.dendrite_pool = AsyncDendritePool(
wallet=self.wallet, metagraph=self.metagraph
)
init_dht(self)
# Progress tracking
self._init_progress_tracking()
# Wandb setup
if not self.config.neuron.dont_wandb_log:
self.wandb = load_wandb(
self, self.config, self.wallet, "validator", str(self.dht.peer_id)
)
def _init_progress_tracking(self):
self.local_progress = LocalTrainingProgress(
peer_id=self.dht.peer_id.to_bytes(),
epoch=0,
samples_accumulated=0,
samples_per_second=0.0,
time=0.0,
client_mode=False,
inner_step=0,
loss=0.0,
)
self.global_progress = GlobalTrainingProgress(epoch=0, samples_accumulated=0)
self.global_progress.epoch = get_global_epoch(self)
self.local_progress.epoch = self.global_progress.epoch
if self.global_progress.epoch is None:
bt.logging.error(
"Model Tag Is None. Make Sure You Are Using The Correct Model Name"
)
def _init_model_components(self):
"""Initialize model-related components including tokenizer and optimizer settings."""
self._setup_model_params()
self._init_tokenizer()
self._setup_model_state()
self._setup_training_params()
def _setup_model_params(self):
# Timeouts
self.load_state_timeout = 180
# Core parameters
self.learning_rate_maximum = 4e-4
self.weight_decay = 0.1
self.num_inner_steps = 500
self.offload_optimizer = True
self.model_upload_retry_limit = 3
self.model_upload_retry_delay = 10
# Validator-specific training parameters
self.maximum_steps = 306 * 4 # 10_000_000_000/(32000*1024)
self.warmup_steps = 62 # 306 / 5
self.failed_is_alive_counter_threshold = 10
def _init_tokenizer(self):
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2", use_fast=False)
self.tokenizer.pad_token = self.tokenizer.eos_token
def _setup_model_state(self):
self.learning_rate = self.get_learning_rate()
self.average_loss = None
self.loader = FastModelLoader(self.config.neuron.hf_repo_id)
load_model_optimizer_gradient_averager(
self, self.config.neuron.global_model_name, self.global_progress.epoch
)
cleanup_old_cache(self)
if self.local_progress.epoch < self.global_progress.epoch:
load_state_from_peer(self, epoch=self.global_progress.epoch)
def _setup_training_params(self):
self.local_batch_size_train = self.config.neuron.local_batch_size_train
self.local_batch_size_train_effective = (
self.config.neuron.local_batch_size_train_effective
)
self.logging_interval = 5
self.number_of_local_steps = (
self.config.neuron.local_batch_size_train_effective
// self.config.neuron.local_batch_size_train
)
self.running_loss = 0.0
self.batch_count = 0
def _init_network_components(self):
"""Initialize network and P2P components"""
bt.logging.info("Logging PeerID to chain")
log_peerid_to_chain(self)
def _init_uid_components(self):
self._setup_uids()
self._init_peer_mapping()
self._setup_allreduce_block()
def _setup_uids(self):
self.master_uid = self.metagraph.hotkeys.index(
self.config.neuron.master_ss58_address,
)
self.failed_is_alive_counter = {uid: 0 for uid in self.metagraph.uids.tolist()}
def _init_peer_mapping(self):
self.stop_event = threading.Event()
map_uid_to_peerid(self)
update_run_peerid_list(self)
def _setup_allreduce_block(self):
if (self.uid == self.master_uid) or (
"last_allreduce_block" not in self.model.config.__dict__
):
self.last_allreduce_block = self.block
else:
self.last_allreduce_block = self.model.config.last_allreduce_block
def update_local_tracker_state(self, rewards, responses):
for reward, response in zip(rewards, responses[0]):
if (reward != 0) and (response.dataset_indices is not None):
self.local_progress.samples_accumulated += len(response.dataset_indices)
else:
continue
def get_learning_rate(self):
learning_rate_minimum = self.learning_rate_maximum * 0.1
# 1) linear warmup for warmup_steps
if self.global_progress.epoch < self.warmup_steps:
return (
self.learning_rate_maximum
* (self.global_progress.epoch + 1)
/ self.warmup_steps
)
# 2) if epoch > lr_decay_iters, return learning_rate_minimum
if self.global_progress.epoch > self.maximum_steps:
return learning_rate_minimum
# 3) if in between, use cosine decay down to min learning rate
decay_ratio = (self.global_progress.epoch - self.warmup_steps) / (
self.maximum_steps - self.warmup_steps
)
assert 0 <= decay_ratio <= 1
# coeff starts at 1 and goes to 0
coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))
return (learning_rate_minimum + coeff) * (
self.learning_rate_maximum - learning_rate_minimum
)
def get_validator_info(self):
return {
"block": self.metagraph.block.item(),
"stake": self.metagraph.stake[self.uid],
"rank": self.metagraph.ranks[self.uid],
"vtrust": self.metagraph.validator_trust[self.uid],
"dividends": self.metagraph.dividends[self.uid],
"emissions": self.metagraph.emission[self.uid],
}
async def forward(self):
return await forward(self)
# The main function parses the configuration and runs the validator.
if __name__ == "__main__":
Validator().run()