|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import time |
|
|
|
|
|
os.environ["NEST_ASYNCIO"] = "0" |
|
|
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" |
|
|
import math |
|
|
import threading |
|
|
|
|
|
import bittensor as bt |
|
|
from transformers import AutoTokenizer |
|
|
|
|
|
|
|
|
from distributed_training.base.validator import BaseValidatorNeuron |
|
|
from distributed_training.utils.chain import log_peerid_to_chain |
|
|
from distributed_training.utils.misc import ( |
|
|
AsyncDendritePool, |
|
|
init_dht, |
|
|
load_wandb, |
|
|
setup_logging, |
|
|
) |
|
|
from distributed_training.utils.progress_tracker import ( |
|
|
GlobalTrainingProgress, |
|
|
LocalTrainingProgress, |
|
|
get_global_epoch, |
|
|
) |
|
|
from distributed_training.utils.state_loader import ( |
|
|
FastModelLoader, |
|
|
cleanup_old_cache, |
|
|
load_model_optimizer_gradient_averager, |
|
|
load_state_from_peer, |
|
|
) |
|
|
from distributed_training.utils.uids import map_uid_to_peerid, update_run_peerid_list |
|
|
from distributed_training.validator import forward |
|
|
|
|
|
|
|
|
class Validator(BaseValidatorNeuron): |
|
|
def __init__(self, config=None): |
|
|
super(Validator, self).__init__(config=config) |
|
|
self._update_wandb_project() |
|
|
self._init_basic_components() |
|
|
self._init_model_components() |
|
|
self._init_network_components() |
|
|
self._init_uid_components() |
|
|
|
|
|
def _update_wandb_project(self): |
|
|
suffix = "_validators" if self.neuron_type == "ValidatorNeuron" else "_miners" |
|
|
self.config.neuron.wandb_project += suffix |
|
|
|
|
|
def _init_basic_components(self): |
|
|
"""Initialize basic validator components""" |
|
|
setup_logging(config=self.config) |
|
|
|
|
|
|
|
|
self.device = self.config.neuron.device |
|
|
self.uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address) |
|
|
self.dendrite_pool = AsyncDendritePool( |
|
|
wallet=self.wallet, metagraph=self.metagraph |
|
|
) |
|
|
init_dht(self) |
|
|
|
|
|
|
|
|
self._init_progress_tracking() |
|
|
|
|
|
|
|
|
if not self.config.neuron.dont_wandb_log: |
|
|
self.wandb = load_wandb( |
|
|
self, self.config, self.wallet, "validator", str(self.dht.peer_id) |
|
|
) |
|
|
|
|
|
def _init_progress_tracking(self): |
|
|
self.local_progress = LocalTrainingProgress( |
|
|
peer_id=self.dht.peer_id.to_bytes(), |
|
|
epoch=0, |
|
|
samples_accumulated=0, |
|
|
samples_per_second=0.0, |
|
|
time=0.0, |
|
|
client_mode=False, |
|
|
inner_step=0, |
|
|
loss=0.0, |
|
|
) |
|
|
self.global_progress = GlobalTrainingProgress(epoch=0, samples_accumulated=0) |
|
|
self.global_progress.epoch = get_global_epoch(self) |
|
|
self.local_progress.epoch = self.global_progress.epoch |
|
|
|
|
|
if self.global_progress.epoch is None: |
|
|
bt.logging.error( |
|
|
"Model Tag Is None. Make Sure You Are Using The Correct Model Name" |
|
|
) |
|
|
|
|
|
def _init_model_components(self): |
|
|
"""Initialize model-related components including tokenizer and optimizer settings.""" |
|
|
self._setup_model_params() |
|
|
self._init_tokenizer() |
|
|
self._setup_model_state() |
|
|
self._setup_training_params() |
|
|
|
|
|
def _setup_model_params(self): |
|
|
|
|
|
self.load_state_timeout = 180 |
|
|
|
|
|
|
|
|
self.learning_rate_maximum = 4e-4 |
|
|
self.weight_decay = 0.1 |
|
|
self.num_inner_steps = 500 |
|
|
self.offload_optimizer = True |
|
|
self.model_upload_retry_limit = 3 |
|
|
self.model_upload_retry_delay = 10 |
|
|
|
|
|
|
|
|
self.maximum_steps = 306 * 4 |
|
|
self.warmup_steps = 62 |
|
|
self.failed_is_alive_counter_threshold = 10 |
|
|
|
|
|
def _init_tokenizer(self): |
|
|
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2", use_fast=False) |
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
def _setup_model_state(self): |
|
|
self.learning_rate = self.get_learning_rate() |
|
|
self.average_loss = None |
|
|
self.loader = FastModelLoader(self.config.neuron.hf_repo_id) |
|
|
|
|
|
load_model_optimizer_gradient_averager( |
|
|
self, self.config.neuron.global_model_name, self.global_progress.epoch |
|
|
) |
|
|
cleanup_old_cache(self) |
|
|
|
|
|
if self.local_progress.epoch < self.global_progress.epoch: |
|
|
load_state_from_peer(self, epoch=self.global_progress.epoch) |
|
|
|
|
|
def _setup_training_params(self): |
|
|
self.local_batch_size_train = self.config.neuron.local_batch_size_train |
|
|
self.local_batch_size_train_effective = ( |
|
|
self.config.neuron.local_batch_size_train_effective |
|
|
) |
|
|
self.logging_interval = 5 |
|
|
self.number_of_local_steps = ( |
|
|
self.config.neuron.local_batch_size_train_effective |
|
|
// self.config.neuron.local_batch_size_train |
|
|
) |
|
|
|
|
|
self.running_loss = 0.0 |
|
|
self.batch_count = 0 |
|
|
|
|
|
def _init_network_components(self): |
|
|
"""Initialize network and P2P components""" |
|
|
bt.logging.info("Logging PeerID to chain") |
|
|
log_peerid_to_chain(self) |
|
|
|
|
|
def _init_uid_components(self): |
|
|
self._setup_uids() |
|
|
self._init_peer_mapping() |
|
|
self._setup_allreduce_block() |
|
|
|
|
|
def _setup_uids(self): |
|
|
self.master_uid = self.metagraph.hotkeys.index( |
|
|
self.config.neuron.master_ss58_address, |
|
|
) |
|
|
self.failed_is_alive_counter = {uid: 0 for uid in self.metagraph.uids.tolist()} |
|
|
|
|
|
def _init_peer_mapping(self): |
|
|
self.stop_event = threading.Event() |
|
|
map_uid_to_peerid(self) |
|
|
update_run_peerid_list(self) |
|
|
|
|
|
def _setup_allreduce_block(self): |
|
|
if (self.uid == self.master_uid) or ( |
|
|
"last_allreduce_block" not in self.model.config.__dict__ |
|
|
): |
|
|
self.last_allreduce_block = self.block |
|
|
else: |
|
|
self.last_allreduce_block = self.model.config.last_allreduce_block |
|
|
|
|
|
def update_local_tracker_state(self, rewards, responses): |
|
|
for reward, response in zip(rewards, responses[0]): |
|
|
if (reward != 0) and (response.dataset_indices is not None): |
|
|
self.local_progress.samples_accumulated += len(response.dataset_indices) |
|
|
else: |
|
|
continue |
|
|
|
|
|
def get_learning_rate(self): |
|
|
learning_rate_minimum = self.learning_rate_maximum * 0.1 |
|
|
|
|
|
if self.global_progress.epoch < self.warmup_steps: |
|
|
return ( |
|
|
self.learning_rate_maximum |
|
|
* (self.global_progress.epoch + 1) |
|
|
/ self.warmup_steps |
|
|
) |
|
|
|
|
|
if self.global_progress.epoch > self.maximum_steps: |
|
|
return learning_rate_minimum |
|
|
|
|
|
decay_ratio = (self.global_progress.epoch - self.warmup_steps) / ( |
|
|
self.maximum_steps - self.warmup_steps |
|
|
) |
|
|
assert 0 <= decay_ratio <= 1 |
|
|
|
|
|
coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio)) |
|
|
return (learning_rate_minimum + coeff) * ( |
|
|
self.learning_rate_maximum - learning_rate_minimum |
|
|
) |
|
|
|
|
|
def get_validator_info(self): |
|
|
return { |
|
|
"block": self.metagraph.block.item(), |
|
|
"stake": self.metagraph.stake[self.uid], |
|
|
"rank": self.metagraph.ranks[self.uid], |
|
|
"vtrust": self.metagraph.validator_trust[self.uid], |
|
|
"dividends": self.metagraph.dividends[self.uid], |
|
|
"emissions": self.metagraph.emission[self.uid], |
|
|
} |
|
|
|
|
|
async def forward(self): |
|
|
return await forward(self) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
Validator().run() |
|
|
|