|
|
import asyncio |
|
|
import datetime as dt |
|
|
import random |
|
|
import time |
|
|
import traceback |
|
|
from typing import List |
|
|
|
|
|
import bittensor as bt |
|
|
import distributed_training |
|
|
import numpy as np |
|
|
from bittensor.core.chain_data import decode_account_id |
|
|
from hivemind.p2p import PeerID |
|
|
from hivemind.utils.timed_storage import ValueWithExpiration |
|
|
from huggingface_hub import list_repo_commits |
|
|
from distributed_training.utils.state_loader import check_model_exists |
|
|
|
|
|
|
|
|
async def check_uid(dendrite, axon, uid, epoch=None): |
|
|
try: |
|
|
response = await dendrite( |
|
|
axon, |
|
|
distributed_training.protocol.IsAlive(), |
|
|
deserialize=False, |
|
|
timeout=2.3, |
|
|
) |
|
|
if response.is_success: |
|
|
if (epoch is not None) and (response.epoch == epoch): |
|
|
bt.logging.trace(f"UID {uid} is active and on epoch {epoch}") |
|
|
return True |
|
|
elif (epoch is not None) and (response.epoch != epoch): |
|
|
bt.logging.trace(f"UID {uid} is active but not on epoch {epoch}") |
|
|
return False |
|
|
else: |
|
|
bt.logging.trace(f"UID {uid} is active.") |
|
|
return True |
|
|
else: |
|
|
bt.logging.trace(f"UID {uid} is not active.") |
|
|
return False |
|
|
except Exception as e: |
|
|
bt.logging.error(f"Error checking UID {uid}: {e}\n{traceback.format_exc()}") |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
async def check_uid_availability( |
|
|
dendrite, |
|
|
metagraph: "bt.metagraph.Metagraph", |
|
|
uid: int, |
|
|
vpermit_tao_limit: int, |
|
|
epoch: int = None, |
|
|
) -> bool: |
|
|
"""Check if uid is available. The UID should be available if it is serving and has less than vpermit_tao_limit stake |
|
|
Args: |
|
|
metagraph (:obj: bt.metagraph.Metagraph): Metagraph object |
|
|
uid (int): uid to be checked |
|
|
vpermit_tao_limit (int): Validator permit tao limit |
|
|
Returns: |
|
|
bool: True if uid is available, False otherwise |
|
|
""" |
|
|
|
|
|
|
|
|
if not metagraph.axons[uid].is_serving: |
|
|
return False |
|
|
|
|
|
|
|
|
if metagraph.validator_permit[uid]: |
|
|
if metagraph.S[uid] > vpermit_tao_limit: |
|
|
return False |
|
|
|
|
|
|
|
|
if not await check_uid(dendrite, metagraph.axons[uid], uid, epoch): |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
async def get_random_uids( |
|
|
self, dendrite, k: int, exclude: List[int] = None, epoch: int = None |
|
|
) -> np.ndarray: |
|
|
"""Returns k available random uids from the metagraph. |
|
|
Args: |
|
|
k (int): Number of uids to return. |
|
|
exclude (List[int]): List of uids to exclude from the random sampling. |
|
|
Returns: |
|
|
uids (np.ndarray): Randomly sampled available uids. |
|
|
Notes: |
|
|
If `k` is larger than the number of available `uids`, set `k` to the number of available `uids`. |
|
|
""" |
|
|
candidate_uids = [] |
|
|
avail_uids = [] |
|
|
uids = [i for i in range(self.metagraph.n)] |
|
|
random.shuffle(uids) |
|
|
|
|
|
responses = [] |
|
|
attempt = 0 |
|
|
limit = self.config.neuron.uid_isalive_limit |
|
|
while (sum(responses) < k) and ( |
|
|
(attempt < (int(self.metagraph.n / limit) - 1)) or (attempt == 0) |
|
|
): |
|
|
tasks = [] |
|
|
if limit > int(self.metagraph.n): |
|
|
limit = int(self.metagraph.n) |
|
|
|
|
|
for i in range((attempt * limit), (attempt * limit) + limit): |
|
|
|
|
|
tasks.append( |
|
|
check_uid_availability( |
|
|
dendrite, |
|
|
self.metagraph, |
|
|
uids[i], |
|
|
self.config.neuron.vpermit_tao_limit, |
|
|
None, |
|
|
) |
|
|
) |
|
|
responses += await asyncio.gather(*tasks) |
|
|
attempt += 1 |
|
|
|
|
|
for i, response in enumerate(responses): |
|
|
if response == False: |
|
|
self.failed_is_alive_counter[uids[i]] += 1 |
|
|
else: |
|
|
self.failed_is_alive_counter[uids[i]] = 0 |
|
|
|
|
|
for uid, uid_is_available in zip(uids, (responses)): |
|
|
uid_is_not_excluded = exclude is None or uid not in exclude |
|
|
if uid_is_available: |
|
|
avail_uids.append(uid) |
|
|
if uid_is_not_excluded: |
|
|
candidate_uids.append(uid) |
|
|
|
|
|
|
|
|
available_uids = candidate_uids |
|
|
if len(candidate_uids) < k: |
|
|
uids = np.array(available_uids) |
|
|
else: |
|
|
uids = np.array(random.sample(available_uids, k)) |
|
|
return uids |
|
|
|
|
|
|
|
|
async def get_hf_validation_uid(self, outer_step: int = None): |
|
|
uids = [] |
|
|
try: |
|
|
self.uid_tracker = dict( |
|
|
sorted( |
|
|
self.uid_tracker.items(), |
|
|
key=lambda item: item[1]["train_similarity_score_last_updated"], |
|
|
) |
|
|
) |
|
|
bt.logging.info( |
|
|
{ |
|
|
k: v["train_similarity_score_last_updated"] |
|
|
for k, v in self.uid_tracker.items() |
|
|
} |
|
|
) |
|
|
for uid in self.uid_tracker.keys(): |
|
|
if ( |
|
|
(self.uid_tracker[uid]["model_huggingface_id"] is not None) |
|
|
and (check_model_exists(self.uid_tracker[uid]["model_huggingface_id"])) |
|
|
and ( |
|
|
(self.uid_tracker[uid]["last_commit"] is None) |
|
|
or ( |
|
|
list_repo_commits( |
|
|
self.uid_tracker[uid]["model_huggingface_id"], |
|
|
repo_type="model", |
|
|
)[0] |
|
|
!= self.uid_tracker[uid]["last_commit"] |
|
|
) |
|
|
) |
|
|
): |
|
|
uids.append(uid) |
|
|
return uids |
|
|
else: |
|
|
continue |
|
|
except Exception as e: |
|
|
bt.logging.info(f"Error getting UIDs: {e}") |
|
|
|
|
|
return uids |
|
|
|
|
|
|
|
|
async def map_uid_to_peerid_archive(self, uids): |
|
|
|
|
|
peer_list_dht = await self._p2p.list_peers() |
|
|
peer_list_dht_addrs = [ |
|
|
str(peer.addrs[0]).split("/ip4/")[1].split("/")[0] for peer in peer_list_dht |
|
|
] |
|
|
peer_list_dht_ports = [str(peer.addrs[0]).split("/")[-1] for peer in peer_list_dht] |
|
|
|
|
|
|
|
|
prefix = self.grad_averager.matchmaking_kwargs["prefix"] |
|
|
metadata, _ = self.dht.get(f"{prefix}.all_averagers", latest=True) or ( |
|
|
{}, |
|
|
None, |
|
|
) |
|
|
|
|
|
uids_to_peerids = {} |
|
|
for uid in uids: |
|
|
miner_ip = self.metagraph.axons[uid].ip |
|
|
miner_port = self.metagraph.axons[uid].port |
|
|
|
|
|
if metadata is None: |
|
|
|
|
|
uids_to_peerids[uid] = None |
|
|
continue |
|
|
|
|
|
run_peer_id_list = [ |
|
|
str(PeerID(peer_id)) |
|
|
for peer_id, info in metadata.items() |
|
|
if isinstance(info, ValueWithExpiration) |
|
|
and isinstance(info.value, (float, int)) |
|
|
] |
|
|
|
|
|
|
|
|
if miner_ip not in peer_list_dht_addrs: |
|
|
|
|
|
uids_to_peerids[uid] = None |
|
|
continue |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
peer_id = peer_list_dht[peer_list_dht_addrs.index(miner_ip)].peer_id |
|
|
|
|
|
|
|
|
if str(peer_id) not in run_peer_id_list: |
|
|
|
|
|
uids_to_peerids[uid] = None |
|
|
continue |
|
|
else: |
|
|
|
|
|
uids_to_peerids[uid] = peer_id |
|
|
continue |
|
|
|
|
|
return uids_to_peerids |
|
|
|
|
|
|
|
|
def update_run_peerid_list(self): |
|
|
prefix = self.grad_averager.matchmaking_kwargs["prefix"] |
|
|
metadata, _ = self.dht.get(f"{prefix}.all_averagers", latest=True) or ( |
|
|
{}, |
|
|
None, |
|
|
) |
|
|
self.run_peer_id_list = [ |
|
|
str(PeerID(peer_id)) |
|
|
for peer_id, info in metadata.items() |
|
|
if isinstance(info, ValueWithExpiration) |
|
|
and isinstance(info.value, (float, int)) |
|
|
] |
|
|
|
|
|
|
|
|
def map_uid_to_peerid_background_task(self): |
|
|
|
|
|
uid_last_checked = dict() |
|
|
while not self.stop_event.is_set(): |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
next_uid = next(self.uid_iterator) |
|
|
|
|
|
time_diff = ( |
|
|
dt.datetime.now() - uid_last_checked[next_uid] |
|
|
if next_uid in uid_last_checked |
|
|
else None |
|
|
) |
|
|
if time_diff and time_diff < dt.timedelta(minutes=5): |
|
|
|
|
|
time_to_sleep = (dt.timedelta(minutes=5) - time_diff).total_seconds() |
|
|
bt.logging.trace( |
|
|
f"Update loop has already processed all UIDs in the last 5 minutes. Sleeping {time_to_sleep} seconds." |
|
|
) |
|
|
time.sleep(time_to_sleep) |
|
|
|
|
|
uid_last_checked[next_uid] = dt.datetime.now() |
|
|
|
|
|
metadata = bt.core.extrinsics.serving.get_metadata( |
|
|
self.subtensor, self.config.netuid, self.metagraph.hotkeys[next_uid] |
|
|
) |
|
|
if metadata is not None: |
|
|
commitment = metadata["info"]["fields"][0] |
|
|
hex_data = commitment[list(commitment.keys())[0]][2:] |
|
|
chain_str = bytes.fromhex(hex_data).decode() |
|
|
updated = (chain_str, metadata["block"]) |
|
|
else: |
|
|
updated = (None, None) |
|
|
|
|
|
if (self.uids_to_peerids[next_uid][0] != updated[0]) and ( |
|
|
updated[0] |
|
|
not in [peerid_info[0] for peerid_info in self.uids_to_peerids.values()] |
|
|
): |
|
|
bt.logging.info( |
|
|
f"Updated peerID for UID={next_uid}. Previous = {self.uids_to_peerids[next_uid][0]}. Current = {updated[0]}" |
|
|
) |
|
|
self.uids_to_peerids[next_uid] = updated |
|
|
elif (self.uids_to_peerids[next_uid][0] != updated[0]) and ( |
|
|
updated[0] |
|
|
in [peerid_info[0] for peerid_info in self.uids_to_peerids.values()] |
|
|
): |
|
|
indices = [ |
|
|
index |
|
|
for index, peerid_info in enumerate(self.uids_to_peerids.values()) |
|
|
if peerid_info[0] == updated[0] |
|
|
] |
|
|
for index in indices: |
|
|
if self.uids_to_peerids[index][1] > updated[1]: |
|
|
self.uids_to_peerids[index] = (None, None) |
|
|
bt.logging.info( |
|
|
f"The same peerID was found for UID={index} with a later commit message. Setting the peerID for that UID={index} to None. Previous = {self.uids_to_peerids[next_uid][0]}. Current = {updated[0]}" |
|
|
) |
|
|
self.uids_to_peerids[next_uid] = updated |
|
|
bt.logging.info( |
|
|
f"Updated peerID for UID={next_uid}. Previous = {self.uids_to_peerids[next_uid][0]}. Current = {updated[0]}" |
|
|
) |
|
|
break |
|
|
else: |
|
|
updated = (None, None) |
|
|
bt.logging.info( |
|
|
f"The same peerID was found for UID={index} with an earlier commit message. Setting the peerID for UID={next_uid} to None. Previous = {self.uids_to_peerids[next_uid][0]}. Current = {updated[0]}" |
|
|
) |
|
|
self.uids_to_peerids[next_uid] = updated |
|
|
|
|
|
except Exception as e: |
|
|
bt.logging.error(f"Error in update loop: {e} \n {traceback.format_exc()}") |
|
|
|
|
|
bt.logging.info("Exiting update models loop.") |
|
|
|
|
|
|
|
|
def decode_metadata(encoded_ss58: tuple, metadata: dict) -> tuple[str, str]: |
|
|
decoded_key = decode_account_id(encoded_ss58[0]) |
|
|
commitment = metadata["info"]["fields"][0][0] |
|
|
bytes_tuple = commitment[next(iter(commitment.keys()))][0] |
|
|
return decoded_key, bytes(bytes_tuple).decode() |
|
|
|
|
|
|
|
|
def map_uid_to_peerid(self): |
|
|
result = {} |
|
|
try: |
|
|
subtensor = bt.subtensor(config=self.config) |
|
|
result = subtensor.substrate.query_map( |
|
|
module="Commitments", |
|
|
storage_function="CommitmentOf", |
|
|
params=[self.config.netuid], |
|
|
block_hash=None, |
|
|
) |
|
|
|
|
|
hotkey_to_uid = dict(zip(self.metagraph.hotkeys, self.metagraph.uids.tolist())) |
|
|
except Exception as e: |
|
|
bt.logging.info(f"Error {e} when querying UID commitments") |
|
|
|
|
|
for key, value in result: |
|
|
try: |
|
|
hotkey, metadata = decode_metadata(key, value.value) |
|
|
if hotkey not in hotkey_to_uid: |
|
|
continue |
|
|
|
|
|
uid = hotkey_to_uid[hotkey] |
|
|
last_updated_block = value.value.get("block", None) |
|
|
concatenated = eval(metadata) |
|
|
|
|
|
if "peer_id" not in concatenated: |
|
|
bt.logging.debug( |
|
|
f"Invalid commitment for UID {uid}: peer_id not in commitment metadata" |
|
|
) |
|
|
continue |
|
|
if "model_huggingface_id" not in concatenated: |
|
|
bt.logging.debug( |
|
|
f"Invalid commitment for UID {uid}: model_huggingface_id not in commitment metadata" |
|
|
) |
|
|
if concatenated["peer_id"] != self.uid_tracker[uid]["peer_id"]: |
|
|
uid_peerid_metadata = [ |
|
|
metadata["peer_id"] |
|
|
for key, metadata in self.uid_tracker.items() |
|
|
if key != uid |
|
|
] |
|
|
if concatenated["peer_id"] not in uid_peerid_metadata: |
|
|
self.uid_tracker[uid]["peer_id"] = concatenated["peer_id"] |
|
|
self.uid_tracker[uid]["last_updated_block"] = last_updated_block |
|
|
else: |
|
|
uid_list = [ |
|
|
uid |
|
|
for uid, metadata in self.uid_tracker.items() |
|
|
if metadata["peer_id"] == concatenated["peer_id"] |
|
|
] |
|
|
for uid_i in uid_list: |
|
|
if ( |
|
|
self.uid_tracker[uid_i]["last_updated_block"] is not None |
|
|
) and ( |
|
|
self.uid_tracker[uid_i]["last_updated_block"] |
|
|
> last_updated_block |
|
|
): |
|
|
self.uid_tracker[uid_i]["last_updated_block"] = None |
|
|
self.uid_tracker[uid_i]["model_huggingface_id"] = None |
|
|
self.uid_tracker[uid_i]["peer_id"] = None |
|
|
else: |
|
|
self.uid_tracker[uid]["last_updated_block"] = None |
|
|
self.uid_tracker[uid]["model_huggingface_id"] = None |
|
|
self.uid_tracker[uid]["peer_id"] = None |
|
|
if ( |
|
|
concatenated["model_huggingface_id"] |
|
|
!= self.uid_tracker[uid]["model_huggingface_id"] |
|
|
): |
|
|
self.uid_tracker[uid]["model_huggingface_id"] = concatenated[ |
|
|
"model_huggingface_id" |
|
|
] |
|
|
uid_peerid_metadata = [ |
|
|
metadata["model_huggingface_id"] |
|
|
for key, metadata in self.uid_tracker.items() |
|
|
if key != uid |
|
|
] |
|
|
if concatenated["model_huggingface_id"] not in uid_peerid_metadata: |
|
|
self.uid_tracker[uid]["model_huggingface_id"] = concatenated[ |
|
|
"model_huggingface_id" |
|
|
] |
|
|
self.uid_tracker[uid]["last_updated_block"] = last_updated_block |
|
|
else: |
|
|
uid_list = [ |
|
|
uid |
|
|
for uid, metadata in self.uid_tracker.items() |
|
|
if metadata["model_huggingface_id"] |
|
|
== concatenated["model_huggingface_id"] |
|
|
] |
|
|
for uid_i in uid_list: |
|
|
if ( |
|
|
self.uid_tracker[uid_i]["last_updated_block"] is not None |
|
|
) and ( |
|
|
self.uid_tracker[uid_i]["last_updated_block"] |
|
|
> last_updated_block |
|
|
): |
|
|
self.uid_tracker[uid_i]["last_updated_block"] = None |
|
|
self.uid_tracker[uid_i]["model_huggingface_id"] = None |
|
|
self.uid_tracker[uid_i]["peer_id"] = None |
|
|
else: |
|
|
self.uid_tracker[uid]["last_updated_block"] = None |
|
|
self.uid_tracker[uid]["model_huggingface_id"] = None |
|
|
self.uid_tracker[uid]["peer_id"] = None |
|
|
|
|
|
bt.logging.debug(f"Retrieved commitment for UID {uid}: {metadata}") |
|
|
|
|
|
except Exception as e: |
|
|
bt.logging.debug(f"Failed to decode commitment for UID {uid}: {e}") |
|
|
continue |
|
|
|
|
|
bt.logging.debug("Finished extracting commitments for all uids") |
|
|
|