| |
| """Monty-backed API orchestration executor (v2). |
| |
| v2 goals: |
| - HfApi-first helper implementations for endpoints covered by huggingface_hub. |
| - Thin raw API fallback for uncovered endpoints (/api/recent-activity, /api/trending, |
| /api/users/<u>/likes event stream, collections q-search). |
| - Stable machine-first helper envelopes (`items` + optional `item`, no polymorphic payloads). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import asyncio |
| import ast |
| import inspect |
| import json |
| import os |
| import re |
| import time |
| from itertools import islice |
| from typing import Any, Callable, cast, get_args |
| from urllib.error import HTTPError, URLError |
| from urllib.parse import urlencode |
| from urllib.request import Request, urlopen |
|
|
| from huggingface_hub import HfApi |
| from huggingface_hub.hf_api import DatasetSort_T, ModelSort_T, SpaceSort_T |
|
|
| |
| |
| |
| |
| DEFAULT_TIMEOUT_SEC = 90 |
| DEFAULT_MAX_CALLS = 400 |
| MAX_CALLS_LIMIT = 400 |
| INTERNAL_STRICT_MODE = False |
|
|
| |
| |
| |
| |
| |
| OUTPUT_ITEMS_TRUNCATION_LIMIT = 500 |
| EXHAUSTIVE_HELPER_RETURN_HARD_CAP = 2_000 |
| SELECTIVE_ENDPOINT_RETURN_HARD_CAP = 200 |
| TRENDING_ENDPOINT_MAX_LIMIT = 20 |
|
|
| |
| |
| GRAPH_SCAN_LIMIT_CAP = 10_000 |
| LIKES_SCAN_LIMIT_CAP = 10_000 |
| LIKES_RANKING_WINDOW_DEFAULT = 40 |
| LIKES_ENRICHMENT_MAX_REPOS = 50 |
| RECENT_ACTIVITY_PAGE_SIZE = 100 |
| RECENT_ACTIVITY_SCAN_MAX_PAGES = 10 |
|
|
| |
| |
| USER_SUMMARY_GRAPH_SCAN_LIMIT = 1_000 |
| USER_SUMMARY_LIKES_SCAN_LIMIT = 1_000 |
| USER_SUMMARY_ACTIVITY_MAX_PAGES = 3 |
|
|
| |
| |
| DEFAULT_MONTY_MAX_MEMORY = 64 * 1024 * 1024 |
| DEFAULT_MONTY_MAX_ALLOCATIONS = 250_000 |
| DEFAULT_MONTY_MAX_RECURSION_DEPTH = 100 |
|
|
| _MODEL_SORT_KEYS = set(get_args(ModelSort_T)) or { |
| "created_at", |
| "downloads", |
| "last_modified", |
| "likes", |
| "trending_score", |
| } |
| _DATASET_SORT_KEYS = set(get_args(DatasetSort_T)) or { |
| "created_at", |
| "downloads", |
| "last_modified", |
| "likes", |
| "trending_score", |
| } |
| _SPACE_SORT_KEYS = set(get_args(SpaceSort_T)) or { |
| "created_at", |
| "last_modified", |
| "likes", |
| "trending_score", |
| } |
|
|
| _REPO_SORT_KEYS: dict[str, set[str]] = { |
| "model": _MODEL_SORT_KEYS, |
| "dataset": _DATASET_SORT_KEYS, |
| "space": _SPACE_SORT_KEYS, |
| } |
|
|
| _SORT_KEY_ALIASES: dict[str, str] = { |
| "createdat": "created_at", |
| "created_at": "created_at", |
| "created-at": "created_at", |
| "downloads": "downloads", |
| "likes": "likes", |
| "lastmodified": "last_modified", |
| "last_modified": "last_modified", |
| "last-modified": "last_modified", |
| "trendingscore": "trending_score", |
| "trending_score": "trending_score", |
| "trending-score": "trending_score", |
| "trending": "trending_score", |
| } |
|
|
| _USER_FIELD_ALIASES: dict[str, str] = { |
| "login": "username", |
| "user": "username", |
| "handle": "username", |
| "name": "fullname", |
| "full_name": "fullname", |
| "full-name": "fullname", |
| "is_pro": "isPro", |
| "ispro": "isPro", |
| "pro": "isPro", |
| } |
|
|
| _ACTOR_FIELD_ALIASES: dict[str, str] = { |
| **_USER_FIELD_ALIASES, |
| "entity_type": "type", |
| "user_type": "type", |
| "actor_type": "type", |
| } |
|
|
| |
| |
| |
| _REPO_FIELD_ALIASES: dict[str, str] = { |
| "repoid": "repo_id", |
| "repotype": "repo_type", |
| "repourl": "repo_url", |
| "createdat": "created_at", |
| "lastmodified": "last_modified", |
| "pipelinetag": "pipeline_tag", |
| "trendingscore": "trending_score", |
| "libraryname": "library_name", |
| "paperswithcodeid": "paperswithcode_id", |
| } |
|
|
| _COLLECTION_FIELD_ALIASES: dict[str, str] = { |
| "collectionid": "collection_id", |
| "lastupdated": "last_updated", |
| "ownertype": "owner_type", |
| "itemcount": "item_count", |
| "author": "owner", |
| } |
|
|
| REPO_CANONICAL_FIELDS: tuple[str, ...] = ( |
| "repo_id", |
| "repo_type", |
| "title", |
| "author", |
| "likes", |
| "downloads", |
| "created_at", |
| "last_modified", |
| "pipeline_tag", |
| "repo_url", |
| "tags", |
| "library_name", |
| "description", |
| "paperswithcode_id", |
| "sdk", |
| "models", |
| "datasets", |
| "subdomain", |
| ) |
|
|
| USER_CANONICAL_FIELDS: tuple[str, ...] = ( |
| "username", |
| "fullname", |
| "bio", |
| "websiteUrl", |
| "twitter", |
| "github", |
| "linkedin", |
| "bluesky", |
| "followers", |
| "following", |
| "likes", |
| "isPro", |
| ) |
|
|
| PROFILE_CANONICAL_FIELDS: tuple[str, ...] = ( |
| "handle", |
| "entity_type", |
| "display_name", |
| "bio", |
| "description", |
| "avatar_url", |
| "website_url", |
| "twitter_url", |
| "github_url", |
| "linkedin_url", |
| "bluesky_url", |
| "followers_count", |
| "following_count", |
| "likes_count", |
| "members_count", |
| "models_count", |
| "datasets_count", |
| "spaces_count", |
| "discussions_count", |
| "papers_count", |
| "upvotes_count", |
| "organizations", |
| "is_pro", |
| "likes_sample", |
| "activity_sample", |
| ) |
|
|
| ACTOR_CANONICAL_FIELDS: tuple[str, ...] = ( |
| "username", |
| "fullname", |
| "isPro", |
| "role", |
| "type", |
| ) |
|
|
| ACTIVITY_CANONICAL_FIELDS: tuple[str, ...] = ( |
| "event_type", |
| "repo_id", |
| "repo_type", |
| "timestamp", |
| ) |
|
|
| COLLECTION_CANONICAL_FIELDS: tuple[str, ...] = ( |
| "collection_id", |
| "slug", |
| "title", |
| "owner", |
| "owner_type", |
| "description", |
| "last_updated", |
| "item_count", |
| ) |
|
|
| |
| |
| |
| |
| _REPO_SEARCH_EXTRA_ARGS: dict[str, set[str]] = { |
| "model": { |
| "filter", |
| "apps", |
| "gated", |
| "inference", |
| "inference_provider", |
| "model_name", |
| "trained_dataset", |
| "pipeline_tag", |
| "emissions_thresholds", |
| "expand", |
| "full", |
| "cardData", |
| "card_data", |
| "fetch_config", |
| }, |
| "dataset": { |
| "filter", |
| "benchmark", |
| "dataset_name", |
| "gated", |
| "language_creators", |
| "language", |
| "multilinguality", |
| "size_categories", |
| "task_categories", |
| "task_ids", |
| "expand", |
| "full", |
| }, |
| "space": { |
| "filter", |
| "datasets", |
| "models", |
| "linked", |
| "expand", |
| "full", |
| }, |
| } |
|
|
| |
| |
| |
| _REPO_SEARCH_DEFAULT_EXPAND: dict[str, list[str]] = { |
| "model": [ |
| "author", |
| "createdAt", |
| "downloads", |
| "gated", |
| "lastModified", |
| "library_name", |
| "likes", |
| "pipeline_tag", |
| "private", |
| "sha", |
| "tags", |
| "trendingScore", |
| ], |
| "dataset": [ |
| "author", |
| "createdAt", |
| "description", |
| "downloads", |
| "gated", |
| "lastModified", |
| "likes", |
| "paperswithcode_id", |
| "private", |
| "sha", |
| "tags", |
| "trendingScore", |
| ], |
| "space": [ |
| "author", |
| "createdAt", |
| "datasets", |
| "lastModified", |
| "likes", |
| "models", |
| "private", |
| "sdk", |
| "sha", |
| "subdomain", |
| "tags", |
| "trendingScore", |
| ], |
| } |
|
|
| |
| |
| |
| |
| |
| PAGINATION_POLICY: dict[str, dict[str, Any]] = { |
| "hf_user_graph": { |
| "scan_max": GRAPH_SCAN_LIMIT_CAP, |
| "default_return": 1_000, |
| "max_return": GRAPH_SCAN_LIMIT_CAP, |
| }, |
| "hf_org_members": {"scan_max": GRAPH_SCAN_LIMIT_CAP, "default_return": 1_000}, |
| "hf_repo_likers": {"default_return": 1_000}, |
| "hf_user_likes": { |
| "scan_max": LIKES_SCAN_LIMIT_CAP, |
| "default_return": 100, |
| "ranking_default": LIKES_RANKING_WINDOW_DEFAULT, |
| "enrich_max": LIKES_ENRICHMENT_MAX_REPOS, |
| }, |
| "hf_recent_activity": { |
| "page_limit": RECENT_ACTIVITY_PAGE_SIZE, |
| "max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES, |
| "default_return": 100, |
| }, |
| "hf_repo_search": {"max_return": 5_000, "default_return": 20}, |
| "hf_trending": {"max_return": TRENDING_ENDPOINT_MAX_LIMIT, "default_return": 20}, |
| "hf_collections_search": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 20}, |
| "hf_collection_items": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 100}, |
| } |
|
|
| |
| |
| HELPER_EXTERNALS = ( |
| "hf_runtime_capabilities", |
| "hf_whoami", |
| "hf_profile_summary", |
| "hf_org_members", |
| "hf_repo_search", |
| "hf_user_graph", |
| "hf_repo_likers", |
| "hf_user_likes", |
| "hf_recent_activity", |
| "hf_repo_discussions", |
| "hf_repo_discussion_details", |
| "hf_repo_details", |
| "hf_trending", |
| "hf_collections_search", |
| "hf_collection_items", |
| ) |
|
|
| HELPER_COVERED_ENDPOINT_PATTERNS: list[tuple[str, str]] = [ |
| (r"^/api/whoami-v2$", "hf_whoami"), |
| (r"^/api/trending$", "hf_trending"), |
| (r"^/api/recent-activity$", "hf_recent_activity"), |
| (r"^/api/models$", "hf_repo_search"), |
| (r"^/api/datasets$", "hf_repo_search"), |
| (r"^/api/spaces$", "hf_repo_search"), |
| (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$", "hf_repo_details"), |
| (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", "hf_repo_discussions"), |
| (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", "hf_repo_discussion_details"), |
| (r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", "hf_repo_likers"), |
| (r"^/api/users/[^/]+/overview$", "hf_profile_summary"), |
| (r"^/api/organizations/[^/]+/overview$", "hf_profile_summary"), |
| (r"^/api/users/[^/]+/likes$", "hf_user_likes"), |
| (r"^/api/users/[^/]+/(followers|following)$", "hf_user_graph"), |
| (r"^/api/organizations/[^/]+/members$", "hf_org_members"), |
| (r"^/api/organizations/[^/]+/followers$", "hf_user_graph"), |
| (r"^/api/collections$", "hf_collections_search"), |
| (r"^/api/collections/[^/]+$", "hf_collection_items"), |
| (r"^/api/collections/[^/]+/[^/]+$", "hf_collection_items"), |
| ] |
|
|
|
|
| def _resolve_helper_functions(namespace: dict[str, Any]) -> dict[str, Callable[..., Any]]: |
| resolved: dict[str, Callable[..., Any]] = {} |
| for helper_name in HELPER_EXTERNALS: |
| candidate = namespace.get(helper_name) |
| if not callable(candidate): |
| raise RuntimeError(f"Helper '{helper_name}' is not defined or not callable") |
| resolved[helper_name] = cast(Callable[..., Any], candidate) |
| return resolved |
|
|
| ALLOWLIST_PATTERNS = [ |
| r"^/api/whoami-v2$", |
| r"^/api/trending$", |
| r"^/api/daily_papers$", |
| r"^/api/models$", |
| r"^/api/datasets$", |
| r"^/api/spaces$", |
| r"^/api/models-tags-by-type$", |
| r"^/api/datasets-tags-by-type$", |
| r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$", |
| r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", |
| r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", |
| r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$", |
| r"^/api/users/[^/]+/overview$", |
| r"^/api/users/[^/]+/socials$", |
| r"^/api/users/[^/]+/followers$", |
| r"^/api/users/[^/]+/following$", |
| r"^/api/users/[^/]+/likes$", |
| r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", |
| r"^/api/organizations/[^/]+/overview$", |
| r"^/api/organizations/[^/]+/members$", |
| r"^/api/organizations/[^/]+/followers$", |
| r"^/api/collections$", |
| r"^/api/collections/[^/]+$", |
| r"^/api/collections/[^/]+/[^/]+$", |
| r"^/api/recent-activity$", |
| ] |
|
|
| STRICT_ALLOWLIST_PATTERNS = [ |
| r"^/api/users/[^/]+/overview$", |
| r"^/api/users/[^/]+/socials$", |
| r"^/api/whoami-v2$", |
| r"^/api/trending$", |
| r"^/api/daily_papers$", |
| r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", |
| r"^/api/collections$", |
| r"^/api/collections/[^/]+$", |
| r"^/api/collections/[^/]+/[^/]+$", |
| r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", |
| r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", |
| r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$", |
| ] |
|
|
|
|
| class MontyExecutionError(RuntimeError): |
| def __init__(self, message: str, api_calls: int, trace: list[dict[str, Any]]): |
| super().__init__(message) |
| self.api_calls = api_calls |
| self.trace = trace |
|
|
|
|
| def _load_request_token() -> str | None: |
| try: |
| from fast_agent.mcp.auth.context import request_bearer_token |
|
|
| token = request_bearer_token.get() |
| if token: |
| return token |
| except Exception: |
| pass |
| return None |
|
|
|
|
| def _load_token() -> str | None: |
| token = _load_request_token() |
| if token: |
| return token |
| return os.getenv("HF_TOKEN") or None |
|
|
|
|
| def _normalize_endpoint(endpoint: str) -> str: |
| ep = (endpoint or "").strip() |
| if not ep: |
| raise ValueError("endpoint is required") |
| if "?" in ep: |
| raise ValueError("endpoint must not include query string; use params") |
| if ep.startswith("http://") or ep.startswith("https://"): |
| raise ValueError("endpoint must be path-only") |
| if not ep.startswith("/"): |
| ep = "/" + ep |
| if not ep.startswith("/api/"): |
| ep = "/api" + ep |
| if ep in {"/api/collections/search", "/api/collections/search/"}: |
| ep = "/api/collections" |
| if ".." in ep: |
| raise ValueError("path traversal not allowed") |
| return ep |
|
|
|
|
| def _endpoint_allowed(endpoint: str, strict_mode: bool) -> bool: |
| path = endpoint.split("?", 1)[0] |
| patterns = STRICT_ALLOWLIST_PATTERNS if strict_mode else ALLOWLIST_PATTERNS |
| return any(re.match(p, path) for p in patterns) |
|
|
|
|
| def _json_best_effort(raw: bytes) -> Any: |
| try: |
| return json.loads(raw) |
| except Exception: |
| return raw.decode("utf-8", errors="replace") |
|
|
|
|
| def _sanitize_params(endpoint: str, params: dict[str, Any] | None) -> dict[str, Any]: |
| clean = dict(params or {}) |
| path = endpoint.split("?", 1)[0] |
|
|
| if path == "/api/collections": |
| if "q" not in clean and "search" in clean: |
| clean["q"] = clean.get("search") |
| clean.pop("search", None) |
|
|
| if path == "/api/trending": |
| t = str(clean.get("type") or "").strip().lower() |
| aliases = {"models": "model", "datasets": "dataset", "spaces": "space"} |
| if t in aliases: |
| clean["type"] = aliases[t] |
| lim = clean.get("limit") |
| if lim is not None: |
| try: |
| n = int(lim) |
| except Exception: |
| n = TRENDING_ENDPOINT_MAX_LIMIT |
| clean["limit"] = max(1, min(n, TRENDING_ENDPOINT_MAX_LIMIT)) |
| return clean |
|
|
| lim = clean.get("limit") |
| if lim is None: |
| return clean |
| try: |
| n = int(lim) |
| except Exception: |
| return clean |
|
|
| endpoint_limit_max = SELECTIVE_ENDPOINT_RETURN_HARD_CAP |
| if re.match(r"^/api/users/[^/]+/(followers|following)$", path): |
| endpoint_limit_max = GRAPH_SCAN_LIMIT_CAP |
| elif re.match(r"^/api/users/[^/]+/likes$", path): |
| endpoint_limit_max = LIKES_SCAN_LIMIT_CAP |
|
|
| clean["limit"] = max(1, min(n, endpoint_limit_max)) |
| return clean |
|
|
|
|
| def _truncate_result_payload(output: Any) -> Any: |
| if not isinstance(output, dict): |
| return output |
|
|
| items = output.get("items") |
| if not isinstance(items, list) or len(items) <= OUTPUT_ITEMS_TRUNCATION_LIMIT: |
| return output |
|
|
| trimmed = dict(output) |
| trimmed_items = items[:OUTPUT_ITEMS_TRUNCATION_LIMIT] |
| trimmed["items"] = trimmed_items |
| trimmed["item"] = trimmed_items[0] if len(trimmed_items) == 1 else None |
| note = f"truncated items to first {OUTPUT_ITEMS_TRUNCATION_LIMIT} rows for token efficiency" |
| steps = trimmed.get("steps") |
| if isinstance(steps, list): |
| trimmed["steps"] = [*steps, note] |
| else: |
| trimmed["steps"] = [note] |
| return trimmed |
|
|
|
|
| def _is_helper_envelope(output: Any) -> bool: |
| return ( |
| isinstance(output, dict) |
| and isinstance(output.get("ok"), bool) |
| and "items" in output |
| and "meta" in output |
| and "error" in output |
| ) |
|
|
|
|
| def _summarize_limit_hit(helper_name: str, result: Any) -> dict[str, Any] | None: |
| if not _is_helper_envelope(result): |
| return None |
| meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} |
| if not isinstance(meta, dict): |
| return None |
|
|
| truncated_by = str(meta.get("truncated_by") or "") |
| limit_hit = any( |
| [ |
| meta.get("truncated") is True, |
| meta.get("hard_cap_applied") is True, |
| truncated_by in {"scan_limit", "page_limit", "multiple"}, |
| ] |
| ) |
| if not limit_hit: |
| return None |
|
|
| summary: dict[str, Any] = { |
| "helper": helper_name, |
| "source": meta.get("source"), |
| "returned": meta.get("returned"), |
| "total": meta.get("total"), |
| "truncated": meta.get("truncated"), |
| "truncated_by": meta.get("truncated_by"), |
| "more_available": meta.get("more_available"), |
| "requested_return_limit": meta.get("requested_return_limit"), |
| "applied_return_limit": meta.get("applied_return_limit"), |
| "next_request_hint": meta.get("next_request_hint"), |
| } |
| if meta.get("scan_limit") is not None: |
| summary["scan_limit"] = meta.get("scan_limit") |
| if meta.get("applied_max_pages") is not None: |
| summary["applied_max_pages"] = meta.get("applied_max_pages") |
| return summary |
|
|
|
|
| def _wrap_raw_result( |
| result: Any, |
| *, |
| ok: bool, |
| api_calls: int, |
| elapsed_ms: int, |
| limit_summaries: list[dict[str, Any]] | None = None, |
| error: str | None = None, |
| ) -> dict[str, Any]: |
| hits = [dict(summary) for summary in (limit_summaries or [])[:10]] |
| meta: dict[str, Any] = { |
| "ok": ok, |
| "api_calls": api_calls, |
| "elapsed_ms": elapsed_ms, |
| "limits_reached": bool(hits), |
| "limit_summary": hits, |
| } |
| if error is not None: |
| meta["error"] = error |
| return { |
| "result": result, |
| "meta": meta, |
| } |
|
|
|
|
| def _clamp_int(value: Any, *, default: int, minimum: int, maximum: int) -> int: |
| try: |
| out = int(value) |
| except Exception: |
| out = default |
| return max(minimum, min(out, maximum)) |
|
|
|
|
| def _as_int(value: Any) -> int | None: |
| try: |
| return int(value) |
| except Exception: |
| return None |
|
|
|
|
| def _canonical_repo_type(value: Any, *, default: str = "model") -> str: |
| raw = str(value or "").strip().lower() |
| aliases = { |
| "model": "model", |
| "models": "model", |
| "dataset": "dataset", |
| "datasets": "dataset", |
| "space": "space", |
| "spaces": "space", |
| } |
| return aliases.get(raw, default) |
|
|
|
|
| def _normalize_repo_sort_key(repo_type: str, sort_value: Any) -> tuple[str | None, str | None]: |
| raw = str(sort_value or "").strip() |
| if not raw: |
| return None, None |
|
|
| key = _SORT_KEY_ALIASES.get(raw.lower().replace(" ", "").replace("__", "_")) |
| if key is None: |
| key = _SORT_KEY_ALIASES.get(raw.lower()) |
| if key is None: |
| return None, f"Invalid sort key '{raw}'" |
|
|
| rt = _canonical_repo_type(repo_type) |
| allowed = _REPO_SORT_KEYS.get(rt, set()) |
| if key not in allowed: |
| return None, f"Invalid sort key '{raw}' for repo_type='{rt}'. Allowed: {', '.join(sorted(allowed))}" |
| return key, None |
|
|
|
|
| def _repo_detail_endpoint(repo_type: str, repo_id: str) -> str: |
| rt = _canonical_repo_type(repo_type) |
| rid = str(repo_id or "").strip() |
| if "/" not in rid: |
| raise ValueError("repo_id must be owner/name") |
| owner, name = rid.split("/", 1) |
| if not owner or not name: |
| raise ValueError("repo_id must be owner/name") |
| return f"/api/{rt}s/{owner}/{name}" |
|
|
|
|
| def _coerce_str_list(value: Any) -> list[str]: |
| if value is None: |
| return [] |
| if isinstance(value, str): |
| raw = [value] |
| elif isinstance(value, (list, tuple, set)): |
| raw = list(value) |
| else: |
| raise ValueError("Expected a string or list of strings") |
| return [str(v).strip() for v in raw if str(v).strip()] |
|
|
|
|
| def _optional_str_list(value: Any) -> list[str] | None: |
| if value is None: |
| return None |
| if isinstance(value, str): |
| out = [value.strip()] if value.strip() else [] |
| return out or None |
| if isinstance(value, (list, tuple, set)): |
| out = [str(v).strip() for v in value if str(v).strip()] |
| return out or None |
| return None |
|
|
|
|
| def _dt_to_str(value: Any) -> str | None: |
| if value is None: |
| return None |
| iso = getattr(value, "isoformat", None) |
| if callable(iso): |
| try: |
| return str(iso()) |
| except Exception: |
| pass |
| return str(value) |
|
|
|
|
| def _repo_web_url(repo_type: str, repo_id: str | None) -> str | None: |
| if not isinstance(repo_id, str) or not repo_id: |
| return None |
| base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") |
| rt = _canonical_repo_type(repo_type, default="") |
| if rt == "dataset": |
| return f"{base}/datasets/{repo_id}" |
| if rt == "space": |
| return f"{base}/spaces/{repo_id}" |
| return f"{base}/{repo_id}" |
|
|
|
|
| def _build_repo_row( |
| *, |
| repo_id: Any, |
| repo_type: str, |
| author: Any = None, |
| title: Any = None, |
| likes: Any = None, |
| downloads: Any = None, |
| created_at: Any = None, |
| last_modified: Any = None, |
| pipeline_tag: Any = None, |
| private: Any = None, |
| trending_score: Any = None, |
| tags: Any = None, |
| sha: Any = None, |
| gated: Any = None, |
| library_name: Any = None, |
| description: Any = None, |
| paperswithcode_id: Any = None, |
| sdk: Any = None, |
| models: Any = None, |
| datasets: Any = None, |
| subdomain: Any = None, |
| ) -> dict[str, Any]: |
| rt = _canonical_repo_type(repo_type) |
| author_value = author |
| if not isinstance(author_value, str) and isinstance(repo_id, str) and "/" in repo_id: |
| author_value = repo_id.split("/", 1)[0] |
|
|
| title_value = title |
| if (not isinstance(title_value, str) or not title_value.strip()) and isinstance(repo_id, str) and repo_id: |
| title_value = repo_id if rt == "space" else None |
|
|
| return { |
| "id": repo_id, |
| "slug": repo_id, |
| "repo_id": repo_id, |
| "title": title_value, |
| "repo_type": rt, |
| "author": author_value, |
| "likes": _as_int(likes), |
| "downloads": _as_int(downloads), |
| "created_at": _dt_to_str(created_at), |
| "last_modified": _dt_to_str(last_modified), |
| "pipeline_tag": pipeline_tag, |
| "private": private, |
| "trending_score": _as_int(trending_score) if trending_score is not None else None, |
| "repo_url": _repo_web_url(rt, repo_id if isinstance(repo_id, str) else None), |
| "tags": _optional_str_list(tags), |
| "sha": sha, |
| "gated": gated, |
| "library_name": library_name, |
| "description": description, |
| "paperswithcode_id": paperswithcode_id, |
| "sdk": sdk, |
| "models": _optional_str_list(models), |
| "datasets": _optional_str_list(datasets), |
| "subdomain": subdomain, |
| } |
|
|
|
|
| def _normalize_repo_search_row(row: Any, repo_type: str) -> dict[str, Any]: |
| return _build_repo_row( |
| repo_id=getattr(row, "id", None), |
| repo_type=repo_type, |
| author=getattr(row, "author", None), |
| title=getattr(row, "title", None), |
| likes=getattr(row, "likes", None), |
| downloads=getattr(row, "downloads", None), |
| created_at=getattr(row, "created_at", None), |
| last_modified=getattr(row, "last_modified", None), |
| pipeline_tag=getattr(row, "pipeline_tag", None), |
| private=getattr(row, "private", None), |
| trending_score=getattr(row, "trending_score", None), |
| tags=getattr(row, "tags", None), |
| sha=getattr(row, "sha", None), |
| gated=getattr(row, "gated", None), |
| library_name=getattr(row, "library_name", None), |
| description=getattr(row, "description", None), |
| paperswithcode_id=getattr(row, "paperswithcode_id", None), |
| sdk=getattr(row, "sdk", None), |
| models=getattr(row, "models", None), |
| datasets=getattr(row, "datasets", None), |
| subdomain=getattr(row, "subdomain", None), |
| ) |
|
|
|
|
| def _normalize_repo_detail_row(detail: Any, repo_type: str, repo_id: str) -> dict[str, Any]: |
| row = _normalize_repo_search_row(detail, repo_type) |
| resolved_repo_id = row.get("repo_id") or repo_id |
| row["id"] = row.get("id") or resolved_repo_id |
| row["slug"] = row.get("slug") or resolved_repo_id |
| row["repo_id"] = resolved_repo_id |
| row["repo_url"] = _repo_web_url(repo_type, resolved_repo_id) |
| return row |
|
|
|
|
| def _normalize_trending_row(repo: dict[str, Any], default_repo_type: str, rank: int | None = None) -> dict[str, Any]: |
| row = _build_repo_row( |
| repo_id=repo.get("id"), |
| repo_type=repo.get("type") or default_repo_type, |
| author=repo.get("author"), |
| title=repo.get("title"), |
| likes=repo.get("likes"), |
| downloads=repo.get("downloads"), |
| created_at=repo.get("createdAt"), |
| last_modified=repo.get("lastModified"), |
| pipeline_tag=repo.get("pipeline_tag"), |
| private=repo.get("private"), |
| trending_score=repo.get("trendingScore"), |
| tags=repo.get("tags"), |
| sha=repo.get("sha"), |
| gated=repo.get("gated"), |
| library_name=repo.get("library_name"), |
| description=repo.get("description"), |
| paperswithcode_id=repo.get("paperswithcode_id"), |
| sdk=repo.get("sdk"), |
| models=repo.get("models"), |
| datasets=repo.get("datasets"), |
| subdomain=repo.get("subdomain"), |
| ) |
| if rank is not None: |
| row["trending_rank"] = rank |
| return row |
|
|
|
|
| def _normalize_collection_repo_item(row: dict[str, Any]) -> dict[str, Any] | None: |
| repo_id = row.get("id") or row.get("repoId") or row.get("repo_id") |
| if not isinstance(repo_id, str) or not repo_id: |
| return None |
|
|
| repo_type = _canonical_repo_type(row.get("repoType") or row.get("repo_type") or row.get("type"), default="") |
| if repo_type not in {"model", "dataset", "space"}: |
| return None |
|
|
| return _build_repo_row( |
| repo_id=repo_id, |
| repo_type=repo_type, |
| author=row.get("author") or _author_from_any(row.get("authorData")), |
| title=row.get("title"), |
| likes=row.get("likes"), |
| downloads=row.get("downloads"), |
| created_at=row.get("createdAt") or row.get("created_at"), |
| last_modified=row.get("lastModified") or row.get("last_modified"), |
| pipeline_tag=row.get("pipeline_tag") or row.get("pipelineTag"), |
| private=row.get("private"), |
| tags=row.get("tags"), |
| gated=row.get("gated"), |
| library_name=row.get("library_name") or row.get("libraryName"), |
| description=row.get("description"), |
| paperswithcode_id=row.get("paperswithcode_id") or row.get("paperswithcodeId"), |
| sdk=row.get("sdk"), |
| models=row.get("models"), |
| datasets=row.get("datasets"), |
| subdomain=row.get("subdomain"), |
| ) |
|
|
|
|
| def _sort_repo_rows(rows: list[dict[str, Any]], sort_key: str | None) -> list[dict[str, Any]]: |
| if not sort_key: |
| return rows |
|
|
| if sort_key in {"likes", "downloads", "trending_score"}: |
| return sorted(rows, key=lambda row: _as_int(row.get(sort_key)) or -1, reverse=True) |
|
|
| if sort_key in {"created_at", "last_modified"}: |
| return sorted(rows, key=lambda row: str(row.get(sort_key) or ""), reverse=True) |
|
|
| return rows |
|
|
|
|
| def call_api_host( |
| endpoint: str, |
| *, |
| method: str = "GET", |
| params: dict[str, Any] | None = None, |
| json_body: dict[str, Any] | None = None, |
| timeout_sec: int = DEFAULT_TIMEOUT_SEC, |
| strict_mode: bool = False, |
| ) -> dict[str, Any]: |
| method_u = method.upper().strip() |
| if method_u not in {"GET", "POST"}: |
| raise ValueError("Only GET and POST are supported") |
|
|
| ep = _normalize_endpoint(endpoint) |
| if not _endpoint_allowed(ep, strict_mode): |
| raise ValueError(f"Endpoint not allowed: {ep}") |
|
|
| params = _sanitize_params(ep, params) |
| if ep == "/api/recent-activity": |
| feed_type = str((params or {}).get("feedType", "")).strip().lower() |
| if feed_type not in {"user", "org"}: |
| raise ValueError("/api/recent-activity requires feedType=user|org") |
| if not str((params or {}).get("entity", "")).strip(): |
| raise ValueError("/api/recent-activity requires entity") |
|
|
| base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") |
| q = urlencode(params or {}, doseq=True) |
| url = f"{base}{ep}" + (f"?{q}" if q else "") |
|
|
| headers = {"Accept": "application/json"} |
| token = _load_token() |
| if token: |
| headers["Authorization"] = f"Bearer {token}" |
|
|
| data = None |
| if method_u == "POST": |
| headers["Content-Type"] = "application/json" |
| data = json.dumps(json_body or {}).encode("utf-8") |
|
|
| req = Request(url, method=method_u, headers=headers, data=data) |
| try: |
| with urlopen(req, timeout=timeout_sec) as res: |
| payload = _json_best_effort(res.read()) |
| return {"ok": True, "status": int(res.status), "url": url, "data": payload, "error": None} |
| except HTTPError as e: |
| payload = _json_best_effort(e.read()) |
| err = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False)[:1000] |
| return {"ok": False, "status": int(e.code), "url": url, "data": payload, "error": err} |
| except URLError as e: |
| return {"ok": False, "status": 0, "url": url, "data": None, "error": f"Network error: {e}"} |
|
|
|
|
| def _validate_generated_code(code: str) -> None: |
| if not code.strip(): |
| raise ValueError("Generated code is empty") |
|
|
| blocked_patterns: list[tuple[str, str]] = [ |
| (r"(?m)^\s*import\s+\S", "import statement"), |
| (r"(?m)^\s*from\s+\S+\s+import\s+\S", "from-import statement"), |
| (r"\bexec\s*\(", "exec("), |
| (r"\beval\s*\(", "eval("), |
| (r"\bopen\s*\(", "open("), |
| (r"\b__import__\b", "__import__"), |
| (r"(?i)\bwhile\s+true\b", "while true"), |
| ] |
| for pattern, label in blocked_patterns: |
| if re.search(pattern, code): |
| raise ValueError(f"Generated code contains blocked pattern: {label}") |
|
|
| try: |
| parsed = compile( |
| code, |
| "<generated-monty-code>", |
| "exec", |
| flags=ast.PyCF_ONLY_AST | ast.PyCF_ALLOW_TOP_LEVEL_AWAIT, |
| dont_inherit=True, |
| ) |
| except SyntaxError as e: |
| message = e.msg or "invalid syntax" |
| raise ValueError(f"Generated code is not valid Python: {message}") from e |
|
|
| if not isinstance(parsed, ast.Module): |
| raise ValueError("Generated code must be a Python module") |
|
|
| solve_defs = [ |
| node |
| for node in parsed.body |
| if isinstance(node, ast.AsyncFunctionDef) and node.name == "solve" |
| ] |
| if not solve_defs: |
| raise ValueError("Generated code must define `async def solve(query, max_calls): ...`.") |
|
|
| def _valid_solve_signature(node: ast.AsyncFunctionDef) -> bool: |
| args = node.args |
| return ( |
| not args.posonlyargs |
| and len(args.args) == 2 |
| and [arg.arg for arg in args.args] == ["query", "max_calls"] |
| and args.vararg is None |
| and not args.kwonlyargs |
| and args.kwarg is None |
| and not args.defaults |
| and not args.kw_defaults |
| ) |
|
|
| if not any(_valid_solve_signature(node) for node in solve_defs): |
| raise ValueError("`solve` must have signature `async def solve(query, max_calls): ...`.") |
|
|
| if not parsed.body: |
| raise ValueError("Generated code is empty") |
|
|
| final_stmt = parsed.body[-1] |
| valid_final_await = ( |
| isinstance(final_stmt, ast.Expr) |
| and isinstance(final_stmt.value, ast.Await) |
| and isinstance(final_stmt.value.value, ast.Call) |
| and isinstance(final_stmt.value.value.func, ast.Name) |
| and final_stmt.value.value.func.id == "solve" |
| and len(final_stmt.value.value.args) == 2 |
| and not final_stmt.value.value.keywords |
| and all(isinstance(arg, ast.Name) for arg in final_stmt.value.value.args) |
| and [cast(ast.Name, arg).id for arg in final_stmt.value.value.args] == ["query", "max_calls"] |
| ) |
| if not valid_final_await: |
| raise ValueError("Generated code must end with `await solve(query, max_calls)`.") |
|
|
| def _preferred_helper_for_endpoint(endpoint: str) -> str | None: |
| for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS: |
| if re.match(pattern, endpoint): |
| return helper_name |
| return None |
|
|
| def _call_api_endpoint_hint(expr: ast.AST | None) -> str | None: |
| if isinstance(expr, ast.Constant) and isinstance(expr.value, str): |
| return expr.value |
| if isinstance(expr, ast.JoinedStr): |
| literal_parts = [ |
| value.value |
| for value in expr.values |
| if isinstance(value, ast.Constant) and isinstance(value.value, str) |
| ] |
| if literal_parts: |
| return "".join(literal_parts) |
| return None |
|
|
| for node in ast.walk(parsed): |
| if not isinstance(node, ast.Call): |
| continue |
| if not isinstance(node.func, ast.Name) or node.func.id != "call_api": |
| continue |
|
|
| endpoint_expr: ast.AST | None = node.args[0] if node.args else None |
| for keyword in node.keywords: |
| if keyword.arg == "endpoint": |
| endpoint_expr = keyword.value |
| break |
|
|
| endpoint_hint = _call_api_endpoint_hint(endpoint_expr) |
| if endpoint_hint and "/api/collections/" in endpoint_hint and "/items" in endpoint_hint: |
| raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.") |
| if endpoint_hint: |
| preferred_helper = _preferred_helper_for_endpoint(endpoint_hint) |
| if preferred_helper is not None: |
| raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_hint!r}, ...)` for this endpoint family.") |
|
|
| allowed_external_calls = ["call_api(", *[f"{name}(" for name in HELPER_EXTERNALS]] |
| if not any(token in code for token in allowed_external_calls): |
| raise ValueError("Generated code must call at least one external API function (call_api or hf_* helper)") |
|
|
| helper_name_set = set(HELPER_EXTERNALS) |
| for m in re.finditer(r"call_api\(\s*([\"'])\s*([^\"']+)\s*\1", code): |
| endpoint_literal = str(m.group(2) or "").strip() |
| if not endpoint_literal: |
| continue |
| if ( |
| endpoint_literal in helper_name_set |
| or endpoint_literal.startswith("hf_") |
| or endpoint_literal.startswith("/hf_") |
| or endpoint_literal.startswith("/api/hf_") |
| ): |
| raise ValueError("Do not call helper names through call_api; call hf_* helpers directly.") |
| if re.match(r"^/api/collections/(?:[^/]+/)?[^/]+/items$", endpoint_literal): |
| raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.") |
| preferred_helper = _preferred_helper_for_endpoint(endpoint_literal) |
| if preferred_helper is not None: |
| raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_literal!r}, ...)` for this endpoint family.") |
| if not endpoint_literal.startswith("/api/"): |
| raise ValueError("call_api endpoint must be a raw path starting with '/api/...'.") |
| async def _run_with_monty( |
| *, |
| code: str, |
| query: str, |
| max_calls: int, |
| strict_mode: bool, |
| timeout_sec: int, |
| ) -> dict[str, Any]: |
| try: |
| import pydantic_monty |
| except Exception as e: |
| raise RuntimeError("pydantic_monty is not installed. Install with `uv pip install pydantic-monty`.") from e |
|
|
| max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) |
| call_count = {"n": 0} |
| trace: list[dict[str, Any]] = [] |
| limit_summaries: list[dict[str, Any]] = [] |
| latest_helper_error: dict[str, Any] | None = None |
| internal_helper_used = {"used": False} |
| def _budget_remaining() -> int: |
| return max(0, max_calls - call_count["n"]) |
|
|
| def _policy_int(helper_name: str, key: str, default: int) -> int: |
| cfg = PAGINATION_POLICY.get(helper_name) or {} |
| try: |
| return int(cfg.get(key, default)) |
| except Exception: |
| return int(default) |
|
|
| def _consume_call(endpoint: str, method: str = "GET") -> int: |
| if call_count["n"] >= max_calls: |
| raise RuntimeError(f"Max API calls exceeded ({max_calls})") |
| call_count["n"] += 1 |
| return call_count["n"] |
|
|
| def _trace_ok(idx: int, endpoint: str, method: str = "GET", status: int = 200) -> None: |
| trace.append( |
| { |
| "call_index": idx, |
| "depth": idx, |
| "method": method, |
| "endpoint": endpoint, |
| "ok": True, |
| "status": status, |
| } |
| ) |
|
|
| def _trace_err(idx: int, endpoint: str, err: Any, method: str = "GET", status: int = 0) -> None: |
| trace.append( |
| { |
| "call_index": idx, |
| "depth": idx, |
| "method": method, |
| "endpoint": endpoint, |
| "ok": False, |
| "status": status, |
| "error": str(err), |
| } |
| ) |
|
|
| def _host_raw_call( |
| endpoint: str, |
| *, |
| params: dict[str, Any] | None = None, |
| method: str = "GET", |
| json_body: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| idx = _consume_call(endpoint, method) |
| try: |
| resp = call_api_host( |
| endpoint, |
| method=method, |
| params=params, |
| json_body=json_body, |
| timeout_sec=timeout_sec, |
| strict_mode=strict_mode, |
| ) |
| if resp.get("ok"): |
| _trace_ok(idx, endpoint, method=method, status=int(resp.get("status") or 200)) |
| else: |
| _trace_err(idx, endpoint, resp.get("error"), method=method, status=int(resp.get("status") or 0)) |
| return resp |
| except Exception as e: |
| _trace_err(idx, endpoint, e, method=method, status=0) |
| raise |
|
|
| hf_api_client: HfApi | None = None |
|
|
| def _get_hf_api_client() -> HfApi: |
| nonlocal hf_api_client |
| if hf_api_client is None: |
| endpoint = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") |
| hf_api_client = HfApi(endpoint=endpoint, token=_load_token()) |
| return hf_api_client |
|
|
| def _host_hf_call(endpoint: str, fn: Callable[[], Any]) -> Any: |
| idx = _consume_call(endpoint, "GET") |
| try: |
| out = fn() |
| _trace_ok(idx, endpoint, method="GET", status=200) |
| return out |
| except Exception as e: |
| _trace_err(idx, endpoint, e, method="GET", status=0) |
| raise |
|
|
| def _helper_meta(start_calls: int, *, source: str, **extra: Any) -> dict[str, Any]: |
| out = { |
| "source": source, |
| "normalized": True, |
| "budget_used": max(0, call_count["n"] - start_calls), |
| "budget_remaining": _budget_remaining(), |
| } |
| out.update(extra) |
| return out |
|
|
| def _derive_limit_metadata( |
| *, |
| requested_return_limit: int | None, |
| applied_return_limit: int, |
| default_limit_used: bool, |
| requested_scan_limit: int | None = None, |
| applied_scan_limit: int | None = None, |
| requested_max_pages: int | None = None, |
| applied_max_pages: int | None = None, |
| ) -> dict[str, Any]: |
| meta: dict[str, Any] = { |
| "requested_return_limit": requested_return_limit, |
| "applied_return_limit": applied_return_limit, |
| "default_limit_used": default_limit_used, |
| } |
| if requested_scan_limit is not None or applied_scan_limit is not None: |
| meta["requested_scan_limit"] = requested_scan_limit |
| meta["scan_limit"] = applied_scan_limit |
| meta["scan_limit_applied"] = requested_scan_limit != applied_scan_limit |
| if requested_max_pages is not None or applied_max_pages is not None: |
| meta["requested_max_pages"] = requested_max_pages |
| meta["applied_max_pages"] = applied_max_pages |
| meta["page_limit_applied"] = requested_max_pages != applied_max_pages |
| if requested_return_limit is not None: |
| meta["hard_cap_applied"] = applied_return_limit < requested_return_limit |
| return meta |
|
|
| def _derive_more_available(*, sample_complete: bool, exact_count: bool, returned: int, total: int | None) -> bool | str: |
| if sample_complete: |
| return False |
| if exact_count and total is not None and returned < total: |
| return True |
| return "unknown" |
|
|
| def _derive_truncated_by( |
| *, |
| hard_cap: bool = False, |
| scan_limit_hit: bool = False, |
| page_limit_hit: bool = False, |
| return_limit_hit: bool = False, |
| ) -> str: |
| causes = [hard_cap, scan_limit_hit, page_limit_hit, return_limit_hit] |
| if sum(1 for cause in causes if cause) > 1: |
| return "multiple" |
| if hard_cap: |
| return "hard_cap" |
| if scan_limit_hit: |
| return "scan_limit" |
| if page_limit_hit: |
| return "page_limit" |
| if return_limit_hit: |
| return "return_limit" |
| return "none" |
|
|
| def _derive_can_request_more(*, sample_complete: bool, truncated_by: str) -> bool: |
| if sample_complete: |
| return False |
| return truncated_by in {"return_limit", "scan_limit", "page_limit", "multiple"} |
|
|
| def _derive_next_request_hint(*, truncated_by: str, more_available: bool | str, applied_return_limit: int, applied_scan_limit: int | None = None, applied_max_pages: int | None = None) -> str: |
| if truncated_by == "return_limit": |
| return f"Ask for return_limit>{applied_return_limit} to see more rows" |
| if truncated_by == "scan_limit" and applied_scan_limit is not None: |
| return f"Increase scan_limit above {applied_scan_limit} for broader coverage" |
| if truncated_by == "page_limit" and applied_max_pages is not None: |
| return f"Increase max_pages above {applied_max_pages} to continue paging" |
| if truncated_by == "hard_cap": |
| return "No more rows can be returned in a single call because a hard cap was applied" |
| if truncated_by == "multiple": |
| return "Increase the relevant return/page/scan bounds to improve coverage" |
| if more_available is False: |
| return "No more results available" |
| if more_available == "unknown": |
| return "More results may exist; narrow filters or raise scan/page bounds for better coverage" |
| return "Ask for a larger limit to see more rows" |
|
|
| def _resolve_exhaustive_limits( |
| *, |
| return_limit: int | None, |
| count_only: bool, |
| default_return: int, |
| max_return: int, |
| scan_limit: int | None = None, |
| scan_cap: int | None = None, |
| ) -> dict[str, Any]: |
| requested_return_limit = None if count_only else return_limit |
| effective_requested_return_limit = 0 if count_only else requested_return_limit |
| out: dict[str, Any] = { |
| "requested_return_limit": requested_return_limit, |
| "applied_return_limit": _clamp_int( |
| effective_requested_return_limit, |
| default=default_return, |
| minimum=0, |
| maximum=max_return, |
| ), |
| "default_limit_used": requested_return_limit is None and not count_only, |
| } |
| out["hard_cap_applied"] = ( |
| requested_return_limit is not None and out["applied_return_limit"] < requested_return_limit |
| ) |
| if scan_cap is not None: |
| out["requested_scan_limit"] = scan_limit |
| out["applied_scan_limit"] = _clamp_int( |
| scan_limit, |
| default=scan_cap, |
| minimum=1, |
| maximum=scan_cap, |
| ) |
| return out |
|
|
| def _build_exhaustive_meta( |
| *, |
| base_meta: dict[str, Any], |
| limit_plan: dict[str, Any], |
| sample_complete: bool, |
| exact_count: bool, |
| truncated_by: str, |
| more_available: bool | str, |
| requested_max_pages: int | None = None, |
| applied_max_pages: int | None = None, |
| ) -> dict[str, Any]: |
| meta = dict(base_meta) |
| applied_return_limit = int(limit_plan["applied_return_limit"]) |
| applied_scan_limit = limit_plan.get("applied_scan_limit") |
| meta.update( |
| { |
| "complete": sample_complete, |
| "exact_count": exact_count, |
| "sample_complete": sample_complete, |
| "more_available": more_available, |
| "can_request_more": _derive_can_request_more( |
| sample_complete=sample_complete, |
| truncated_by=truncated_by, |
| ), |
| "truncated_by": truncated_by, |
| "next_request_hint": _derive_next_request_hint( |
| truncated_by=truncated_by, |
| more_available=more_available, |
| applied_return_limit=applied_return_limit, |
| applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None, |
| applied_max_pages=applied_max_pages, |
| ), |
| } |
| ) |
| meta.update( |
| _derive_limit_metadata( |
| requested_return_limit=limit_plan["requested_return_limit"], |
| applied_return_limit=applied_return_limit, |
| default_limit_used=bool(limit_plan["default_limit_used"]), |
| requested_scan_limit=limit_plan.get("requested_scan_limit"), |
| applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None, |
| requested_max_pages=requested_max_pages, |
| applied_max_pages=applied_max_pages, |
| ) |
| ) |
| return meta |
|
|
| def _overview_count_only_success( |
| *, |
| start_calls: int, |
| source: str, |
| total: int, |
| limit_plan: dict[str, Any], |
| base_meta: dict[str, Any], |
| ) -> dict[str, Any]: |
| sample_complete = True |
| more_available = False |
| truncated_by = "none" |
| meta = _build_exhaustive_meta( |
| base_meta={ |
| **base_meta, |
| "matched": total, |
| "returned": 0, |
| "total": total, |
| "total_available": total, |
| "total_matched": total, |
| "truncated": False, |
| }, |
| limit_plan=limit_plan, |
| sample_complete=sample_complete, |
| exact_count=True, |
| truncated_by=truncated_by, |
| more_available=more_available, |
| ) |
| return _helper_success( |
| start_calls=start_calls, |
| source=source, |
| items=[], |
| meta=meta, |
| ) |
|
|
| def _build_exhaustive_result_meta( |
| *, |
| base_meta: dict[str, Any], |
| limit_plan: dict[str, Any], |
| matched_count: int, |
| returned_count: int, |
| exact_count: bool, |
| count_only: bool = False, |
| sample_complete: bool | None = None, |
| more_available: bool | str | None = None, |
| scan_limit_hit: bool = False, |
| page_limit_hit: bool = False, |
| truncated_extra: bool = False, |
| requested_max_pages: int | None = None, |
| applied_max_pages: int | None = None, |
| ) -> dict[str, Any]: |
| applied_return_limit = int(limit_plan["applied_return_limit"]) |
| if count_only: |
| effective_sample_complete = exact_count |
| else: |
| effective_sample_complete = ( |
| sample_complete |
| if isinstance(sample_complete, bool) |
| else exact_count and matched_count <= applied_return_limit |
| ) |
| return_limit_hit = False if count_only else (applied_return_limit > 0 and matched_count > applied_return_limit) |
| truncated_by = _derive_truncated_by( |
| hard_cap=bool(limit_plan.get("hard_cap_applied")), |
| scan_limit_hit=scan_limit_hit, |
| page_limit_hit=page_limit_hit, |
| return_limit_hit=return_limit_hit, |
| ) |
| truncated = truncated_by != "none" or truncated_extra |
| total_value = _as_int(base_meta.get("total")) |
| effective_more_available = more_available |
| if count_only and exact_count: |
| effective_more_available = False |
| if effective_more_available is None: |
| effective_more_available = _derive_more_available( |
| sample_complete=effective_sample_complete, |
| exact_count=exact_count, |
| returned=returned_count, |
| total=total_value, |
| ) |
|
|
| return _build_exhaustive_meta( |
| base_meta={ |
| **base_meta, |
| "matched": matched_count, |
| "returned": returned_count, |
| "truncated": truncated, |
| }, |
| limit_plan=limit_plan, |
| sample_complete=effective_sample_complete, |
| exact_count=exact_count, |
| truncated_by=truncated_by, |
| more_available=effective_more_available, |
| requested_max_pages=requested_max_pages, |
| applied_max_pages=applied_max_pages, |
| ) |
|
|
| def _helper_success( |
| *, |
| start_calls: int, |
| source: str, |
| items: list[dict[str, Any]], |
| cursor: str | None = None, |
| meta: dict[str, Any] | None = None, |
| **extra_meta: Any, |
| ) -> dict[str, Any]: |
| merged_meta = dict(meta or {}) |
| merged_meta.update(extra_meta) |
| if cursor is not None: |
| merged_meta["cursor"] = cursor |
|
|
| return { |
| "ok": True, |
| "item": items[0] if len(items) == 1 else None, |
| "items": items, |
| "meta": _helper_meta(start_calls, source=source, **merged_meta), |
| "error": None, |
| } |
|
|
| def _helper_error(*, start_calls: int, source: str, error: Any, **meta: Any) -> dict[str, Any]: |
| nonlocal latest_helper_error |
| envelope = { |
| "ok": False, |
| "item": None, |
| "items": [], |
| "meta": _helper_meta(start_calls, source=source, **meta), |
| "error": str(error), |
| } |
| latest_helper_error = envelope |
| return envelope |
|
|
| def _project_items( |
| items: list[dict[str, Any]], |
| fields: list[str] | None, |
| aliases: dict[str, str] | None = None, |
| ) -> list[dict[str, Any]]: |
| if not isinstance(fields, list) or not fields: |
| return items |
| wanted = [str(f).strip() for f in fields if str(f).strip()] |
| if not wanted: |
| return items |
| alias_map = {str(k).strip().lower(): str(v).strip() for k, v in (aliases or {}).items() if str(k).strip() and str(v).strip()} |
| projected: list[dict[str, Any]] = [] |
| for row in items: |
| out: dict[str, Any] = {} |
| for key in wanted: |
| source_key = alias_map.get(key.lower(), key) |
| value = row.get(source_key) |
| if value is None: |
| continue |
| out[key] = value |
| projected.append(out) |
| return projected |
|
|
| def _project_repo_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: |
| return _project_items(items, fields, aliases=_REPO_FIELD_ALIASES) |
|
|
| def _project_collection_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: |
| return _project_items(items, fields, aliases=_COLLECTION_FIELD_ALIASES) |
|
|
| def _project_user_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: |
| return _project_items(items, fields, aliases=_USER_FIELD_ALIASES) |
|
|
| def _project_actor_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: |
| return _project_items(items, fields, aliases=_ACTOR_FIELD_ALIASES) |
|
|
| def _item_matches_where(item: dict[str, Any], where: dict[str, Any] | None) -> bool: |
| if not isinstance(where, dict) or not where: |
| return True |
| for key, cond in where.items(): |
| val = item.get(str(key)) |
| if isinstance(cond, dict): |
| if "eq" in cond and val != cond.get("eq"): |
| return False |
| if "in" in cond: |
| allowed = cond.get("in") |
| if isinstance(allowed, (list, tuple, set)) and val not in allowed: |
| return False |
| if "contains" in cond: |
| needle = cond.get("contains") |
| if not isinstance(val, str) or not isinstance(needle, str) or needle not in val: |
| return False |
| if "icontains" in cond: |
| needle = cond.get("icontains") |
| if not isinstance(val, str) or not isinstance(needle, str) or needle.lower() not in val.lower(): |
| return False |
| if "gte" in cond: |
| v = _as_int(val) |
| c = _as_int(cond.get("gte")) |
| if v is None or c is None or v < c: |
| return False |
| if "lte" in cond: |
| v = _as_int(val) |
| c = _as_int(cond.get("lte")) |
| if v is None or c is None or v > c: |
| return False |
| continue |
|
|
| if isinstance(cond, (list, tuple, set)): |
| if val not in cond: |
| return False |
| continue |
|
|
| if val != cond: |
| return False |
| return True |
|
|
| def _apply_where(items: list[dict[str, Any]], where: dict[str, Any] | None) -> list[dict[str, Any]]: |
| if not isinstance(where, dict) or not where: |
| return items |
| return [row for row in items if _item_matches_where(row, where)] |
|
|
| def _helper_item(resp: dict[str, Any]) -> dict[str, Any] | None: |
| item = resp.get("item") |
| if isinstance(item, dict): |
| return item |
| items = resp.get("items") |
| if isinstance(items, list) and items and isinstance(items[0], dict): |
| return items[0] |
| return None |
|
|
| def _overview_count(item: dict[str, Any] | None, key: str) -> int | None: |
| if not isinstance(item, dict): |
| return None |
| return _as_int(item.get(key)) |
|
|
| def _summary_section( |
| resp: dict[str, Any], |
| *, |
| count: int | None = None, |
| default_sample: list[dict[str, Any]] | None = None, |
| ) -> dict[str, Any]: |
| meta = resp.get("meta") |
| section_meta = dict(meta) if isinstance(meta, dict) else {} |
| sample = resp.get("items") |
| section_sample = sample if isinstance(sample, list) else list(default_sample or []) |
| section_count = count |
| if section_count is None: |
| count_exact = section_meta.get("exact_count") is True or section_meta.get("count_source") in {"overview", "endpoint"} |
| if count_exact: |
| for key in ("total", "total_matched", "matched"): |
| section_count = _as_int(section_meta.get(key)) |
| if section_count is not None: |
| break |
| if resp.get("ok") is not True: |
| section_meta["error"] = str(resp.get("error") or "section fetch failed") |
| section_sample = list(default_sample or []) |
| return {"count": section_count, "sample": section_sample, "meta": section_meta} |
|
|
| async def _resolve_username_or_current(username: str | None) -> tuple[str | None, str | None]: |
| u = str(username or "").strip() |
| if u: |
| return u, None |
|
|
| whoami = await hf_whoami() |
| if whoami.get("ok") is not True: |
| return None, str(whoami.get("error") or "Could not resolve current authenticated user") |
|
|
| item = _helper_item(whoami) |
| resolved = item.get("username") if isinstance(item, dict) else None |
| if not isinstance(resolved, str) or not resolved.strip(): |
| return None, "username was not provided and current authenticated user could not be resolved" |
| return resolved.strip(), None |
|
|
| def _normalize_user_likes_sort(sort: str | None) -> tuple[str | None, str | None]: |
| raw = str(sort or "likedAt").strip() |
| alias_map = { |
| "": "likedAt", |
| "likedat": "likedAt", |
| "liked_at": "likedAt", |
| "liked-at": "likedAt", |
| "recency": "likedAt", |
| "repolikes": "repoLikes", |
| "repo_likes": "repoLikes", |
| "repo-likes": "repoLikes", |
| "repodownloads": "repoDownloads", |
| "repo_downloads": "repoDownloads", |
| "repo-downloads": "repoDownloads", |
| } |
| normalized = alias_map.get(raw.lower(), raw) |
| if normalized not in {"likedAt", "repoLikes", "repoDownloads"}: |
| return None, "sort must be one of likedAt, repoLikes, repoDownloads" |
| return normalized, None |
|
|
| def _author_from_any(value: Any) -> str | None: |
| if isinstance(value, str): |
| return value |
| if isinstance(value, dict): |
| for k in ("name", "username", "user", "login"): |
| v = value.get(k) |
| if isinstance(v, str) and v: |
| return v |
| return None |
|
|
| def _clean_social_handle(value: Any) -> str | None: |
| if not isinstance(value, str): |
| return None |
| cleaned = value.strip() |
| if not cleaned: |
| return None |
| if re.match(r"^https?://", cleaned, flags=re.IGNORECASE): |
| return cleaned |
| return cleaned.lstrip("@") |
|
|
| def _social_url(kind: str, value: Any) -> str | None: |
| cleaned = _clean_social_handle(value) |
| if cleaned is None: |
| return None |
| if re.match(r"^https?://", cleaned, flags=re.IGNORECASE): |
| return cleaned |
| if kind == "twitter": |
| return f"https://twitter.com/{cleaned}" |
| if kind == "github": |
| return f"https://github.com/{cleaned}" |
| if kind == "linkedin": |
| if cleaned.startswith(("in/", "company/")): |
| return f"https://www.linkedin.com/{cleaned}" |
| return f"https://www.linkedin.com/in/{cleaned}" |
| if kind == "bluesky": |
| return f"https://bsky.app/profile/{cleaned}" |
| return cleaned |
|
|
| async def call_api( |
| endpoint: str, |
| params: dict[str, Any] | None = None, |
| method: str = "GET", |
| json_body: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| return _host_raw_call(endpoint, params=params, method=method, json_body=json_body) |
|
|
| async def hf_whoami() -> dict[str, Any]: |
| start_calls = call_count["n"] |
| endpoint = "/api/whoami-v2" |
| token = _load_token() |
| if token is None: |
| return _helper_error( |
| start_calls=start_calls, |
| source=endpoint, |
| error=( |
| "Current authenticated user is unavailable for this request. " |
| "No request-scoped or fallback HF token was found." |
| ), |
| ) |
| try: |
| payload = _host_hf_call( |
| endpoint, |
| lambda: _get_hf_api_client().whoami(token=token, cache=True), |
| ) |
| except Exception as e: |
| return _helper_error(start_calls=start_calls, source=endpoint, error=e) |
|
|
| username = payload.get("name") or payload.get("user") or payload.get("username") |
| item = {"username": username, "fullname": payload.get("fullname"), "isPro": payload.get("isPro")} |
| items = [item] if isinstance(username, str) and username else [] |
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=items, |
| scanned=1, |
| matched=len(items), |
| returned=len(items), |
| truncated=False, |
| ) |
|
|
| async def hf_user_overview(username: str) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| u = str(username or "").strip() |
| if not u: |
| return _helper_error(start_calls=start_calls, source="/api/users/<u>/overview", error="username is required") |
| endpoint = f"/api/users/{u}/overview" |
| try: |
| obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_user_overview(u)) |
| except Exception as e: |
| return _helper_error(start_calls=start_calls, source=endpoint, error=e) |
|
|
| twitter = getattr(obj, "twitter", None) or getattr(obj, "twitterUsername", None) |
| github = getattr(obj, "github", None) or getattr(obj, "githubUsername", None) |
| linkedin = getattr(obj, "linkedin", None) or getattr(obj, "linkedinUsername", None) |
| bluesky = getattr(obj, "bluesky", None) or getattr(obj, "blueskyUsername", None) |
|
|
| if _budget_remaining() > 0 and any(v in {None, ""} for v in [twitter, github, linkedin, bluesky]): |
| socials_ep = f"/api/users/{u}/socials" |
| socials_resp = _host_raw_call(socials_ep) |
| if socials_resp.get("ok"): |
| socials_payload = socials_resp.get("data") if isinstance(socials_resp.get("data"), dict) else {} |
| handles = socials_payload.get("socialHandles") if isinstance(socials_payload.get("socialHandles"), dict) else {} |
| twitter = twitter or handles.get("twitter") |
| github = github or handles.get("github") |
| linkedin = linkedin or handles.get("linkedin") |
| bluesky = bluesky or handles.get("bluesky") |
|
|
| orgs_raw = getattr(obj, "orgs", None) |
| org_names: list[str] | None = None |
| if isinstance(orgs_raw, (list, tuple, set)): |
| names = [] |
| for org in orgs_raw: |
| if isinstance(org, str) and org.strip(): |
| names.append(org.strip()) |
| continue |
| name = getattr(org, "name", None) |
| if isinstance(name, str) and name.strip(): |
| names.append(name.strip()) |
| org_names = names or None |
|
|
| twitter_handle = _clean_social_handle(twitter) |
| github_handle = _clean_social_handle(github) |
| linkedin_handle = _clean_social_handle(linkedin) |
| bluesky_handle = _clean_social_handle(bluesky) |
|
|
| item = { |
| "username": obj.username or u, |
| "fullname": obj.fullname, |
| "bio": getattr(obj, "details", None), |
| "avatarUrl": obj.avatar_url, |
| "websiteUrl": getattr(obj, "websiteUrl", None), |
| "twitter": _social_url("twitter", twitter_handle), |
| "github": _social_url("github", github_handle), |
| "linkedin": _social_url("linkedin", linkedin_handle), |
| "bluesky": _social_url("bluesky", bluesky_handle), |
| "twitterHandle": twitter_handle, |
| "githubHandle": github_handle, |
| "linkedinHandle": linkedin_handle, |
| "blueskyHandle": bluesky_handle, |
| "followers": _as_int(obj.num_followers), |
| "following": _as_int(obj.num_following), |
| "likes": _as_int(obj.num_likes), |
| "models": _as_int(getattr(obj, "num_models", None)), |
| "datasets": _as_int(getattr(obj, "num_datasets", None)), |
| "spaces": _as_int(getattr(obj, "num_spaces", None)), |
| "discussions": _as_int(getattr(obj, "num_discussions", None)), |
| "papers": _as_int(getattr(obj, "num_papers", None)), |
| "upvotes": _as_int(getattr(obj, "num_upvotes", None)), |
| "orgs": org_names, |
| "isPro": obj.is_pro, |
| } |
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=[item], |
| scanned=1, |
| matched=1, |
| returned=1, |
| truncated=False, |
| ) |
|
|
| async def hf_org_overview(organization: str) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| org = str(organization or "").strip() |
| if not org: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/organizations/<o>/overview", |
| error="organization is required", |
| ) |
| endpoint = f"/api/organizations/{org}/overview" |
| try: |
| obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_organization_overview(org)) |
| except Exception as e: |
| return _helper_error(start_calls=start_calls, source=endpoint, error=e) |
|
|
| item = { |
| "organization": obj.name or org, |
| "displayName": obj.fullname, |
| "avatarUrl": obj.avatar_url, |
| "description": obj.details, |
| "websiteUrl": getattr(obj, "websiteUrl", None), |
| "followers": _as_int(obj.num_followers), |
| "members": _as_int(obj.num_users), |
| "models": _as_int(getattr(obj, "num_models", None)), |
| "datasets": _as_int(getattr(obj, "num_datasets", None)), |
| "spaces": _as_int(getattr(obj, "num_spaces", None)), |
| } |
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=[item], |
| scanned=1, |
| matched=1, |
| returned=1, |
| truncated=False, |
| ) |
|
|
| async def hf_org_members( |
| organization: str, |
| return_limit: int | None = None, |
| scan_limit: int | None = None, |
| count_only: bool = False, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| org = str(organization or "").strip() |
| if not org: |
| return _helper_error(start_calls=start_calls, source="/api/organizations/<o>/members", error="organization is required") |
|
|
| default_return = _policy_int("hf_org_members", "default_return", 100) |
| scan_cap = _policy_int("hf_org_members", "scan_max", GRAPH_SCAN_LIMIT_CAP) |
| limit_plan = _resolve_exhaustive_limits( |
| return_limit=return_limit, |
| count_only=count_only, |
| default_return=default_return, |
| max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, |
| scan_limit=scan_limit, |
| scan_cap=scan_cap, |
| ) |
| ret_lim = int(limit_plan["applied_return_limit"]) |
| scan_lim = int(limit_plan["applied_scan_limit"]) |
| has_where = isinstance(where, dict) and bool(where) |
|
|
| overview_total: int | None = None |
| overview_source = f"/api/organizations/{org}/overview" |
| if _budget_remaining() > 0: |
| try: |
| org_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_organization_overview(org)) |
| overview_total = _as_int(getattr(org_obj, "num_users", None)) |
| except Exception: |
| overview_total = None |
|
|
| if count_only and not has_where and overview_total is not None: |
| return _overview_count_only_success( |
| start_calls=start_calls, |
| source=overview_source, |
| total=overview_total, |
| limit_plan=limit_plan, |
| base_meta={ |
| "scanned": 1, |
| "count_source": "overview", |
| "organization": org, |
| }, |
| ) |
|
|
| endpoint = f"/api/organizations/{org}/members" |
| try: |
| rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_members(org), scan_lim))) |
| except Exception as e: |
| return _helper_error(start_calls=start_calls, source=endpoint, error=e, organization=org) |
|
|
| normalized: list[dict[str, Any]] = [] |
| for row in rows: |
| handle = getattr(row, "username", None) |
| if not isinstance(handle, str) or not handle: |
| continue |
| item = { |
| "username": handle, |
| "fullname": getattr(row, "fullname", None), |
| "isPro": getattr(row, "is_pro", None), |
| "role": getattr(row, "role", None), |
| } |
| normalized.append(item) |
|
|
| normalized = _apply_where(normalized, where) |
| observed_total = len(rows) |
| scan_exhaustive = observed_total < scan_lim |
|
|
| overview_list_mismatch = ( |
| overview_total is not None |
| and scan_exhaustive |
| and observed_total != overview_total |
| ) |
|
|
| if has_where: |
| exact_count = scan_exhaustive |
| total = len(normalized) |
| total_matched = len(normalized) |
| else: |
| if overview_total is not None: |
| exact_count = True |
| total = overview_total |
| total_matched = overview_total |
| else: |
| exact_count = scan_exhaustive |
| total = observed_total |
| total_matched = observed_total |
|
|
| total_available = overview_total if overview_total is not None else observed_total |
| items = normalized[:ret_lim] |
| scan_limit_hit = not exact_count and observed_total >= scan_lim |
| count_source = "overview" if overview_total is not None and not has_where else "scan" |
| sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0) |
| more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) |
| if not exact_count and scan_limit_hit: |
| more_available = "unknown" if has_where else True |
|
|
| items = _project_user_items(items, fields) |
| meta = _build_exhaustive_result_meta( |
| base_meta={ |
| "scanned": observed_total, |
| "total": total, |
| "total_available": total_available, |
| "total_matched": total_matched, |
| "count_source": count_source, |
| "lower_bound": bool(has_where and not exact_count), |
| "overview_total": overview_total, |
| "listed_total": observed_total, |
| "overview_list_mismatch": overview_list_mismatch, |
| "organization": org, |
| }, |
| limit_plan=limit_plan, |
| matched_count=len(normalized), |
| returned_count=len(items), |
| exact_count=exact_count, |
| count_only=count_only, |
| sample_complete=sample_complete, |
| more_available=more_available, |
| scan_limit_hit=scan_limit_hit, |
| ) |
| return _helper_success(start_calls=start_calls, source=endpoint, items=items, meta=meta) |
|
|
| async def hf_repo_search( |
| query: str | None = None, |
| repo_type: str | None = None, |
| repo_types: list[str] | None = None, |
| author: str | None = None, |
| filters: list[str] | None = None, |
| sort: str | None = None, |
| limit: int = 20, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| advanced: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| default_return = _policy_int("hf_repo_search", "default_return", 20) |
| max_return = _policy_int("hf_repo_search", "max_return", SELECTIVE_ENDPOINT_RETURN_HARD_CAP) |
|
|
| if repo_type is not None and repo_types is not None: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/repos", |
| error="Pass either repo_type or repo_types, not both", |
| ) |
|
|
| if repo_types is None: |
| if repo_type is None or not str(repo_type).strip(): |
| requested_repo_types = ["model"] |
| else: |
| rt = _canonical_repo_type(repo_type, default="") |
| if rt not in {"model", "dataset", "space"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/repos", |
| error=f"Unsupported repo_type '{repo_type}'", |
| ) |
| requested_repo_types = [rt] |
| else: |
| raw_types = _coerce_str_list(repo_types) |
| if not raw_types: |
| return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_types must not be empty") |
| requested_repo_types: list[str] = [] |
| for raw in raw_types: |
| rt = _canonical_repo_type(raw, default="") |
| if rt not in {"model", "dataset", "space"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/repos", |
| error=f"Unsupported repo_type '{raw}'", |
| ) |
| requested_repo_types.append(rt) |
|
|
| filter_list = _coerce_str_list(filters) |
| term = str(query or "").strip() |
| author_clean = str(author or "").strip() or None |
| requested_limit = limit |
| lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return) |
| limit_meta = _derive_limit_metadata( |
| requested_return_limit=requested_limit, |
| applied_return_limit=lim, |
| default_limit_used=limit == default_return, |
| ) |
| hard_cap_applied = bool(limit_meta.get("hard_cap_applied")) |
|
|
| if advanced is not None and not isinstance(advanced, dict): |
| return _helper_error(start_calls=start_calls, source="/api/repos", error="advanced must be a dict when provided") |
| if advanced is not None and len(requested_repo_types) != 1: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/repos", |
| error="advanced may only be used with a single repo_type", |
| ) |
|
|
| sort_keys: dict[str, str | None] = {} |
| for rt in requested_repo_types: |
| sort_key, sort_error = _normalize_repo_sort_key(rt, sort) |
| if sort_error: |
| return _helper_error(start_calls=start_calls, source=f"/api/{rt}s", error=sort_error) |
| sort_keys[rt] = sort_key |
|
|
| all_items: list[dict[str, Any]] = [] |
| scanned = 0 |
| source_endpoints: list[str] = [] |
| limit_boundary_hit = False |
| api = _get_hf_api_client() |
|
|
| for rt in requested_repo_types: |
| endpoint = f"/api/{rt}s" |
| source_endpoints.append(endpoint) |
| extra_args = dict(advanced or {}) if len(requested_repo_types) == 1 else {} |
| allowed_extra = _REPO_SEARCH_EXTRA_ARGS.get(rt, set()) |
| unsupported = sorted(str(k) for k in extra_args.keys() if str(k) not in allowed_extra) |
| if unsupported: |
| return _helper_error( |
| start_calls=start_calls, |
| source=endpoint, |
| error=( |
| f"Unsupported advanced args for repo_type='{rt}': {unsupported}. " |
| f"Allowed advanced args: {sorted(allowed_extra)}" |
| ), |
| ) |
| if "card_data" in extra_args and "cardData" not in extra_args: |
| extra_args["cardData"] = extra_args.pop("card_data") |
| else: |
| extra_args.pop("card_data", None) |
|
|
| if not any(key in extra_args for key in ("expand", "full", "cardData", "fetch_config")): |
| extra_args["expand"] = list(_REPO_SEARCH_DEFAULT_EXPAND[rt]) |
|
|
| try: |
| if rt == "model": |
| payload = _host_hf_call( |
| endpoint, |
| lambda: list( |
| api.list_models( |
| search=term or None, |
| author=author_clean, |
| filter=filter_list or None, |
| sort=sort_keys[rt], |
| limit=lim, |
| **extra_args, |
| ) |
| ), |
| ) |
| elif rt == "dataset": |
| payload = _host_hf_call( |
| endpoint, |
| lambda: list( |
| api.list_datasets( |
| search=term or None, |
| author=author_clean, |
| filter=filter_list or None, |
| sort=sort_keys[rt], |
| limit=lim, |
| **extra_args, |
| ) |
| ), |
| ) |
| else: |
| payload = _host_hf_call( |
| endpoint, |
| lambda: list( |
| api.list_spaces( |
| search=term or None, |
| author=author_clean, |
| filter=filter_list or None, |
| sort=sort_keys[rt], |
| limit=lim, |
| **extra_args, |
| ) |
| ), |
| ) |
| except Exception as e: |
| return _helper_error(start_calls=start_calls, source=endpoint, error=e) |
|
|
| scanned += len(payload) |
| if len(payload) >= lim: |
| limit_boundary_hit = True |
| all_items.extend(_normalize_repo_search_row(row, rt) for row in payload[:lim]) |
|
|
| all_items = _apply_where(all_items, where) |
| combined_sort_key = next(iter(sort_keys.values()), None) |
| all_items = _sort_repo_rows(all_items, combined_sort_key) |
| matched = len(all_items) |
| all_items = _project_repo_items(all_items[:lim], fields) |
| more_available: bool | str = False |
| truncated = False |
| truncated_by = "none" |
| next_request_hint: str | None = None |
| if hard_cap_applied and scanned >= lim: |
| truncated = True |
| truncated_by = "hard_cap" |
| more_available = "unknown" |
| next_request_hint = f"Increase limit above {lim} to improve coverage" |
| elif limit_boundary_hit: |
| more_available = "unknown" |
| next_request_hint = f"Increase limit above {lim} to check whether more rows exist" |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source=",".join(source_endpoints), |
| items=all_items, |
| query=term or None, |
| repo_types=requested_repo_types, |
| filters=filter_list or None, |
| sort=combined_sort_key, |
| author=author_clean, |
| limit=lim, |
| scanned=scanned, |
| matched=matched, |
| returned=len(all_items), |
| truncated=truncated, |
| truncated_by=truncated_by, |
| more_available=more_available, |
| limit_boundary_hit=limit_boundary_hit, |
| next_request_hint=next_request_hint, |
| **limit_meta, |
| ) |
|
|
| async def _user_graph_helper( |
| kind: str, |
| username: str, |
| pro_only: bool | None, |
| return_limit: int | None, |
| scan_limit: int | None, |
| count_only: bool, |
| where: dict[str, Any] | None, |
| fields: list[str] | None, |
| *, |
| helper_name: str, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| default_return = _policy_int(helper_name, "default_return", 100) |
| scan_cap = _policy_int(helper_name, "scan_max", GRAPH_SCAN_LIMIT_CAP) |
| max_return = _policy_int(helper_name, "max_return", EXHAUSTIVE_HELPER_RETURN_HARD_CAP) |
|
|
| u = str(username or "").strip() |
| if not u: |
| return _helper_error(start_calls=start_calls, source=f"/api/users/<u>/{kind}", error="username is required") |
|
|
| limit_plan = _resolve_exhaustive_limits( |
| return_limit=return_limit, |
| count_only=count_only, |
| default_return=default_return, |
| max_return=max_return, |
| scan_limit=scan_limit, |
| scan_cap=scan_cap, |
| ) |
| ret_lim = int(limit_plan["applied_return_limit"]) |
| scan_lim = int(limit_plan["applied_scan_limit"]) |
| has_where = isinstance(where, dict) and bool(where) |
| filtered = (pro_only is not None) or has_where |
|
|
| entity_type = "user" |
| overview_total: int | None = None |
| overview_source = f"/api/users/{u}/overview" |
| if _budget_remaining() > 0: |
| try: |
| user_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_user_overview(u)) |
| overview_total = _as_int(user_obj.num_followers if kind == "followers" else user_obj.num_following) |
| except Exception: |
| org_overview_source = f"/api/organizations/{u}/overview" |
| try: |
| org_obj = _host_hf_call(org_overview_source, lambda: _get_hf_api_client().get_organization_overview(u)) |
| except Exception: |
| overview_total = None |
| else: |
| entity_type = "organization" |
| overview_source = org_overview_source |
| if kind != "followers": |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/organizations/{u}/{kind}", |
| error="organization graph only supports relation='followers'; organizations do not expose a following list", |
| relation=kind, |
| organization=u, |
| entity=u, |
| entity_type=entity_type, |
| ) |
| overview_total = _as_int(getattr(org_obj, "num_followers", None)) |
|
|
| if count_only and not filtered and overview_total is not None: |
| return _overview_count_only_success( |
| start_calls=start_calls, |
| source=overview_source, |
| total=overview_total, |
| limit_plan=limit_plan, |
| base_meta={ |
| "scanned": 1, |
| "count_source": "overview", |
| "relation": kind, |
| "pro_only": pro_only, |
| "where_applied": has_where, |
| "entity": u, |
| "entity_type": entity_type, |
| "username": u, |
| "organization": u if entity_type == "organization" else None, |
| }, |
| ) |
|
|
| endpoint = f"/api/users/{u}/{kind}" |
| try: |
| if entity_type == "organization": |
| endpoint = f"/api/organizations/{u}/followers" |
| rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_followers(u), scan_lim))) |
| elif kind == "followers": |
| rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_followers(u), scan_lim))) |
| else: |
| rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_following(u), scan_lim))) |
| except Exception as e: |
| return _helper_error( |
| start_calls=start_calls, |
| source=endpoint, |
| error=e, |
| relation=kind, |
| username=u, |
| entity=u, |
| entity_type=entity_type, |
| organization=u if entity_type == "organization" else None, |
| ) |
|
|
| normalized: list[dict[str, Any]] = [] |
| for row in rows: |
| handle = getattr(row, "username", None) |
| if not isinstance(handle, str) or not handle: |
| continue |
| item = { |
| "username": handle, |
| "fullname": getattr(row, "fullname", None), |
| "isPro": getattr(row, "is_pro", None), |
| } |
| if pro_only is True and item.get("isPro") is not True: |
| continue |
| if pro_only is False and item.get("isPro") is True: |
| continue |
| normalized.append(item) |
|
|
| normalized = _apply_where(normalized, where) |
| observed_total = len(rows) |
| scan_exhaustive = observed_total < scan_lim |
|
|
| overview_list_mismatch = ( |
| overview_total is not None |
| and scan_exhaustive |
| and observed_total != overview_total |
| ) |
|
|
| if filtered: |
| exact_count = scan_exhaustive |
| total = len(normalized) |
| total_matched = len(normalized) |
| else: |
| if overview_total is not None: |
| exact_count = True |
| total = overview_total |
| total_matched = overview_total |
| else: |
| exact_count = scan_exhaustive |
| total = observed_total |
| total_matched = observed_total |
|
|
| total_available = overview_total if overview_total is not None else observed_total |
| items = normalized[:ret_lim] |
| scan_limit_hit = not exact_count and observed_total >= scan_lim |
| count_source = "overview" if overview_total is not None and not filtered else "scan" |
| sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0) |
| more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) |
| if not exact_count and scan_limit_hit: |
| more_available = "unknown" if filtered else True |
|
|
| items = _project_user_items(items, fields) |
| meta = _build_exhaustive_result_meta( |
| base_meta={ |
| "scanned": observed_total, |
| "total": total, |
| "total_available": total_available, |
| "total_matched": total_matched, |
| "count_source": count_source, |
| "lower_bound": bool(filtered and not exact_count), |
| "overview_total": overview_total, |
| "listed_total": observed_total, |
| "overview_list_mismatch": overview_list_mismatch, |
| "relation": kind, |
| "pro_only": pro_only, |
| "where_applied": has_where, |
| "entity": u, |
| "entity_type": entity_type, |
| "username": u, |
| "organization": u if entity_type == "organization" else None, |
| }, |
| limit_plan=limit_plan, |
| matched_count=len(normalized), |
| returned_count=len(items), |
| exact_count=exact_count, |
| count_only=count_only, |
| sample_complete=sample_complete, |
| more_available=more_available, |
| scan_limit_hit=scan_limit_hit, |
| ) |
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=items, |
| meta=meta, |
| ) |
|
|
| async def hf_profile_summary( |
| handle: str | None = None, |
| include: list[str] | None = None, |
| likes_limit: int = 10, |
| activity_limit: int = 10, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| resolved_handle, resolve_error = await _resolve_username_or_current(handle) |
| if resolve_error: |
| return _helper_error(start_calls=start_calls, source="/api/users/<u>/overview", error=resolve_error) |
| if not isinstance(resolved_handle, str): |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/users/<u>/overview", |
| error="handle was not provided and current authenticated user could not be resolved", |
| ) |
|
|
| try: |
| requested_sections = ( |
| {part.lower() for part in _coerce_str_list(include) if part.strip()} if include is not None else set() |
| ) |
| except ValueError as e: |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/users/{resolved_handle}/overview", |
| error=e, |
| ) |
| invalid_sections = sorted(requested_sections - {"likes", "activity"}) |
| if invalid_sections: |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/users/{resolved_handle}/overview", |
| error=f"Unsupported include values: {invalid_sections}", |
| ) |
|
|
| likes_lim = _clamp_int(likes_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT) |
| activity_lim = _clamp_int(activity_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT) |
| section_errors: dict[str, str] = {} |
|
|
| user_overview = await hf_user_overview(resolved_handle) |
| if user_overview.get("ok") is True: |
| overview_item = _helper_item(user_overview) or {"username": resolved_handle} |
| item: dict[str, Any] = { |
| "handle": str(overview_item.get("username") or resolved_handle), |
| "entity_type": "user", |
| "display_name": overview_item.get("fullname") or str(overview_item.get("username") or resolved_handle), |
| "bio": overview_item.get("bio"), |
| "avatar_url": overview_item.get("avatarUrl"), |
| "website_url": overview_item.get("websiteUrl"), |
| "twitter_url": overview_item.get("twitter"), |
| "github_url": overview_item.get("github"), |
| "linkedin_url": overview_item.get("linkedin"), |
| "bluesky_url": overview_item.get("bluesky"), |
| "followers_count": _overview_count(overview_item, "followers"), |
| "following_count": _overview_count(overview_item, "following"), |
| "likes_count": _overview_count(overview_item, "likes"), |
| "models_count": _overview_count(overview_item, "models"), |
| "datasets_count": _overview_count(overview_item, "datasets"), |
| "spaces_count": _overview_count(overview_item, "spaces"), |
| "discussions_count": _overview_count(overview_item, "discussions"), |
| "papers_count": _overview_count(overview_item, "papers"), |
| "upvotes_count": _overview_count(overview_item, "upvotes"), |
| "organizations": overview_item.get("orgs"), |
| "is_pro": overview_item.get("isPro"), |
| } |
|
|
| if "likes" in requested_sections: |
| likes = await hf_user_likes( |
| username=resolved_handle, |
| return_limit=likes_lim, |
| scan_limit=USER_SUMMARY_LIKES_SCAN_LIMIT, |
| count_only=likes_lim == 0, |
| sort="likedAt", |
| fields=["liked_at", "repo_id", "repo_type", "repo_author", "repo_url"], |
| ) |
| item["likes_sample"] = likes.get("items") if likes.get("ok") is True else [] |
| if likes.get("ok") is not True: |
| section_errors["likes"] = str(likes.get("error") or "likes fetch failed") |
|
|
| if "activity" in requested_sections: |
| activity = await hf_recent_activity( |
| feed_type="user", |
| entity=resolved_handle, |
| return_limit=activity_lim, |
| max_pages=USER_SUMMARY_ACTIVITY_MAX_PAGES, |
| count_only=activity_lim == 0, |
| fields=["timestamp", "event_type", "repo_type", "repo_id"], |
| ) |
| item["activity_sample"] = activity.get("items") if activity.get("ok") is True else [] |
| if activity.get("ok") is not True: |
| section_errors["activity"] = str(activity.get("error") or "activity fetch failed") |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source=f"/api/users/{resolved_handle}/overview", |
| items=[item], |
| scanned=1, |
| matched=1, |
| returned=1, |
| truncated=False, |
| handle=resolved_handle, |
| entity_type="user", |
| include=sorted(requested_sections), |
| likes_limit=likes_lim, |
| activity_limit=activity_lim, |
| section_errors=section_errors or None, |
| ) |
|
|
| org_overview = await hf_org_overview(resolved_handle) |
| if org_overview.get("ok") is True: |
| overview_item = _helper_item(org_overview) or {"organization": resolved_handle} |
| item = { |
| "handle": str(overview_item.get("organization") or resolved_handle), |
| "entity_type": "organization", |
| "display_name": overview_item.get("displayName") or str(overview_item.get("organization") or resolved_handle), |
| "description": overview_item.get("description"), |
| "avatar_url": overview_item.get("avatarUrl"), |
| "website_url": overview_item.get("websiteUrl"), |
| "followers_count": _overview_count(overview_item, "followers"), |
| "members_count": _overview_count(overview_item, "members"), |
| "models_count": _overview_count(overview_item, "models"), |
| "datasets_count": _overview_count(overview_item, "datasets"), |
| "spaces_count": _overview_count(overview_item, "spaces"), |
| } |
| return _helper_success( |
| start_calls=start_calls, |
| source=f"/api/organizations/{resolved_handle}/overview", |
| items=[item], |
| scanned=1, |
| matched=1, |
| returned=1, |
| truncated=False, |
| handle=resolved_handle, |
| entity_type="organization", |
| include=[], |
| ignored_includes=sorted(requested_sections) or None, |
| ) |
|
|
| error = user_overview.get("error") or org_overview.get("error") or "profile fetch failed" |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/profiles/{resolved_handle}", |
| error=error, |
| handle=resolved_handle, |
| ) |
|
|
| async def hf_user_graph( |
| username: str | None = None, |
| relation: str = "followers", |
| return_limit: int | None = None, |
| scan_limit: int | None = None, |
| count_only: bool = False, |
| pro_only: bool | None = None, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| rel = str(relation or "").strip().lower() or "followers" |
| if rel not in {"followers", "following"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/users/<u>/followers", |
| error="relation must be 'followers' or 'following'", |
| ) |
|
|
| resolved_username, resolve_error = await _resolve_username_or_current(username) |
| if resolve_error: |
| return _helper_error(start_calls=start_calls, source=f"/api/users/<u>/{rel}", error=resolve_error, relation=rel) |
| if not isinstance(resolved_username, str): |
| return _helper_error(start_calls=start_calls, source=f"/api/users/<u>/{rel}", error="username is required", relation=rel) |
|
|
| return await _user_graph_helper( |
| rel, |
| resolved_username, |
| pro_only, |
| return_limit, |
| scan_limit, |
| count_only, |
| where, |
| fields, |
| helper_name="hf_user_graph", |
| ) |
|
|
| async def hf_user_likes( |
| username: str | None = None, |
| repo_types: list[str] | None = None, |
| return_limit: int | None = None, |
| scan_limit: int | None = None, |
| count_only: bool = False, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| sort: str | None = None, |
| ranking_window: int | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| default_return = _policy_int("hf_user_likes", "default_return", 100) |
| scan_cap = _policy_int("hf_user_likes", "scan_max", LIKES_SCAN_LIMIT_CAP) |
| ranking_default = _policy_int("hf_user_likes", "ranking_default", LIKES_RANKING_WINDOW_DEFAULT) |
| enrich_cap = _policy_int("hf_user_likes", "enrich_max", LIKES_ENRICHMENT_MAX_REPOS) |
|
|
| resolved_username, resolve_error = await _resolve_username_or_current(username) |
| if resolve_error: |
| return _helper_error(start_calls=start_calls, source="/api/users/<u>/likes", error=resolve_error) |
| if not isinstance(resolved_username, str): |
| return _helper_error(start_calls=start_calls, source="/api/users/<u>/likes", error="username is required") |
|
|
| sort_key, sort_error = _normalize_user_likes_sort(sort) |
| if sort_error: |
| return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=sort_error) |
| if sort_key is None: |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/users/{resolved_username}/likes", |
| error="sort must be one of likedAt, repoLikes, repoDownloads", |
| ) |
|
|
| limit_plan = _resolve_exhaustive_limits( |
| return_limit=return_limit, |
| count_only=count_only, |
| default_return=default_return, |
| max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, |
| scan_limit=scan_limit, |
| scan_cap=scan_cap, |
| ) |
| ret_lim = int(limit_plan["applied_return_limit"]) |
| scan_lim = int(limit_plan["applied_scan_limit"]) |
|
|
| allowed_repo_types: set[str] | None = None |
| try: |
| raw_repo_types: list[str] = _coerce_str_list(repo_types) if repo_types is not None else [] |
| except ValueError as e: |
| return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=e) |
| if raw_repo_types: |
| allowed_repo_types = set() |
| for raw in raw_repo_types: |
| canonical = _canonical_repo_type(raw, default="") |
| if canonical not in {"model", "dataset", "space"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/users/{resolved_username}/likes", |
| error=f"Unsupported repo_type '{raw}'", |
| ) |
| allowed_repo_types.add(canonical) |
|
|
| endpoint = f"/api/users/{resolved_username}/likes" |
| resp = _host_raw_call(endpoint, params={"limit": scan_lim}) |
| if not resp.get("ok"): |
| return _helper_error( |
| start_calls=start_calls, |
| source=endpoint, |
| error=resp.get("error") or "likes fetch failed", |
| ) |
|
|
| payload = resp.get("data") if isinstance(resp.get("data"), list) else [] |
| scanned_rows = payload[:scan_lim] |
| matched_rows: list[tuple[int, dict[str, Any]]] = [] |
|
|
| for row in scanned_rows: |
| if not isinstance(row, dict): |
| continue |
| repo = row.get("repo") if isinstance(row.get("repo"), dict) else {} |
| repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else {} |
|
|
| repo_id = repo_data.get("id") or repo_data.get("name") or repo.get("name") |
| if not isinstance(repo_id, str) or not repo_id: |
| continue |
|
|
| repo_type = _canonical_repo_type(repo_data.get("type") or repo.get("type"), default="") |
| if not repo_type: |
| repo_type = _canonical_repo_type(repo.get("type"), default="model") |
| if allowed_repo_types is not None and repo_type not in allowed_repo_types: |
| continue |
|
|
| repo_author = repo_data.get("author") |
| if not isinstance(repo_author, str) and "/" in repo_id: |
| repo_author = repo_id.split("/", 1)[0] |
|
|
| item = { |
| "likedAt": row.get("likedAt") or row.get("createdAt"), |
| "liked_at": row.get("likedAt") or row.get("createdAt"), |
| "repoId": repo_id, |
| "repo_id": repo_id, |
| "repoType": repo_type, |
| "repo_type": repo_type, |
| "repoAuthor": repo_author, |
| "repo_author": repo_author, |
| "repoLikes": _as_int(repo_data.get("likes")), |
| "repo_likes": _as_int(repo_data.get("likes")), |
| "repoDownloads": _as_int(repo_data.get("downloads")), |
| "repo_downloads": _as_int(repo_data.get("downloads")), |
| "likes": _as_int(repo_data.get("likes")), |
| "downloads": _as_int(repo_data.get("downloads")), |
| "repo_url": _repo_web_url(repo_type, repo_id), |
| } |
| if not _item_matches_where(item, where): |
| continue |
| matched_rows.append((len(matched_rows), item)) |
|
|
| matched = len(matched_rows) |
| scan_exhaustive = len(payload) < scan_lim |
| exact_count = scan_exhaustive |
| total_matched = matched |
| total = total_matched |
| effective_ranking_window: int | None = None |
| ranking_complete = sort_key == "likedAt" and exact_count |
| enriched = 0 |
|
|
| selected_pairs: list[tuple[int, dict[str, Any]]] |
| if count_only: |
| selected_pairs = [] |
| ranking_complete = False if matched > 0 else exact_count |
| elif sort_key == "likedAt": |
| selected_pairs = matched_rows[:ret_lim] |
| else: |
| metric = str(sort_key) |
| requested_window = ranking_window if ranking_window is not None else ranking_default |
| effective_ranking_window = _clamp_int( |
| requested_window, |
| default=ranking_default, |
| minimum=1, |
| maximum=enrich_cap, |
| ) |
| shortlist_size = min(effective_ranking_window, matched, scan_lim) |
| shortlist = matched_rows[:shortlist_size] |
| candidates = [ |
| pair |
| for pair in shortlist |
| if pair[1].get(metric) is None |
| and isinstance(pair[1].get("repoId"), str) |
| and pair[1].get("repoType") in {"model", "dataset", "space"} |
| ] |
| enrich_budget = min(len(candidates), _budget_remaining(), shortlist_size) |
| for _, item in candidates[:enrich_budget]: |
| repo_type = str(item.get("repoType")) |
| repo_id = str(item.get("repoId")) |
| detail_endpoint = f"/api/{_canonical_repo_type(repo_type)}s/{repo_id}" |
| try: |
| detail = _host_hf_call( |
| detail_endpoint, |
| lambda rt=repo_type, rid=repo_id: ( |
| _get_hf_api_client().model_info(rid) |
| if _canonical_repo_type(rt) == "model" |
| else _get_hf_api_client().dataset_info(rid) |
| if _canonical_repo_type(rt) == "dataset" |
| else _get_hf_api_client().space_info(rid) |
| ), |
| ) |
| except Exception: |
| continue |
|
|
| likes = _as_int(getattr(detail, "likes", None)) |
| downloads = _as_int(getattr(detail, "downloads", None)) |
| if likes is not None: |
| item["repoLikes"] = likes |
| item["repo_likes"] = likes |
| item["likes"] = likes |
| if downloads is not None: |
| item["repoDownloads"] = downloads |
| item["repo_downloads"] = downloads |
| item["downloads"] = downloads |
| enriched += 1 |
|
|
| def _ranking_key(pair: tuple[int, dict[str, Any]]) -> tuple[int, int, int]: |
| idx, row = pair |
| metric_value = _as_int(row.get(metric)) |
| if metric_value is None: |
| return (1, 0, idx) |
| return (0, -metric_value, idx) |
|
|
| ranked_shortlist = sorted(shortlist, key=_ranking_key) |
| selected_pairs = ranked_shortlist[:ret_lim] |
| ranking_complete = exact_count and shortlist_size >= matched and len(candidates) <= enrich_budget |
|
|
| items = _project_items([row for _, row in selected_pairs], fields) |
| popularity_present = sum(1 for _, row in selected_pairs if row.get("repoLikes") is not None) |
| sample_complete = ( |
| exact_count |
| and ret_lim >= matched |
| and (sort_key == "likedAt" or ranking_complete) |
| and (not count_only or matched == 0) |
| ) |
| scan_limit_hit = not scan_exhaustive and len(payload) >= scan_lim |
| more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) |
| if scan_limit_hit: |
| more_available = "unknown" if (allowed_repo_types is not None or where) else True |
|
|
| meta = _build_exhaustive_result_meta( |
| base_meta={ |
| "scanned": len(scanned_rows), |
| "total": total, |
| "total_available": len(payload), |
| "total_matched": total_matched, |
| "count_source": "scan", |
| "lower_bound": not exact_count, |
| "enriched": enriched, |
| "popularity_present": popularity_present, |
| "sort_applied": sort_key, |
| "ranking_window": effective_ranking_window, |
| "ranking_complete": ranking_complete, |
| "username": resolved_username, |
| }, |
| limit_plan=limit_plan, |
| matched_count=matched, |
| returned_count=len(items), |
| exact_count=exact_count, |
| count_only=count_only, |
| sample_complete=sample_complete, |
| more_available=more_available, |
| scan_limit_hit=scan_limit_hit, |
| truncated_extra=sort_key != "likedAt" and not ranking_complete, |
| ) |
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=items, |
| meta=meta, |
| ) |
|
|
| async def hf_repo_likers( |
| repo_id: str, |
| repo_type: str, |
| return_limit: int | None = None, |
| count_only: bool = False, |
| pro_only: bool | None = None, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| rid = str(repo_id or "").strip() |
| if not rid: |
| return _helper_error(start_calls=start_calls, source="/api/repos/<repo>/likers", error="repo_id is required") |
|
|
| rt = _canonical_repo_type(repo_type, default="") |
| if rt not in {"model", "dataset", "space"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/repos/{rid}/likers", |
| error=f"Unsupported repo_type '{repo_type}'", |
| repo_id=rid, |
| ) |
|
|
| default_return = _policy_int("hf_repo_likers", "default_return", 1_000) |
| requested_return_limit = return_limit |
| default_limit_used = requested_return_limit is None and not count_only |
| has_where = isinstance(where, dict) and bool(where) |
|
|
| endpoint = f"/api/{rt}s/{rid}/likers" |
| resp = _host_raw_call(endpoint) |
| if not resp.get("ok"): |
| return _helper_error( |
| start_calls=start_calls, |
| source=endpoint, |
| error=resp.get("error") or "repo likers fetch failed", |
| repo_id=rid, |
| repo_type=rt, |
| ) |
|
|
| payload = resp.get("data") if isinstance(resp.get("data"), list) else [] |
| normalized: list[dict[str, Any]] = [] |
| for row in payload: |
| if not isinstance(row, dict): |
| continue |
| username = row.get("user") or row.get("username") |
| if not isinstance(username, str) or not username: |
| continue |
| item = { |
| "username": username, |
| "fullname": row.get("fullname"), |
| "type": row.get("type") if isinstance(row.get("type"), str) and row.get("type") else "user", |
| "isPro": row.get("isPro"), |
| } |
| if pro_only is True and item.get("isPro") is not True: |
| continue |
| if pro_only is False and item.get("isPro") is True: |
| continue |
| if not _item_matches_where(item, where): |
| continue |
| normalized.append(item) |
|
|
| |
| |
| |
| |
| if count_only: |
| ret_lim = 0 |
| elif requested_return_limit is None: |
| ret_lim = default_return |
| else: |
| try: |
| ret_lim = max(0, int(requested_return_limit)) |
| except Exception: |
| ret_lim = default_return |
| limit_plan = { |
| "requested_return_limit": requested_return_limit, |
| "applied_return_limit": ret_lim, |
| "default_limit_used": default_limit_used, |
| "hard_cap_applied": False, |
| } |
|
|
| matched = len(normalized) |
| items = [] if count_only else normalized[:ret_lim] |
| return_limit_hit = ret_lim > 0 and matched > ret_lim |
| truncated_by = _derive_truncated_by( |
| hard_cap=False, |
| return_limit_hit=return_limit_hit, |
| ) |
| sample_complete = matched <= ret_lim and (not count_only or matched == 0) |
| truncated = truncated_by != "none" |
| more_available = _derive_more_available( |
| sample_complete=sample_complete, |
| exact_count=True, |
| returned=len(items), |
| total=matched, |
| ) |
|
|
| items = _project_actor_items(items, fields) |
|
|
| meta = _build_exhaustive_meta( |
| base_meta={ |
| "scanned": len(payload), |
| "matched": matched, |
| "returned": len(items), |
| "total": matched, |
| "total_available": len(payload), |
| "total_matched": matched, |
| "truncated": truncated, |
| "count_source": "likers_list", |
| "lower_bound": False, |
| "repo_id": rid, |
| "repo_type": rt, |
| "pro_only": pro_only, |
| "where_applied": has_where, |
| "upstream_pagination": "none", |
| }, |
| limit_plan=limit_plan, |
| sample_complete=sample_complete, |
| exact_count=True, |
| truncated_by=truncated_by, |
| more_available=more_available, |
| ) |
| meta["hard_cap_applied"] = False |
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=items, |
| meta=meta, |
| ) |
|
|
| async def hf_recent_activity( |
| feed_type: str | None = None, |
| entity: str | None = None, |
| activity_types: list[str] | None = None, |
| repo_types: list[str] | None = None, |
| return_limit: int | None = None, |
| max_pages: int | None = None, |
| start_cursor: str | None = None, |
| count_only: bool = False, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| default_return = _policy_int("hf_recent_activity", "default_return", 100) |
| page_cap = _policy_int("hf_recent_activity", "page_limit", RECENT_ACTIVITY_PAGE_SIZE) |
| pages_cap = _policy_int("hf_recent_activity", "max_pages", RECENT_ACTIVITY_SCAN_MAX_PAGES) |
|
|
| requested_max_pages = max_pages |
|
|
| ft = str(feed_type or "").strip().lower() |
| ent = str(entity or "").strip() |
| if ft not in {"user", "org"}: |
| if ft and not ent: |
| ent = ft |
| ft = "user" |
| elif not ft and ent: |
| ft = "user" |
|
|
| if ft not in {"user", "org"}: |
| return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="feed_type must be 'user' or 'org'") |
| if not ent: |
| return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="entity is required") |
|
|
| limit_plan = _resolve_exhaustive_limits( |
| return_limit=return_limit, |
| count_only=count_only, |
| default_return=default_return, |
| max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, |
| ) |
| ret_lim = int(limit_plan["applied_return_limit"]) |
| page_lim = page_cap |
| pages_lim = _clamp_int(requested_max_pages, default=pages_cap, minimum=1, maximum=pages_cap) |
|
|
| type_filter = {str(t).strip().lower() for t in (activity_types or []) if str(t).strip()} |
| repo_filter = {_canonical_repo_type(t, default="") for t in (repo_types or []) if str(t).strip()} |
|
|
| next_cursor = str(start_cursor).strip() if isinstance(start_cursor, str) and start_cursor.strip() else None |
| items: list[dict[str, Any]] = [] |
| scanned = 0 |
| matched = 0 |
| pages = 0 |
| exhausted_feed = False |
| stopped_for_budget = False |
|
|
| while pages < pages_lim and (ret_lim == 0 or len(items) < ret_lim): |
| if _budget_remaining() <= 0: |
| stopped_for_budget = True |
| break |
|
|
| params: dict[str, Any] = {"feedType": ft, "entity": ent, "limit": page_lim} |
| if next_cursor: |
| params["cursor"] = next_cursor |
|
|
| resp = _host_raw_call("/api/recent-activity", params=params) |
| if not resp.get("ok"): |
| if pages == 0: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/recent-activity", |
| error=resp.get("error") or "recent-activity fetch failed", |
| ) |
| break |
|
|
| payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} |
| rows = payload.get("recentActivity") if isinstance(payload.get("recentActivity"), list) else [] |
| cursor_raw = payload.get("cursor") |
| next_cursor = cursor_raw if isinstance(cursor_raw, str) and cursor_raw else None |
| pages += 1 |
|
|
| if not rows: |
| exhausted_feed = True |
| break |
|
|
| for row in rows: |
| if not isinstance(row, dict): |
| continue |
| scanned += 1 |
|
|
| typ = str(row.get("type") or "").strip().lower() |
| repo_id = row.get("repoId") |
| repo_type = row.get("repoType") |
| repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else None |
| repo_obj = row.get("repo") if isinstance(row.get("repo"), dict) else None |
| if repo_id is None and repo_data is not None: |
| repo_id = repo_data.get("id") or repo_data.get("name") |
| if repo_id is None and repo_obj is not None: |
| repo_id = repo_obj.get("id") or repo_obj.get("name") |
| if repo_type is None and repo_data is not None: |
| repo_type = repo_data.get("type") |
| if repo_type is None and repo_obj is not None: |
| repo_type = repo_obj.get("type") |
|
|
| rt = _canonical_repo_type(repo_type, default="") if repo_type else "" |
| if type_filter and typ not in type_filter: |
| continue |
| if repo_filter and rt not in repo_filter: |
| continue |
|
|
| item = { |
| "time": row.get("time"), |
| "timestamp": row.get("time"), |
| "type": row.get("type"), |
| "event_type": row.get("type"), |
| "repoType": rt or repo_type, |
| "repo_type": rt or repo_type, |
| "repoId": repo_id, |
| "repo_id": repo_id, |
| } |
| if not _item_matches_where(item, where): |
| continue |
|
|
| matched += 1 |
| if len(items) < ret_lim: |
| items.append(item) |
|
|
| if not next_cursor: |
| exhausted_feed = True |
| break |
|
|
| items = _project_items(items, fields) |
| exact_count = exhausted_feed and not stopped_for_budget |
| sample_complete = exact_count and ret_lim >= matched and (not count_only or matched == 0) |
| page_limit_hit = next_cursor is not None and pages >= pages_lim and not exhausted_feed |
| more_available: bool | str = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=matched if exact_count else None) |
| if next_cursor is not None: |
| more_available = True |
| elif stopped_for_budget and not exact_count: |
| more_available = "unknown" |
|
|
| meta = _build_exhaustive_result_meta( |
| base_meta={ |
| "scanned": scanned, |
| "total": matched, |
| "total_matched": matched, |
| "pages": pages, |
| "count_source": "scan" if exact_count else "none", |
| "lower_bound": not exact_count, |
| "page_limit": page_lim, |
| "stopped_for_budget": stopped_for_budget, |
| "feed_type": ft, |
| "entity": ent, |
| }, |
| limit_plan=limit_plan, |
| matched_count=matched, |
| returned_count=len(items), |
| exact_count=exact_count, |
| count_only=count_only, |
| sample_complete=sample_complete, |
| more_available=more_available, |
| page_limit_hit=page_limit_hit, |
| truncated_extra=stopped_for_budget, |
| requested_max_pages=requested_max_pages, |
| applied_max_pages=pages_lim, |
| ) |
| return _helper_success( |
| start_calls=start_calls, |
| source="/api/recent-activity", |
| items=items, |
| meta=meta, |
| cursor=next_cursor, |
| ) |
|
|
| async def hf_repo_discussions(repo_type: str, repo_id: str, limit: int = 20) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| rt = _canonical_repo_type(repo_type) |
| rid = str(repo_id or "").strip() |
| if "/" not in rid: |
| return _helper_error(start_calls=start_calls, source="/api/.../discussions", error="repo_id must be owner/name") |
|
|
| lim = _clamp_int(limit, default=20, minimum=1, maximum=SELECTIVE_ENDPOINT_RETURN_HARD_CAP) |
| endpoint = f"/api/{rt}s/{rid}/discussions" |
| try: |
| discussions = _host_hf_call( |
| endpoint, |
| lambda: list(islice(_get_hf_api_client().get_repo_discussions(repo_id=rid, repo_type=rt), lim)), |
| ) |
| except Exception as e: |
| return _helper_error(start_calls=start_calls, source=endpoint, error=e) |
|
|
| items: list[dict[str, Any]] = [] |
| for d in discussions: |
| num = _as_int(getattr(d, "num", None)) |
| items.append( |
| { |
| "num": num, |
| "number": num, |
| "discussionNum": num, |
| "id": num, |
| "title": getattr(d, "title", None), |
| "author": getattr(d, "author", None), |
| "createdAt": str(getattr(d, "created_at", None)) if getattr(d, "created_at", None) is not None else None, |
| "status": getattr(d, "status", None), |
| } |
| ) |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=items, |
| scanned=len(items), |
| matched=len(items), |
| returned=len(items), |
| truncated=False, |
| total_count=None, |
| ) |
|
|
| async def hf_repo_discussion_details(repo_type: str, repo_id: str, discussion_num: int) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| rt = _canonical_repo_type(repo_type) |
| rid = str(repo_id or "").strip() |
| if "/" not in rid: |
| return _helper_error(start_calls=start_calls, source="/api/.../discussions/<num>", error="repo_id must be owner/name") |
|
|
| num = _as_int(discussion_num) |
| if num is None: |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/{rt}s/{rid}/discussions/<num>", |
| error="discussion_num must be an integer", |
| ) |
|
|
| endpoint = f"/api/{rt}s/{rid}/discussions/{num}" |
| try: |
| detail = _host_hf_call( |
| endpoint, |
| lambda: _get_hf_api_client().get_discussion_details( |
| repo_id=rid, |
| discussion_num=int(num), |
| repo_type=rt, |
| ), |
| ) |
| except Exception as e: |
| return _helper_error(start_calls=start_calls, source=endpoint, error=e) |
|
|
| comment_events: list[dict[str, Any]] = [] |
| raw_events = getattr(detail, "events", None) |
| if isinstance(raw_events, list): |
| for event in raw_events: |
| if str(getattr(event, "type", "")).strip().lower() != "comment": |
| continue |
| comment_events.append( |
| { |
| "author": getattr(event, "author", None), |
| "createdAt": _dt_to_str(getattr(event, "created_at", None)), |
| "text": getattr(event, "content", None), |
| "rendered": getattr(event, "rendered", None), |
| } |
| ) |
|
|
| latest_comment: dict[str, Any] | None = None |
| if comment_events: |
| latest_comment = max(comment_events, key=lambda row: str(row.get("createdAt") or "")) |
|
|
| item: dict[str, Any] = { |
| "num": num, |
| "number": num, |
| "discussionNum": num, |
| "id": num, |
| "repo_id": rid, |
| "repo_type": rt, |
| "title": getattr(detail, "title", None), |
| "author": getattr(detail, "author", None), |
| "createdAt": _dt_to_str(getattr(detail, "created_at", None)), |
| "status": getattr(detail, "status", None), |
| "url": getattr(detail, "url", None), |
| "commentCount": len(comment_events), |
| "latestCommentAuthor": latest_comment.get("author") if latest_comment else None, |
| "latestCommentCreatedAt": latest_comment.get("createdAt") if latest_comment else None, |
| "latestCommentText": latest_comment.get("text") if latest_comment else None, |
| "latestCommentHtml": latest_comment.get("rendered") if latest_comment else None, |
| "latest_comment_author": latest_comment.get("author") if latest_comment else None, |
| "latest_comment_created_at": latest_comment.get("createdAt") if latest_comment else None, |
| "latest_comment_text": latest_comment.get("text") if latest_comment else None, |
| "latest_comment_html": latest_comment.get("rendered") if latest_comment else None, |
| } |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=[item], |
| scanned=len(comment_events), |
| matched=1, |
| returned=1, |
| truncated=False, |
| total_comments=len(comment_events), |
| ) |
|
|
| def _resolve_repo_detail_row( |
| api: HfApi, |
| repo_id: str, |
| attempt_types: list[str], |
| ) -> tuple[dict[str, Any] | None, dict[str, Any] | None]: |
| rid = str(repo_id or "").strip() |
| if "/" not in rid: |
| return None, {"repo_id": rid, "error": "repo_id must be owner/name"} |
|
|
| resolved_type: str | None = None |
| detail: Any = None |
| last_endpoint = "/api/repos" |
| errors: list[str] = [] |
|
|
| for rt in attempt_types: |
| endpoint = f"/api/{rt}s/{rid}" |
| last_endpoint = endpoint |
| try: |
| detail = _host_hf_call( |
| endpoint, |
| lambda rt=rt, rid=rid: api.model_info(rid) |
| if rt == "model" |
| else api.dataset_info(rid) |
| if rt == "dataset" |
| else api.space_info(rid), |
| ) |
| resolved_type = rt |
| break |
| except Exception as e: |
| errors.append(f"{rt}: {str(e)}") |
|
|
| if resolved_type is None or detail is None: |
| return None, { |
| "repo_id": rid, |
| "error": "; ".join(errors[:3]) if errors else "repo lookup failed", |
| "attempted_repo_types": list(attempt_types), |
| "source": last_endpoint, |
| } |
|
|
| return _normalize_repo_detail_row(detail, resolved_type, rid), None |
|
|
| async def hf_repo_details( |
| repo_id: str | None = None, |
| repo_ids: list[str] | None = None, |
| repo_type: str = "auto", |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
|
|
| if repo_id is not None and repo_ids is not None: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/repos", |
| error="Pass either repo_id or repo_ids, not both", |
| ) |
|
|
| requested_ids = [str(repo_id).strip()] if isinstance(repo_id, str) and str(repo_id).strip() else [] |
| if repo_ids is not None: |
| requested_ids = _coerce_str_list(repo_ids) |
|
|
| if not requested_ids: |
| return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_id or repo_ids is required") |
|
|
| raw_type = str(repo_type or "auto").strip().lower() |
| if raw_type in {"", "auto"}: |
| base_attempt_types = ["model", "dataset", "space"] |
| else: |
| canonical_type = _canonical_repo_type(raw_type, default="") |
| if canonical_type not in {"model", "dataset", "space"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/repos", |
| error=f"Unsupported repo_type '{repo_type}'", |
| ) |
| base_attempt_types = [canonical_type] |
|
|
| api = _get_hf_api_client() |
| items: list[dict[str, Any]] = [] |
| failures: list[dict[str, Any]] = [] |
|
|
| for rid in requested_ids: |
| row, failure = _resolve_repo_detail_row(api, rid, base_attempt_types) |
| if row is None: |
| if failure is not None: |
| failures.append(failure) |
| continue |
| items.append(row) |
|
|
| if not items: |
| summary = failures[0]["error"] if failures else "repo lookup failed" |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/repos", |
| error=summary, |
| failures=failures, |
| repo_type=repo_type, |
| ) |
|
|
| items = _project_repo_items(items, fields) |
| return _helper_success( |
| start_calls=start_calls, |
| source="/api/repos", |
| items=items, |
| repo_type=repo_type, |
| requested_repo_ids=requested_ids, |
| failures=failures or None, |
| matched=len(items), |
| returned=len(items), |
| ) |
|
|
| async def hf_trending( |
| repo_type: str = "model", |
| limit: int = 20, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| default_return = _policy_int("hf_trending", "default_return", 20) |
| max_return = _policy_int("hf_trending", "max_return", TRENDING_ENDPOINT_MAX_LIMIT) |
|
|
| raw_type = str(repo_type or "model").strip().lower() |
| if raw_type == "all": |
| requested_type = "all" |
| else: |
| requested_type = _canonical_repo_type(raw_type, default="") |
| if requested_type not in {"model", "dataset", "space"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/trending", |
| error=f"Unsupported repo_type '{repo_type}'", |
| ) |
| lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return) |
|
|
| resp = _host_raw_call("/api/trending", params={"type": requested_type, "limit": lim}) |
| if not resp.get("ok"): |
| return _helper_error(start_calls=start_calls, source="/api/trending", error=resp.get("error") or "trending fetch failed") |
|
|
| payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} |
| rows = payload.get("recentlyTrending") if isinstance(payload.get("recentlyTrending"), list) else [] |
|
|
| items: list[dict[str, Any]] = [] |
| default_row_type = requested_type if requested_type != "all" else "model" |
| for idx, row in enumerate(rows[:lim], start=1): |
| if not isinstance(row, dict): |
| continue |
| repo = row.get("repoData") if isinstance(row.get("repoData"), dict) else {} |
| items.append(_normalize_trending_row(repo, default_row_type, rank=idx)) |
|
|
| api = _get_hf_api_client() |
| enriched_items: list[dict[str, Any]] = [] |
| enrichment_failures: list[dict[str, Any]] = [] |
| for item in items: |
| repo_id = item.get("repo_id") |
| if not isinstance(repo_id, str) or not repo_id: |
| enriched_items.append(item) |
| continue |
|
|
| item_repo_type = item.get("repo_type") |
| if isinstance(item_repo_type, str) and item_repo_type in {"model", "dataset", "space"}: |
| attempt_types = [item_repo_type] |
| else: |
| attempt_types = ["model", "dataset", "space"] |
|
|
| detail_row, failure = _resolve_repo_detail_row(api, repo_id, attempt_types) |
| if detail_row is None: |
| enriched_items.append(item) |
| if failure is not None: |
| enrichment_failures.append(failure) |
| continue |
|
|
| merged = dict(detail_row) |
| trending_score = item.get("trending_score") |
| if trending_score is not None: |
| merged["trending_score"] = trending_score |
| if item.get("trending_rank") is not None: |
| merged["trending_rank"] = item.get("trending_rank") |
| enriched_items.append(merged) |
|
|
| items = enriched_items |
| items = _apply_where(items, where) |
| matched = len(items) |
| items = _project_repo_items(items[:lim], fields) |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source="/api/trending", |
| items=items, |
| repo_type=requested_type, |
| limit=lim, |
| scanned=len(rows), |
| matched=matched, |
| returned=len(items), |
| trending_score_available=any(item.get("trending_score") is not None for item in items), |
| ordered_ranking=True, |
| failures=enrichment_failures or None, |
| ) |
|
|
| async def hf_collections_search( |
| query: str | None = None, |
| owner: str | None = None, |
| return_limit: int = 20, |
| count_only: bool = False, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| default_return = _policy_int("hf_collections_search", "default_return", 20) |
| max_return = _policy_int("hf_collections_search", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT) |
|
|
| if count_only: |
| return_limit = 0 |
|
|
| lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return) |
| owner_clean = str(owner or "").strip() or None |
| fetch_lim = max_return if lim == 0 or owner_clean else lim |
| if owner_clean: |
| fetch_lim = min(fetch_lim, 100) |
|
|
| term = str(query or "").strip() |
| if not term and owner_clean: |
| term = owner_clean |
| if not term: |
| return _helper_error(start_calls=start_calls, source="/api/collections", error="query or owner is required") |
|
|
| params: dict[str, Any] = {"limit": fetch_lim} |
| if term: |
| params["q"] = term |
| if owner_clean: |
| params["owner"] = owner_clean |
|
|
| resp = _host_raw_call("/api/collections", params=params) |
| if not resp.get("ok"): |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/collections", |
| error=resp.get("error") or "collections fetch failed", |
| ) |
|
|
| payload = resp.get("data") if isinstance(resp.get("data"), list) else [] |
| items: list[dict[str, Any]] = [] |
| for row in payload[:fetch_lim]: |
| if not isinstance(row, dict): |
| continue |
| owner = _author_from_any(row.get("owner")) or _author_from_any(row.get("ownerData")) |
| if not owner and isinstance(row.get("slug"), str) and "/" in str(row.get("slug")): |
| owner = str(row.get("slug")).split("/", 1)[0] |
| if owner_clean is not None and owner != owner_clean: |
| continue |
|
|
| owner_payload = row.get("owner") if isinstance(row.get("owner"), dict) else {} |
| collection_items = row.get("items") if isinstance(row.get("items"), list) else [] |
| slug = row.get("slug") |
| items.append( |
| { |
| "collection_id": slug, |
| "slug": slug, |
| "title": row.get("title"), |
| "owner": owner, |
| "owner_type": owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None, |
| "description": row.get("description"), |
| "gating": row.get("gating"), |
| "last_updated": row.get("lastUpdated"), |
| "item_count": len(collection_items), |
| } |
| ) |
|
|
| items = _apply_where(items, where) |
| total_matched = len(items) |
| items = items[:lim] |
| items = _project_collection_items(items, fields) |
| truncated = (lim > 0 and total_matched > lim) or (lim == 0 and len(payload) >= fetch_lim) |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source="/api/collections", |
| items=items, |
| scanned=len(payload), |
| matched=total_matched, |
| returned=len(items), |
| total=len(payload), |
| total_matched=total_matched, |
| total_population=len(payload), |
| truncated=truncated, |
| complete=not truncated, |
| query=term, |
| owner=owner_clean, |
| ) |
|
|
| async def hf_collection_items( |
| collection_id: str, |
| repo_types: list[str] | None = None, |
| return_limit: int = 100, |
| count_only: bool = False, |
| where: dict[str, Any] | None = None, |
| fields: list[str] | None = None, |
| ) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| default_return = _policy_int("hf_collection_items", "default_return", 100) |
| max_return = _policy_int("hf_collection_items", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT) |
|
|
| cid = str(collection_id or "").strip() |
| if not cid: |
| return _helper_error( |
| start_calls=start_calls, |
| source="/api/collections/<collection_id>", |
| error="collection_id is required", |
| ) |
|
|
| if count_only: |
| return_limit = 0 |
|
|
| lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return) |
|
|
| allowed_repo_types: set[str] | None = None |
| try: |
| raw_repo_types = _coerce_str_list(repo_types) if repo_types is not None else [] |
| except ValueError as e: |
| return _helper_error(start_calls=start_calls, source=f"/api/collections/{cid}", error=e, collection_id=cid) |
| if raw_repo_types: |
| allowed_repo_types = set() |
| for raw in raw_repo_types: |
| canonical = _canonical_repo_type(raw, default="") |
| if canonical not in {"model", "dataset", "space"}: |
| return _helper_error( |
| start_calls=start_calls, |
| source=f"/api/collections/{cid}", |
| error=f"Unsupported repo_type '{raw}'", |
| collection_id=cid, |
| ) |
| allowed_repo_types.add(canonical) |
|
|
| endpoint = f"/api/collections/{cid}" |
| resp = _host_raw_call(endpoint) |
| if not resp.get("ok"): |
| return _helper_error( |
| start_calls=start_calls, |
| source=endpoint, |
| error=resp.get("error") or "collection fetch failed", |
| collection_id=cid, |
| ) |
|
|
| payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} |
| raw_items = payload.get("items") if isinstance(payload.get("items"), list) else [] |
| owner = _author_from_any(payload.get("owner")) |
| owner_payload = payload.get("owner") if isinstance(payload.get("owner"), dict) else {} |
| if owner is None and "/" in cid: |
| owner = cid.split("/", 1)[0] |
|
|
| normalized: list[dict[str, Any]] = [] |
| for row in raw_items: |
| if not isinstance(row, dict): |
| continue |
| item = _normalize_collection_repo_item(row) |
| if item is None: |
| continue |
| repo_type = item.get("repo_type") |
| if allowed_repo_types is not None and repo_type not in allowed_repo_types: |
| continue |
| if not _item_matches_where(item, where): |
| continue |
| normalized.append(item) |
|
|
| total_matched = len(normalized) |
| items = [] if count_only else normalized[:lim] |
| items = _project_repo_items(items, fields) |
| truncated = lim > 0 and total_matched > lim |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source=endpoint, |
| items=items, |
| scanned=len(raw_items), |
| matched=total_matched, |
| returned=len(items), |
| total=len(raw_items), |
| total_matched=total_matched, |
| total_population=len(raw_items), |
| truncated=truncated, |
| complete=not truncated, |
| collection_id=cid, |
| title=payload.get("title"), |
| owner=owner, |
| owner_type=owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None, |
| repo_types=sorted(allowed_repo_types) if allowed_repo_types is not None else None, |
| ) |
|
|
| async def hf_runtime_capabilities(section: str | None = None) -> dict[str, Any]: |
| start_calls = call_count["n"] |
| internal_helper_used["used"] = True |
|
|
| def _render_annotation(annotation: Any) -> str: |
| if annotation is inspect.Signature.empty: |
| return "Any" |
| return str(annotation) |
|
|
| def _render_default(default: Any) -> str | None: |
| if default is inspect.Signature.empty: |
| return None |
| return repr(default) |
|
|
| def _signature_payload(fn: Callable[..., Any]) -> dict[str, Any]: |
| signature = inspect.signature(fn) |
| parameters: list[dict[str, Any]] = [] |
| for parameter in signature.parameters.values(): |
| item: dict[str, Any] = { |
| "name": parameter.name, |
| "kind": str(parameter.kind).replace("Parameter.", "").lower(), |
| "annotation": _render_annotation(parameter.annotation), |
| "required": parameter.default is inspect.Signature.empty, |
| } |
| default = _render_default(parameter.default) |
| if default is not None: |
| item["default"] = default |
| parameters.append(item) |
| return { |
| "parameters": parameters, |
| "returns": _render_annotation(signature.return_annotation), |
| } |
|
|
| helper_payload = { |
| name: _signature_payload(fn) |
| for name, fn in sorted(helper_functions.items()) |
| } |
|
|
| manifest: dict[str, Any] = { |
| "overview": { |
| "helper_count": len(helper_functions), |
| "supports_current_user": True, |
| "supports_raw_api_fallback": True, |
| "helper_result_envelope": { |
| "ok": "bool", |
| "item": "dict | None", |
| "items": "list[dict]", |
| "meta": "dict", |
| "error": "str | None", |
| }, |
| "raw_result_envelope": { |
| "result": "Any", |
| "meta": { |
| "ok": "bool", |
| "api_calls": "int", |
| "elapsed_ms": "int", |
| "limits_reached": "bool", |
| "limit_summary": "list[dict]", |
| }, |
| }, |
| }, |
| "helpers": helper_payload, |
| "fields": { |
| "profile": list(PROFILE_CANONICAL_FIELDS), |
| "repo": list(REPO_CANONICAL_FIELDS), |
| "user": list(USER_CANONICAL_FIELDS), |
| "actor": list(ACTOR_CANONICAL_FIELDS), |
| "activity": list(ACTIVITY_CANONICAL_FIELDS), |
| "collection": list(COLLECTION_CANONICAL_FIELDS), |
| }, |
| "aliases": { |
| "repo": dict(sorted(_REPO_FIELD_ALIASES.items())), |
| "user": dict(sorted(_USER_FIELD_ALIASES.items())), |
| "actor": dict(sorted(_ACTOR_FIELD_ALIASES.items())), |
| "collection": dict(sorted(_COLLECTION_FIELD_ALIASES.items())), |
| "sort_keys": dict(sorted(_SORT_KEY_ALIASES.items())), |
| }, |
| "limits": { |
| "default_timeout_sec": DEFAULT_TIMEOUT_SEC, |
| "default_max_calls": DEFAULT_MAX_CALLS, |
| "max_calls_limit": MAX_CALLS_LIMIT, |
| "output_items_truncation_limit": OUTPUT_ITEMS_TRUNCATION_LIMIT, |
| "graph_scan_limit_cap": GRAPH_SCAN_LIMIT_CAP, |
| "likes_scan_limit_cap": LIKES_SCAN_LIMIT_CAP, |
| "recent_activity_scan_max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES, |
| "trending_endpoint_max_limit": TRENDING_ENDPOINT_MAX_LIMIT, |
| "pagination_policy": { |
| helper_name: dict(sorted(policy.items())) |
| for helper_name, policy in sorted(PAGINATION_POLICY.items()) |
| }, |
| }, |
| "raw_api": { |
| "call_api": _signature_payload(call_api), |
| "allowed_methods": ["GET", "POST"], |
| "allowed_endpoint_patterns": list(ALLOWLIST_PATTERNS), |
| "helper_covered_endpoint_patterns": [ |
| {"pattern": pattern, "helper": helper_name} |
| for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS |
| ], |
| }, |
| "repo_search": { |
| "sort_keys": { |
| repo_type: sorted(keys) |
| for repo_type, keys in sorted(_REPO_SORT_KEYS.items()) |
| }, |
| "extra_args": { |
| repo_type: sorted(args) |
| for repo_type, args in sorted(_REPO_SEARCH_EXTRA_ARGS.items()) |
| }, |
| }, |
| } |
|
|
| allowed_sections = sorted(manifest) |
| requested = str(section or "").strip().lower() |
| if requested: |
| if requested not in manifest: |
| return _helper_error( |
| start_calls=start_calls, |
| source="internal://runtime-capabilities", |
| error=f"Unsupported section {section!r}. Allowed sections: {allowed_sections}", |
| section=section, |
| allowed_sections=allowed_sections, |
| ) |
| payload = { |
| "section": requested, |
| "content": manifest[requested], |
| "allowed_sections": allowed_sections, |
| } |
| else: |
| payload = { |
| "allowed_sections": allowed_sections, |
| **manifest, |
| } |
|
|
| return _helper_success( |
| start_calls=start_calls, |
| source="internal://runtime-capabilities", |
| items=[payload], |
| section=requested or None, |
| ) |
|
|
| m = pydantic_monty.Monty( |
| code, |
| inputs=["query", "max_calls"], |
| script_name="monty_agent.py", |
| type_check=False, |
| ) |
|
|
| def _collecting_wrapper(helper_name: str, fn: Callable[..., Any]) -> Callable[..., Any]: |
| async def wrapped(*args: Any, **kwargs: Any) -> Any: |
| result = await fn(*args, **kwargs) |
| summary = _summarize_limit_hit(helper_name, result) |
| if summary is not None and len(limit_summaries) < 20: |
| limit_summaries.append(summary) |
| return result |
|
|
| return wrapped |
|
|
| limits: pydantic_monty.ResourceLimits = { |
| "max_duration_secs": float(timeout_sec), |
| "max_memory": DEFAULT_MONTY_MAX_MEMORY, |
| "max_allocations": DEFAULT_MONTY_MAX_ALLOCATIONS, |
| "max_recursion_depth": DEFAULT_MONTY_MAX_RECURSION_DEPTH, |
| } |
|
|
| helper_functions = _resolve_helper_functions(locals()) |
|
|
| try: |
| result = await pydantic_monty.run_monty_async( |
| m, |
| inputs={"query": query, "max_calls": max_calls}, |
| external_functions={ |
| "call_api": call_api, |
| **{name: _collecting_wrapper(name, fn) for name, fn in helper_functions.items()}, |
| }, |
| limits=limits, |
| ) |
| except Exception as e: |
| raise MontyExecutionError(str(e), call_count["n"], trace) from e |
|
|
| if call_count["n"] == 0: |
| |
| |
| |
| |
| |
| if internal_helper_used["used"]: |
| return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} |
| if isinstance(result, dict) and result.get("ok") is True: |
| meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} |
| source = meta.get("source") |
| if isinstance(source, str) and source.startswith("internal://"): |
| return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} |
| if latest_helper_error is not None: |
| return {"output": _truncate_result_payload(latest_helper_error), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} |
| if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str): |
| return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} |
| raise MontyExecutionError("Code completed without calling any external API function", call_count["n"], trace) |
|
|
| if not any(step.get("ok") is True for step in trace): |
| |
| |
| |
| if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str): |
| return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} |
| raise MontyExecutionError( |
| "Code completed without a successful API call; refusing non-live fallback result", |
| call_count["n"], |
| trace, |
| ) |
|
|
| return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} |
|
|
|
|
| async def hf_hub_query( |
| query: str, |
| code: str, |
| max_calls: int = DEFAULT_MAX_CALLS, |
| timeout_sec: int = DEFAULT_TIMEOUT_SEC, |
| ) -> dict[str, Any]: |
| """Use natural-language queries to explore the Hugging Face Hub. |
| |
| Best for read-only Hub discovery, lookup, ranking, and relationship questions |
| across users, organizations, repositories, activity, followers, likes, |
| discussions, and collections. |
| """ |
| if not query or not query.strip(): |
| raise ValueError("query is required") |
| if not code or not code.strip(): |
| raise ValueError("code is required") |
|
|
| max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) |
| code = code.strip() |
| try: |
| _validate_generated_code(code) |
|
|
| run = await _run_with_monty( |
| code=code, |
| query=query, |
| max_calls=max_calls, |
| strict_mode=INTERNAL_STRICT_MODE, |
| timeout_sec=timeout_sec, |
| ) |
| return { |
| "ok": True, |
| "data": run["output"], |
| "error": None, |
| "api_calls": run["api_calls"], |
| } |
| except MontyExecutionError as e: |
| return { |
| "ok": False, |
| "data": None, |
| "error": str(e), |
| "api_calls": e.api_calls, |
| } |
| except Exception as e: |
| return { |
| "ok": False, |
| "data": None, |
| "error": str(e), |
| "api_calls": 0, |
| } |
|
|
|
|
| async def hf_hub_query_raw( |
| query: str, |
| code: str, |
| max_calls: int = DEFAULT_MAX_CALLS, |
| timeout_sec: int = DEFAULT_TIMEOUT_SEC, |
| ) -> Any: |
| """Use natural-language queries to explore the Hugging Face Hub in raw mode. |
| |
| Best for read-only Hub discovery, lookup, ranking, and relationship |
| questions when the caller wants a runtime-owned raw envelope: |
| ``result`` contains the direct ``solve(...)`` output and ``meta`` contains |
| execution details such as timing, call counts, and limit summaries. |
| """ |
| if not query or not query.strip(): |
| raise ValueError("query is required") |
| if not code or not code.strip(): |
| raise ValueError("code is required") |
|
|
| max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) |
| code = code.strip() |
| started = time.perf_counter() |
| try: |
| _validate_generated_code(code) |
|
|
| run = await _run_with_monty( |
| code=code, |
| query=query, |
| max_calls=max_calls, |
| strict_mode=INTERNAL_STRICT_MODE, |
| timeout_sec=timeout_sec, |
| ) |
| elapsed_ms = int((time.perf_counter() - started) * 1000) |
| return _wrap_raw_result( |
| run["output"], |
| ok=True, |
| api_calls=run["api_calls"], |
| elapsed_ms=elapsed_ms, |
| limit_summaries=run.get("limit_summaries"), |
| ) |
| except MontyExecutionError as e: |
| elapsed_ms = int((time.perf_counter() - started) * 1000) |
| return _wrap_raw_result( |
| None, |
| ok=False, |
| api_calls=e.api_calls, |
| elapsed_ms=elapsed_ms, |
| error=str(e), |
| ) |
| except Exception as e: |
| elapsed_ms = int((time.perf_counter() - started) * 1000) |
| return _wrap_raw_result( |
| None, |
| ok=False, |
| api_calls=0, |
| elapsed_ms=elapsed_ms, |
| error=str(e), |
| ) |
|
|
| def _arg_parser() -> argparse.ArgumentParser: |
| p = argparse.ArgumentParser(description="Monty-backed API chaining tool (v2)") |
| p.add_argument("--query", required=True, help="Natural language query") |
| p.add_argument("--code", default=None, help="Inline Monty code to execute") |
| p.add_argument("--code-file", default=None, help="Path to .py file with Monty code to execute") |
| p.add_argument("--max-calls", type=int, default=DEFAULT_MAX_CALLS, help="Max external API/helper calls") |
| p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SEC) |
| return p |
|
|
|
|
| def main() -> int: |
| args = _arg_parser().parse_args() |
| code = args.code |
| if args.code_file: |
| with open(args.code_file, "r", encoding="utf-8") as f: |
| code = f.read() |
|
|
| if not code: |
| print(json.dumps({"ok": False, "error": "Either --code or --code-file is required"}, ensure_ascii=False)) |
| return 1 |
|
|
| try: |
| out = asyncio.run( |
| hf_hub_query( |
| query=args.query, |
| code=code, |
| max_calls=args.max_calls, |
| timeout_sec=args.timeout, |
| ) |
| ) |
| print(json.dumps(out, ensure_ascii=False)) |
| return 0 if out.get("ok") else 1 |
| except Exception as e: |
| print(json.dumps({"ok": False, "error": str(e)}, ensure_ascii=False)) |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|