#!/usr/bin/env python3 """Monty-backed API orchestration executor (v2). v2 goals: - HfApi-first helper implementations for endpoints covered by huggingface_hub. - Thin raw API fallback for uncovered endpoints (/api/recent-activity, /api/trending, /api/users//likes event stream, collections q-search). - Stable machine-first helper envelopes (`items` + optional `item`, no polymorphic payloads). """ from __future__ import annotations import argparse import asyncio import ast import inspect import json import os import re import time from itertools import islice from typing import Any, Callable, cast, get_args from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import Request, urlopen from huggingface_hub import HfApi from huggingface_hub.hf_api import DatasetSort_T, ModelSort_T, SpaceSort_T # Runtime-level execution limits. # - max_calls: hard cap on the total number of external helper/API calls a single # generated program may make in one run. # - timeout_sec: wall-clock timeout for the full Monty execution. DEFAULT_TIMEOUT_SEC = 90 # Default end-to-end timeout for one Monty run. DEFAULT_MAX_CALLS = 400 # Default external-call budget exposed to callers. MAX_CALLS_LIMIT = 400 # Absolute max external-call budget accepted by the runtime. INTERNAL_STRICT_MODE = False # Result-size vocabulary used throughout helper metadata: # - return_limit: how many rows the caller wants back from a helper. # - scan_limit / max_pages: how much source data a helper is willing to inspect # internally to answer the query. # - hard cap: an absolute runtime-imposed maximum on rows returned in one helper call. OUTPUT_ITEMS_TRUNCATION_LIMIT = 500 # Final output truncation for oversized `items` payloads. EXHAUSTIVE_HELPER_RETURN_HARD_CAP = 2_000 # Runtime hard cap for exhaustive-helper output rows. SELECTIVE_ENDPOINT_RETURN_HARD_CAP = 200 # Default cap for one-shot selective endpoint helpers. TRENDING_ENDPOINT_MAX_LIMIT = 20 # Upstream `/api/trending` endpoint maximum. # Exhaustive helper scan/page ceilings. These bound how much upstream data we # inspect, which is different from how many rows we return to the caller. GRAPH_SCAN_LIMIT_CAP = 10_000 # Max follower/member rows scanned in one helper call. LIKES_SCAN_LIMIT_CAP = 10_000 # Max like-event rows scanned in one helper call. LIKES_RANKING_WINDOW_DEFAULT = 40 # Default shortlist size when ranking likes by repo popularity. LIKES_ENRICHMENT_MAX_REPOS = 50 # Max liked repos enriched with extra repo-detail calls. RECENT_ACTIVITY_PAGE_SIZE = 100 # Rows requested per `/api/recent-activity` page. RECENT_ACTIVITY_SCAN_MAX_PAGES = 10 # Max recent-activity pages fetched in one helper call. # Compact summary helpers intentionally inspect less data than the full # exhaustive helpers so they remain fast and predictable. USER_SUMMARY_GRAPH_SCAN_LIMIT = 1_000 # Follower/following rows sampled for user summary. USER_SUMMARY_LIKES_SCAN_LIMIT = 1_000 # Like rows sampled for user summary. USER_SUMMARY_ACTIVITY_MAX_PAGES = 3 # Activity pages sampled for user summary. # Monty sandbox resource limits. These constrain the Python execution # environment itself rather than Hub/API pagination behavior. DEFAULT_MONTY_MAX_MEMORY = 64 * 1024 * 1024 # 64 MiB DEFAULT_MONTY_MAX_ALLOCATIONS = 250_000 # Approximate object-allocation ceiling in the sandbox. DEFAULT_MONTY_MAX_RECURSION_DEPTH = 100 # Python recursion limit inside the sandbox. _MODEL_SORT_KEYS = set(get_args(ModelSort_T)) or { "created_at", "downloads", "last_modified", "likes", "trending_score", } _DATASET_SORT_KEYS = set(get_args(DatasetSort_T)) or { "created_at", "downloads", "last_modified", "likes", "trending_score", } _SPACE_SORT_KEYS = set(get_args(SpaceSort_T)) or { "created_at", "last_modified", "likes", "trending_score", } _REPO_SORT_KEYS: dict[str, set[str]] = { "model": _MODEL_SORT_KEYS, "dataset": _DATASET_SORT_KEYS, "space": _SPACE_SORT_KEYS, } _SORT_KEY_ALIASES: dict[str, str] = { "createdat": "created_at", "created_at": "created_at", "created-at": "created_at", "downloads": "downloads", "likes": "likes", "lastmodified": "last_modified", "last_modified": "last_modified", "last-modified": "last_modified", "trendingscore": "trending_score", "trending_score": "trending_score", "trending-score": "trending_score", "trending": "trending_score", } _USER_FIELD_ALIASES: dict[str, str] = { "login": "username", "user": "username", "handle": "username", "name": "fullname", "full_name": "fullname", "full-name": "fullname", "is_pro": "isPro", "ispro": "isPro", "pro": "isPro", } _ACTOR_FIELD_ALIASES: dict[str, str] = { **_USER_FIELD_ALIASES, "entity_type": "type", "user_type": "type", "actor_type": "type", } # Repo helpers prefer canonical snake_case field names in generated code, but # tolerate common camelCase/raw endpoint aliases when callers project with # `fields=[...]`. _REPO_FIELD_ALIASES: dict[str, str] = { "repoid": "repo_id", "repotype": "repo_type", "repourl": "repo_url", "createdat": "created_at", "lastmodified": "last_modified", "pipelinetag": "pipeline_tag", "trendingscore": "trending_score", "libraryname": "library_name", "paperswithcodeid": "paperswithcode_id", } _COLLECTION_FIELD_ALIASES: dict[str, str] = { "collectionid": "collection_id", "lastupdated": "last_updated", "ownertype": "owner_type", "itemcount": "item_count", "author": "owner", } REPO_CANONICAL_FIELDS: tuple[str, ...] = ( "repo_id", "repo_type", "title", "author", "likes", "downloads", "created_at", "last_modified", "pipeline_tag", "repo_url", "tags", "library_name", "description", "paperswithcode_id", "sdk", "models", "datasets", "subdomain", ) USER_CANONICAL_FIELDS: tuple[str, ...] = ( "username", "fullname", "bio", "websiteUrl", "twitter", "github", "linkedin", "bluesky", "followers", "following", "likes", "isPro", ) PROFILE_CANONICAL_FIELDS: tuple[str, ...] = ( "handle", "entity_type", "display_name", "bio", "description", "avatar_url", "website_url", "twitter_url", "github_url", "linkedin_url", "bluesky_url", "followers_count", "following_count", "likes_count", "members_count", "models_count", "datasets_count", "spaces_count", "discussions_count", "papers_count", "upvotes_count", "organizations", "is_pro", "likes_sample", "activity_sample", ) ACTOR_CANONICAL_FIELDS: tuple[str, ...] = ( "username", "fullname", "isPro", "role", "type", ) ACTIVITY_CANONICAL_FIELDS: tuple[str, ...] = ( "event_type", "repo_id", "repo_type", "timestamp", ) COLLECTION_CANONICAL_FIELDS: tuple[str, ...] = ( "collection_id", "slug", "title", "owner", "owner_type", "description", "last_updated", "item_count", ) # Extra hf_repo_search kwargs intentionally supported as pass-through to # huggingface_hub.HfApi.list_models/list_datasets/list_spaces. # (Generic args like `query/search/sort/author/limit` are handled directly in # hf_repo_search signature and are not listed here.) _REPO_SEARCH_EXTRA_ARGS: dict[str, set[str]] = { "model": { "filter", "apps", "gated", "inference", "inference_provider", "model_name", "trained_dataset", "pipeline_tag", "emissions_thresholds", "expand", "full", "cardData", "card_data", # alias "fetch_config", }, "dataset": { "filter", "benchmark", "dataset_name", "gated", "language_creators", "language", "multilinguality", "size_categories", "task_categories", "task_ids", "expand", "full", }, "space": { "filter", "datasets", "models", "linked", "expand", "full", }, } # Rich default metadata for repo search. These raw endpoint expand keys are # normalized into the stable repo-row field surface below; keep them aligned # with `_build_repo_row(...)`, `_REPO_FIELD_ALIASES`, and the shared agent docs. _REPO_SEARCH_DEFAULT_EXPAND: dict[str, list[str]] = { "model": [ "author", "createdAt", "downloads", "gated", "lastModified", "library_name", "likes", "pipeline_tag", "private", "sha", "tags", "trendingScore", ], "dataset": [ "author", "createdAt", "description", "downloads", "gated", "lastModified", "likes", "paperswithcode_id", "private", "sha", "tags", "trendingScore", ], "space": [ "author", "createdAt", "datasets", "lastModified", "likes", "models", "private", "sdk", "sha", "subdomain", "tags", "trendingScore", ], } # Per-helper pagination defaults and ceilings. # These values answer questions like: # - "If the caller omits return_limit, how many rows should this helper return?" # - "How much upstream data may this helper scan/page through internally?" # - "What is the helper-specific max_return override, if any?" PAGINATION_POLICY: dict[str, dict[str, Any]] = { "hf_user_graph": { "scan_max": GRAPH_SCAN_LIMIT_CAP, "default_return": 1_000, "max_return": GRAPH_SCAN_LIMIT_CAP, }, "hf_org_members": {"scan_max": GRAPH_SCAN_LIMIT_CAP, "default_return": 1_000}, "hf_repo_likers": {"default_return": 1_000}, "hf_user_likes": { "scan_max": LIKES_SCAN_LIMIT_CAP, "default_return": 100, "ranking_default": LIKES_RANKING_WINDOW_DEFAULT, "enrich_max": LIKES_ENRICHMENT_MAX_REPOS, }, "hf_recent_activity": { "page_limit": RECENT_ACTIVITY_PAGE_SIZE, "max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES, "default_return": 100, }, "hf_repo_search": {"max_return": 5_000, "default_return": 20}, "hf_trending": {"max_return": TRENDING_ENDPOINT_MAX_LIMIT, "default_return": 20}, "hf_collections_search": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 20}, "hf_collection_items": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 100}, } # Single source of truth for the public helper surface exposed to generated # Monty code. Keep runtime helper resolution derived from this tuple. HELPER_EXTERNALS = ( "hf_runtime_capabilities", "hf_whoami", "hf_profile_summary", "hf_org_members", "hf_repo_search", "hf_user_graph", "hf_repo_likers", "hf_user_likes", "hf_recent_activity", "hf_repo_discussions", "hf_repo_discussion_details", "hf_repo_details", "hf_trending", "hf_collections_search", "hf_collection_items", ) HELPER_COVERED_ENDPOINT_PATTERNS: list[tuple[str, str]] = [ (r"^/api/whoami-v2$", "hf_whoami"), (r"^/api/trending$", "hf_trending"), (r"^/api/recent-activity$", "hf_recent_activity"), (r"^/api/models$", "hf_repo_search"), (r"^/api/datasets$", "hf_repo_search"), (r"^/api/spaces$", "hf_repo_search"), (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$", "hf_repo_details"), (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", "hf_repo_discussions"), (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", "hf_repo_discussion_details"), (r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", "hf_repo_likers"), (r"^/api/users/[^/]+/overview$", "hf_profile_summary"), (r"^/api/organizations/[^/]+/overview$", "hf_profile_summary"), (r"^/api/users/[^/]+/likes$", "hf_user_likes"), (r"^/api/users/[^/]+/(followers|following)$", "hf_user_graph"), (r"^/api/organizations/[^/]+/members$", "hf_org_members"), (r"^/api/organizations/[^/]+/followers$", "hf_user_graph"), (r"^/api/collections$", "hf_collections_search"), (r"^/api/collections/[^/]+$", "hf_collection_items"), (r"^/api/collections/[^/]+/[^/]+$", "hf_collection_items"), ] def _resolve_helper_functions(namespace: dict[str, Any]) -> dict[str, Callable[..., Any]]: resolved: dict[str, Callable[..., Any]] = {} for helper_name in HELPER_EXTERNALS: candidate = namespace.get(helper_name) if not callable(candidate): raise RuntimeError(f"Helper '{helper_name}' is not defined or not callable") resolved[helper_name] = cast(Callable[..., Any], candidate) return resolved ALLOWLIST_PATTERNS = [ r"^/api/whoami-v2$", r"^/api/trending$", r"^/api/daily_papers$", r"^/api/models$", r"^/api/datasets$", r"^/api/spaces$", r"^/api/models-tags-by-type$", r"^/api/datasets-tags-by-type$", r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$", r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$", r"^/api/users/[^/]+/overview$", r"^/api/users/[^/]+/socials$", r"^/api/users/[^/]+/followers$", r"^/api/users/[^/]+/following$", r"^/api/users/[^/]+/likes$", r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", r"^/api/organizations/[^/]+/overview$", r"^/api/organizations/[^/]+/members$", r"^/api/organizations/[^/]+/followers$", r"^/api/collections$", r"^/api/collections/[^/]+$", r"^/api/collections/[^/]+/[^/]+$", r"^/api/recent-activity$", ] STRICT_ALLOWLIST_PATTERNS = [ r"^/api/users/[^/]+/overview$", r"^/api/users/[^/]+/socials$", r"^/api/whoami-v2$", r"^/api/trending$", r"^/api/daily_papers$", r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", r"^/api/collections$", r"^/api/collections/[^/]+$", r"^/api/collections/[^/]+/[^/]+$", r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$", ] class MontyExecutionError(RuntimeError): def __init__(self, message: str, api_calls: int, trace: list[dict[str, Any]]): super().__init__(message) self.api_calls = api_calls self.trace = trace def _load_request_token() -> str | None: try: from fast_agent.mcp.auth.context import request_bearer_token # type: ignore token = request_bearer_token.get() if token: return token except Exception: pass return None def _load_token() -> str | None: token = _load_request_token() if token: return token return os.getenv("HF_TOKEN") or None def _normalize_endpoint(endpoint: str) -> str: ep = (endpoint or "").strip() if not ep: raise ValueError("endpoint is required") if "?" in ep: raise ValueError("endpoint must not include query string; use params") if ep.startswith("http://") or ep.startswith("https://"): raise ValueError("endpoint must be path-only") if not ep.startswith("/"): ep = "/" + ep if not ep.startswith("/api/"): ep = "/api" + ep if ep in {"/api/collections/search", "/api/collections/search/"}: ep = "/api/collections" if ".." in ep: raise ValueError("path traversal not allowed") return ep def _endpoint_allowed(endpoint: str, strict_mode: bool) -> bool: path = endpoint.split("?", 1)[0] patterns = STRICT_ALLOWLIST_PATTERNS if strict_mode else ALLOWLIST_PATTERNS return any(re.match(p, path) for p in patterns) def _json_best_effort(raw: bytes) -> Any: try: return json.loads(raw) except Exception: return raw.decode("utf-8", errors="replace") def _sanitize_params(endpoint: str, params: dict[str, Any] | None) -> dict[str, Any]: clean = dict(params or {}) path = endpoint.split("?", 1)[0] if path == "/api/collections": if "q" not in clean and "search" in clean: clean["q"] = clean.get("search") clean.pop("search", None) if path == "/api/trending": t = str(clean.get("type") or "").strip().lower() aliases = {"models": "model", "datasets": "dataset", "spaces": "space"} if t in aliases: clean["type"] = aliases[t] lim = clean.get("limit") if lim is not None: try: n = int(lim) except Exception: n = TRENDING_ENDPOINT_MAX_LIMIT clean["limit"] = max(1, min(n, TRENDING_ENDPOINT_MAX_LIMIT)) return clean lim = clean.get("limit") if lim is None: return clean try: n = int(lim) except Exception: return clean endpoint_limit_max = SELECTIVE_ENDPOINT_RETURN_HARD_CAP if re.match(r"^/api/users/[^/]+/(followers|following)$", path): endpoint_limit_max = GRAPH_SCAN_LIMIT_CAP elif re.match(r"^/api/users/[^/]+/likes$", path): endpoint_limit_max = LIKES_SCAN_LIMIT_CAP clean["limit"] = max(1, min(n, endpoint_limit_max)) return clean def _truncate_result_payload(output: Any) -> Any: if not isinstance(output, dict): return output items = output.get("items") if not isinstance(items, list) or len(items) <= OUTPUT_ITEMS_TRUNCATION_LIMIT: return output trimmed = dict(output) trimmed_items = items[:OUTPUT_ITEMS_TRUNCATION_LIMIT] trimmed["items"] = trimmed_items trimmed["item"] = trimmed_items[0] if len(trimmed_items) == 1 else None note = f"truncated items to first {OUTPUT_ITEMS_TRUNCATION_LIMIT} rows for token efficiency" steps = trimmed.get("steps") if isinstance(steps, list): trimmed["steps"] = [*steps, note] else: trimmed["steps"] = [note] return trimmed def _is_helper_envelope(output: Any) -> bool: return ( isinstance(output, dict) and isinstance(output.get("ok"), bool) and "items" in output and "meta" in output and "error" in output ) def _summarize_limit_hit(helper_name: str, result: Any) -> dict[str, Any] | None: if not _is_helper_envelope(result): return None meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} if not isinstance(meta, dict): return None truncated_by = str(meta.get("truncated_by") or "") limit_hit = any( [ meta.get("truncated") is True, meta.get("hard_cap_applied") is True, truncated_by in {"scan_limit", "page_limit", "multiple"}, ] ) if not limit_hit: return None summary: dict[str, Any] = { "helper": helper_name, "source": meta.get("source"), "returned": meta.get("returned"), "total": meta.get("total"), "truncated": meta.get("truncated"), "truncated_by": meta.get("truncated_by"), "more_available": meta.get("more_available"), "requested_return_limit": meta.get("requested_return_limit"), "applied_return_limit": meta.get("applied_return_limit"), "next_request_hint": meta.get("next_request_hint"), } if meta.get("scan_limit") is not None: summary["scan_limit"] = meta.get("scan_limit") if meta.get("applied_max_pages") is not None: summary["applied_max_pages"] = meta.get("applied_max_pages") return summary def _wrap_raw_result( result: Any, *, ok: bool, api_calls: int, elapsed_ms: int, limit_summaries: list[dict[str, Any]] | None = None, error: str | None = None, ) -> dict[str, Any]: hits = [dict(summary) for summary in (limit_summaries or [])[:10]] meta: dict[str, Any] = { "ok": ok, "api_calls": api_calls, "elapsed_ms": elapsed_ms, "limits_reached": bool(hits), "limit_summary": hits, } if error is not None: meta["error"] = error return { "result": result, "meta": meta, } def _clamp_int(value: Any, *, default: int, minimum: int, maximum: int) -> int: try: out = int(value) except Exception: out = default return max(minimum, min(out, maximum)) def _as_int(value: Any) -> int | None: try: return int(value) except Exception: return None def _canonical_repo_type(value: Any, *, default: str = "model") -> str: raw = str(value or "").strip().lower() aliases = { "model": "model", "models": "model", "dataset": "dataset", "datasets": "dataset", "space": "space", "spaces": "space", } return aliases.get(raw, default) def _normalize_repo_sort_key(repo_type: str, sort_value: Any) -> tuple[str | None, str | None]: raw = str(sort_value or "").strip() if not raw: return None, None key = _SORT_KEY_ALIASES.get(raw.lower().replace(" ", "").replace("__", "_")) if key is None: key = _SORT_KEY_ALIASES.get(raw.lower()) if key is None: return None, f"Invalid sort key '{raw}'" rt = _canonical_repo_type(repo_type) allowed = _REPO_SORT_KEYS.get(rt, set()) if key not in allowed: return None, f"Invalid sort key '{raw}' for repo_type='{rt}'. Allowed: {', '.join(sorted(allowed))}" return key, None def _repo_detail_endpoint(repo_type: str, repo_id: str) -> str: rt = _canonical_repo_type(repo_type) rid = str(repo_id or "").strip() if "/" not in rid: raise ValueError("repo_id must be owner/name") owner, name = rid.split("/", 1) if not owner or not name: raise ValueError("repo_id must be owner/name") return f"/api/{rt}s/{owner}/{name}" def _coerce_str_list(value: Any) -> list[str]: if value is None: return [] if isinstance(value, str): raw = [value] elif isinstance(value, (list, tuple, set)): raw = list(value) else: raise ValueError("Expected a string or list of strings") return [str(v).strip() for v in raw if str(v).strip()] def _optional_str_list(value: Any) -> list[str] | None: if value is None: return None if isinstance(value, str): out = [value.strip()] if value.strip() else [] return out or None if isinstance(value, (list, tuple, set)): out = [str(v).strip() for v in value if str(v).strip()] return out or None return None def _dt_to_str(value: Any) -> str | None: if value is None: return None iso = getattr(value, "isoformat", None) if callable(iso): try: return str(iso()) except Exception: pass return str(value) def _repo_web_url(repo_type: str, repo_id: str | None) -> str | None: if not isinstance(repo_id, str) or not repo_id: return None base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") rt = _canonical_repo_type(repo_type, default="") if rt == "dataset": return f"{base}/datasets/{repo_id}" if rt == "space": return f"{base}/spaces/{repo_id}" return f"{base}/{repo_id}" def _build_repo_row( *, repo_id: Any, repo_type: str, author: Any = None, title: Any = None, likes: Any = None, downloads: Any = None, created_at: Any = None, last_modified: Any = None, pipeline_tag: Any = None, private: Any = None, trending_score: Any = None, tags: Any = None, sha: Any = None, gated: Any = None, library_name: Any = None, description: Any = None, paperswithcode_id: Any = None, sdk: Any = None, models: Any = None, datasets: Any = None, subdomain: Any = None, ) -> dict[str, Any]: rt = _canonical_repo_type(repo_type) author_value = author if not isinstance(author_value, str) and isinstance(repo_id, str) and "/" in repo_id: author_value = repo_id.split("/", 1)[0] title_value = title if (not isinstance(title_value, str) or not title_value.strip()) and isinstance(repo_id, str) and repo_id: title_value = repo_id if rt == "space" else None return { "id": repo_id, "slug": repo_id, "repo_id": repo_id, "title": title_value, "repo_type": rt, "author": author_value, "likes": _as_int(likes), "downloads": _as_int(downloads), "created_at": _dt_to_str(created_at), "last_modified": _dt_to_str(last_modified), "pipeline_tag": pipeline_tag, "private": private, "trending_score": _as_int(trending_score) if trending_score is not None else None, "repo_url": _repo_web_url(rt, repo_id if isinstance(repo_id, str) else None), "tags": _optional_str_list(tags), "sha": sha, "gated": gated, "library_name": library_name, "description": description, "paperswithcode_id": paperswithcode_id, "sdk": sdk, "models": _optional_str_list(models), "datasets": _optional_str_list(datasets), "subdomain": subdomain, } def _normalize_repo_search_row(row: Any, repo_type: str) -> dict[str, Any]: return _build_repo_row( repo_id=getattr(row, "id", None), repo_type=repo_type, author=getattr(row, "author", None), title=getattr(row, "title", None), likes=getattr(row, "likes", None), downloads=getattr(row, "downloads", None), created_at=getattr(row, "created_at", None), last_modified=getattr(row, "last_modified", None), pipeline_tag=getattr(row, "pipeline_tag", None), private=getattr(row, "private", None), trending_score=getattr(row, "trending_score", None), tags=getattr(row, "tags", None), sha=getattr(row, "sha", None), gated=getattr(row, "gated", None), library_name=getattr(row, "library_name", None), description=getattr(row, "description", None), paperswithcode_id=getattr(row, "paperswithcode_id", None), sdk=getattr(row, "sdk", None), models=getattr(row, "models", None), datasets=getattr(row, "datasets", None), subdomain=getattr(row, "subdomain", None), ) def _normalize_repo_detail_row(detail: Any, repo_type: str, repo_id: str) -> dict[str, Any]: row = _normalize_repo_search_row(detail, repo_type) resolved_repo_id = row.get("repo_id") or repo_id row["id"] = row.get("id") or resolved_repo_id row["slug"] = row.get("slug") or resolved_repo_id row["repo_id"] = resolved_repo_id row["repo_url"] = _repo_web_url(repo_type, resolved_repo_id) return row def _normalize_trending_row(repo: dict[str, Any], default_repo_type: str, rank: int | None = None) -> dict[str, Any]: row = _build_repo_row( repo_id=repo.get("id"), repo_type=repo.get("type") or default_repo_type, author=repo.get("author"), title=repo.get("title"), likes=repo.get("likes"), downloads=repo.get("downloads"), created_at=repo.get("createdAt"), last_modified=repo.get("lastModified"), pipeline_tag=repo.get("pipeline_tag"), private=repo.get("private"), trending_score=repo.get("trendingScore"), tags=repo.get("tags"), sha=repo.get("sha"), gated=repo.get("gated"), library_name=repo.get("library_name"), description=repo.get("description"), paperswithcode_id=repo.get("paperswithcode_id"), sdk=repo.get("sdk"), models=repo.get("models"), datasets=repo.get("datasets"), subdomain=repo.get("subdomain"), ) if rank is not None: row["trending_rank"] = rank return row def _normalize_collection_repo_item(row: dict[str, Any]) -> dict[str, Any] | None: repo_id = row.get("id") or row.get("repoId") or row.get("repo_id") if not isinstance(repo_id, str) or not repo_id: return None repo_type = _canonical_repo_type(row.get("repoType") or row.get("repo_type") or row.get("type"), default="") if repo_type not in {"model", "dataset", "space"}: return None return _build_repo_row( repo_id=repo_id, repo_type=repo_type, author=row.get("author") or _author_from_any(row.get("authorData")), title=row.get("title"), likes=row.get("likes"), downloads=row.get("downloads"), created_at=row.get("createdAt") or row.get("created_at"), last_modified=row.get("lastModified") or row.get("last_modified"), pipeline_tag=row.get("pipeline_tag") or row.get("pipelineTag"), private=row.get("private"), tags=row.get("tags"), gated=row.get("gated"), library_name=row.get("library_name") or row.get("libraryName"), description=row.get("description"), paperswithcode_id=row.get("paperswithcode_id") or row.get("paperswithcodeId"), sdk=row.get("sdk"), models=row.get("models"), datasets=row.get("datasets"), subdomain=row.get("subdomain"), ) def _sort_repo_rows(rows: list[dict[str, Any]], sort_key: str | None) -> list[dict[str, Any]]: if not sort_key: return rows if sort_key in {"likes", "downloads", "trending_score"}: return sorted(rows, key=lambda row: _as_int(row.get(sort_key)) or -1, reverse=True) if sort_key in {"created_at", "last_modified"}: return sorted(rows, key=lambda row: str(row.get(sort_key) or ""), reverse=True) return rows def call_api_host( endpoint: str, *, method: str = "GET", params: dict[str, Any] | None = None, json_body: dict[str, Any] | None = None, timeout_sec: int = DEFAULT_TIMEOUT_SEC, strict_mode: bool = False, ) -> dict[str, Any]: method_u = method.upper().strip() if method_u not in {"GET", "POST"}: raise ValueError("Only GET and POST are supported") ep = _normalize_endpoint(endpoint) if not _endpoint_allowed(ep, strict_mode): raise ValueError(f"Endpoint not allowed: {ep}") params = _sanitize_params(ep, params) if ep == "/api/recent-activity": feed_type = str((params or {}).get("feedType", "")).strip().lower() if feed_type not in {"user", "org"}: raise ValueError("/api/recent-activity requires feedType=user|org") if not str((params or {}).get("entity", "")).strip(): raise ValueError("/api/recent-activity requires entity") base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") q = urlencode(params or {}, doseq=True) url = f"{base}{ep}" + (f"?{q}" if q else "") headers = {"Accept": "application/json"} token = _load_token() if token: headers["Authorization"] = f"Bearer {token}" data = None if method_u == "POST": headers["Content-Type"] = "application/json" data = json.dumps(json_body or {}).encode("utf-8") req = Request(url, method=method_u, headers=headers, data=data) try: with urlopen(req, timeout=timeout_sec) as res: payload = _json_best_effort(res.read()) return {"ok": True, "status": int(res.status), "url": url, "data": payload, "error": None} except HTTPError as e: payload = _json_best_effort(e.read()) err = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False)[:1000] return {"ok": False, "status": int(e.code), "url": url, "data": payload, "error": err} except URLError as e: return {"ok": False, "status": 0, "url": url, "data": None, "error": f"Network error: {e}"} def _validate_generated_code(code: str) -> None: if not code.strip(): raise ValueError("Generated code is empty") blocked_patterns: list[tuple[str, str]] = [ (r"(?m)^\s*import\s+\S", "import statement"), (r"(?m)^\s*from\s+\S+\s+import\s+\S", "from-import statement"), (r"\bexec\s*\(", "exec("), (r"\beval\s*\(", "eval("), (r"\bopen\s*\(", "open("), (r"\b__import__\b", "__import__"), (r"(?i)\bwhile\s+true\b", "while true"), ] for pattern, label in blocked_patterns: if re.search(pattern, code): raise ValueError(f"Generated code contains blocked pattern: {label}") try: parsed = compile( # noqa: S102 - compile is used for AST validation only. code, "", "exec", flags=ast.PyCF_ONLY_AST | ast.PyCF_ALLOW_TOP_LEVEL_AWAIT, dont_inherit=True, ) except SyntaxError as e: message = e.msg or "invalid syntax" raise ValueError(f"Generated code is not valid Python: {message}") from e if not isinstance(parsed, ast.Module): raise ValueError("Generated code must be a Python module") solve_defs = [ node for node in parsed.body if isinstance(node, ast.AsyncFunctionDef) and node.name == "solve" ] if not solve_defs: raise ValueError("Generated code must define `async def solve(query, max_calls): ...`.") def _valid_solve_signature(node: ast.AsyncFunctionDef) -> bool: args = node.args return ( not args.posonlyargs and len(args.args) == 2 and [arg.arg for arg in args.args] == ["query", "max_calls"] and args.vararg is None and not args.kwonlyargs and args.kwarg is None and not args.defaults and not args.kw_defaults ) if not any(_valid_solve_signature(node) for node in solve_defs): raise ValueError("`solve` must have signature `async def solve(query, max_calls): ...`.") if not parsed.body: raise ValueError("Generated code is empty") final_stmt = parsed.body[-1] valid_final_await = ( isinstance(final_stmt, ast.Expr) and isinstance(final_stmt.value, ast.Await) and isinstance(final_stmt.value.value, ast.Call) and isinstance(final_stmt.value.value.func, ast.Name) and final_stmt.value.value.func.id == "solve" and len(final_stmt.value.value.args) == 2 and not final_stmt.value.value.keywords and all(isinstance(arg, ast.Name) for arg in final_stmt.value.value.args) and [cast(ast.Name, arg).id for arg in final_stmt.value.value.args] == ["query", "max_calls"] ) if not valid_final_await: raise ValueError("Generated code must end with `await solve(query, max_calls)`.") def _preferred_helper_for_endpoint(endpoint: str) -> str | None: for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS: if re.match(pattern, endpoint): return helper_name return None def _call_api_endpoint_hint(expr: ast.AST | None) -> str | None: if isinstance(expr, ast.Constant) and isinstance(expr.value, str): return expr.value if isinstance(expr, ast.JoinedStr): literal_parts = [ value.value for value in expr.values if isinstance(value, ast.Constant) and isinstance(value.value, str) ] if literal_parts: return "".join(literal_parts) return None for node in ast.walk(parsed): if not isinstance(node, ast.Call): continue if not isinstance(node.func, ast.Name) or node.func.id != "call_api": continue endpoint_expr: ast.AST | None = node.args[0] if node.args else None for keyword in node.keywords: if keyword.arg == "endpoint": endpoint_expr = keyword.value break endpoint_hint = _call_api_endpoint_hint(endpoint_expr) if endpoint_hint and "/api/collections/" in endpoint_hint and "/items" in endpoint_hint: raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.") if endpoint_hint: preferred_helper = _preferred_helper_for_endpoint(endpoint_hint) if preferred_helper is not None: raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_hint!r}, ...)` for this endpoint family.") allowed_external_calls = ["call_api(", *[f"{name}(" for name in HELPER_EXTERNALS]] if not any(token in code for token in allowed_external_calls): raise ValueError("Generated code must call at least one external API function (call_api or hf_* helper)") helper_name_set = set(HELPER_EXTERNALS) for m in re.finditer(r"call_api\(\s*([\"'])\s*([^\"']+)\s*\1", code): endpoint_literal = str(m.group(2) or "").strip() if not endpoint_literal: continue if ( endpoint_literal in helper_name_set or endpoint_literal.startswith("hf_") or endpoint_literal.startswith("/hf_") or endpoint_literal.startswith("/api/hf_") ): raise ValueError("Do not call helper names through call_api; call hf_* helpers directly.") if re.match(r"^/api/collections/(?:[^/]+/)?[^/]+/items$", endpoint_literal): raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.") preferred_helper = _preferred_helper_for_endpoint(endpoint_literal) if preferred_helper is not None: raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_literal!r}, ...)` for this endpoint family.") if not endpoint_literal.startswith("/api/"): raise ValueError("call_api endpoint must be a raw path starting with '/api/...'.") async def _run_with_monty( *, code: str, query: str, max_calls: int, strict_mode: bool, timeout_sec: int, ) -> dict[str, Any]: try: import pydantic_monty except Exception as e: raise RuntimeError("pydantic_monty is not installed. Install with `uv pip install pydantic-monty`.") from e max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) call_count = {"n": 0} trace: list[dict[str, Any]] = [] limit_summaries: list[dict[str, Any]] = [] latest_helper_error: dict[str, Any] | None = None internal_helper_used = {"used": False} def _budget_remaining() -> int: return max(0, max_calls - call_count["n"]) def _policy_int(helper_name: str, key: str, default: int) -> int: cfg = PAGINATION_POLICY.get(helper_name) or {} try: return int(cfg.get(key, default)) except Exception: return int(default) def _consume_call(endpoint: str, method: str = "GET") -> int: if call_count["n"] >= max_calls: raise RuntimeError(f"Max API calls exceeded ({max_calls})") call_count["n"] += 1 return call_count["n"] def _trace_ok(idx: int, endpoint: str, method: str = "GET", status: int = 200) -> None: trace.append( { "call_index": idx, "depth": idx, "method": method, "endpoint": endpoint, "ok": True, "status": status, } ) def _trace_err(idx: int, endpoint: str, err: Any, method: str = "GET", status: int = 0) -> None: trace.append( { "call_index": idx, "depth": idx, "method": method, "endpoint": endpoint, "ok": False, "status": status, "error": str(err), } ) def _host_raw_call( endpoint: str, *, params: dict[str, Any] | None = None, method: str = "GET", json_body: dict[str, Any] | None = None, ) -> dict[str, Any]: idx = _consume_call(endpoint, method) try: resp = call_api_host( endpoint, method=method, params=params, json_body=json_body, timeout_sec=timeout_sec, strict_mode=strict_mode, ) if resp.get("ok"): _trace_ok(idx, endpoint, method=method, status=int(resp.get("status") or 200)) else: _trace_err(idx, endpoint, resp.get("error"), method=method, status=int(resp.get("status") or 0)) return resp except Exception as e: _trace_err(idx, endpoint, e, method=method, status=0) raise hf_api_client: HfApi | None = None def _get_hf_api_client() -> HfApi: nonlocal hf_api_client if hf_api_client is None: endpoint = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") hf_api_client = HfApi(endpoint=endpoint, token=_load_token()) return hf_api_client def _host_hf_call(endpoint: str, fn: Callable[[], Any]) -> Any: idx = _consume_call(endpoint, "GET") try: out = fn() _trace_ok(idx, endpoint, method="GET", status=200) return out except Exception as e: _trace_err(idx, endpoint, e, method="GET", status=0) raise def _helper_meta(start_calls: int, *, source: str, **extra: Any) -> dict[str, Any]: out = { "source": source, "normalized": True, "budget_used": max(0, call_count["n"] - start_calls), "budget_remaining": _budget_remaining(), } out.update(extra) return out def _derive_limit_metadata( *, requested_return_limit: int | None, applied_return_limit: int, default_limit_used: bool, requested_scan_limit: int | None = None, applied_scan_limit: int | None = None, requested_max_pages: int | None = None, applied_max_pages: int | None = None, ) -> dict[str, Any]: meta: dict[str, Any] = { "requested_return_limit": requested_return_limit, "applied_return_limit": applied_return_limit, "default_limit_used": default_limit_used, } if requested_scan_limit is not None or applied_scan_limit is not None: meta["requested_scan_limit"] = requested_scan_limit meta["scan_limit"] = applied_scan_limit meta["scan_limit_applied"] = requested_scan_limit != applied_scan_limit if requested_max_pages is not None or applied_max_pages is not None: meta["requested_max_pages"] = requested_max_pages meta["applied_max_pages"] = applied_max_pages meta["page_limit_applied"] = requested_max_pages != applied_max_pages if requested_return_limit is not None: meta["hard_cap_applied"] = applied_return_limit < requested_return_limit return meta def _derive_more_available(*, sample_complete: bool, exact_count: bool, returned: int, total: int | None) -> bool | str: if sample_complete: return False if exact_count and total is not None and returned < total: return True return "unknown" def _derive_truncated_by( *, hard_cap: bool = False, scan_limit_hit: bool = False, page_limit_hit: bool = False, return_limit_hit: bool = False, ) -> str: causes = [hard_cap, scan_limit_hit, page_limit_hit, return_limit_hit] if sum(1 for cause in causes if cause) > 1: return "multiple" if hard_cap: return "hard_cap" if scan_limit_hit: return "scan_limit" if page_limit_hit: return "page_limit" if return_limit_hit: return "return_limit" return "none" def _derive_can_request_more(*, sample_complete: bool, truncated_by: str) -> bool: if sample_complete: return False return truncated_by in {"return_limit", "scan_limit", "page_limit", "multiple"} def _derive_next_request_hint(*, truncated_by: str, more_available: bool | str, applied_return_limit: int, applied_scan_limit: int | None = None, applied_max_pages: int | None = None) -> str: if truncated_by == "return_limit": return f"Ask for return_limit>{applied_return_limit} to see more rows" if truncated_by == "scan_limit" and applied_scan_limit is not None: return f"Increase scan_limit above {applied_scan_limit} for broader coverage" if truncated_by == "page_limit" and applied_max_pages is not None: return f"Increase max_pages above {applied_max_pages} to continue paging" if truncated_by == "hard_cap": return "No more rows can be returned in a single call because a hard cap was applied" if truncated_by == "multiple": return "Increase the relevant return/page/scan bounds to improve coverage" if more_available is False: return "No more results available" if more_available == "unknown": return "More results may exist; narrow filters or raise scan/page bounds for better coverage" return "Ask for a larger limit to see more rows" def _resolve_exhaustive_limits( *, return_limit: int | None, count_only: bool, default_return: int, max_return: int, scan_limit: int | None = None, scan_cap: int | None = None, ) -> dict[str, Any]: requested_return_limit = None if count_only else return_limit effective_requested_return_limit = 0 if count_only else requested_return_limit out: dict[str, Any] = { "requested_return_limit": requested_return_limit, "applied_return_limit": _clamp_int( effective_requested_return_limit, default=default_return, minimum=0, maximum=max_return, ), "default_limit_used": requested_return_limit is None and not count_only, } out["hard_cap_applied"] = ( requested_return_limit is not None and out["applied_return_limit"] < requested_return_limit ) if scan_cap is not None: out["requested_scan_limit"] = scan_limit out["applied_scan_limit"] = _clamp_int( scan_limit, default=scan_cap, minimum=1, maximum=scan_cap, ) return out def _build_exhaustive_meta( *, base_meta: dict[str, Any], limit_plan: dict[str, Any], sample_complete: bool, exact_count: bool, truncated_by: str, more_available: bool | str, requested_max_pages: int | None = None, applied_max_pages: int | None = None, ) -> dict[str, Any]: meta = dict(base_meta) applied_return_limit = int(limit_plan["applied_return_limit"]) applied_scan_limit = limit_plan.get("applied_scan_limit") meta.update( { "complete": sample_complete, "exact_count": exact_count, "sample_complete": sample_complete, "more_available": more_available, "can_request_more": _derive_can_request_more( sample_complete=sample_complete, truncated_by=truncated_by, ), "truncated_by": truncated_by, "next_request_hint": _derive_next_request_hint( truncated_by=truncated_by, more_available=more_available, applied_return_limit=applied_return_limit, applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None, applied_max_pages=applied_max_pages, ), } ) meta.update( _derive_limit_metadata( requested_return_limit=limit_plan["requested_return_limit"], applied_return_limit=applied_return_limit, default_limit_used=bool(limit_plan["default_limit_used"]), requested_scan_limit=limit_plan.get("requested_scan_limit"), applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None, requested_max_pages=requested_max_pages, applied_max_pages=applied_max_pages, ) ) return meta def _overview_count_only_success( *, start_calls: int, source: str, total: int, limit_plan: dict[str, Any], base_meta: dict[str, Any], ) -> dict[str, Any]: sample_complete = True more_available = False truncated_by = "none" meta = _build_exhaustive_meta( base_meta={ **base_meta, "matched": total, "returned": 0, "total": total, "total_available": total, "total_matched": total, "truncated": False, }, limit_plan=limit_plan, sample_complete=sample_complete, exact_count=True, truncated_by=truncated_by, more_available=more_available, ) return _helper_success( start_calls=start_calls, source=source, items=[], meta=meta, ) def _build_exhaustive_result_meta( *, base_meta: dict[str, Any], limit_plan: dict[str, Any], matched_count: int, returned_count: int, exact_count: bool, count_only: bool = False, sample_complete: bool | None = None, more_available: bool | str | None = None, scan_limit_hit: bool = False, page_limit_hit: bool = False, truncated_extra: bool = False, requested_max_pages: int | None = None, applied_max_pages: int | None = None, ) -> dict[str, Any]: applied_return_limit = int(limit_plan["applied_return_limit"]) if count_only: effective_sample_complete = exact_count else: effective_sample_complete = ( sample_complete if isinstance(sample_complete, bool) else exact_count and matched_count <= applied_return_limit ) return_limit_hit = False if count_only else (applied_return_limit > 0 and matched_count > applied_return_limit) truncated_by = _derive_truncated_by( hard_cap=bool(limit_plan.get("hard_cap_applied")), scan_limit_hit=scan_limit_hit, page_limit_hit=page_limit_hit, return_limit_hit=return_limit_hit, ) truncated = truncated_by != "none" or truncated_extra total_value = _as_int(base_meta.get("total")) effective_more_available = more_available if count_only and exact_count: effective_more_available = False if effective_more_available is None: effective_more_available = _derive_more_available( sample_complete=effective_sample_complete, exact_count=exact_count, returned=returned_count, total=total_value, ) return _build_exhaustive_meta( base_meta={ **base_meta, "matched": matched_count, "returned": returned_count, "truncated": truncated, }, limit_plan=limit_plan, sample_complete=effective_sample_complete, exact_count=exact_count, truncated_by=truncated_by, more_available=effective_more_available, requested_max_pages=requested_max_pages, applied_max_pages=applied_max_pages, ) def _helper_success( *, start_calls: int, source: str, items: list[dict[str, Any]], cursor: str | None = None, meta: dict[str, Any] | None = None, **extra_meta: Any, ) -> dict[str, Any]: merged_meta = dict(meta or {}) merged_meta.update(extra_meta) if cursor is not None: merged_meta["cursor"] = cursor return { "ok": True, "item": items[0] if len(items) == 1 else None, "items": items, "meta": _helper_meta(start_calls, source=source, **merged_meta), "error": None, } def _helper_error(*, start_calls: int, source: str, error: Any, **meta: Any) -> dict[str, Any]: nonlocal latest_helper_error envelope = { "ok": False, "item": None, "items": [], "meta": _helper_meta(start_calls, source=source, **meta), "error": str(error), } latest_helper_error = envelope return envelope def _project_items( items: list[dict[str, Any]], fields: list[str] | None, aliases: dict[str, str] | None = None, ) -> list[dict[str, Any]]: if not isinstance(fields, list) or not fields: return items wanted = [str(f).strip() for f in fields if str(f).strip()] if not wanted: return items alias_map = {str(k).strip().lower(): str(v).strip() for k, v in (aliases or {}).items() if str(k).strip() and str(v).strip()} projected: list[dict[str, Any]] = [] for row in items: out: dict[str, Any] = {} for key in wanted: source_key = alias_map.get(key.lower(), key) value = row.get(source_key) if value is None: continue out[key] = value projected.append(out) return projected def _project_repo_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: return _project_items(items, fields, aliases=_REPO_FIELD_ALIASES) def _project_collection_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: return _project_items(items, fields, aliases=_COLLECTION_FIELD_ALIASES) def _project_user_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: return _project_items(items, fields, aliases=_USER_FIELD_ALIASES) def _project_actor_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: return _project_items(items, fields, aliases=_ACTOR_FIELD_ALIASES) def _item_matches_where(item: dict[str, Any], where: dict[str, Any] | None) -> bool: if not isinstance(where, dict) or not where: return True for key, cond in where.items(): val = item.get(str(key)) if isinstance(cond, dict): if "eq" in cond and val != cond.get("eq"): return False if "in" in cond: allowed = cond.get("in") if isinstance(allowed, (list, tuple, set)) and val not in allowed: return False if "contains" in cond: needle = cond.get("contains") if not isinstance(val, str) or not isinstance(needle, str) or needle not in val: return False if "icontains" in cond: needle = cond.get("icontains") if not isinstance(val, str) or not isinstance(needle, str) or needle.lower() not in val.lower(): return False if "gte" in cond: v = _as_int(val) c = _as_int(cond.get("gte")) if v is None or c is None or v < c: return False if "lte" in cond: v = _as_int(val) c = _as_int(cond.get("lte")) if v is None or c is None or v > c: return False continue if isinstance(cond, (list, tuple, set)): if val not in cond: return False continue if val != cond: return False return True def _apply_where(items: list[dict[str, Any]], where: dict[str, Any] | None) -> list[dict[str, Any]]: if not isinstance(where, dict) or not where: return items return [row for row in items if _item_matches_where(row, where)] def _helper_item(resp: dict[str, Any]) -> dict[str, Any] | None: item = resp.get("item") if isinstance(item, dict): return item items = resp.get("items") if isinstance(items, list) and items and isinstance(items[0], dict): return items[0] return None def _overview_count(item: dict[str, Any] | None, key: str) -> int | None: if not isinstance(item, dict): return None return _as_int(item.get(key)) def _summary_section( resp: dict[str, Any], *, count: int | None = None, default_sample: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: meta = resp.get("meta") section_meta = dict(meta) if isinstance(meta, dict) else {} sample = resp.get("items") section_sample = sample if isinstance(sample, list) else list(default_sample or []) section_count = count if section_count is None: count_exact = section_meta.get("exact_count") is True or section_meta.get("count_source") in {"overview", "endpoint"} if count_exact: for key in ("total", "total_matched", "matched"): section_count = _as_int(section_meta.get(key)) if section_count is not None: break if resp.get("ok") is not True: section_meta["error"] = str(resp.get("error") or "section fetch failed") section_sample = list(default_sample or []) return {"count": section_count, "sample": section_sample, "meta": section_meta} async def _resolve_username_or_current(username: str | None) -> tuple[str | None, str | None]: u = str(username or "").strip() if u: return u, None whoami = await hf_whoami() if whoami.get("ok") is not True: return None, str(whoami.get("error") or "Could not resolve current authenticated user") item = _helper_item(whoami) resolved = item.get("username") if isinstance(item, dict) else None if not isinstance(resolved, str) or not resolved.strip(): return None, "username was not provided and current authenticated user could not be resolved" return resolved.strip(), None def _normalize_user_likes_sort(sort: str | None) -> tuple[str | None, str | None]: raw = str(sort or "likedAt").strip() alias_map = { "": "likedAt", "likedat": "likedAt", "liked_at": "likedAt", "liked-at": "likedAt", "recency": "likedAt", "repolikes": "repoLikes", "repo_likes": "repoLikes", "repo-likes": "repoLikes", "repodownloads": "repoDownloads", "repo_downloads": "repoDownloads", "repo-downloads": "repoDownloads", } normalized = alias_map.get(raw.lower(), raw) if normalized not in {"likedAt", "repoLikes", "repoDownloads"}: return None, "sort must be one of likedAt, repoLikes, repoDownloads" return normalized, None def _author_from_any(value: Any) -> str | None: if isinstance(value, str): return value if isinstance(value, dict): for k in ("name", "username", "user", "login"): v = value.get(k) if isinstance(v, str) and v: return v return None def _clean_social_handle(value: Any) -> str | None: if not isinstance(value, str): return None cleaned = value.strip() if not cleaned: return None if re.match(r"^https?://", cleaned, flags=re.IGNORECASE): return cleaned return cleaned.lstrip("@") def _social_url(kind: str, value: Any) -> str | None: cleaned = _clean_social_handle(value) if cleaned is None: return None if re.match(r"^https?://", cleaned, flags=re.IGNORECASE): return cleaned if kind == "twitter": return f"https://twitter.com/{cleaned}" if kind == "github": return f"https://github.com/{cleaned}" if kind == "linkedin": if cleaned.startswith(("in/", "company/")): return f"https://www.linkedin.com/{cleaned}" return f"https://www.linkedin.com/in/{cleaned}" if kind == "bluesky": return f"https://bsky.app/profile/{cleaned}" return cleaned async def call_api( endpoint: str, params: dict[str, Any] | None = None, method: str = "GET", json_body: dict[str, Any] | None = None, ) -> dict[str, Any]: return _host_raw_call(endpoint, params=params, method=method, json_body=json_body) async def hf_whoami() -> dict[str, Any]: start_calls = call_count["n"] endpoint = "/api/whoami-v2" token = _load_token() if token is None: return _helper_error( start_calls=start_calls, source=endpoint, error=( "Current authenticated user is unavailable for this request. " "No request-scoped or fallback HF token was found." ), ) try: payload = _host_hf_call( endpoint, lambda: _get_hf_api_client().whoami(token=token, cache=True), ) except Exception as e: return _helper_error(start_calls=start_calls, source=endpoint, error=e) username = payload.get("name") or payload.get("user") or payload.get("username") item = {"username": username, "fullname": payload.get("fullname"), "isPro": payload.get("isPro")} items = [item] if isinstance(username, str) and username else [] return _helper_success( start_calls=start_calls, source=endpoint, items=items, scanned=1, matched=len(items), returned=len(items), truncated=False, ) async def hf_user_overview(username: str) -> dict[str, Any]: start_calls = call_count["n"] u = str(username or "").strip() if not u: return _helper_error(start_calls=start_calls, source="/api/users//overview", error="username is required") endpoint = f"/api/users/{u}/overview" try: obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_user_overview(u)) except Exception as e: return _helper_error(start_calls=start_calls, source=endpoint, error=e) twitter = getattr(obj, "twitter", None) or getattr(obj, "twitterUsername", None) github = getattr(obj, "github", None) or getattr(obj, "githubUsername", None) linkedin = getattr(obj, "linkedin", None) or getattr(obj, "linkedinUsername", None) bluesky = getattr(obj, "bluesky", None) or getattr(obj, "blueskyUsername", None) if _budget_remaining() > 0 and any(v in {None, ""} for v in [twitter, github, linkedin, bluesky]): socials_ep = f"/api/users/{u}/socials" socials_resp = _host_raw_call(socials_ep) if socials_resp.get("ok"): socials_payload = socials_resp.get("data") if isinstance(socials_resp.get("data"), dict) else {} handles = socials_payload.get("socialHandles") if isinstance(socials_payload.get("socialHandles"), dict) else {} twitter = twitter or handles.get("twitter") github = github or handles.get("github") linkedin = linkedin or handles.get("linkedin") bluesky = bluesky or handles.get("bluesky") orgs_raw = getattr(obj, "orgs", None) org_names: list[str] | None = None if isinstance(orgs_raw, (list, tuple, set)): names = [] for org in orgs_raw: if isinstance(org, str) and org.strip(): names.append(org.strip()) continue name = getattr(org, "name", None) if isinstance(name, str) and name.strip(): names.append(name.strip()) org_names = names or None twitter_handle = _clean_social_handle(twitter) github_handle = _clean_social_handle(github) linkedin_handle = _clean_social_handle(linkedin) bluesky_handle = _clean_social_handle(bluesky) item = { "username": obj.username or u, "fullname": obj.fullname, "bio": getattr(obj, "details", None), "avatarUrl": obj.avatar_url, "websiteUrl": getattr(obj, "websiteUrl", None), "twitter": _social_url("twitter", twitter_handle), "github": _social_url("github", github_handle), "linkedin": _social_url("linkedin", linkedin_handle), "bluesky": _social_url("bluesky", bluesky_handle), "twitterHandle": twitter_handle, "githubHandle": github_handle, "linkedinHandle": linkedin_handle, "blueskyHandle": bluesky_handle, "followers": _as_int(obj.num_followers), "following": _as_int(obj.num_following), "likes": _as_int(obj.num_likes), "models": _as_int(getattr(obj, "num_models", None)), "datasets": _as_int(getattr(obj, "num_datasets", None)), "spaces": _as_int(getattr(obj, "num_spaces", None)), "discussions": _as_int(getattr(obj, "num_discussions", None)), "papers": _as_int(getattr(obj, "num_papers", None)), "upvotes": _as_int(getattr(obj, "num_upvotes", None)), "orgs": org_names, "isPro": obj.is_pro, } return _helper_success( start_calls=start_calls, source=endpoint, items=[item], scanned=1, matched=1, returned=1, truncated=False, ) async def hf_org_overview(organization: str) -> dict[str, Any]: start_calls = call_count["n"] org = str(organization or "").strip() if not org: return _helper_error( start_calls=start_calls, source="/api/organizations//overview", error="organization is required", ) endpoint = f"/api/organizations/{org}/overview" try: obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_organization_overview(org)) except Exception as e: return _helper_error(start_calls=start_calls, source=endpoint, error=e) item = { "organization": obj.name or org, "displayName": obj.fullname, "avatarUrl": obj.avatar_url, "description": obj.details, "websiteUrl": getattr(obj, "websiteUrl", None), "followers": _as_int(obj.num_followers), "members": _as_int(obj.num_users), "models": _as_int(getattr(obj, "num_models", None)), "datasets": _as_int(getattr(obj, "num_datasets", None)), "spaces": _as_int(getattr(obj, "num_spaces", None)), } return _helper_success( start_calls=start_calls, source=endpoint, items=[item], scanned=1, matched=1, returned=1, truncated=False, ) async def hf_org_members( organization: str, return_limit: int | None = None, scan_limit: int | None = None, count_only: bool = False, where: dict[str, Any] | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] org = str(organization or "").strip() if not org: return _helper_error(start_calls=start_calls, source="/api/organizations//members", error="organization is required") default_return = _policy_int("hf_org_members", "default_return", 100) scan_cap = _policy_int("hf_org_members", "scan_max", GRAPH_SCAN_LIMIT_CAP) limit_plan = _resolve_exhaustive_limits( return_limit=return_limit, count_only=count_only, default_return=default_return, max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, scan_limit=scan_limit, scan_cap=scan_cap, ) ret_lim = int(limit_plan["applied_return_limit"]) scan_lim = int(limit_plan["applied_scan_limit"]) has_where = isinstance(where, dict) and bool(where) overview_total: int | None = None overview_source = f"/api/organizations/{org}/overview" if _budget_remaining() > 0: try: org_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_organization_overview(org)) overview_total = _as_int(getattr(org_obj, "num_users", None)) except Exception: overview_total = None if count_only and not has_where and overview_total is not None: return _overview_count_only_success( start_calls=start_calls, source=overview_source, total=overview_total, limit_plan=limit_plan, base_meta={ "scanned": 1, "count_source": "overview", "organization": org, }, ) endpoint = f"/api/organizations/{org}/members" try: rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_members(org), scan_lim))) except Exception as e: return _helper_error(start_calls=start_calls, source=endpoint, error=e, organization=org) normalized: list[dict[str, Any]] = [] for row in rows: handle = getattr(row, "username", None) if not isinstance(handle, str) or not handle: continue item = { "username": handle, "fullname": getattr(row, "fullname", None), "isPro": getattr(row, "is_pro", None), "role": getattr(row, "role", None), } normalized.append(item) normalized = _apply_where(normalized, where) observed_total = len(rows) scan_exhaustive = observed_total < scan_lim overview_list_mismatch = ( overview_total is not None and scan_exhaustive and observed_total != overview_total ) if has_where: exact_count = scan_exhaustive total = len(normalized) total_matched = len(normalized) else: if overview_total is not None: exact_count = True total = overview_total total_matched = overview_total else: exact_count = scan_exhaustive total = observed_total total_matched = observed_total total_available = overview_total if overview_total is not None else observed_total items = normalized[:ret_lim] scan_limit_hit = not exact_count and observed_total >= scan_lim count_source = "overview" if overview_total is not None and not has_where else "scan" sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0) more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) if not exact_count and scan_limit_hit: more_available = "unknown" if has_where else True items = _project_user_items(items, fields) meta = _build_exhaustive_result_meta( base_meta={ "scanned": observed_total, "total": total, "total_available": total_available, "total_matched": total_matched, "count_source": count_source, "lower_bound": bool(has_where and not exact_count), "overview_total": overview_total, "listed_total": observed_total, "overview_list_mismatch": overview_list_mismatch, "organization": org, }, limit_plan=limit_plan, matched_count=len(normalized), returned_count=len(items), exact_count=exact_count, count_only=count_only, sample_complete=sample_complete, more_available=more_available, scan_limit_hit=scan_limit_hit, ) return _helper_success(start_calls=start_calls, source=endpoint, items=items, meta=meta) async def hf_repo_search( query: str | None = None, repo_type: str | None = None, repo_types: list[str] | None = None, author: str | None = None, filters: list[str] | None = None, sort: str | None = None, limit: int = 20, where: dict[str, Any] | None = None, fields: list[str] | None = None, advanced: dict[str, Any] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] default_return = _policy_int("hf_repo_search", "default_return", 20) max_return = _policy_int("hf_repo_search", "max_return", SELECTIVE_ENDPOINT_RETURN_HARD_CAP) if repo_type is not None and repo_types is not None: return _helper_error( start_calls=start_calls, source="/api/repos", error="Pass either repo_type or repo_types, not both", ) if repo_types is None: if repo_type is None or not str(repo_type).strip(): requested_repo_types = ["model"] else: rt = _canonical_repo_type(repo_type, default="") if rt not in {"model", "dataset", "space"}: return _helper_error( start_calls=start_calls, source="/api/repos", error=f"Unsupported repo_type '{repo_type}'", ) requested_repo_types = [rt] else: raw_types = _coerce_str_list(repo_types) if not raw_types: return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_types must not be empty") requested_repo_types: list[str] = [] for raw in raw_types: rt = _canonical_repo_type(raw, default="") if rt not in {"model", "dataset", "space"}: return _helper_error( start_calls=start_calls, source="/api/repos", error=f"Unsupported repo_type '{raw}'", ) requested_repo_types.append(rt) filter_list = _coerce_str_list(filters) term = str(query or "").strip() author_clean = str(author or "").strip() or None requested_limit = limit lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return) limit_meta = _derive_limit_metadata( requested_return_limit=requested_limit, applied_return_limit=lim, default_limit_used=limit == default_return, ) hard_cap_applied = bool(limit_meta.get("hard_cap_applied")) if advanced is not None and not isinstance(advanced, dict): return _helper_error(start_calls=start_calls, source="/api/repos", error="advanced must be a dict when provided") if advanced is not None and len(requested_repo_types) != 1: return _helper_error( start_calls=start_calls, source="/api/repos", error="advanced may only be used with a single repo_type", ) sort_keys: dict[str, str | None] = {} for rt in requested_repo_types: sort_key, sort_error = _normalize_repo_sort_key(rt, sort) if sort_error: return _helper_error(start_calls=start_calls, source=f"/api/{rt}s", error=sort_error) sort_keys[rt] = sort_key all_items: list[dict[str, Any]] = [] scanned = 0 source_endpoints: list[str] = [] limit_boundary_hit = False api = _get_hf_api_client() for rt in requested_repo_types: endpoint = f"/api/{rt}s" source_endpoints.append(endpoint) extra_args = dict(advanced or {}) if len(requested_repo_types) == 1 else {} allowed_extra = _REPO_SEARCH_EXTRA_ARGS.get(rt, set()) unsupported = sorted(str(k) for k in extra_args.keys() if str(k) not in allowed_extra) if unsupported: return _helper_error( start_calls=start_calls, source=endpoint, error=( f"Unsupported advanced args for repo_type='{rt}': {unsupported}. " f"Allowed advanced args: {sorted(allowed_extra)}" ), ) if "card_data" in extra_args and "cardData" not in extra_args: extra_args["cardData"] = extra_args.pop("card_data") else: extra_args.pop("card_data", None) if not any(key in extra_args for key in ("expand", "full", "cardData", "fetch_config")): extra_args["expand"] = list(_REPO_SEARCH_DEFAULT_EXPAND[rt]) try: if rt == "model": payload = _host_hf_call( endpoint, lambda: list( api.list_models( search=term or None, author=author_clean, filter=filter_list or None, sort=sort_keys[rt], # type: ignore[arg-type] limit=lim, **extra_args, ) ), ) elif rt == "dataset": payload = _host_hf_call( endpoint, lambda: list( api.list_datasets( search=term or None, author=author_clean, filter=filter_list or None, sort=sort_keys[rt], # type: ignore[arg-type] limit=lim, **extra_args, ) ), ) else: payload = _host_hf_call( endpoint, lambda: list( api.list_spaces( search=term or None, author=author_clean, filter=filter_list or None, sort=sort_keys[rt], # type: ignore[arg-type] limit=lim, **extra_args, ) ), ) except Exception as e: return _helper_error(start_calls=start_calls, source=endpoint, error=e) scanned += len(payload) if len(payload) >= lim: limit_boundary_hit = True all_items.extend(_normalize_repo_search_row(row, rt) for row in payload[:lim]) all_items = _apply_where(all_items, where) combined_sort_key = next(iter(sort_keys.values()), None) all_items = _sort_repo_rows(all_items, combined_sort_key) matched = len(all_items) all_items = _project_repo_items(all_items[:lim], fields) more_available: bool | str = False truncated = False truncated_by = "none" next_request_hint: str | None = None if hard_cap_applied and scanned >= lim: truncated = True truncated_by = "hard_cap" more_available = "unknown" next_request_hint = f"Increase limit above {lim} to improve coverage" elif limit_boundary_hit: more_available = "unknown" next_request_hint = f"Increase limit above {lim} to check whether more rows exist" return _helper_success( start_calls=start_calls, source=",".join(source_endpoints), items=all_items, query=term or None, repo_types=requested_repo_types, filters=filter_list or None, sort=combined_sort_key, author=author_clean, limit=lim, scanned=scanned, matched=matched, returned=len(all_items), truncated=truncated, truncated_by=truncated_by, more_available=more_available, limit_boundary_hit=limit_boundary_hit, next_request_hint=next_request_hint, **limit_meta, ) async def _user_graph_helper( kind: str, username: str, pro_only: bool | None, return_limit: int | None, scan_limit: int | None, count_only: bool, where: dict[str, Any] | None, fields: list[str] | None, *, helper_name: str, ) -> dict[str, Any]: start_calls = call_count["n"] default_return = _policy_int(helper_name, "default_return", 100) scan_cap = _policy_int(helper_name, "scan_max", GRAPH_SCAN_LIMIT_CAP) max_return = _policy_int(helper_name, "max_return", EXHAUSTIVE_HELPER_RETURN_HARD_CAP) u = str(username or "").strip() if not u: return _helper_error(start_calls=start_calls, source=f"/api/users//{kind}", error="username is required") limit_plan = _resolve_exhaustive_limits( return_limit=return_limit, count_only=count_only, default_return=default_return, max_return=max_return, scan_limit=scan_limit, scan_cap=scan_cap, ) ret_lim = int(limit_plan["applied_return_limit"]) scan_lim = int(limit_plan["applied_scan_limit"]) has_where = isinstance(where, dict) and bool(where) filtered = (pro_only is not None) or has_where entity_type = "user" overview_total: int | None = None overview_source = f"/api/users/{u}/overview" if _budget_remaining() > 0: try: user_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_user_overview(u)) overview_total = _as_int(user_obj.num_followers if kind == "followers" else user_obj.num_following) except Exception: org_overview_source = f"/api/organizations/{u}/overview" try: org_obj = _host_hf_call(org_overview_source, lambda: _get_hf_api_client().get_organization_overview(u)) except Exception: overview_total = None else: entity_type = "organization" overview_source = org_overview_source if kind != "followers": return _helper_error( start_calls=start_calls, source=f"/api/organizations/{u}/{kind}", error="organization graph only supports relation='followers'; organizations do not expose a following list", relation=kind, organization=u, entity=u, entity_type=entity_type, ) overview_total = _as_int(getattr(org_obj, "num_followers", None)) if count_only and not filtered and overview_total is not None: return _overview_count_only_success( start_calls=start_calls, source=overview_source, total=overview_total, limit_plan=limit_plan, base_meta={ "scanned": 1, "count_source": "overview", "relation": kind, "pro_only": pro_only, "where_applied": has_where, "entity": u, "entity_type": entity_type, "username": u, "organization": u if entity_type == "organization" else None, }, ) endpoint = f"/api/users/{u}/{kind}" try: if entity_type == "organization": endpoint = f"/api/organizations/{u}/followers" rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_followers(u), scan_lim))) elif kind == "followers": rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_followers(u), scan_lim))) else: rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_following(u), scan_lim))) except Exception as e: return _helper_error( start_calls=start_calls, source=endpoint, error=e, relation=kind, username=u, entity=u, entity_type=entity_type, organization=u if entity_type == "organization" else None, ) normalized: list[dict[str, Any]] = [] for row in rows: handle = getattr(row, "username", None) if not isinstance(handle, str) or not handle: continue item = { "username": handle, "fullname": getattr(row, "fullname", None), "isPro": getattr(row, "is_pro", None), } if pro_only is True and item.get("isPro") is not True: continue if pro_only is False and item.get("isPro") is True: continue normalized.append(item) normalized = _apply_where(normalized, where) observed_total = len(rows) scan_exhaustive = observed_total < scan_lim overview_list_mismatch = ( overview_total is not None and scan_exhaustive and observed_total != overview_total ) if filtered: exact_count = scan_exhaustive total = len(normalized) total_matched = len(normalized) else: if overview_total is not None: exact_count = True total = overview_total total_matched = overview_total else: exact_count = scan_exhaustive total = observed_total total_matched = observed_total total_available = overview_total if overview_total is not None else observed_total items = normalized[:ret_lim] scan_limit_hit = not exact_count and observed_total >= scan_lim count_source = "overview" if overview_total is not None and not filtered else "scan" sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0) more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) if not exact_count and scan_limit_hit: more_available = "unknown" if filtered else True items = _project_user_items(items, fields) meta = _build_exhaustive_result_meta( base_meta={ "scanned": observed_total, "total": total, "total_available": total_available, "total_matched": total_matched, "count_source": count_source, "lower_bound": bool(filtered and not exact_count), "overview_total": overview_total, "listed_total": observed_total, "overview_list_mismatch": overview_list_mismatch, "relation": kind, "pro_only": pro_only, "where_applied": has_where, "entity": u, "entity_type": entity_type, "username": u, "organization": u if entity_type == "organization" else None, }, limit_plan=limit_plan, matched_count=len(normalized), returned_count=len(items), exact_count=exact_count, count_only=count_only, sample_complete=sample_complete, more_available=more_available, scan_limit_hit=scan_limit_hit, ) return _helper_success( start_calls=start_calls, source=endpoint, items=items, meta=meta, ) async def hf_profile_summary( handle: str | None = None, include: list[str] | None = None, likes_limit: int = 10, activity_limit: int = 10, ) -> dict[str, Any]: start_calls = call_count["n"] resolved_handle, resolve_error = await _resolve_username_or_current(handle) if resolve_error: return _helper_error(start_calls=start_calls, source="/api/users//overview", error=resolve_error) if not isinstance(resolved_handle, str): return _helper_error( start_calls=start_calls, source="/api/users//overview", error="handle was not provided and current authenticated user could not be resolved", ) try: requested_sections = ( {part.lower() for part in _coerce_str_list(include) if part.strip()} if include is not None else set() ) except ValueError as e: return _helper_error( start_calls=start_calls, source=f"/api/users/{resolved_handle}/overview", error=e, ) invalid_sections = sorted(requested_sections - {"likes", "activity"}) if invalid_sections: return _helper_error( start_calls=start_calls, source=f"/api/users/{resolved_handle}/overview", error=f"Unsupported include values: {invalid_sections}", ) likes_lim = _clamp_int(likes_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT) activity_lim = _clamp_int(activity_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT) section_errors: dict[str, str] = {} user_overview = await hf_user_overview(resolved_handle) if user_overview.get("ok") is True: overview_item = _helper_item(user_overview) or {"username": resolved_handle} item: dict[str, Any] = { "handle": str(overview_item.get("username") or resolved_handle), "entity_type": "user", "display_name": overview_item.get("fullname") or str(overview_item.get("username") or resolved_handle), "bio": overview_item.get("bio"), "avatar_url": overview_item.get("avatarUrl"), "website_url": overview_item.get("websiteUrl"), "twitter_url": overview_item.get("twitter"), "github_url": overview_item.get("github"), "linkedin_url": overview_item.get("linkedin"), "bluesky_url": overview_item.get("bluesky"), "followers_count": _overview_count(overview_item, "followers"), "following_count": _overview_count(overview_item, "following"), "likes_count": _overview_count(overview_item, "likes"), "models_count": _overview_count(overview_item, "models"), "datasets_count": _overview_count(overview_item, "datasets"), "spaces_count": _overview_count(overview_item, "spaces"), "discussions_count": _overview_count(overview_item, "discussions"), "papers_count": _overview_count(overview_item, "papers"), "upvotes_count": _overview_count(overview_item, "upvotes"), "organizations": overview_item.get("orgs"), "is_pro": overview_item.get("isPro"), } if "likes" in requested_sections: likes = await hf_user_likes( username=resolved_handle, return_limit=likes_lim, scan_limit=USER_SUMMARY_LIKES_SCAN_LIMIT, count_only=likes_lim == 0, sort="likedAt", fields=["liked_at", "repo_id", "repo_type", "repo_author", "repo_url"], ) item["likes_sample"] = likes.get("items") if likes.get("ok") is True else [] if likes.get("ok") is not True: section_errors["likes"] = str(likes.get("error") or "likes fetch failed") if "activity" in requested_sections: activity = await hf_recent_activity( feed_type="user", entity=resolved_handle, return_limit=activity_lim, max_pages=USER_SUMMARY_ACTIVITY_MAX_PAGES, count_only=activity_lim == 0, fields=["timestamp", "event_type", "repo_type", "repo_id"], ) item["activity_sample"] = activity.get("items") if activity.get("ok") is True else [] if activity.get("ok") is not True: section_errors["activity"] = str(activity.get("error") or "activity fetch failed") return _helper_success( start_calls=start_calls, source=f"/api/users/{resolved_handle}/overview", items=[item], scanned=1, matched=1, returned=1, truncated=False, handle=resolved_handle, entity_type="user", include=sorted(requested_sections), likes_limit=likes_lim, activity_limit=activity_lim, section_errors=section_errors or None, ) org_overview = await hf_org_overview(resolved_handle) if org_overview.get("ok") is True: overview_item = _helper_item(org_overview) or {"organization": resolved_handle} item = { "handle": str(overview_item.get("organization") or resolved_handle), "entity_type": "organization", "display_name": overview_item.get("displayName") or str(overview_item.get("organization") or resolved_handle), "description": overview_item.get("description"), "avatar_url": overview_item.get("avatarUrl"), "website_url": overview_item.get("websiteUrl"), "followers_count": _overview_count(overview_item, "followers"), "members_count": _overview_count(overview_item, "members"), "models_count": _overview_count(overview_item, "models"), "datasets_count": _overview_count(overview_item, "datasets"), "spaces_count": _overview_count(overview_item, "spaces"), } return _helper_success( start_calls=start_calls, source=f"/api/organizations/{resolved_handle}/overview", items=[item], scanned=1, matched=1, returned=1, truncated=False, handle=resolved_handle, entity_type="organization", include=[], ignored_includes=sorted(requested_sections) or None, ) error = user_overview.get("error") or org_overview.get("error") or "profile fetch failed" return _helper_error( start_calls=start_calls, source=f"/api/profiles/{resolved_handle}", error=error, handle=resolved_handle, ) async def hf_user_graph( username: str | None = None, relation: str = "followers", return_limit: int | None = None, scan_limit: int | None = None, count_only: bool = False, pro_only: bool | None = None, where: dict[str, Any] | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] rel = str(relation or "").strip().lower() or "followers" if rel not in {"followers", "following"}: return _helper_error( start_calls=start_calls, source="/api/users//followers", error="relation must be 'followers' or 'following'", ) resolved_username, resolve_error = await _resolve_username_or_current(username) if resolve_error: return _helper_error(start_calls=start_calls, source=f"/api/users//{rel}", error=resolve_error, relation=rel) if not isinstance(resolved_username, str): return _helper_error(start_calls=start_calls, source=f"/api/users//{rel}", error="username is required", relation=rel) return await _user_graph_helper( rel, resolved_username, pro_only, return_limit, scan_limit, count_only, where, fields, helper_name="hf_user_graph", ) async def hf_user_likes( username: str | None = None, repo_types: list[str] | None = None, return_limit: int | None = None, scan_limit: int | None = None, count_only: bool = False, where: dict[str, Any] | None = None, fields: list[str] | None = None, sort: str | None = None, ranking_window: int | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] default_return = _policy_int("hf_user_likes", "default_return", 100) scan_cap = _policy_int("hf_user_likes", "scan_max", LIKES_SCAN_LIMIT_CAP) ranking_default = _policy_int("hf_user_likes", "ranking_default", LIKES_RANKING_WINDOW_DEFAULT) enrich_cap = _policy_int("hf_user_likes", "enrich_max", LIKES_ENRICHMENT_MAX_REPOS) resolved_username, resolve_error = await _resolve_username_or_current(username) if resolve_error: return _helper_error(start_calls=start_calls, source="/api/users//likes", error=resolve_error) if not isinstance(resolved_username, str): return _helper_error(start_calls=start_calls, source="/api/users//likes", error="username is required") sort_key, sort_error = _normalize_user_likes_sort(sort) if sort_error: return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=sort_error) if sort_key is None: return _helper_error( start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error="sort must be one of likedAt, repoLikes, repoDownloads", ) limit_plan = _resolve_exhaustive_limits( return_limit=return_limit, count_only=count_only, default_return=default_return, max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, scan_limit=scan_limit, scan_cap=scan_cap, ) ret_lim = int(limit_plan["applied_return_limit"]) scan_lim = int(limit_plan["applied_scan_limit"]) allowed_repo_types: set[str] | None = None try: raw_repo_types: list[str] = _coerce_str_list(repo_types) if repo_types is not None else [] except ValueError as e: return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=e) if raw_repo_types: allowed_repo_types = set() for raw in raw_repo_types: canonical = _canonical_repo_type(raw, default="") if canonical not in {"model", "dataset", "space"}: return _helper_error( start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=f"Unsupported repo_type '{raw}'", ) allowed_repo_types.add(canonical) endpoint = f"/api/users/{resolved_username}/likes" resp = _host_raw_call(endpoint, params={"limit": scan_lim}) if not resp.get("ok"): return _helper_error( start_calls=start_calls, source=endpoint, error=resp.get("error") or "likes fetch failed", ) payload = resp.get("data") if isinstance(resp.get("data"), list) else [] scanned_rows = payload[:scan_lim] matched_rows: list[tuple[int, dict[str, Any]]] = [] for row in scanned_rows: if not isinstance(row, dict): continue repo = row.get("repo") if isinstance(row.get("repo"), dict) else {} repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else {} repo_id = repo_data.get("id") or repo_data.get("name") or repo.get("name") if not isinstance(repo_id, str) or not repo_id: continue repo_type = _canonical_repo_type(repo_data.get("type") or repo.get("type"), default="") if not repo_type: repo_type = _canonical_repo_type(repo.get("type"), default="model") if allowed_repo_types is not None and repo_type not in allowed_repo_types: continue repo_author = repo_data.get("author") if not isinstance(repo_author, str) and "/" in repo_id: repo_author = repo_id.split("/", 1)[0] item = { "likedAt": row.get("likedAt") or row.get("createdAt"), "liked_at": row.get("likedAt") or row.get("createdAt"), "repoId": repo_id, "repo_id": repo_id, "repoType": repo_type, "repo_type": repo_type, "repoAuthor": repo_author, "repo_author": repo_author, "repoLikes": _as_int(repo_data.get("likes")), "repo_likes": _as_int(repo_data.get("likes")), "repoDownloads": _as_int(repo_data.get("downloads")), "repo_downloads": _as_int(repo_data.get("downloads")), "likes": _as_int(repo_data.get("likes")), "downloads": _as_int(repo_data.get("downloads")), "repo_url": _repo_web_url(repo_type, repo_id), } if not _item_matches_where(item, where): continue matched_rows.append((len(matched_rows), item)) matched = len(matched_rows) scan_exhaustive = len(payload) < scan_lim exact_count = scan_exhaustive total_matched = matched total = total_matched effective_ranking_window: int | None = None ranking_complete = sort_key == "likedAt" and exact_count enriched = 0 selected_pairs: list[tuple[int, dict[str, Any]]] if count_only: selected_pairs = [] ranking_complete = False if matched > 0 else exact_count elif sort_key == "likedAt": selected_pairs = matched_rows[:ret_lim] else: metric = str(sort_key) requested_window = ranking_window if ranking_window is not None else ranking_default effective_ranking_window = _clamp_int( requested_window, default=ranking_default, minimum=1, maximum=enrich_cap, ) shortlist_size = min(effective_ranking_window, matched, scan_lim) shortlist = matched_rows[:shortlist_size] candidates = [ pair for pair in shortlist if pair[1].get(metric) is None and isinstance(pair[1].get("repoId"), str) and pair[1].get("repoType") in {"model", "dataset", "space"} ] enrich_budget = min(len(candidates), _budget_remaining(), shortlist_size) for _, item in candidates[:enrich_budget]: repo_type = str(item.get("repoType")) repo_id = str(item.get("repoId")) detail_endpoint = f"/api/{_canonical_repo_type(repo_type)}s/{repo_id}" try: detail = _host_hf_call( detail_endpoint, lambda rt=repo_type, rid=repo_id: ( _get_hf_api_client().model_info(rid) if _canonical_repo_type(rt) == "model" else _get_hf_api_client().dataset_info(rid) if _canonical_repo_type(rt) == "dataset" else _get_hf_api_client().space_info(rid) ), ) except Exception: continue likes = _as_int(getattr(detail, "likes", None)) downloads = _as_int(getattr(detail, "downloads", None)) if likes is not None: item["repoLikes"] = likes item["repo_likes"] = likes item["likes"] = likes if downloads is not None: item["repoDownloads"] = downloads item["repo_downloads"] = downloads item["downloads"] = downloads enriched += 1 def _ranking_key(pair: tuple[int, dict[str, Any]]) -> tuple[int, int, int]: idx, row = pair metric_value = _as_int(row.get(metric)) if metric_value is None: return (1, 0, idx) return (0, -metric_value, idx) ranked_shortlist = sorted(shortlist, key=_ranking_key) selected_pairs = ranked_shortlist[:ret_lim] ranking_complete = exact_count and shortlist_size >= matched and len(candidates) <= enrich_budget items = _project_items([row for _, row in selected_pairs], fields) popularity_present = sum(1 for _, row in selected_pairs if row.get("repoLikes") is not None) sample_complete = ( exact_count and ret_lim >= matched and (sort_key == "likedAt" or ranking_complete) and (not count_only or matched == 0) ) scan_limit_hit = not scan_exhaustive and len(payload) >= scan_lim more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) if scan_limit_hit: more_available = "unknown" if (allowed_repo_types is not None or where) else True meta = _build_exhaustive_result_meta( base_meta={ "scanned": len(scanned_rows), "total": total, "total_available": len(payload), "total_matched": total_matched, "count_source": "scan", "lower_bound": not exact_count, "enriched": enriched, "popularity_present": popularity_present, "sort_applied": sort_key, "ranking_window": effective_ranking_window, "ranking_complete": ranking_complete, "username": resolved_username, }, limit_plan=limit_plan, matched_count=matched, returned_count=len(items), exact_count=exact_count, count_only=count_only, sample_complete=sample_complete, more_available=more_available, scan_limit_hit=scan_limit_hit, truncated_extra=sort_key != "likedAt" and not ranking_complete, ) return _helper_success( start_calls=start_calls, source=endpoint, items=items, meta=meta, ) async def hf_repo_likers( repo_id: str, repo_type: str, return_limit: int | None = None, count_only: bool = False, pro_only: bool | None = None, where: dict[str, Any] | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] rid = str(repo_id or "").strip() if not rid: return _helper_error(start_calls=start_calls, source="/api/repos//likers", error="repo_id is required") rt = _canonical_repo_type(repo_type, default="") if rt not in {"model", "dataset", "space"}: return _helper_error( start_calls=start_calls, source=f"/api/repos/{rid}/likers", error=f"Unsupported repo_type '{repo_type}'", repo_id=rid, ) default_return = _policy_int("hf_repo_likers", "default_return", 1_000) requested_return_limit = return_limit default_limit_used = requested_return_limit is None and not count_only has_where = isinstance(where, dict) and bool(where) endpoint = f"/api/{rt}s/{rid}/likers" resp = _host_raw_call(endpoint) if not resp.get("ok"): return _helper_error( start_calls=start_calls, source=endpoint, error=resp.get("error") or "repo likers fetch failed", repo_id=rid, repo_type=rt, ) payload = resp.get("data") if isinstance(resp.get("data"), list) else [] normalized: list[dict[str, Any]] = [] for row in payload: if not isinstance(row, dict): continue username = row.get("user") or row.get("username") if not isinstance(username, str) or not username: continue item = { "username": username, "fullname": row.get("fullname"), "type": row.get("type") if isinstance(row.get("type"), str) and row.get("type") else "user", "isPro": row.get("isPro"), } if pro_only is True and item.get("isPro") is not True: continue if pro_only is False and item.get("isPro") is True: continue if not _item_matches_where(item, where): continue normalized.append(item) # /likers is a one-shot full-list endpoint: the Hub returns the liker rows in a # single response with no cursor/scan continuation. Keep the default output compact, # but do not apply the generic exhaustive hard cap here because it does not improve # upstream coverage or cost; the full liker set has already been fetched. if count_only: ret_lim = 0 elif requested_return_limit is None: ret_lim = default_return else: try: ret_lim = max(0, int(requested_return_limit)) except Exception: ret_lim = default_return limit_plan = { "requested_return_limit": requested_return_limit, "applied_return_limit": ret_lim, "default_limit_used": default_limit_used, "hard_cap_applied": False, } matched = len(normalized) items = [] if count_only else normalized[:ret_lim] return_limit_hit = ret_lim > 0 and matched > ret_lim truncated_by = _derive_truncated_by( hard_cap=False, return_limit_hit=return_limit_hit, ) sample_complete = matched <= ret_lim and (not count_only or matched == 0) truncated = truncated_by != "none" more_available = _derive_more_available( sample_complete=sample_complete, exact_count=True, returned=len(items), total=matched, ) items = _project_actor_items(items, fields) meta = _build_exhaustive_meta( base_meta={ "scanned": len(payload), "matched": matched, "returned": len(items), "total": matched, "total_available": len(payload), "total_matched": matched, "truncated": truncated, "count_source": "likers_list", "lower_bound": False, "repo_id": rid, "repo_type": rt, "pro_only": pro_only, "where_applied": has_where, "upstream_pagination": "none", }, limit_plan=limit_plan, sample_complete=sample_complete, exact_count=True, truncated_by=truncated_by, more_available=more_available, ) meta["hard_cap_applied"] = False return _helper_success( start_calls=start_calls, source=endpoint, items=items, meta=meta, ) async def hf_recent_activity( feed_type: str | None = None, entity: str | None = None, activity_types: list[str] | None = None, repo_types: list[str] | None = None, return_limit: int | None = None, max_pages: int | None = None, start_cursor: str | None = None, count_only: bool = False, where: dict[str, Any] | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] default_return = _policy_int("hf_recent_activity", "default_return", 100) page_cap = _policy_int("hf_recent_activity", "page_limit", RECENT_ACTIVITY_PAGE_SIZE) pages_cap = _policy_int("hf_recent_activity", "max_pages", RECENT_ACTIVITY_SCAN_MAX_PAGES) requested_max_pages = max_pages ft = str(feed_type or "").strip().lower() ent = str(entity or "").strip() if ft not in {"user", "org"}: if ft and not ent: ent = ft ft = "user" elif not ft and ent: ft = "user" if ft not in {"user", "org"}: return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="feed_type must be 'user' or 'org'") if not ent: return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="entity is required") limit_plan = _resolve_exhaustive_limits( return_limit=return_limit, count_only=count_only, default_return=default_return, max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, ) ret_lim = int(limit_plan["applied_return_limit"]) page_lim = page_cap pages_lim = _clamp_int(requested_max_pages, default=pages_cap, minimum=1, maximum=pages_cap) type_filter = {str(t).strip().lower() for t in (activity_types or []) if str(t).strip()} repo_filter = {_canonical_repo_type(t, default="") for t in (repo_types or []) if str(t).strip()} next_cursor = str(start_cursor).strip() if isinstance(start_cursor, str) and start_cursor.strip() else None items: list[dict[str, Any]] = [] scanned = 0 matched = 0 pages = 0 exhausted_feed = False stopped_for_budget = False while pages < pages_lim and (ret_lim == 0 or len(items) < ret_lim): if _budget_remaining() <= 0: stopped_for_budget = True break params: dict[str, Any] = {"feedType": ft, "entity": ent, "limit": page_lim} if next_cursor: params["cursor"] = next_cursor resp = _host_raw_call("/api/recent-activity", params=params) if not resp.get("ok"): if pages == 0: return _helper_error( start_calls=start_calls, source="/api/recent-activity", error=resp.get("error") or "recent-activity fetch failed", ) break payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} rows = payload.get("recentActivity") if isinstance(payload.get("recentActivity"), list) else [] cursor_raw = payload.get("cursor") next_cursor = cursor_raw if isinstance(cursor_raw, str) and cursor_raw else None pages += 1 if not rows: exhausted_feed = True break for row in rows: if not isinstance(row, dict): continue scanned += 1 typ = str(row.get("type") or "").strip().lower() repo_id = row.get("repoId") repo_type = row.get("repoType") repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else None repo_obj = row.get("repo") if isinstance(row.get("repo"), dict) else None if repo_id is None and repo_data is not None: repo_id = repo_data.get("id") or repo_data.get("name") if repo_id is None and repo_obj is not None: repo_id = repo_obj.get("id") or repo_obj.get("name") if repo_type is None and repo_data is not None: repo_type = repo_data.get("type") if repo_type is None and repo_obj is not None: repo_type = repo_obj.get("type") rt = _canonical_repo_type(repo_type, default="") if repo_type else "" if type_filter and typ not in type_filter: continue if repo_filter and rt not in repo_filter: continue item = { "time": row.get("time"), "timestamp": row.get("time"), "type": row.get("type"), "event_type": row.get("type"), "repoType": rt or repo_type, "repo_type": rt or repo_type, "repoId": repo_id, "repo_id": repo_id, } if not _item_matches_where(item, where): continue matched += 1 if len(items) < ret_lim: items.append(item) if not next_cursor: exhausted_feed = True break items = _project_items(items, fields) exact_count = exhausted_feed and not stopped_for_budget sample_complete = exact_count and ret_lim >= matched and (not count_only or matched == 0) page_limit_hit = next_cursor is not None and pages >= pages_lim and not exhausted_feed more_available: bool | str = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=matched if exact_count else None) if next_cursor is not None: more_available = True elif stopped_for_budget and not exact_count: more_available = "unknown" meta = _build_exhaustive_result_meta( base_meta={ "scanned": scanned, "total": matched, "total_matched": matched, "pages": pages, "count_source": "scan" if exact_count else "none", "lower_bound": not exact_count, "page_limit": page_lim, "stopped_for_budget": stopped_for_budget, "feed_type": ft, "entity": ent, }, limit_plan=limit_plan, matched_count=matched, returned_count=len(items), exact_count=exact_count, count_only=count_only, sample_complete=sample_complete, more_available=more_available, page_limit_hit=page_limit_hit, truncated_extra=stopped_for_budget, requested_max_pages=requested_max_pages, applied_max_pages=pages_lim, ) return _helper_success( start_calls=start_calls, source="/api/recent-activity", items=items, meta=meta, cursor=next_cursor, ) async def hf_repo_discussions(repo_type: str, repo_id: str, limit: int = 20) -> dict[str, Any]: start_calls = call_count["n"] rt = _canonical_repo_type(repo_type) rid = str(repo_id or "").strip() if "/" not in rid: return _helper_error(start_calls=start_calls, source="/api/.../discussions", error="repo_id must be owner/name") lim = _clamp_int(limit, default=20, minimum=1, maximum=SELECTIVE_ENDPOINT_RETURN_HARD_CAP) endpoint = f"/api/{rt}s/{rid}/discussions" try: discussions = _host_hf_call( endpoint, lambda: list(islice(_get_hf_api_client().get_repo_discussions(repo_id=rid, repo_type=rt), lim)), ) except Exception as e: return _helper_error(start_calls=start_calls, source=endpoint, error=e) items: list[dict[str, Any]] = [] for d in discussions: num = _as_int(getattr(d, "num", None)) items.append( { "num": num, "number": num, "discussionNum": num, "id": num, "title": getattr(d, "title", None), "author": getattr(d, "author", None), "createdAt": str(getattr(d, "created_at", None)) if getattr(d, "created_at", None) is not None else None, "status": getattr(d, "status", None), } ) return _helper_success( start_calls=start_calls, source=endpoint, items=items, scanned=len(items), matched=len(items), returned=len(items), truncated=False, total_count=None, ) async def hf_repo_discussion_details(repo_type: str, repo_id: str, discussion_num: int) -> dict[str, Any]: start_calls = call_count["n"] rt = _canonical_repo_type(repo_type) rid = str(repo_id or "").strip() if "/" not in rid: return _helper_error(start_calls=start_calls, source="/api/.../discussions/", error="repo_id must be owner/name") num = _as_int(discussion_num) if num is None: return _helper_error( start_calls=start_calls, source=f"/api/{rt}s/{rid}/discussions/", error="discussion_num must be an integer", ) endpoint = f"/api/{rt}s/{rid}/discussions/{num}" try: detail = _host_hf_call( endpoint, lambda: _get_hf_api_client().get_discussion_details( repo_id=rid, discussion_num=int(num), repo_type=rt, ), ) except Exception as e: return _helper_error(start_calls=start_calls, source=endpoint, error=e) comment_events: list[dict[str, Any]] = [] raw_events = getattr(detail, "events", None) if isinstance(raw_events, list): for event in raw_events: if str(getattr(event, "type", "")).strip().lower() != "comment": continue comment_events.append( { "author": getattr(event, "author", None), "createdAt": _dt_to_str(getattr(event, "created_at", None)), "text": getattr(event, "content", None), "rendered": getattr(event, "rendered", None), } ) latest_comment: dict[str, Any] | None = None if comment_events: latest_comment = max(comment_events, key=lambda row: str(row.get("createdAt") or "")) item: dict[str, Any] = { "num": num, "number": num, "discussionNum": num, "id": num, "repo_id": rid, "repo_type": rt, "title": getattr(detail, "title", None), "author": getattr(detail, "author", None), "createdAt": _dt_to_str(getattr(detail, "created_at", None)), "status": getattr(detail, "status", None), "url": getattr(detail, "url", None), "commentCount": len(comment_events), "latestCommentAuthor": latest_comment.get("author") if latest_comment else None, "latestCommentCreatedAt": latest_comment.get("createdAt") if latest_comment else None, "latestCommentText": latest_comment.get("text") if latest_comment else None, "latestCommentHtml": latest_comment.get("rendered") if latest_comment else None, "latest_comment_author": latest_comment.get("author") if latest_comment else None, "latest_comment_created_at": latest_comment.get("createdAt") if latest_comment else None, "latest_comment_text": latest_comment.get("text") if latest_comment else None, "latest_comment_html": latest_comment.get("rendered") if latest_comment else None, } return _helper_success( start_calls=start_calls, source=endpoint, items=[item], scanned=len(comment_events), matched=1, returned=1, truncated=False, total_comments=len(comment_events), ) def _resolve_repo_detail_row( api: HfApi, repo_id: str, attempt_types: list[str], ) -> tuple[dict[str, Any] | None, dict[str, Any] | None]: rid = str(repo_id or "").strip() if "/" not in rid: return None, {"repo_id": rid, "error": "repo_id must be owner/name"} resolved_type: str | None = None detail: Any = None last_endpoint = "/api/repos" errors: list[str] = [] for rt in attempt_types: endpoint = f"/api/{rt}s/{rid}" last_endpoint = endpoint try: detail = _host_hf_call( endpoint, lambda rt=rt, rid=rid: api.model_info(rid) if rt == "model" else api.dataset_info(rid) if rt == "dataset" else api.space_info(rid), ) resolved_type = rt break except Exception as e: errors.append(f"{rt}: {str(e)}") if resolved_type is None or detail is None: return None, { "repo_id": rid, "error": "; ".join(errors[:3]) if errors else "repo lookup failed", "attempted_repo_types": list(attempt_types), "source": last_endpoint, } return _normalize_repo_detail_row(detail, resolved_type, rid), None async def hf_repo_details( repo_id: str | None = None, repo_ids: list[str] | None = None, repo_type: str = "auto", fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] if repo_id is not None and repo_ids is not None: return _helper_error( start_calls=start_calls, source="/api/repos", error="Pass either repo_id or repo_ids, not both", ) requested_ids = [str(repo_id).strip()] if isinstance(repo_id, str) and str(repo_id).strip() else [] if repo_ids is not None: requested_ids = _coerce_str_list(repo_ids) if not requested_ids: return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_id or repo_ids is required") raw_type = str(repo_type or "auto").strip().lower() if raw_type in {"", "auto"}: base_attempt_types = ["model", "dataset", "space"] else: canonical_type = _canonical_repo_type(raw_type, default="") if canonical_type not in {"model", "dataset", "space"}: return _helper_error( start_calls=start_calls, source="/api/repos", error=f"Unsupported repo_type '{repo_type}'", ) base_attempt_types = [canonical_type] api = _get_hf_api_client() items: list[dict[str, Any]] = [] failures: list[dict[str, Any]] = [] for rid in requested_ids: row, failure = _resolve_repo_detail_row(api, rid, base_attempt_types) if row is None: if failure is not None: failures.append(failure) continue items.append(row) if not items: summary = failures[0]["error"] if failures else "repo lookup failed" return _helper_error( start_calls=start_calls, source="/api/repos", error=summary, failures=failures, repo_type=repo_type, ) items = _project_repo_items(items, fields) return _helper_success( start_calls=start_calls, source="/api/repos", items=items, repo_type=repo_type, requested_repo_ids=requested_ids, failures=failures or None, matched=len(items), returned=len(items), ) async def hf_trending( repo_type: str = "model", limit: int = 20, where: dict[str, Any] | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] default_return = _policy_int("hf_trending", "default_return", 20) max_return = _policy_int("hf_trending", "max_return", TRENDING_ENDPOINT_MAX_LIMIT) raw_type = str(repo_type or "model").strip().lower() if raw_type == "all": requested_type = "all" else: requested_type = _canonical_repo_type(raw_type, default="") if requested_type not in {"model", "dataset", "space"}: return _helper_error( start_calls=start_calls, source="/api/trending", error=f"Unsupported repo_type '{repo_type}'", ) lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return) resp = _host_raw_call("/api/trending", params={"type": requested_type, "limit": lim}) if not resp.get("ok"): return _helper_error(start_calls=start_calls, source="/api/trending", error=resp.get("error") or "trending fetch failed") payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} rows = payload.get("recentlyTrending") if isinstance(payload.get("recentlyTrending"), list) else [] items: list[dict[str, Any]] = [] default_row_type = requested_type if requested_type != "all" else "model" for idx, row in enumerate(rows[:lim], start=1): if not isinstance(row, dict): continue repo = row.get("repoData") if isinstance(row.get("repoData"), dict) else {} items.append(_normalize_trending_row(repo, default_row_type, rank=idx)) api = _get_hf_api_client() enriched_items: list[dict[str, Any]] = [] enrichment_failures: list[dict[str, Any]] = [] for item in items: repo_id = item.get("repo_id") if not isinstance(repo_id, str) or not repo_id: enriched_items.append(item) continue item_repo_type = item.get("repo_type") if isinstance(item_repo_type, str) and item_repo_type in {"model", "dataset", "space"}: attempt_types = [item_repo_type] else: attempt_types = ["model", "dataset", "space"] detail_row, failure = _resolve_repo_detail_row(api, repo_id, attempt_types) if detail_row is None: enriched_items.append(item) if failure is not None: enrichment_failures.append(failure) continue merged = dict(detail_row) trending_score = item.get("trending_score") if trending_score is not None: merged["trending_score"] = trending_score if item.get("trending_rank") is not None: merged["trending_rank"] = item.get("trending_rank") enriched_items.append(merged) items = enriched_items items = _apply_where(items, where) matched = len(items) items = _project_repo_items(items[:lim], fields) return _helper_success( start_calls=start_calls, source="/api/trending", items=items, repo_type=requested_type, limit=lim, scanned=len(rows), matched=matched, returned=len(items), trending_score_available=any(item.get("trending_score") is not None for item in items), ordered_ranking=True, failures=enrichment_failures or None, ) async def hf_collections_search( query: str | None = None, owner: str | None = None, return_limit: int = 20, count_only: bool = False, where: dict[str, Any] | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] default_return = _policy_int("hf_collections_search", "default_return", 20) max_return = _policy_int("hf_collections_search", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT) if count_only: return_limit = 0 lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return) owner_clean = str(owner or "").strip() or None fetch_lim = max_return if lim == 0 or owner_clean else lim if owner_clean: fetch_lim = min(fetch_lim, 100) term = str(query or "").strip() if not term and owner_clean: term = owner_clean if not term: return _helper_error(start_calls=start_calls, source="/api/collections", error="query or owner is required") params: dict[str, Any] = {"limit": fetch_lim} if term: params["q"] = term if owner_clean: params["owner"] = owner_clean resp = _host_raw_call("/api/collections", params=params) if not resp.get("ok"): return _helper_error( start_calls=start_calls, source="/api/collections", error=resp.get("error") or "collections fetch failed", ) payload = resp.get("data") if isinstance(resp.get("data"), list) else [] items: list[dict[str, Any]] = [] for row in payload[:fetch_lim]: if not isinstance(row, dict): continue owner = _author_from_any(row.get("owner")) or _author_from_any(row.get("ownerData")) if not owner and isinstance(row.get("slug"), str) and "/" in str(row.get("slug")): owner = str(row.get("slug")).split("/", 1)[0] if owner_clean is not None and owner != owner_clean: continue owner_payload = row.get("owner") if isinstance(row.get("owner"), dict) else {} collection_items = row.get("items") if isinstance(row.get("items"), list) else [] slug = row.get("slug") items.append( { "collection_id": slug, "slug": slug, "title": row.get("title"), "owner": owner, "owner_type": owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None, "description": row.get("description"), "gating": row.get("gating"), "last_updated": row.get("lastUpdated"), "item_count": len(collection_items), } ) items = _apply_where(items, where) total_matched = len(items) items = items[:lim] items = _project_collection_items(items, fields) truncated = (lim > 0 and total_matched > lim) or (lim == 0 and len(payload) >= fetch_lim) return _helper_success( start_calls=start_calls, source="/api/collections", items=items, scanned=len(payload), matched=total_matched, returned=len(items), total=len(payload), total_matched=total_matched, total_population=len(payload), truncated=truncated, complete=not truncated, query=term, owner=owner_clean, ) async def hf_collection_items( collection_id: str, repo_types: list[str] | None = None, return_limit: int = 100, count_only: bool = False, where: dict[str, Any] | None = None, fields: list[str] | None = None, ) -> dict[str, Any]: start_calls = call_count["n"] default_return = _policy_int("hf_collection_items", "default_return", 100) max_return = _policy_int("hf_collection_items", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT) cid = str(collection_id or "").strip() if not cid: return _helper_error( start_calls=start_calls, source="/api/collections/", error="collection_id is required", ) if count_only: return_limit = 0 lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return) allowed_repo_types: set[str] | None = None try: raw_repo_types = _coerce_str_list(repo_types) if repo_types is not None else [] except ValueError as e: return _helper_error(start_calls=start_calls, source=f"/api/collections/{cid}", error=e, collection_id=cid) if raw_repo_types: allowed_repo_types = set() for raw in raw_repo_types: canonical = _canonical_repo_type(raw, default="") if canonical not in {"model", "dataset", "space"}: return _helper_error( start_calls=start_calls, source=f"/api/collections/{cid}", error=f"Unsupported repo_type '{raw}'", collection_id=cid, ) allowed_repo_types.add(canonical) endpoint = f"/api/collections/{cid}" resp = _host_raw_call(endpoint) if not resp.get("ok"): return _helper_error( start_calls=start_calls, source=endpoint, error=resp.get("error") or "collection fetch failed", collection_id=cid, ) payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} raw_items = payload.get("items") if isinstance(payload.get("items"), list) else [] owner = _author_from_any(payload.get("owner")) owner_payload = payload.get("owner") if isinstance(payload.get("owner"), dict) else {} if owner is None and "/" in cid: owner = cid.split("/", 1)[0] normalized: list[dict[str, Any]] = [] for row in raw_items: if not isinstance(row, dict): continue item = _normalize_collection_repo_item(row) if item is None: continue repo_type = item.get("repo_type") if allowed_repo_types is not None and repo_type not in allowed_repo_types: continue if not _item_matches_where(item, where): continue normalized.append(item) total_matched = len(normalized) items = [] if count_only else normalized[:lim] items = _project_repo_items(items, fields) truncated = lim > 0 and total_matched > lim return _helper_success( start_calls=start_calls, source=endpoint, items=items, scanned=len(raw_items), matched=total_matched, returned=len(items), total=len(raw_items), total_matched=total_matched, total_population=len(raw_items), truncated=truncated, complete=not truncated, collection_id=cid, title=payload.get("title"), owner=owner, owner_type=owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None, repo_types=sorted(allowed_repo_types) if allowed_repo_types is not None else None, ) async def hf_runtime_capabilities(section: str | None = None) -> dict[str, Any]: start_calls = call_count["n"] internal_helper_used["used"] = True def _render_annotation(annotation: Any) -> str: if annotation is inspect.Signature.empty: return "Any" return str(annotation) def _render_default(default: Any) -> str | None: if default is inspect.Signature.empty: return None return repr(default) def _signature_payload(fn: Callable[..., Any]) -> dict[str, Any]: signature = inspect.signature(fn) parameters: list[dict[str, Any]] = [] for parameter in signature.parameters.values(): item: dict[str, Any] = { "name": parameter.name, "kind": str(parameter.kind).replace("Parameter.", "").lower(), "annotation": _render_annotation(parameter.annotation), "required": parameter.default is inspect.Signature.empty, } default = _render_default(parameter.default) if default is not None: item["default"] = default parameters.append(item) return { "parameters": parameters, "returns": _render_annotation(signature.return_annotation), } helper_payload = { name: _signature_payload(fn) for name, fn in sorted(helper_functions.items()) } manifest: dict[str, Any] = { "overview": { "helper_count": len(helper_functions), "supports_current_user": True, "supports_raw_api_fallback": True, "helper_result_envelope": { "ok": "bool", "item": "dict | None", "items": "list[dict]", "meta": "dict", "error": "str | None", }, "raw_result_envelope": { "result": "Any", "meta": { "ok": "bool", "api_calls": "int", "elapsed_ms": "int", "limits_reached": "bool", "limit_summary": "list[dict]", }, }, }, "helpers": helper_payload, "fields": { "profile": list(PROFILE_CANONICAL_FIELDS), "repo": list(REPO_CANONICAL_FIELDS), "user": list(USER_CANONICAL_FIELDS), "actor": list(ACTOR_CANONICAL_FIELDS), "activity": list(ACTIVITY_CANONICAL_FIELDS), "collection": list(COLLECTION_CANONICAL_FIELDS), }, "aliases": { "repo": dict(sorted(_REPO_FIELD_ALIASES.items())), "user": dict(sorted(_USER_FIELD_ALIASES.items())), "actor": dict(sorted(_ACTOR_FIELD_ALIASES.items())), "collection": dict(sorted(_COLLECTION_FIELD_ALIASES.items())), "sort_keys": dict(sorted(_SORT_KEY_ALIASES.items())), }, "limits": { "default_timeout_sec": DEFAULT_TIMEOUT_SEC, "default_max_calls": DEFAULT_MAX_CALLS, "max_calls_limit": MAX_CALLS_LIMIT, "output_items_truncation_limit": OUTPUT_ITEMS_TRUNCATION_LIMIT, "graph_scan_limit_cap": GRAPH_SCAN_LIMIT_CAP, "likes_scan_limit_cap": LIKES_SCAN_LIMIT_CAP, "recent_activity_scan_max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES, "trending_endpoint_max_limit": TRENDING_ENDPOINT_MAX_LIMIT, "pagination_policy": { helper_name: dict(sorted(policy.items())) for helper_name, policy in sorted(PAGINATION_POLICY.items()) }, }, "raw_api": { "call_api": _signature_payload(call_api), "allowed_methods": ["GET", "POST"], "allowed_endpoint_patterns": list(ALLOWLIST_PATTERNS), "helper_covered_endpoint_patterns": [ {"pattern": pattern, "helper": helper_name} for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS ], }, "repo_search": { "sort_keys": { repo_type: sorted(keys) for repo_type, keys in sorted(_REPO_SORT_KEYS.items()) }, "extra_args": { repo_type: sorted(args) for repo_type, args in sorted(_REPO_SEARCH_EXTRA_ARGS.items()) }, }, } allowed_sections = sorted(manifest) requested = str(section or "").strip().lower() if requested: if requested not in manifest: return _helper_error( start_calls=start_calls, source="internal://runtime-capabilities", error=f"Unsupported section {section!r}. Allowed sections: {allowed_sections}", section=section, allowed_sections=allowed_sections, ) payload = { "section": requested, "content": manifest[requested], "allowed_sections": allowed_sections, } else: payload = { "allowed_sections": allowed_sections, **manifest, } return _helper_success( start_calls=start_calls, source="internal://runtime-capabilities", items=[payload], section=requested or None, ) m = pydantic_monty.Monty( code, inputs=["query", "max_calls"], script_name="monty_agent.py", type_check=False, ) def _collecting_wrapper(helper_name: str, fn: Callable[..., Any]) -> Callable[..., Any]: async def wrapped(*args: Any, **kwargs: Any) -> Any: result = await fn(*args, **kwargs) summary = _summarize_limit_hit(helper_name, result) if summary is not None and len(limit_summaries) < 20: limit_summaries.append(summary) return result return wrapped limits: pydantic_monty.ResourceLimits = { "max_duration_secs": float(timeout_sec), "max_memory": DEFAULT_MONTY_MAX_MEMORY, "max_allocations": DEFAULT_MONTY_MAX_ALLOCATIONS, "max_recursion_depth": DEFAULT_MONTY_MAX_RECURSION_DEPTH, } helper_functions = _resolve_helper_functions(locals()) try: result = await pydantic_monty.run_monty_async( m, inputs={"query": query, "max_calls": max_calls}, external_functions={ "call_api": call_api, **{name: _collecting_wrapper(name, fn) for name, fn in helper_functions.items()}, }, limits=limits, ) except Exception as e: raise MontyExecutionError(str(e), call_count["n"], trace) from e if call_count["n"] == 0: # Some current-user helpers can fail before any live API call is made # (for example when request-scoped auth is unavailable). If generated # code either returns that explicit helper error envelope or flattens it # into an empty fallback shape, preserve the helper-owned error instead # of replacing it with a generic zero-call runtime failure. if internal_helper_used["used"]: return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} if isinstance(result, dict) and result.get("ok") is True: meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} source = meta.get("source") if isinstance(source, str) and source.startswith("internal://"): return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} if latest_helper_error is not None: return {"output": _truncate_result_payload(latest_helper_error), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str): return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} raise MontyExecutionError("Code completed without calling any external API function", call_count["n"], trace) if not any(step.get("ok") is True for step in trace): # Allow explicit helper/live failure envelopes to be returned as-is. # This preserves concrete API error context (e.g. repo not found) while # still blocking fabricated successful fallback outputs. if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str): return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} raise MontyExecutionError( "Code completed without a successful API call; refusing non-live fallback result", call_count["n"], trace, ) return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} async def hf_hub_query( query: str, code: str, max_calls: int = DEFAULT_MAX_CALLS, timeout_sec: int = DEFAULT_TIMEOUT_SEC, ) -> dict[str, Any]: """Use natural-language queries to explore the Hugging Face Hub. Best for read-only Hub discovery, lookup, ranking, and relationship questions across users, organizations, repositories, activity, followers, likes, discussions, and collections. """ if not query or not query.strip(): raise ValueError("query is required") if not code or not code.strip(): raise ValueError("code is required") max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) code = code.strip() try: _validate_generated_code(code) run = await _run_with_monty( code=code, query=query, max_calls=max_calls, strict_mode=INTERNAL_STRICT_MODE, timeout_sec=timeout_sec, ) return { "ok": True, "data": run["output"], "error": None, "api_calls": run["api_calls"], } except MontyExecutionError as e: return { "ok": False, "data": None, "error": str(e), "api_calls": e.api_calls, } except Exception as e: return { "ok": False, "data": None, "error": str(e), "api_calls": 0, } async def hf_hub_query_raw( query: str, code: str, max_calls: int = DEFAULT_MAX_CALLS, timeout_sec: int = DEFAULT_TIMEOUT_SEC, ) -> Any: """Use natural-language queries to explore the Hugging Face Hub in raw mode. Best for read-only Hub discovery, lookup, ranking, and relationship questions when the caller wants a runtime-owned raw envelope: ``result`` contains the direct ``solve(...)`` output and ``meta`` contains execution details such as timing, call counts, and limit summaries. """ if not query or not query.strip(): raise ValueError("query is required") if not code or not code.strip(): raise ValueError("code is required") max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) code = code.strip() started = time.perf_counter() try: _validate_generated_code(code) run = await _run_with_monty( code=code, query=query, max_calls=max_calls, strict_mode=INTERNAL_STRICT_MODE, timeout_sec=timeout_sec, ) elapsed_ms = int((time.perf_counter() - started) * 1000) return _wrap_raw_result( run["output"], ok=True, api_calls=run["api_calls"], elapsed_ms=elapsed_ms, limit_summaries=run.get("limit_summaries"), ) except MontyExecutionError as e: elapsed_ms = int((time.perf_counter() - started) * 1000) return _wrap_raw_result( None, ok=False, api_calls=e.api_calls, elapsed_ms=elapsed_ms, error=str(e), ) except Exception as e: elapsed_ms = int((time.perf_counter() - started) * 1000) return _wrap_raw_result( None, ok=False, api_calls=0, elapsed_ms=elapsed_ms, error=str(e), ) def _arg_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="Monty-backed API chaining tool (v2)") p.add_argument("--query", required=True, help="Natural language query") p.add_argument("--code", default=None, help="Inline Monty code to execute") p.add_argument("--code-file", default=None, help="Path to .py file with Monty code to execute") p.add_argument("--max-calls", type=int, default=DEFAULT_MAX_CALLS, help="Max external API/helper calls") p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SEC) return p def main() -> int: args = _arg_parser().parse_args() code = args.code if args.code_file: with open(args.code_file, "r", encoding="utf-8") as f: code = f.read() if not code: print(json.dumps({"ok": False, "error": "Either --code or --code-file is required"}, ensure_ascii=False)) return 1 try: out = asyncio.run( hf_hub_query( query=args.query, code=code, max_calls=args.max_calls, timeout_sec=args.timeout, ) ) print(json.dumps(out, ensure_ascii=False)) return 0 if out.get("ok") else 1 except Exception as e: print(json.dumps({"ok": False, "error": str(e)}, ensure_ascii=False)) return 1 if __name__ == "__main__": raise SystemExit(main())