| | import sqlite3 |
| | import os |
| | from huggingface_hub import HfApi |
| | import requests |
| | import huggingface_hub |
| | from huggingface_hub.hf_api import SpaceInfo |
| | from typing import List, Dict, Any, Union |
| | import json |
| |
|
| | |
| | print("HuggingFace Hub version:", huggingface_hub.__version__) |
| | |
| |
|
| | |
| | DB_PATH = '/data/huggingface_spaces.db' if os.path.exists('/data') else 'huggingface_spaces.db' |
| | SQL_CREATE_SPACES = 'sql/create_spaces.sql' |
| | SQL_UPDATE_SPACES = 'sql/update_spaces.sql' |
| | SQL_CREATE_ENDPOINTS = 'sql/create_endpoints.sql' |
| | SQL_UPDATE_ENDPOINTS = 'sql/update_endpoints.sql' |
| | SQL_CREATE_TOOLS = 'sql/create_tools.sql' |
| | SQL_UPDATE_TOOLS = 'sql/update_tools.sql' |
| |
|
| | from sql.sql_utils import load_sql_query, is_database_outdated, update_db_timestamp, create_metadata_table |
| |
|
| |
|
| | def create_database(): |
| | """Initialize database if needed""" |
| | |
| | create_metadata_table(DB_PATH) |
| |
|
| | query_create_spaces = load_sql_query(SQL_CREATE_SPACES) |
| | query_create_endpoints = load_sql_query(SQL_CREATE_ENDPOINTS) |
| | query_create_tools = load_sql_query(SQL_CREATE_TOOLS) |
| | |
| | with sqlite3.connect(DB_PATH) as conn: |
| | conn.executescript(query_create_spaces) |
| | conn.executescript(query_create_endpoints) |
| | conn.executescript(query_create_tools) |
| | conn.commit() |
| |
|
| | def generate_endpoint_urls(space_id: str) -> Dict[str, str]: |
| | """Generate potential endpoint URLs for a space""" |
| | |
| | subdomain = space_id.replace('/', '-').replace('_', '-').lower() |
| | |
| | return { |
| | "sse": f"https://{subdomain}.hf.space/gradio_api/mcp/sse", |
| | "schema": f"https://{subdomain}.hf.space/gradio_api/mcp/schema" |
| | } |
| |
|
| | def check_endpoint_availability(url: str) -> bool: |
| | """Check if endpoint exists and returns valid response""" |
| | try: |
| | response = requests.head(url, timeout=5, allow_redirects=True) |
| | return response.status_code == 200 |
| | except (requests.exceptions.RequestException, requests.exceptions.Timeout): |
| | return False |
| |
|
| | def normalize_tool_format(tool_data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> List[tuple[str, str, Dict]]: |
| | """ |
| | Normalize different tool formats into a consistent format. |
| | Returns list of tuples: (tool_name, description, properties) |
| | """ |
| | result = [] |
| | |
| | if isinstance(tool_data, list): |
| | |
| | for tool in tool_data: |
| | if name := tool.get('name'): |
| | result.append(( |
| | name, |
| | tool.get('description', ''), |
| | tool.get('inputSchema', {}) |
| | )) |
| | else: |
| | |
| | for name, data in tool_data.items(): |
| | result.append(( |
| | name, |
| | data.get('description', ''), |
| | data |
| | )) |
| | |
| | return result |
| |
|
| | def fetch_and_parse_schema(url: str) -> Union[Dict[str, Any], List[Dict[str, Any]]]: |
| | """Fetch and parse tool schema from endpoint""" |
| | try: |
| | response = requests.get(url, timeout=10) |
| | response.raise_for_status() |
| | return response.json() |
| | except (requests.exceptions.RequestException, ValueError): |
| | return [] |
| |
|
| | def save_endpoints_and_tools(conn: sqlite3.Connection, space_id: str): |
| | """Discover and store endpoints and tools for a space""" |
| | cursor = conn.cursor() |
| | endpoint_urls = generate_endpoint_urls(space_id) |
| | |
| | |
| | for endpoint_type, url in endpoint_urls.items(): |
| | if check_endpoint_availability(url): |
| | query_update_endpoints = load_sql_query(SQL_UPDATE_ENDPOINTS) |
| | cursor.execute(query_update_endpoints, (space_id, endpoint_type, url)) |
| | |
| | |
| | if 'schema' in endpoint_urls and check_endpoint_availability(endpoint_urls['schema']): |
| | tools_data = fetch_and_parse_schema(endpoint_urls['schema']) |
| | |
| | if not tools_data: |
| | return |
| | |
| | |
| | for tool_name, description, properties in normalize_tool_format(tools_data): |
| | try: |
| | query_update_tools = load_sql_query(SQL_UPDATE_TOOLS) |
| | cursor.execute(query_update_tools, ( |
| | space_id, |
| | tool_name, |
| | description, |
| | json.dumps(properties) |
| | )) |
| | except Exception as e: |
| | print(f"Error saving tool {tool_name} for space {space_id}: {e}") |
| | continue |
| |
|
| | def fetch_spaces() -> List[Dict[str, Any]]: |
| | """ |
| | Fetch spaces using the Hugging Face API with enhanced filtering and model card access |
| | """ |
| | api = HfApi() |
| | spaces = [] |
| | |
| | try: |
| | |
| | for space in api.list_spaces( |
| | filter="gradio", |
| | search="mcp-server", |
| | limit=100, |
| | full=True, |
| | ): |
| | try: |
| | |
| | space_info: SpaceInfo = api.space_info(repo_id=f"{space.id}") |
| | |
| | |
| | model_card = space_info.cardData |
| | |
| | |
| | tags = space_info.cardData.get("tags", []) |
| | description = space_info.cardData.get("description", "") |
| |
|
| | title = space.id.split("/")[-1] |
| | author = space.id.split("/")[0] |
| |
|
| | likes = space_info.likes |
| | url = f"https://huggingface.co/spaces/{space.id}" |
| | |
| | spaces.append({ |
| | 'id': space.id, |
| | 'title': title, |
| | 'author': author, |
| | 'description': description, |
| | 'likes': likes, |
| | 'url': url, |
| | |
| | 'tags': ' '.join(tags) if tags else None, |
| | 'last_modified': space_info.lastModified, |
| | 'private': space_info.private, |
| | }) |
| | except Exception as e: |
| | print(f"Error fetching space info for {space.id}: {e}") |
| | continue |
| | |
| | return spaces |
| | |
| | except Exception as e: |
| | print(f"Error fetching spaces: {e}") |
| | return [] |
| |
|
| | def save_to_database(spaces): |
| | """Save spaces data to database and process endpoints and tools""" |
| | |
| | query_update_spaces = load_sql_query(SQL_UPDATE_SPACES) |
| | |
| | |
| | conn = sqlite3.connect(DB_PATH) |
| | cursor = conn.cursor() |
| | |
| | try: |
| | for space in spaces: |
| | try: |
| | |
| | cursor.execute(query_update_spaces, ( |
| | space['id'], |
| | space['title'], |
| | space['author'], |
| | space['description'], |
| | space['likes'], |
| | space['url'], |
| | space['tags'], |
| | space['last_modified'], |
| | space['private'] |
| | )) |
| | |
| | |
| | save_endpoints_and_tools(conn, space['id']) |
| | |
| | except sqlite3.IntegrityError as e: |
| | print(f"Error saving space {space['title']}: {e}") |
| | continue |
| | except Exception as e: |
| | print(f"Unexpected error processing space {space['title']}: {e}") |
| | continue |
| | |
| | conn.commit() |
| | print(f"Database saved at: {DB_PATH}") |
| | print(f"Processed {len(spaces)} spaces") |
| | |
| | finally: |
| | conn.close() |
| |
|
| | def update_database(): |
| | """Update database if needed""" |
| | |
| | create_database() |
| | if not is_database_outdated(DB_PATH): |
| | print("Database is up to date") |
| | return False |
| | |
| | print("Starting fetching process...") |
| | spaces_data = fetch_spaces() |
| | save_to_database(spaces_data) |
| | |
| | |
| | update_db_timestamp(DB_PATH) |
| | |
| | print("Process complete! Data saved to database") |
| | return True |
| |
|
| | if __name__ == "__main__": |
| | update_database() |
| |
|