fastmcp3 / monty_api_tool_v2.py
evalstate's picture
evalstate HF Staff
Deploy fast-agent-pr 0.6.0 raw card to fastmcp3
f54040d verified
#!/usr/bin/env python3
"""Monty-backed API orchestration executor (v2).
v2 goals:
- HfApi-first helper implementations for endpoints covered by huggingface_hub.
- Thin raw API fallback for uncovered endpoints (/api/recent-activity, /api/trending,
/api/users/<u>/likes event stream, collections q-search).
- Stable machine-first helper envelopes (`items` + optional `item`, no polymorphic payloads).
"""
from __future__ import annotations
import argparse
import asyncio
import ast
import inspect
import json
import os
import re
import time
from itertools import islice
from typing import Any, Callable, cast, get_args
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from huggingface_hub import HfApi
from huggingface_hub.hf_api import DatasetSort_T, ModelSort_T, SpaceSort_T
# Runtime-level execution limits.
# - max_calls: hard cap on the total number of external helper/API calls a single
# generated program may make in one run.
# - timeout_sec: wall-clock timeout for the full Monty execution.
DEFAULT_TIMEOUT_SEC = 90 # Default end-to-end timeout for one Monty run.
DEFAULT_MAX_CALLS = 400 # Default external-call budget exposed to callers.
MAX_CALLS_LIMIT = 400 # Absolute max external-call budget accepted by the runtime.
INTERNAL_STRICT_MODE = False
# Result-size vocabulary used throughout helper metadata:
# - return_limit: how many rows the caller wants back from a helper.
# - scan_limit / max_pages: how much source data a helper is willing to inspect
# internally to answer the query.
# - hard cap: an absolute runtime-imposed maximum on rows returned in one helper call.
OUTPUT_ITEMS_TRUNCATION_LIMIT = 500 # Final output truncation for oversized `items` payloads.
EXHAUSTIVE_HELPER_RETURN_HARD_CAP = 2_000 # Runtime hard cap for exhaustive-helper output rows.
SELECTIVE_ENDPOINT_RETURN_HARD_CAP = 200 # Default cap for one-shot selective endpoint helpers.
TRENDING_ENDPOINT_MAX_LIMIT = 20 # Upstream `/api/trending` endpoint maximum.
# Exhaustive helper scan/page ceilings. These bound how much upstream data we
# inspect, which is different from how many rows we return to the caller.
GRAPH_SCAN_LIMIT_CAP = 10_000 # Max follower/member rows scanned in one helper call.
LIKES_SCAN_LIMIT_CAP = 10_000 # Max like-event rows scanned in one helper call.
LIKES_RANKING_WINDOW_DEFAULT = 40 # Default shortlist size when ranking likes by repo popularity.
LIKES_ENRICHMENT_MAX_REPOS = 50 # Max liked repos enriched with extra repo-detail calls.
RECENT_ACTIVITY_PAGE_SIZE = 100 # Rows requested per `/api/recent-activity` page.
RECENT_ACTIVITY_SCAN_MAX_PAGES = 10 # Max recent-activity pages fetched in one helper call.
# Compact summary helpers intentionally inspect less data than the full
# exhaustive helpers so they remain fast and predictable.
USER_SUMMARY_GRAPH_SCAN_LIMIT = 1_000 # Follower/following rows sampled for user summary.
USER_SUMMARY_LIKES_SCAN_LIMIT = 1_000 # Like rows sampled for user summary.
USER_SUMMARY_ACTIVITY_MAX_PAGES = 3 # Activity pages sampled for user summary.
# Monty sandbox resource limits. These constrain the Python execution
# environment itself rather than Hub/API pagination behavior.
DEFAULT_MONTY_MAX_MEMORY = 64 * 1024 * 1024 # 64 MiB
DEFAULT_MONTY_MAX_ALLOCATIONS = 250_000 # Approximate object-allocation ceiling in the sandbox.
DEFAULT_MONTY_MAX_RECURSION_DEPTH = 100 # Python recursion limit inside the sandbox.
_MODEL_SORT_KEYS = set(get_args(ModelSort_T)) or {
"created_at",
"downloads",
"last_modified",
"likes",
"trending_score",
}
_DATASET_SORT_KEYS = set(get_args(DatasetSort_T)) or {
"created_at",
"downloads",
"last_modified",
"likes",
"trending_score",
}
_SPACE_SORT_KEYS = set(get_args(SpaceSort_T)) or {
"created_at",
"last_modified",
"likes",
"trending_score",
}
_REPO_SORT_KEYS: dict[str, set[str]] = {
"model": _MODEL_SORT_KEYS,
"dataset": _DATASET_SORT_KEYS,
"space": _SPACE_SORT_KEYS,
}
_SORT_KEY_ALIASES: dict[str, str] = {
"createdat": "created_at",
"created_at": "created_at",
"created-at": "created_at",
"downloads": "downloads",
"likes": "likes",
"lastmodified": "last_modified",
"last_modified": "last_modified",
"last-modified": "last_modified",
"trendingscore": "trending_score",
"trending_score": "trending_score",
"trending-score": "trending_score",
"trending": "trending_score",
}
_USER_FIELD_ALIASES: dict[str, str] = {
"login": "username",
"user": "username",
"handle": "username",
"name": "fullname",
"full_name": "fullname",
"full-name": "fullname",
"is_pro": "isPro",
"ispro": "isPro",
"pro": "isPro",
}
_ACTOR_FIELD_ALIASES: dict[str, str] = {
**_USER_FIELD_ALIASES,
"entity_type": "type",
"user_type": "type",
"actor_type": "type",
}
# Repo helpers prefer canonical snake_case field names in generated code, but
# tolerate common camelCase/raw endpoint aliases when callers project with
# `fields=[...]`.
_REPO_FIELD_ALIASES: dict[str, str] = {
"repoid": "repo_id",
"repotype": "repo_type",
"repourl": "repo_url",
"createdat": "created_at",
"lastmodified": "last_modified",
"pipelinetag": "pipeline_tag",
"trendingscore": "trending_score",
"libraryname": "library_name",
"paperswithcodeid": "paperswithcode_id",
}
_COLLECTION_FIELD_ALIASES: dict[str, str] = {
"collectionid": "collection_id",
"lastupdated": "last_updated",
"ownertype": "owner_type",
"itemcount": "item_count",
"author": "owner",
}
REPO_CANONICAL_FIELDS: tuple[str, ...] = (
"repo_id",
"repo_type",
"title",
"author",
"likes",
"downloads",
"created_at",
"last_modified",
"pipeline_tag",
"repo_url",
"tags",
"library_name",
"description",
"paperswithcode_id",
"sdk",
"models",
"datasets",
"subdomain",
)
USER_CANONICAL_FIELDS: tuple[str, ...] = (
"username",
"fullname",
"bio",
"websiteUrl",
"twitter",
"github",
"linkedin",
"bluesky",
"followers",
"following",
"likes",
"isPro",
)
PROFILE_CANONICAL_FIELDS: tuple[str, ...] = (
"handle",
"entity_type",
"display_name",
"bio",
"description",
"avatar_url",
"website_url",
"twitter_url",
"github_url",
"linkedin_url",
"bluesky_url",
"followers_count",
"following_count",
"likes_count",
"members_count",
"models_count",
"datasets_count",
"spaces_count",
"discussions_count",
"papers_count",
"upvotes_count",
"organizations",
"is_pro",
"likes_sample",
"activity_sample",
)
ACTOR_CANONICAL_FIELDS: tuple[str, ...] = (
"username",
"fullname",
"isPro",
"role",
"type",
)
ACTIVITY_CANONICAL_FIELDS: tuple[str, ...] = (
"event_type",
"repo_id",
"repo_type",
"timestamp",
)
COLLECTION_CANONICAL_FIELDS: tuple[str, ...] = (
"collection_id",
"slug",
"title",
"owner",
"owner_type",
"description",
"last_updated",
"item_count",
)
# Extra hf_repo_search kwargs intentionally supported as pass-through to
# huggingface_hub.HfApi.list_models/list_datasets/list_spaces.
# (Generic args like `query/search/sort/author/limit` are handled directly in
# hf_repo_search signature and are not listed here.)
_REPO_SEARCH_EXTRA_ARGS: dict[str, set[str]] = {
"model": {
"filter",
"apps",
"gated",
"inference",
"inference_provider",
"model_name",
"trained_dataset",
"pipeline_tag",
"emissions_thresholds",
"expand",
"full",
"cardData",
"card_data", # alias
"fetch_config",
},
"dataset": {
"filter",
"benchmark",
"dataset_name",
"gated",
"language_creators",
"language",
"multilinguality",
"size_categories",
"task_categories",
"task_ids",
"expand",
"full",
},
"space": {
"filter",
"datasets",
"models",
"linked",
"expand",
"full",
},
}
# Rich default metadata for repo search. These raw endpoint expand keys are
# normalized into the stable repo-row field surface below; keep them aligned
# with `_build_repo_row(...)`, `_REPO_FIELD_ALIASES`, and the shared agent docs.
_REPO_SEARCH_DEFAULT_EXPAND: dict[str, list[str]] = {
"model": [
"author",
"createdAt",
"downloads",
"gated",
"lastModified",
"library_name",
"likes",
"pipeline_tag",
"private",
"sha",
"tags",
"trendingScore",
],
"dataset": [
"author",
"createdAt",
"description",
"downloads",
"gated",
"lastModified",
"likes",
"paperswithcode_id",
"private",
"sha",
"tags",
"trendingScore",
],
"space": [
"author",
"createdAt",
"datasets",
"lastModified",
"likes",
"models",
"private",
"sdk",
"sha",
"subdomain",
"tags",
"trendingScore",
],
}
# Per-helper pagination defaults and ceilings.
# These values answer questions like:
# - "If the caller omits return_limit, how many rows should this helper return?"
# - "How much upstream data may this helper scan/page through internally?"
# - "What is the helper-specific max_return override, if any?"
PAGINATION_POLICY: dict[str, dict[str, Any]] = {
"hf_user_graph": {
"scan_max": GRAPH_SCAN_LIMIT_CAP,
"default_return": 1_000,
"max_return": GRAPH_SCAN_LIMIT_CAP,
},
"hf_org_members": {"scan_max": GRAPH_SCAN_LIMIT_CAP, "default_return": 1_000},
"hf_repo_likers": {"default_return": 1_000},
"hf_user_likes": {
"scan_max": LIKES_SCAN_LIMIT_CAP,
"default_return": 100,
"ranking_default": LIKES_RANKING_WINDOW_DEFAULT,
"enrich_max": LIKES_ENRICHMENT_MAX_REPOS,
},
"hf_recent_activity": {
"page_limit": RECENT_ACTIVITY_PAGE_SIZE,
"max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES,
"default_return": 100,
},
"hf_repo_search": {"max_return": 5_000, "default_return": 20},
"hf_trending": {"max_return": TRENDING_ENDPOINT_MAX_LIMIT, "default_return": 20},
"hf_collections_search": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 20},
"hf_collection_items": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 100},
}
# Single source of truth for the public helper surface exposed to generated
# Monty code. Keep runtime helper resolution derived from this tuple.
HELPER_EXTERNALS = (
"hf_runtime_capabilities",
"hf_whoami",
"hf_profile_summary",
"hf_org_members",
"hf_repo_search",
"hf_user_graph",
"hf_repo_likers",
"hf_user_likes",
"hf_recent_activity",
"hf_repo_discussions",
"hf_repo_discussion_details",
"hf_repo_details",
"hf_trending",
"hf_collections_search",
"hf_collection_items",
)
HELPER_COVERED_ENDPOINT_PATTERNS: list[tuple[str, str]] = [
(r"^/api/whoami-v2$", "hf_whoami"),
(r"^/api/trending$", "hf_trending"),
(r"^/api/recent-activity$", "hf_recent_activity"),
(r"^/api/models$", "hf_repo_search"),
(r"^/api/datasets$", "hf_repo_search"),
(r"^/api/spaces$", "hf_repo_search"),
(r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$", "hf_repo_details"),
(r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", "hf_repo_discussions"),
(r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", "hf_repo_discussion_details"),
(r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", "hf_repo_likers"),
(r"^/api/users/[^/]+/overview$", "hf_profile_summary"),
(r"^/api/organizations/[^/]+/overview$", "hf_profile_summary"),
(r"^/api/users/[^/]+/likes$", "hf_user_likes"),
(r"^/api/users/[^/]+/(followers|following)$", "hf_user_graph"),
(r"^/api/organizations/[^/]+/members$", "hf_org_members"),
(r"^/api/organizations/[^/]+/followers$", "hf_user_graph"),
(r"^/api/collections$", "hf_collections_search"),
(r"^/api/collections/[^/]+$", "hf_collection_items"),
(r"^/api/collections/[^/]+/[^/]+$", "hf_collection_items"),
]
def _resolve_helper_functions(namespace: dict[str, Any]) -> dict[str, Callable[..., Any]]:
resolved: dict[str, Callable[..., Any]] = {}
for helper_name in HELPER_EXTERNALS:
candidate = namespace.get(helper_name)
if not callable(candidate):
raise RuntimeError(f"Helper '{helper_name}' is not defined or not callable")
resolved[helper_name] = cast(Callable[..., Any], candidate)
return resolved
ALLOWLIST_PATTERNS = [
r"^/api/whoami-v2$",
r"^/api/trending$",
r"^/api/daily_papers$",
r"^/api/models$",
r"^/api/datasets$",
r"^/api/spaces$",
r"^/api/models-tags-by-type$",
r"^/api/datasets-tags-by-type$",
r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$",
r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$",
r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$",
r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$",
r"^/api/users/[^/]+/overview$",
r"^/api/users/[^/]+/socials$",
r"^/api/users/[^/]+/followers$",
r"^/api/users/[^/]+/following$",
r"^/api/users/[^/]+/likes$",
r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$",
r"^/api/organizations/[^/]+/overview$",
r"^/api/organizations/[^/]+/members$",
r"^/api/organizations/[^/]+/followers$",
r"^/api/collections$",
r"^/api/collections/[^/]+$",
r"^/api/collections/[^/]+/[^/]+$",
r"^/api/recent-activity$",
]
STRICT_ALLOWLIST_PATTERNS = [
r"^/api/users/[^/]+/overview$",
r"^/api/users/[^/]+/socials$",
r"^/api/whoami-v2$",
r"^/api/trending$",
r"^/api/daily_papers$",
r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$",
r"^/api/collections$",
r"^/api/collections/[^/]+$",
r"^/api/collections/[^/]+/[^/]+$",
r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$",
r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$",
r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$",
]
class MontyExecutionError(RuntimeError):
def __init__(self, message: str, api_calls: int, trace: list[dict[str, Any]]):
super().__init__(message)
self.api_calls = api_calls
self.trace = trace
def _load_request_token() -> str | None:
try:
from fast_agent.mcp.auth.context import request_bearer_token # type: ignore
token = request_bearer_token.get()
if token:
return token
except Exception:
pass
return None
def _load_token() -> str | None:
token = _load_request_token()
if token:
return token
return os.getenv("HF_TOKEN") or None
def _normalize_endpoint(endpoint: str) -> str:
ep = (endpoint or "").strip()
if not ep:
raise ValueError("endpoint is required")
if "?" in ep:
raise ValueError("endpoint must not include query string; use params")
if ep.startswith("http://") or ep.startswith("https://"):
raise ValueError("endpoint must be path-only")
if not ep.startswith("/"):
ep = "/" + ep
if not ep.startswith("/api/"):
ep = "/api" + ep
if ep in {"/api/collections/search", "/api/collections/search/"}:
ep = "/api/collections"
if ".." in ep:
raise ValueError("path traversal not allowed")
return ep
def _endpoint_allowed(endpoint: str, strict_mode: bool) -> bool:
path = endpoint.split("?", 1)[0]
patterns = STRICT_ALLOWLIST_PATTERNS if strict_mode else ALLOWLIST_PATTERNS
return any(re.match(p, path) for p in patterns)
def _json_best_effort(raw: bytes) -> Any:
try:
return json.loads(raw)
except Exception:
return raw.decode("utf-8", errors="replace")
def _sanitize_params(endpoint: str, params: dict[str, Any] | None) -> dict[str, Any]:
clean = dict(params or {})
path = endpoint.split("?", 1)[0]
if path == "/api/collections":
if "q" not in clean and "search" in clean:
clean["q"] = clean.get("search")
clean.pop("search", None)
if path == "/api/trending":
t = str(clean.get("type") or "").strip().lower()
aliases = {"models": "model", "datasets": "dataset", "spaces": "space"}
if t in aliases:
clean["type"] = aliases[t]
lim = clean.get("limit")
if lim is not None:
try:
n = int(lim)
except Exception:
n = TRENDING_ENDPOINT_MAX_LIMIT
clean["limit"] = max(1, min(n, TRENDING_ENDPOINT_MAX_LIMIT))
return clean
lim = clean.get("limit")
if lim is None:
return clean
try:
n = int(lim)
except Exception:
return clean
endpoint_limit_max = SELECTIVE_ENDPOINT_RETURN_HARD_CAP
if re.match(r"^/api/users/[^/]+/(followers|following)$", path):
endpoint_limit_max = GRAPH_SCAN_LIMIT_CAP
elif re.match(r"^/api/users/[^/]+/likes$", path):
endpoint_limit_max = LIKES_SCAN_LIMIT_CAP
clean["limit"] = max(1, min(n, endpoint_limit_max))
return clean
def _truncate_result_payload(output: Any) -> Any:
if not isinstance(output, dict):
return output
items = output.get("items")
if not isinstance(items, list) or len(items) <= OUTPUT_ITEMS_TRUNCATION_LIMIT:
return output
trimmed = dict(output)
trimmed_items = items[:OUTPUT_ITEMS_TRUNCATION_LIMIT]
trimmed["items"] = trimmed_items
trimmed["item"] = trimmed_items[0] if len(trimmed_items) == 1 else None
note = f"truncated items to first {OUTPUT_ITEMS_TRUNCATION_LIMIT} rows for token efficiency"
steps = trimmed.get("steps")
if isinstance(steps, list):
trimmed["steps"] = [*steps, note]
else:
trimmed["steps"] = [note]
return trimmed
def _is_helper_envelope(output: Any) -> bool:
return (
isinstance(output, dict)
and isinstance(output.get("ok"), bool)
and "items" in output
and "meta" in output
and "error" in output
)
def _summarize_limit_hit(helper_name: str, result: Any) -> dict[str, Any] | None:
if not _is_helper_envelope(result):
return None
meta = result.get("meta") if isinstance(result.get("meta"), dict) else {}
if not isinstance(meta, dict):
return None
truncated_by = str(meta.get("truncated_by") or "")
limit_hit = any(
[
meta.get("truncated") is True,
meta.get("hard_cap_applied") is True,
truncated_by in {"scan_limit", "page_limit", "multiple"},
]
)
if not limit_hit:
return None
summary: dict[str, Any] = {
"helper": helper_name,
"source": meta.get("source"),
"returned": meta.get("returned"),
"total": meta.get("total"),
"truncated": meta.get("truncated"),
"truncated_by": meta.get("truncated_by"),
"more_available": meta.get("more_available"),
"requested_return_limit": meta.get("requested_return_limit"),
"applied_return_limit": meta.get("applied_return_limit"),
"next_request_hint": meta.get("next_request_hint"),
}
if meta.get("scan_limit") is not None:
summary["scan_limit"] = meta.get("scan_limit")
if meta.get("applied_max_pages") is not None:
summary["applied_max_pages"] = meta.get("applied_max_pages")
return summary
def _wrap_raw_result(
result: Any,
*,
ok: bool,
api_calls: int,
elapsed_ms: int,
limit_summaries: list[dict[str, Any]] | None = None,
error: str | None = None,
) -> dict[str, Any]:
hits = [dict(summary) for summary in (limit_summaries or [])[:10]]
meta: dict[str, Any] = {
"ok": ok,
"api_calls": api_calls,
"elapsed_ms": elapsed_ms,
"limits_reached": bool(hits),
"limit_summary": hits,
}
if error is not None:
meta["error"] = error
return {
"result": result,
"meta": meta,
}
def _clamp_int(value: Any, *, default: int, minimum: int, maximum: int) -> int:
try:
out = int(value)
except Exception:
out = default
return max(minimum, min(out, maximum))
def _as_int(value: Any) -> int | None:
try:
return int(value)
except Exception:
return None
def _canonical_repo_type(value: Any, *, default: str = "model") -> str:
raw = str(value or "").strip().lower()
aliases = {
"model": "model",
"models": "model",
"dataset": "dataset",
"datasets": "dataset",
"space": "space",
"spaces": "space",
}
return aliases.get(raw, default)
def _normalize_repo_sort_key(repo_type: str, sort_value: Any) -> tuple[str | None, str | None]:
raw = str(sort_value or "").strip()
if not raw:
return None, None
key = _SORT_KEY_ALIASES.get(raw.lower().replace(" ", "").replace("__", "_"))
if key is None:
key = _SORT_KEY_ALIASES.get(raw.lower())
if key is None:
return None, f"Invalid sort key '{raw}'"
rt = _canonical_repo_type(repo_type)
allowed = _REPO_SORT_KEYS.get(rt, set())
if key not in allowed:
return None, f"Invalid sort key '{raw}' for repo_type='{rt}'. Allowed: {', '.join(sorted(allowed))}"
return key, None
def _repo_detail_endpoint(repo_type: str, repo_id: str) -> str:
rt = _canonical_repo_type(repo_type)
rid = str(repo_id or "").strip()
if "/" not in rid:
raise ValueError("repo_id must be owner/name")
owner, name = rid.split("/", 1)
if not owner or not name:
raise ValueError("repo_id must be owner/name")
return f"/api/{rt}s/{owner}/{name}"
def _coerce_str_list(value: Any) -> list[str]:
if value is None:
return []
if isinstance(value, str):
raw = [value]
elif isinstance(value, (list, tuple, set)):
raw = list(value)
else:
raise ValueError("Expected a string or list of strings")
return [str(v).strip() for v in raw if str(v).strip()]
def _optional_str_list(value: Any) -> list[str] | None:
if value is None:
return None
if isinstance(value, str):
out = [value.strip()] if value.strip() else []
return out or None
if isinstance(value, (list, tuple, set)):
out = [str(v).strip() for v in value if str(v).strip()]
return out or None
return None
def _dt_to_str(value: Any) -> str | None:
if value is None:
return None
iso = getattr(value, "isoformat", None)
if callable(iso):
try:
return str(iso())
except Exception:
pass
return str(value)
def _repo_web_url(repo_type: str, repo_id: str | None) -> str | None:
if not isinstance(repo_id, str) or not repo_id:
return None
base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/")
rt = _canonical_repo_type(repo_type, default="")
if rt == "dataset":
return f"{base}/datasets/{repo_id}"
if rt == "space":
return f"{base}/spaces/{repo_id}"
return f"{base}/{repo_id}"
def _build_repo_row(
*,
repo_id: Any,
repo_type: str,
author: Any = None,
title: Any = None,
likes: Any = None,
downloads: Any = None,
created_at: Any = None,
last_modified: Any = None,
pipeline_tag: Any = None,
private: Any = None,
trending_score: Any = None,
tags: Any = None,
sha: Any = None,
gated: Any = None,
library_name: Any = None,
description: Any = None,
paperswithcode_id: Any = None,
sdk: Any = None,
models: Any = None,
datasets: Any = None,
subdomain: Any = None,
) -> dict[str, Any]:
rt = _canonical_repo_type(repo_type)
author_value = author
if not isinstance(author_value, str) and isinstance(repo_id, str) and "/" in repo_id:
author_value = repo_id.split("/", 1)[0]
title_value = title
if (not isinstance(title_value, str) or not title_value.strip()) and isinstance(repo_id, str) and repo_id:
title_value = repo_id if rt == "space" else None
return {
"id": repo_id,
"slug": repo_id,
"repo_id": repo_id,
"title": title_value,
"repo_type": rt,
"author": author_value,
"likes": _as_int(likes),
"downloads": _as_int(downloads),
"created_at": _dt_to_str(created_at),
"last_modified": _dt_to_str(last_modified),
"pipeline_tag": pipeline_tag,
"private": private,
"trending_score": _as_int(trending_score) if trending_score is not None else None,
"repo_url": _repo_web_url(rt, repo_id if isinstance(repo_id, str) else None),
"tags": _optional_str_list(tags),
"sha": sha,
"gated": gated,
"library_name": library_name,
"description": description,
"paperswithcode_id": paperswithcode_id,
"sdk": sdk,
"models": _optional_str_list(models),
"datasets": _optional_str_list(datasets),
"subdomain": subdomain,
}
def _normalize_repo_search_row(row: Any, repo_type: str) -> dict[str, Any]:
return _build_repo_row(
repo_id=getattr(row, "id", None),
repo_type=repo_type,
author=getattr(row, "author", None),
title=getattr(row, "title", None),
likes=getattr(row, "likes", None),
downloads=getattr(row, "downloads", None),
created_at=getattr(row, "created_at", None),
last_modified=getattr(row, "last_modified", None),
pipeline_tag=getattr(row, "pipeline_tag", None),
private=getattr(row, "private", None),
trending_score=getattr(row, "trending_score", None),
tags=getattr(row, "tags", None),
sha=getattr(row, "sha", None),
gated=getattr(row, "gated", None),
library_name=getattr(row, "library_name", None),
description=getattr(row, "description", None),
paperswithcode_id=getattr(row, "paperswithcode_id", None),
sdk=getattr(row, "sdk", None),
models=getattr(row, "models", None),
datasets=getattr(row, "datasets", None),
subdomain=getattr(row, "subdomain", None),
)
def _normalize_repo_detail_row(detail: Any, repo_type: str, repo_id: str) -> dict[str, Any]:
row = _normalize_repo_search_row(detail, repo_type)
resolved_repo_id = row.get("repo_id") or repo_id
row["id"] = row.get("id") or resolved_repo_id
row["slug"] = row.get("slug") or resolved_repo_id
row["repo_id"] = resolved_repo_id
row["repo_url"] = _repo_web_url(repo_type, resolved_repo_id)
return row
def _normalize_trending_row(repo: dict[str, Any], default_repo_type: str, rank: int | None = None) -> dict[str, Any]:
row = _build_repo_row(
repo_id=repo.get("id"),
repo_type=repo.get("type") or default_repo_type,
author=repo.get("author"),
title=repo.get("title"),
likes=repo.get("likes"),
downloads=repo.get("downloads"),
created_at=repo.get("createdAt"),
last_modified=repo.get("lastModified"),
pipeline_tag=repo.get("pipeline_tag"),
private=repo.get("private"),
trending_score=repo.get("trendingScore"),
tags=repo.get("tags"),
sha=repo.get("sha"),
gated=repo.get("gated"),
library_name=repo.get("library_name"),
description=repo.get("description"),
paperswithcode_id=repo.get("paperswithcode_id"),
sdk=repo.get("sdk"),
models=repo.get("models"),
datasets=repo.get("datasets"),
subdomain=repo.get("subdomain"),
)
if rank is not None:
row["trending_rank"] = rank
return row
def _normalize_collection_repo_item(row: dict[str, Any]) -> dict[str, Any] | None:
repo_id = row.get("id") or row.get("repoId") or row.get("repo_id")
if not isinstance(repo_id, str) or not repo_id:
return None
repo_type = _canonical_repo_type(row.get("repoType") or row.get("repo_type") or row.get("type"), default="")
if repo_type not in {"model", "dataset", "space"}:
return None
return _build_repo_row(
repo_id=repo_id,
repo_type=repo_type,
author=row.get("author") or _author_from_any(row.get("authorData")),
title=row.get("title"),
likes=row.get("likes"),
downloads=row.get("downloads"),
created_at=row.get("createdAt") or row.get("created_at"),
last_modified=row.get("lastModified") or row.get("last_modified"),
pipeline_tag=row.get("pipeline_tag") or row.get("pipelineTag"),
private=row.get("private"),
tags=row.get("tags"),
gated=row.get("gated"),
library_name=row.get("library_name") or row.get("libraryName"),
description=row.get("description"),
paperswithcode_id=row.get("paperswithcode_id") or row.get("paperswithcodeId"),
sdk=row.get("sdk"),
models=row.get("models"),
datasets=row.get("datasets"),
subdomain=row.get("subdomain"),
)
def _sort_repo_rows(rows: list[dict[str, Any]], sort_key: str | None) -> list[dict[str, Any]]:
if not sort_key:
return rows
if sort_key in {"likes", "downloads", "trending_score"}:
return sorted(rows, key=lambda row: _as_int(row.get(sort_key)) or -1, reverse=True)
if sort_key in {"created_at", "last_modified"}:
return sorted(rows, key=lambda row: str(row.get(sort_key) or ""), reverse=True)
return rows
def call_api_host(
endpoint: str,
*,
method: str = "GET",
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
timeout_sec: int = DEFAULT_TIMEOUT_SEC,
strict_mode: bool = False,
) -> dict[str, Any]:
method_u = method.upper().strip()
if method_u not in {"GET", "POST"}:
raise ValueError("Only GET and POST are supported")
ep = _normalize_endpoint(endpoint)
if not _endpoint_allowed(ep, strict_mode):
raise ValueError(f"Endpoint not allowed: {ep}")
params = _sanitize_params(ep, params)
if ep == "/api/recent-activity":
feed_type = str((params or {}).get("feedType", "")).strip().lower()
if feed_type not in {"user", "org"}:
raise ValueError("/api/recent-activity requires feedType=user|org")
if not str((params or {}).get("entity", "")).strip():
raise ValueError("/api/recent-activity requires entity")
base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/")
q = urlencode(params or {}, doseq=True)
url = f"{base}{ep}" + (f"?{q}" if q else "")
headers = {"Accept": "application/json"}
token = _load_token()
if token:
headers["Authorization"] = f"Bearer {token}"
data = None
if method_u == "POST":
headers["Content-Type"] = "application/json"
data = json.dumps(json_body or {}).encode("utf-8")
req = Request(url, method=method_u, headers=headers, data=data)
try:
with urlopen(req, timeout=timeout_sec) as res:
payload = _json_best_effort(res.read())
return {"ok": True, "status": int(res.status), "url": url, "data": payload, "error": None}
except HTTPError as e:
payload = _json_best_effort(e.read())
err = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False)[:1000]
return {"ok": False, "status": int(e.code), "url": url, "data": payload, "error": err}
except URLError as e:
return {"ok": False, "status": 0, "url": url, "data": None, "error": f"Network error: {e}"}
def _validate_generated_code(code: str) -> None:
if not code.strip():
raise ValueError("Generated code is empty")
blocked_patterns: list[tuple[str, str]] = [
(r"(?m)^\s*import\s+\S", "import statement"),
(r"(?m)^\s*from\s+\S+\s+import\s+\S", "from-import statement"),
(r"\bexec\s*\(", "exec("),
(r"\beval\s*\(", "eval("),
(r"\bopen\s*\(", "open("),
(r"\b__import__\b", "__import__"),
(r"(?i)\bwhile\s+true\b", "while true"),
]
for pattern, label in blocked_patterns:
if re.search(pattern, code):
raise ValueError(f"Generated code contains blocked pattern: {label}")
try:
parsed = compile( # noqa: S102 - compile is used for AST validation only.
code,
"<generated-monty-code>",
"exec",
flags=ast.PyCF_ONLY_AST | ast.PyCF_ALLOW_TOP_LEVEL_AWAIT,
dont_inherit=True,
)
except SyntaxError as e:
message = e.msg or "invalid syntax"
raise ValueError(f"Generated code is not valid Python: {message}") from e
if not isinstance(parsed, ast.Module):
raise ValueError("Generated code must be a Python module")
solve_defs = [
node
for node in parsed.body
if isinstance(node, ast.AsyncFunctionDef) and node.name == "solve"
]
if not solve_defs:
raise ValueError("Generated code must define `async def solve(query, max_calls): ...`.")
def _valid_solve_signature(node: ast.AsyncFunctionDef) -> bool:
args = node.args
return (
not args.posonlyargs
and len(args.args) == 2
and [arg.arg for arg in args.args] == ["query", "max_calls"]
and args.vararg is None
and not args.kwonlyargs
and args.kwarg is None
and not args.defaults
and not args.kw_defaults
)
if not any(_valid_solve_signature(node) for node in solve_defs):
raise ValueError("`solve` must have signature `async def solve(query, max_calls): ...`.")
if not parsed.body:
raise ValueError("Generated code is empty")
final_stmt = parsed.body[-1]
valid_final_await = (
isinstance(final_stmt, ast.Expr)
and isinstance(final_stmt.value, ast.Await)
and isinstance(final_stmt.value.value, ast.Call)
and isinstance(final_stmt.value.value.func, ast.Name)
and final_stmt.value.value.func.id == "solve"
and len(final_stmt.value.value.args) == 2
and not final_stmt.value.value.keywords
and all(isinstance(arg, ast.Name) for arg in final_stmt.value.value.args)
and [cast(ast.Name, arg).id for arg in final_stmt.value.value.args] == ["query", "max_calls"]
)
if not valid_final_await:
raise ValueError("Generated code must end with `await solve(query, max_calls)`.")
def _preferred_helper_for_endpoint(endpoint: str) -> str | None:
for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS:
if re.match(pattern, endpoint):
return helper_name
return None
def _call_api_endpoint_hint(expr: ast.AST | None) -> str | None:
if isinstance(expr, ast.Constant) and isinstance(expr.value, str):
return expr.value
if isinstance(expr, ast.JoinedStr):
literal_parts = [
value.value
for value in expr.values
if isinstance(value, ast.Constant) and isinstance(value.value, str)
]
if literal_parts:
return "".join(literal_parts)
return None
for node in ast.walk(parsed):
if not isinstance(node, ast.Call):
continue
if not isinstance(node.func, ast.Name) or node.func.id != "call_api":
continue
endpoint_expr: ast.AST | None = node.args[0] if node.args else None
for keyword in node.keywords:
if keyword.arg == "endpoint":
endpoint_expr = keyword.value
break
endpoint_hint = _call_api_endpoint_hint(endpoint_expr)
if endpoint_hint and "/api/collections/" in endpoint_hint and "/items" in endpoint_hint:
raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.")
if endpoint_hint:
preferred_helper = _preferred_helper_for_endpoint(endpoint_hint)
if preferred_helper is not None:
raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_hint!r}, ...)` for this endpoint family.")
allowed_external_calls = ["call_api(", *[f"{name}(" for name in HELPER_EXTERNALS]]
if not any(token in code for token in allowed_external_calls):
raise ValueError("Generated code must call at least one external API function (call_api or hf_* helper)")
helper_name_set = set(HELPER_EXTERNALS)
for m in re.finditer(r"call_api\(\s*([\"'])\s*([^\"']+)\s*\1", code):
endpoint_literal = str(m.group(2) or "").strip()
if not endpoint_literal:
continue
if (
endpoint_literal in helper_name_set
or endpoint_literal.startswith("hf_")
or endpoint_literal.startswith("/hf_")
or endpoint_literal.startswith("/api/hf_")
):
raise ValueError("Do not call helper names through call_api; call hf_* helpers directly.")
if re.match(r"^/api/collections/(?:[^/]+/)?[^/]+/items$", endpoint_literal):
raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.")
preferred_helper = _preferred_helper_for_endpoint(endpoint_literal)
if preferred_helper is not None:
raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_literal!r}, ...)` for this endpoint family.")
if not endpoint_literal.startswith("/api/"):
raise ValueError("call_api endpoint must be a raw path starting with '/api/...'.")
async def _run_with_monty(
*,
code: str,
query: str,
max_calls: int,
strict_mode: bool,
timeout_sec: int,
) -> dict[str, Any]:
try:
import pydantic_monty
except Exception as e:
raise RuntimeError("pydantic_monty is not installed. Install with `uv pip install pydantic-monty`.") from e
max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT))
call_count = {"n": 0}
trace: list[dict[str, Any]] = []
limit_summaries: list[dict[str, Any]] = []
latest_helper_error: dict[str, Any] | None = None
internal_helper_used = {"used": False}
def _budget_remaining() -> int:
return max(0, max_calls - call_count["n"])
def _policy_int(helper_name: str, key: str, default: int) -> int:
cfg = PAGINATION_POLICY.get(helper_name) or {}
try:
return int(cfg.get(key, default))
except Exception:
return int(default)
def _consume_call(endpoint: str, method: str = "GET") -> int:
if call_count["n"] >= max_calls:
raise RuntimeError(f"Max API calls exceeded ({max_calls})")
call_count["n"] += 1
return call_count["n"]
def _trace_ok(idx: int, endpoint: str, method: str = "GET", status: int = 200) -> None:
trace.append(
{
"call_index": idx,
"depth": idx,
"method": method,
"endpoint": endpoint,
"ok": True,
"status": status,
}
)
def _trace_err(idx: int, endpoint: str, err: Any, method: str = "GET", status: int = 0) -> None:
trace.append(
{
"call_index": idx,
"depth": idx,
"method": method,
"endpoint": endpoint,
"ok": False,
"status": status,
"error": str(err),
}
)
def _host_raw_call(
endpoint: str,
*,
params: dict[str, Any] | None = None,
method: str = "GET",
json_body: dict[str, Any] | None = None,
) -> dict[str, Any]:
idx = _consume_call(endpoint, method)
try:
resp = call_api_host(
endpoint,
method=method,
params=params,
json_body=json_body,
timeout_sec=timeout_sec,
strict_mode=strict_mode,
)
if resp.get("ok"):
_trace_ok(idx, endpoint, method=method, status=int(resp.get("status") or 200))
else:
_trace_err(idx, endpoint, resp.get("error"), method=method, status=int(resp.get("status") or 0))
return resp
except Exception as e:
_trace_err(idx, endpoint, e, method=method, status=0)
raise
hf_api_client: HfApi | None = None
def _get_hf_api_client() -> HfApi:
nonlocal hf_api_client
if hf_api_client is None:
endpoint = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/")
hf_api_client = HfApi(endpoint=endpoint, token=_load_token())
return hf_api_client
def _host_hf_call(endpoint: str, fn: Callable[[], Any]) -> Any:
idx = _consume_call(endpoint, "GET")
try:
out = fn()
_trace_ok(idx, endpoint, method="GET", status=200)
return out
except Exception as e:
_trace_err(idx, endpoint, e, method="GET", status=0)
raise
def _helper_meta(start_calls: int, *, source: str, **extra: Any) -> dict[str, Any]:
out = {
"source": source,
"normalized": True,
"budget_used": max(0, call_count["n"] - start_calls),
"budget_remaining": _budget_remaining(),
}
out.update(extra)
return out
def _derive_limit_metadata(
*,
requested_return_limit: int | None,
applied_return_limit: int,
default_limit_used: bool,
requested_scan_limit: int | None = None,
applied_scan_limit: int | None = None,
requested_max_pages: int | None = None,
applied_max_pages: int | None = None,
) -> dict[str, Any]:
meta: dict[str, Any] = {
"requested_return_limit": requested_return_limit,
"applied_return_limit": applied_return_limit,
"default_limit_used": default_limit_used,
}
if requested_scan_limit is not None or applied_scan_limit is not None:
meta["requested_scan_limit"] = requested_scan_limit
meta["scan_limit"] = applied_scan_limit
meta["scan_limit_applied"] = requested_scan_limit != applied_scan_limit
if requested_max_pages is not None or applied_max_pages is not None:
meta["requested_max_pages"] = requested_max_pages
meta["applied_max_pages"] = applied_max_pages
meta["page_limit_applied"] = requested_max_pages != applied_max_pages
if requested_return_limit is not None:
meta["hard_cap_applied"] = applied_return_limit < requested_return_limit
return meta
def _derive_more_available(*, sample_complete: bool, exact_count: bool, returned: int, total: int | None) -> bool | str:
if sample_complete:
return False
if exact_count and total is not None and returned < total:
return True
return "unknown"
def _derive_truncated_by(
*,
hard_cap: bool = False,
scan_limit_hit: bool = False,
page_limit_hit: bool = False,
return_limit_hit: bool = False,
) -> str:
causes = [hard_cap, scan_limit_hit, page_limit_hit, return_limit_hit]
if sum(1 for cause in causes if cause) > 1:
return "multiple"
if hard_cap:
return "hard_cap"
if scan_limit_hit:
return "scan_limit"
if page_limit_hit:
return "page_limit"
if return_limit_hit:
return "return_limit"
return "none"
def _derive_can_request_more(*, sample_complete: bool, truncated_by: str) -> bool:
if sample_complete:
return False
return truncated_by in {"return_limit", "scan_limit", "page_limit", "multiple"}
def _derive_next_request_hint(*, truncated_by: str, more_available: bool | str, applied_return_limit: int, applied_scan_limit: int | None = None, applied_max_pages: int | None = None) -> str:
if truncated_by == "return_limit":
return f"Ask for return_limit>{applied_return_limit} to see more rows"
if truncated_by == "scan_limit" and applied_scan_limit is not None:
return f"Increase scan_limit above {applied_scan_limit} for broader coverage"
if truncated_by == "page_limit" and applied_max_pages is not None:
return f"Increase max_pages above {applied_max_pages} to continue paging"
if truncated_by == "hard_cap":
return "No more rows can be returned in a single call because a hard cap was applied"
if truncated_by == "multiple":
return "Increase the relevant return/page/scan bounds to improve coverage"
if more_available is False:
return "No more results available"
if more_available == "unknown":
return "More results may exist; narrow filters or raise scan/page bounds for better coverage"
return "Ask for a larger limit to see more rows"
def _resolve_exhaustive_limits(
*,
return_limit: int | None,
count_only: bool,
default_return: int,
max_return: int,
scan_limit: int | None = None,
scan_cap: int | None = None,
) -> dict[str, Any]:
requested_return_limit = None if count_only else return_limit
effective_requested_return_limit = 0 if count_only else requested_return_limit
out: dict[str, Any] = {
"requested_return_limit": requested_return_limit,
"applied_return_limit": _clamp_int(
effective_requested_return_limit,
default=default_return,
minimum=0,
maximum=max_return,
),
"default_limit_used": requested_return_limit is None and not count_only,
}
out["hard_cap_applied"] = (
requested_return_limit is not None and out["applied_return_limit"] < requested_return_limit
)
if scan_cap is not None:
out["requested_scan_limit"] = scan_limit
out["applied_scan_limit"] = _clamp_int(
scan_limit,
default=scan_cap,
minimum=1,
maximum=scan_cap,
)
return out
def _build_exhaustive_meta(
*,
base_meta: dict[str, Any],
limit_plan: dict[str, Any],
sample_complete: bool,
exact_count: bool,
truncated_by: str,
more_available: bool | str,
requested_max_pages: int | None = None,
applied_max_pages: int | None = None,
) -> dict[str, Any]:
meta = dict(base_meta)
applied_return_limit = int(limit_plan["applied_return_limit"])
applied_scan_limit = limit_plan.get("applied_scan_limit")
meta.update(
{
"complete": sample_complete,
"exact_count": exact_count,
"sample_complete": sample_complete,
"more_available": more_available,
"can_request_more": _derive_can_request_more(
sample_complete=sample_complete,
truncated_by=truncated_by,
),
"truncated_by": truncated_by,
"next_request_hint": _derive_next_request_hint(
truncated_by=truncated_by,
more_available=more_available,
applied_return_limit=applied_return_limit,
applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None,
applied_max_pages=applied_max_pages,
),
}
)
meta.update(
_derive_limit_metadata(
requested_return_limit=limit_plan["requested_return_limit"],
applied_return_limit=applied_return_limit,
default_limit_used=bool(limit_plan["default_limit_used"]),
requested_scan_limit=limit_plan.get("requested_scan_limit"),
applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None,
requested_max_pages=requested_max_pages,
applied_max_pages=applied_max_pages,
)
)
return meta
def _overview_count_only_success(
*,
start_calls: int,
source: str,
total: int,
limit_plan: dict[str, Any],
base_meta: dict[str, Any],
) -> dict[str, Any]:
sample_complete = True
more_available = False
truncated_by = "none"
meta = _build_exhaustive_meta(
base_meta={
**base_meta,
"matched": total,
"returned": 0,
"total": total,
"total_available": total,
"total_matched": total,
"truncated": False,
},
limit_plan=limit_plan,
sample_complete=sample_complete,
exact_count=True,
truncated_by=truncated_by,
more_available=more_available,
)
return _helper_success(
start_calls=start_calls,
source=source,
items=[],
meta=meta,
)
def _build_exhaustive_result_meta(
*,
base_meta: dict[str, Any],
limit_plan: dict[str, Any],
matched_count: int,
returned_count: int,
exact_count: bool,
count_only: bool = False,
sample_complete: bool | None = None,
more_available: bool | str | None = None,
scan_limit_hit: bool = False,
page_limit_hit: bool = False,
truncated_extra: bool = False,
requested_max_pages: int | None = None,
applied_max_pages: int | None = None,
) -> dict[str, Any]:
applied_return_limit = int(limit_plan["applied_return_limit"])
if count_only:
effective_sample_complete = exact_count
else:
effective_sample_complete = (
sample_complete
if isinstance(sample_complete, bool)
else exact_count and matched_count <= applied_return_limit
)
return_limit_hit = False if count_only else (applied_return_limit > 0 and matched_count > applied_return_limit)
truncated_by = _derive_truncated_by(
hard_cap=bool(limit_plan.get("hard_cap_applied")),
scan_limit_hit=scan_limit_hit,
page_limit_hit=page_limit_hit,
return_limit_hit=return_limit_hit,
)
truncated = truncated_by != "none" or truncated_extra
total_value = _as_int(base_meta.get("total"))
effective_more_available = more_available
if count_only and exact_count:
effective_more_available = False
if effective_more_available is None:
effective_more_available = _derive_more_available(
sample_complete=effective_sample_complete,
exact_count=exact_count,
returned=returned_count,
total=total_value,
)
return _build_exhaustive_meta(
base_meta={
**base_meta,
"matched": matched_count,
"returned": returned_count,
"truncated": truncated,
},
limit_plan=limit_plan,
sample_complete=effective_sample_complete,
exact_count=exact_count,
truncated_by=truncated_by,
more_available=effective_more_available,
requested_max_pages=requested_max_pages,
applied_max_pages=applied_max_pages,
)
def _helper_success(
*,
start_calls: int,
source: str,
items: list[dict[str, Any]],
cursor: str | None = None,
meta: dict[str, Any] | None = None,
**extra_meta: Any,
) -> dict[str, Any]:
merged_meta = dict(meta or {})
merged_meta.update(extra_meta)
if cursor is not None:
merged_meta["cursor"] = cursor
return {
"ok": True,
"item": items[0] if len(items) == 1 else None,
"items": items,
"meta": _helper_meta(start_calls, source=source, **merged_meta),
"error": None,
}
def _helper_error(*, start_calls: int, source: str, error: Any, **meta: Any) -> dict[str, Any]:
nonlocal latest_helper_error
envelope = {
"ok": False,
"item": None,
"items": [],
"meta": _helper_meta(start_calls, source=source, **meta),
"error": str(error),
}
latest_helper_error = envelope
return envelope
def _project_items(
items: list[dict[str, Any]],
fields: list[str] | None,
aliases: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
if not isinstance(fields, list) or not fields:
return items
wanted = [str(f).strip() for f in fields if str(f).strip()]
if not wanted:
return items
alias_map = {str(k).strip().lower(): str(v).strip() for k, v in (aliases or {}).items() if str(k).strip() and str(v).strip()}
projected: list[dict[str, Any]] = []
for row in items:
out: dict[str, Any] = {}
for key in wanted:
source_key = alias_map.get(key.lower(), key)
value = row.get(source_key)
if value is None:
continue
out[key] = value
projected.append(out)
return projected
def _project_repo_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]:
return _project_items(items, fields, aliases=_REPO_FIELD_ALIASES)
def _project_collection_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]:
return _project_items(items, fields, aliases=_COLLECTION_FIELD_ALIASES)
def _project_user_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]:
return _project_items(items, fields, aliases=_USER_FIELD_ALIASES)
def _project_actor_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]:
return _project_items(items, fields, aliases=_ACTOR_FIELD_ALIASES)
def _item_matches_where(item: dict[str, Any], where: dict[str, Any] | None) -> bool:
if not isinstance(where, dict) or not where:
return True
for key, cond in where.items():
val = item.get(str(key))
if isinstance(cond, dict):
if "eq" in cond and val != cond.get("eq"):
return False
if "in" in cond:
allowed = cond.get("in")
if isinstance(allowed, (list, tuple, set)) and val not in allowed:
return False
if "contains" in cond:
needle = cond.get("contains")
if not isinstance(val, str) or not isinstance(needle, str) or needle not in val:
return False
if "icontains" in cond:
needle = cond.get("icontains")
if not isinstance(val, str) or not isinstance(needle, str) or needle.lower() not in val.lower():
return False
if "gte" in cond:
v = _as_int(val)
c = _as_int(cond.get("gte"))
if v is None or c is None or v < c:
return False
if "lte" in cond:
v = _as_int(val)
c = _as_int(cond.get("lte"))
if v is None or c is None or v > c:
return False
continue
if isinstance(cond, (list, tuple, set)):
if val not in cond:
return False
continue
if val != cond:
return False
return True
def _apply_where(items: list[dict[str, Any]], where: dict[str, Any] | None) -> list[dict[str, Any]]:
if not isinstance(where, dict) or not where:
return items
return [row for row in items if _item_matches_where(row, where)]
def _helper_item(resp: dict[str, Any]) -> dict[str, Any] | None:
item = resp.get("item")
if isinstance(item, dict):
return item
items = resp.get("items")
if isinstance(items, list) and items and isinstance(items[0], dict):
return items[0]
return None
def _overview_count(item: dict[str, Any] | None, key: str) -> int | None:
if not isinstance(item, dict):
return None
return _as_int(item.get(key))
def _summary_section(
resp: dict[str, Any],
*,
count: int | None = None,
default_sample: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
meta = resp.get("meta")
section_meta = dict(meta) if isinstance(meta, dict) else {}
sample = resp.get("items")
section_sample = sample if isinstance(sample, list) else list(default_sample or [])
section_count = count
if section_count is None:
count_exact = section_meta.get("exact_count") is True or section_meta.get("count_source") in {"overview", "endpoint"}
if count_exact:
for key in ("total", "total_matched", "matched"):
section_count = _as_int(section_meta.get(key))
if section_count is not None:
break
if resp.get("ok") is not True:
section_meta["error"] = str(resp.get("error") or "section fetch failed")
section_sample = list(default_sample or [])
return {"count": section_count, "sample": section_sample, "meta": section_meta}
async def _resolve_username_or_current(username: str | None) -> tuple[str | None, str | None]:
u = str(username or "").strip()
if u:
return u, None
whoami = await hf_whoami()
if whoami.get("ok") is not True:
return None, str(whoami.get("error") or "Could not resolve current authenticated user")
item = _helper_item(whoami)
resolved = item.get("username") if isinstance(item, dict) else None
if not isinstance(resolved, str) or not resolved.strip():
return None, "username was not provided and current authenticated user could not be resolved"
return resolved.strip(), None
def _normalize_user_likes_sort(sort: str | None) -> tuple[str | None, str | None]:
raw = str(sort or "likedAt").strip()
alias_map = {
"": "likedAt",
"likedat": "likedAt",
"liked_at": "likedAt",
"liked-at": "likedAt",
"recency": "likedAt",
"repolikes": "repoLikes",
"repo_likes": "repoLikes",
"repo-likes": "repoLikes",
"repodownloads": "repoDownloads",
"repo_downloads": "repoDownloads",
"repo-downloads": "repoDownloads",
}
normalized = alias_map.get(raw.lower(), raw)
if normalized not in {"likedAt", "repoLikes", "repoDownloads"}:
return None, "sort must be one of likedAt, repoLikes, repoDownloads"
return normalized, None
def _author_from_any(value: Any) -> str | None:
if isinstance(value, str):
return value
if isinstance(value, dict):
for k in ("name", "username", "user", "login"):
v = value.get(k)
if isinstance(v, str) and v:
return v
return None
def _clean_social_handle(value: Any) -> str | None:
if not isinstance(value, str):
return None
cleaned = value.strip()
if not cleaned:
return None
if re.match(r"^https?://", cleaned, flags=re.IGNORECASE):
return cleaned
return cleaned.lstrip("@")
def _social_url(kind: str, value: Any) -> str | None:
cleaned = _clean_social_handle(value)
if cleaned is None:
return None
if re.match(r"^https?://", cleaned, flags=re.IGNORECASE):
return cleaned
if kind == "twitter":
return f"https://twitter.com/{cleaned}"
if kind == "github":
return f"https://github.com/{cleaned}"
if kind == "linkedin":
if cleaned.startswith(("in/", "company/")):
return f"https://www.linkedin.com/{cleaned}"
return f"https://www.linkedin.com/in/{cleaned}"
if kind == "bluesky":
return f"https://bsky.app/profile/{cleaned}"
return cleaned
async def call_api(
endpoint: str,
params: dict[str, Any] | None = None,
method: str = "GET",
json_body: dict[str, Any] | None = None,
) -> dict[str, Any]:
return _host_raw_call(endpoint, params=params, method=method, json_body=json_body)
async def hf_whoami() -> dict[str, Any]:
start_calls = call_count["n"]
endpoint = "/api/whoami-v2"
token = _load_token()
if token is None:
return _helper_error(
start_calls=start_calls,
source=endpoint,
error=(
"Current authenticated user is unavailable for this request. "
"No request-scoped or fallback HF token was found."
),
)
try:
payload = _host_hf_call(
endpoint,
lambda: _get_hf_api_client().whoami(token=token, cache=True),
)
except Exception as e:
return _helper_error(start_calls=start_calls, source=endpoint, error=e)
username = payload.get("name") or payload.get("user") or payload.get("username")
item = {"username": username, "fullname": payload.get("fullname"), "isPro": payload.get("isPro")}
items = [item] if isinstance(username, str) and username else []
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=items,
scanned=1,
matched=len(items),
returned=len(items),
truncated=False,
)
async def hf_user_overview(username: str) -> dict[str, Any]:
start_calls = call_count["n"]
u = str(username or "").strip()
if not u:
return _helper_error(start_calls=start_calls, source="/api/users/<u>/overview", error="username is required")
endpoint = f"/api/users/{u}/overview"
try:
obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_user_overview(u))
except Exception as e:
return _helper_error(start_calls=start_calls, source=endpoint, error=e)
twitter = getattr(obj, "twitter", None) or getattr(obj, "twitterUsername", None)
github = getattr(obj, "github", None) or getattr(obj, "githubUsername", None)
linkedin = getattr(obj, "linkedin", None) or getattr(obj, "linkedinUsername", None)
bluesky = getattr(obj, "bluesky", None) or getattr(obj, "blueskyUsername", None)
if _budget_remaining() > 0 and any(v in {None, ""} for v in [twitter, github, linkedin, bluesky]):
socials_ep = f"/api/users/{u}/socials"
socials_resp = _host_raw_call(socials_ep)
if socials_resp.get("ok"):
socials_payload = socials_resp.get("data") if isinstance(socials_resp.get("data"), dict) else {}
handles = socials_payload.get("socialHandles") if isinstance(socials_payload.get("socialHandles"), dict) else {}
twitter = twitter or handles.get("twitter")
github = github or handles.get("github")
linkedin = linkedin or handles.get("linkedin")
bluesky = bluesky or handles.get("bluesky")
orgs_raw = getattr(obj, "orgs", None)
org_names: list[str] | None = None
if isinstance(orgs_raw, (list, tuple, set)):
names = []
for org in orgs_raw:
if isinstance(org, str) and org.strip():
names.append(org.strip())
continue
name = getattr(org, "name", None)
if isinstance(name, str) and name.strip():
names.append(name.strip())
org_names = names or None
twitter_handle = _clean_social_handle(twitter)
github_handle = _clean_social_handle(github)
linkedin_handle = _clean_social_handle(linkedin)
bluesky_handle = _clean_social_handle(bluesky)
item = {
"username": obj.username or u,
"fullname": obj.fullname,
"bio": getattr(obj, "details", None),
"avatarUrl": obj.avatar_url,
"websiteUrl": getattr(obj, "websiteUrl", None),
"twitter": _social_url("twitter", twitter_handle),
"github": _social_url("github", github_handle),
"linkedin": _social_url("linkedin", linkedin_handle),
"bluesky": _social_url("bluesky", bluesky_handle),
"twitterHandle": twitter_handle,
"githubHandle": github_handle,
"linkedinHandle": linkedin_handle,
"blueskyHandle": bluesky_handle,
"followers": _as_int(obj.num_followers),
"following": _as_int(obj.num_following),
"likes": _as_int(obj.num_likes),
"models": _as_int(getattr(obj, "num_models", None)),
"datasets": _as_int(getattr(obj, "num_datasets", None)),
"spaces": _as_int(getattr(obj, "num_spaces", None)),
"discussions": _as_int(getattr(obj, "num_discussions", None)),
"papers": _as_int(getattr(obj, "num_papers", None)),
"upvotes": _as_int(getattr(obj, "num_upvotes", None)),
"orgs": org_names,
"isPro": obj.is_pro,
}
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=[item],
scanned=1,
matched=1,
returned=1,
truncated=False,
)
async def hf_org_overview(organization: str) -> dict[str, Any]:
start_calls = call_count["n"]
org = str(organization or "").strip()
if not org:
return _helper_error(
start_calls=start_calls,
source="/api/organizations/<o>/overview",
error="organization is required",
)
endpoint = f"/api/organizations/{org}/overview"
try:
obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_organization_overview(org))
except Exception as e:
return _helper_error(start_calls=start_calls, source=endpoint, error=e)
item = {
"organization": obj.name or org,
"displayName": obj.fullname,
"avatarUrl": obj.avatar_url,
"description": obj.details,
"websiteUrl": getattr(obj, "websiteUrl", None),
"followers": _as_int(obj.num_followers),
"members": _as_int(obj.num_users),
"models": _as_int(getattr(obj, "num_models", None)),
"datasets": _as_int(getattr(obj, "num_datasets", None)),
"spaces": _as_int(getattr(obj, "num_spaces", None)),
}
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=[item],
scanned=1,
matched=1,
returned=1,
truncated=False,
)
async def hf_org_members(
organization: str,
return_limit: int | None = None,
scan_limit: int | None = None,
count_only: bool = False,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
org = str(organization or "").strip()
if not org:
return _helper_error(start_calls=start_calls, source="/api/organizations/<o>/members", error="organization is required")
default_return = _policy_int("hf_org_members", "default_return", 100)
scan_cap = _policy_int("hf_org_members", "scan_max", GRAPH_SCAN_LIMIT_CAP)
limit_plan = _resolve_exhaustive_limits(
return_limit=return_limit,
count_only=count_only,
default_return=default_return,
max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP,
scan_limit=scan_limit,
scan_cap=scan_cap,
)
ret_lim = int(limit_plan["applied_return_limit"])
scan_lim = int(limit_plan["applied_scan_limit"])
has_where = isinstance(where, dict) and bool(where)
overview_total: int | None = None
overview_source = f"/api/organizations/{org}/overview"
if _budget_remaining() > 0:
try:
org_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_organization_overview(org))
overview_total = _as_int(getattr(org_obj, "num_users", None))
except Exception:
overview_total = None
if count_only and not has_where and overview_total is not None:
return _overview_count_only_success(
start_calls=start_calls,
source=overview_source,
total=overview_total,
limit_plan=limit_plan,
base_meta={
"scanned": 1,
"count_source": "overview",
"organization": org,
},
)
endpoint = f"/api/organizations/{org}/members"
try:
rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_members(org), scan_lim)))
except Exception as e:
return _helper_error(start_calls=start_calls, source=endpoint, error=e, organization=org)
normalized: list[dict[str, Any]] = []
for row in rows:
handle = getattr(row, "username", None)
if not isinstance(handle, str) or not handle:
continue
item = {
"username": handle,
"fullname": getattr(row, "fullname", None),
"isPro": getattr(row, "is_pro", None),
"role": getattr(row, "role", None),
}
normalized.append(item)
normalized = _apply_where(normalized, where)
observed_total = len(rows)
scan_exhaustive = observed_total < scan_lim
overview_list_mismatch = (
overview_total is not None
and scan_exhaustive
and observed_total != overview_total
)
if has_where:
exact_count = scan_exhaustive
total = len(normalized)
total_matched = len(normalized)
else:
if overview_total is not None:
exact_count = True
total = overview_total
total_matched = overview_total
else:
exact_count = scan_exhaustive
total = observed_total
total_matched = observed_total
total_available = overview_total if overview_total is not None else observed_total
items = normalized[:ret_lim]
scan_limit_hit = not exact_count and observed_total >= scan_lim
count_source = "overview" if overview_total is not None and not has_where else "scan"
sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0)
more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total)
if not exact_count and scan_limit_hit:
more_available = "unknown" if has_where else True
items = _project_user_items(items, fields)
meta = _build_exhaustive_result_meta(
base_meta={
"scanned": observed_total,
"total": total,
"total_available": total_available,
"total_matched": total_matched,
"count_source": count_source,
"lower_bound": bool(has_where and not exact_count),
"overview_total": overview_total,
"listed_total": observed_total,
"overview_list_mismatch": overview_list_mismatch,
"organization": org,
},
limit_plan=limit_plan,
matched_count=len(normalized),
returned_count=len(items),
exact_count=exact_count,
count_only=count_only,
sample_complete=sample_complete,
more_available=more_available,
scan_limit_hit=scan_limit_hit,
)
return _helper_success(start_calls=start_calls, source=endpoint, items=items, meta=meta)
async def hf_repo_search(
query: str | None = None,
repo_type: str | None = None,
repo_types: list[str] | None = None,
author: str | None = None,
filters: list[str] | None = None,
sort: str | None = None,
limit: int = 20,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
advanced: dict[str, Any] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
default_return = _policy_int("hf_repo_search", "default_return", 20)
max_return = _policy_int("hf_repo_search", "max_return", SELECTIVE_ENDPOINT_RETURN_HARD_CAP)
if repo_type is not None and repo_types is not None:
return _helper_error(
start_calls=start_calls,
source="/api/repos",
error="Pass either repo_type or repo_types, not both",
)
if repo_types is None:
if repo_type is None or not str(repo_type).strip():
requested_repo_types = ["model"]
else:
rt = _canonical_repo_type(repo_type, default="")
if rt not in {"model", "dataset", "space"}:
return _helper_error(
start_calls=start_calls,
source="/api/repos",
error=f"Unsupported repo_type '{repo_type}'",
)
requested_repo_types = [rt]
else:
raw_types = _coerce_str_list(repo_types)
if not raw_types:
return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_types must not be empty")
requested_repo_types: list[str] = []
for raw in raw_types:
rt = _canonical_repo_type(raw, default="")
if rt not in {"model", "dataset", "space"}:
return _helper_error(
start_calls=start_calls,
source="/api/repos",
error=f"Unsupported repo_type '{raw}'",
)
requested_repo_types.append(rt)
filter_list = _coerce_str_list(filters)
term = str(query or "").strip()
author_clean = str(author or "").strip() or None
requested_limit = limit
lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return)
limit_meta = _derive_limit_metadata(
requested_return_limit=requested_limit,
applied_return_limit=lim,
default_limit_used=limit == default_return,
)
hard_cap_applied = bool(limit_meta.get("hard_cap_applied"))
if advanced is not None and not isinstance(advanced, dict):
return _helper_error(start_calls=start_calls, source="/api/repos", error="advanced must be a dict when provided")
if advanced is not None and len(requested_repo_types) != 1:
return _helper_error(
start_calls=start_calls,
source="/api/repos",
error="advanced may only be used with a single repo_type",
)
sort_keys: dict[str, str | None] = {}
for rt in requested_repo_types:
sort_key, sort_error = _normalize_repo_sort_key(rt, sort)
if sort_error:
return _helper_error(start_calls=start_calls, source=f"/api/{rt}s", error=sort_error)
sort_keys[rt] = sort_key
all_items: list[dict[str, Any]] = []
scanned = 0
source_endpoints: list[str] = []
limit_boundary_hit = False
api = _get_hf_api_client()
for rt in requested_repo_types:
endpoint = f"/api/{rt}s"
source_endpoints.append(endpoint)
extra_args = dict(advanced or {}) if len(requested_repo_types) == 1 else {}
allowed_extra = _REPO_SEARCH_EXTRA_ARGS.get(rt, set())
unsupported = sorted(str(k) for k in extra_args.keys() if str(k) not in allowed_extra)
if unsupported:
return _helper_error(
start_calls=start_calls,
source=endpoint,
error=(
f"Unsupported advanced args for repo_type='{rt}': {unsupported}. "
f"Allowed advanced args: {sorted(allowed_extra)}"
),
)
if "card_data" in extra_args and "cardData" not in extra_args:
extra_args["cardData"] = extra_args.pop("card_data")
else:
extra_args.pop("card_data", None)
if not any(key in extra_args for key in ("expand", "full", "cardData", "fetch_config")):
extra_args["expand"] = list(_REPO_SEARCH_DEFAULT_EXPAND[rt])
try:
if rt == "model":
payload = _host_hf_call(
endpoint,
lambda: list(
api.list_models(
search=term or None,
author=author_clean,
filter=filter_list or None,
sort=sort_keys[rt], # type: ignore[arg-type]
limit=lim,
**extra_args,
)
),
)
elif rt == "dataset":
payload = _host_hf_call(
endpoint,
lambda: list(
api.list_datasets(
search=term or None,
author=author_clean,
filter=filter_list or None,
sort=sort_keys[rt], # type: ignore[arg-type]
limit=lim,
**extra_args,
)
),
)
else:
payload = _host_hf_call(
endpoint,
lambda: list(
api.list_spaces(
search=term or None,
author=author_clean,
filter=filter_list or None,
sort=sort_keys[rt], # type: ignore[arg-type]
limit=lim,
**extra_args,
)
),
)
except Exception as e:
return _helper_error(start_calls=start_calls, source=endpoint, error=e)
scanned += len(payload)
if len(payload) >= lim:
limit_boundary_hit = True
all_items.extend(_normalize_repo_search_row(row, rt) for row in payload[:lim])
all_items = _apply_where(all_items, where)
combined_sort_key = next(iter(sort_keys.values()), None)
all_items = _sort_repo_rows(all_items, combined_sort_key)
matched = len(all_items)
all_items = _project_repo_items(all_items[:lim], fields)
more_available: bool | str = False
truncated = False
truncated_by = "none"
next_request_hint: str | None = None
if hard_cap_applied and scanned >= lim:
truncated = True
truncated_by = "hard_cap"
more_available = "unknown"
next_request_hint = f"Increase limit above {lim} to improve coverage"
elif limit_boundary_hit:
more_available = "unknown"
next_request_hint = f"Increase limit above {lim} to check whether more rows exist"
return _helper_success(
start_calls=start_calls,
source=",".join(source_endpoints),
items=all_items,
query=term or None,
repo_types=requested_repo_types,
filters=filter_list or None,
sort=combined_sort_key,
author=author_clean,
limit=lim,
scanned=scanned,
matched=matched,
returned=len(all_items),
truncated=truncated,
truncated_by=truncated_by,
more_available=more_available,
limit_boundary_hit=limit_boundary_hit,
next_request_hint=next_request_hint,
**limit_meta,
)
async def _user_graph_helper(
kind: str,
username: str,
pro_only: bool | None,
return_limit: int | None,
scan_limit: int | None,
count_only: bool,
where: dict[str, Any] | None,
fields: list[str] | None,
*,
helper_name: str,
) -> dict[str, Any]:
start_calls = call_count["n"]
default_return = _policy_int(helper_name, "default_return", 100)
scan_cap = _policy_int(helper_name, "scan_max", GRAPH_SCAN_LIMIT_CAP)
max_return = _policy_int(helper_name, "max_return", EXHAUSTIVE_HELPER_RETURN_HARD_CAP)
u = str(username or "").strip()
if not u:
return _helper_error(start_calls=start_calls, source=f"/api/users/<u>/{kind}", error="username is required")
limit_plan = _resolve_exhaustive_limits(
return_limit=return_limit,
count_only=count_only,
default_return=default_return,
max_return=max_return,
scan_limit=scan_limit,
scan_cap=scan_cap,
)
ret_lim = int(limit_plan["applied_return_limit"])
scan_lim = int(limit_plan["applied_scan_limit"])
has_where = isinstance(where, dict) and bool(where)
filtered = (pro_only is not None) or has_where
entity_type = "user"
overview_total: int | None = None
overview_source = f"/api/users/{u}/overview"
if _budget_remaining() > 0:
try:
user_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_user_overview(u))
overview_total = _as_int(user_obj.num_followers if kind == "followers" else user_obj.num_following)
except Exception:
org_overview_source = f"/api/organizations/{u}/overview"
try:
org_obj = _host_hf_call(org_overview_source, lambda: _get_hf_api_client().get_organization_overview(u))
except Exception:
overview_total = None
else:
entity_type = "organization"
overview_source = org_overview_source
if kind != "followers":
return _helper_error(
start_calls=start_calls,
source=f"/api/organizations/{u}/{kind}",
error="organization graph only supports relation='followers'; organizations do not expose a following list",
relation=kind,
organization=u,
entity=u,
entity_type=entity_type,
)
overview_total = _as_int(getattr(org_obj, "num_followers", None))
if count_only and not filtered and overview_total is not None:
return _overview_count_only_success(
start_calls=start_calls,
source=overview_source,
total=overview_total,
limit_plan=limit_plan,
base_meta={
"scanned": 1,
"count_source": "overview",
"relation": kind,
"pro_only": pro_only,
"where_applied": has_where,
"entity": u,
"entity_type": entity_type,
"username": u,
"organization": u if entity_type == "organization" else None,
},
)
endpoint = f"/api/users/{u}/{kind}"
try:
if entity_type == "organization":
endpoint = f"/api/organizations/{u}/followers"
rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_followers(u), scan_lim)))
elif kind == "followers":
rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_followers(u), scan_lim)))
else:
rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_following(u), scan_lim)))
except Exception as e:
return _helper_error(
start_calls=start_calls,
source=endpoint,
error=e,
relation=kind,
username=u,
entity=u,
entity_type=entity_type,
organization=u if entity_type == "organization" else None,
)
normalized: list[dict[str, Any]] = []
for row in rows:
handle = getattr(row, "username", None)
if not isinstance(handle, str) or not handle:
continue
item = {
"username": handle,
"fullname": getattr(row, "fullname", None),
"isPro": getattr(row, "is_pro", None),
}
if pro_only is True and item.get("isPro") is not True:
continue
if pro_only is False and item.get("isPro") is True:
continue
normalized.append(item)
normalized = _apply_where(normalized, where)
observed_total = len(rows)
scan_exhaustive = observed_total < scan_lim
overview_list_mismatch = (
overview_total is not None
and scan_exhaustive
and observed_total != overview_total
)
if filtered:
exact_count = scan_exhaustive
total = len(normalized)
total_matched = len(normalized)
else:
if overview_total is not None:
exact_count = True
total = overview_total
total_matched = overview_total
else:
exact_count = scan_exhaustive
total = observed_total
total_matched = observed_total
total_available = overview_total if overview_total is not None else observed_total
items = normalized[:ret_lim]
scan_limit_hit = not exact_count and observed_total >= scan_lim
count_source = "overview" if overview_total is not None and not filtered else "scan"
sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0)
more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total)
if not exact_count and scan_limit_hit:
more_available = "unknown" if filtered else True
items = _project_user_items(items, fields)
meta = _build_exhaustive_result_meta(
base_meta={
"scanned": observed_total,
"total": total,
"total_available": total_available,
"total_matched": total_matched,
"count_source": count_source,
"lower_bound": bool(filtered and not exact_count),
"overview_total": overview_total,
"listed_total": observed_total,
"overview_list_mismatch": overview_list_mismatch,
"relation": kind,
"pro_only": pro_only,
"where_applied": has_where,
"entity": u,
"entity_type": entity_type,
"username": u,
"organization": u if entity_type == "organization" else None,
},
limit_plan=limit_plan,
matched_count=len(normalized),
returned_count=len(items),
exact_count=exact_count,
count_only=count_only,
sample_complete=sample_complete,
more_available=more_available,
scan_limit_hit=scan_limit_hit,
)
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=items,
meta=meta,
)
async def hf_profile_summary(
handle: str | None = None,
include: list[str] | None = None,
likes_limit: int = 10,
activity_limit: int = 10,
) -> dict[str, Any]:
start_calls = call_count["n"]
resolved_handle, resolve_error = await _resolve_username_or_current(handle)
if resolve_error:
return _helper_error(start_calls=start_calls, source="/api/users/<u>/overview", error=resolve_error)
if not isinstance(resolved_handle, str):
return _helper_error(
start_calls=start_calls,
source="/api/users/<u>/overview",
error="handle was not provided and current authenticated user could not be resolved",
)
try:
requested_sections = (
{part.lower() for part in _coerce_str_list(include) if part.strip()} if include is not None else set()
)
except ValueError as e:
return _helper_error(
start_calls=start_calls,
source=f"/api/users/{resolved_handle}/overview",
error=e,
)
invalid_sections = sorted(requested_sections - {"likes", "activity"})
if invalid_sections:
return _helper_error(
start_calls=start_calls,
source=f"/api/users/{resolved_handle}/overview",
error=f"Unsupported include values: {invalid_sections}",
)
likes_lim = _clamp_int(likes_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT)
activity_lim = _clamp_int(activity_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT)
section_errors: dict[str, str] = {}
user_overview = await hf_user_overview(resolved_handle)
if user_overview.get("ok") is True:
overview_item = _helper_item(user_overview) or {"username": resolved_handle}
item: dict[str, Any] = {
"handle": str(overview_item.get("username") or resolved_handle),
"entity_type": "user",
"display_name": overview_item.get("fullname") or str(overview_item.get("username") or resolved_handle),
"bio": overview_item.get("bio"),
"avatar_url": overview_item.get("avatarUrl"),
"website_url": overview_item.get("websiteUrl"),
"twitter_url": overview_item.get("twitter"),
"github_url": overview_item.get("github"),
"linkedin_url": overview_item.get("linkedin"),
"bluesky_url": overview_item.get("bluesky"),
"followers_count": _overview_count(overview_item, "followers"),
"following_count": _overview_count(overview_item, "following"),
"likes_count": _overview_count(overview_item, "likes"),
"models_count": _overview_count(overview_item, "models"),
"datasets_count": _overview_count(overview_item, "datasets"),
"spaces_count": _overview_count(overview_item, "spaces"),
"discussions_count": _overview_count(overview_item, "discussions"),
"papers_count": _overview_count(overview_item, "papers"),
"upvotes_count": _overview_count(overview_item, "upvotes"),
"organizations": overview_item.get("orgs"),
"is_pro": overview_item.get("isPro"),
}
if "likes" in requested_sections:
likes = await hf_user_likes(
username=resolved_handle,
return_limit=likes_lim,
scan_limit=USER_SUMMARY_LIKES_SCAN_LIMIT,
count_only=likes_lim == 0,
sort="likedAt",
fields=["liked_at", "repo_id", "repo_type", "repo_author", "repo_url"],
)
item["likes_sample"] = likes.get("items") if likes.get("ok") is True else []
if likes.get("ok") is not True:
section_errors["likes"] = str(likes.get("error") or "likes fetch failed")
if "activity" in requested_sections:
activity = await hf_recent_activity(
feed_type="user",
entity=resolved_handle,
return_limit=activity_lim,
max_pages=USER_SUMMARY_ACTIVITY_MAX_PAGES,
count_only=activity_lim == 0,
fields=["timestamp", "event_type", "repo_type", "repo_id"],
)
item["activity_sample"] = activity.get("items") if activity.get("ok") is True else []
if activity.get("ok") is not True:
section_errors["activity"] = str(activity.get("error") or "activity fetch failed")
return _helper_success(
start_calls=start_calls,
source=f"/api/users/{resolved_handle}/overview",
items=[item],
scanned=1,
matched=1,
returned=1,
truncated=False,
handle=resolved_handle,
entity_type="user",
include=sorted(requested_sections),
likes_limit=likes_lim,
activity_limit=activity_lim,
section_errors=section_errors or None,
)
org_overview = await hf_org_overview(resolved_handle)
if org_overview.get("ok") is True:
overview_item = _helper_item(org_overview) or {"organization": resolved_handle}
item = {
"handle": str(overview_item.get("organization") or resolved_handle),
"entity_type": "organization",
"display_name": overview_item.get("displayName") or str(overview_item.get("organization") or resolved_handle),
"description": overview_item.get("description"),
"avatar_url": overview_item.get("avatarUrl"),
"website_url": overview_item.get("websiteUrl"),
"followers_count": _overview_count(overview_item, "followers"),
"members_count": _overview_count(overview_item, "members"),
"models_count": _overview_count(overview_item, "models"),
"datasets_count": _overview_count(overview_item, "datasets"),
"spaces_count": _overview_count(overview_item, "spaces"),
}
return _helper_success(
start_calls=start_calls,
source=f"/api/organizations/{resolved_handle}/overview",
items=[item],
scanned=1,
matched=1,
returned=1,
truncated=False,
handle=resolved_handle,
entity_type="organization",
include=[],
ignored_includes=sorted(requested_sections) or None,
)
error = user_overview.get("error") or org_overview.get("error") or "profile fetch failed"
return _helper_error(
start_calls=start_calls,
source=f"/api/profiles/{resolved_handle}",
error=error,
handle=resolved_handle,
)
async def hf_user_graph(
username: str | None = None,
relation: str = "followers",
return_limit: int | None = None,
scan_limit: int | None = None,
count_only: bool = False,
pro_only: bool | None = None,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
rel = str(relation or "").strip().lower() or "followers"
if rel not in {"followers", "following"}:
return _helper_error(
start_calls=start_calls,
source="/api/users/<u>/followers",
error="relation must be 'followers' or 'following'",
)
resolved_username, resolve_error = await _resolve_username_or_current(username)
if resolve_error:
return _helper_error(start_calls=start_calls, source=f"/api/users/<u>/{rel}", error=resolve_error, relation=rel)
if not isinstance(resolved_username, str):
return _helper_error(start_calls=start_calls, source=f"/api/users/<u>/{rel}", error="username is required", relation=rel)
return await _user_graph_helper(
rel,
resolved_username,
pro_only,
return_limit,
scan_limit,
count_only,
where,
fields,
helper_name="hf_user_graph",
)
async def hf_user_likes(
username: str | None = None,
repo_types: list[str] | None = None,
return_limit: int | None = None,
scan_limit: int | None = None,
count_only: bool = False,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
sort: str | None = None,
ranking_window: int | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
default_return = _policy_int("hf_user_likes", "default_return", 100)
scan_cap = _policy_int("hf_user_likes", "scan_max", LIKES_SCAN_LIMIT_CAP)
ranking_default = _policy_int("hf_user_likes", "ranking_default", LIKES_RANKING_WINDOW_DEFAULT)
enrich_cap = _policy_int("hf_user_likes", "enrich_max", LIKES_ENRICHMENT_MAX_REPOS)
resolved_username, resolve_error = await _resolve_username_or_current(username)
if resolve_error:
return _helper_error(start_calls=start_calls, source="/api/users/<u>/likes", error=resolve_error)
if not isinstance(resolved_username, str):
return _helper_error(start_calls=start_calls, source="/api/users/<u>/likes", error="username is required")
sort_key, sort_error = _normalize_user_likes_sort(sort)
if sort_error:
return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=sort_error)
if sort_key is None:
return _helper_error(
start_calls=start_calls,
source=f"/api/users/{resolved_username}/likes",
error="sort must be one of likedAt, repoLikes, repoDownloads",
)
limit_plan = _resolve_exhaustive_limits(
return_limit=return_limit,
count_only=count_only,
default_return=default_return,
max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP,
scan_limit=scan_limit,
scan_cap=scan_cap,
)
ret_lim = int(limit_plan["applied_return_limit"])
scan_lim = int(limit_plan["applied_scan_limit"])
allowed_repo_types: set[str] | None = None
try:
raw_repo_types: list[str] = _coerce_str_list(repo_types) if repo_types is not None else []
except ValueError as e:
return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=e)
if raw_repo_types:
allowed_repo_types = set()
for raw in raw_repo_types:
canonical = _canonical_repo_type(raw, default="")
if canonical not in {"model", "dataset", "space"}:
return _helper_error(
start_calls=start_calls,
source=f"/api/users/{resolved_username}/likes",
error=f"Unsupported repo_type '{raw}'",
)
allowed_repo_types.add(canonical)
endpoint = f"/api/users/{resolved_username}/likes"
resp = _host_raw_call(endpoint, params={"limit": scan_lim})
if not resp.get("ok"):
return _helper_error(
start_calls=start_calls,
source=endpoint,
error=resp.get("error") or "likes fetch failed",
)
payload = resp.get("data") if isinstance(resp.get("data"), list) else []
scanned_rows = payload[:scan_lim]
matched_rows: list[tuple[int, dict[str, Any]]] = []
for row in scanned_rows:
if not isinstance(row, dict):
continue
repo = row.get("repo") if isinstance(row.get("repo"), dict) else {}
repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else {}
repo_id = repo_data.get("id") or repo_data.get("name") or repo.get("name")
if not isinstance(repo_id, str) or not repo_id:
continue
repo_type = _canonical_repo_type(repo_data.get("type") or repo.get("type"), default="")
if not repo_type:
repo_type = _canonical_repo_type(repo.get("type"), default="model")
if allowed_repo_types is not None and repo_type not in allowed_repo_types:
continue
repo_author = repo_data.get("author")
if not isinstance(repo_author, str) and "/" in repo_id:
repo_author = repo_id.split("/", 1)[0]
item = {
"likedAt": row.get("likedAt") or row.get("createdAt"),
"liked_at": row.get("likedAt") or row.get("createdAt"),
"repoId": repo_id,
"repo_id": repo_id,
"repoType": repo_type,
"repo_type": repo_type,
"repoAuthor": repo_author,
"repo_author": repo_author,
"repoLikes": _as_int(repo_data.get("likes")),
"repo_likes": _as_int(repo_data.get("likes")),
"repoDownloads": _as_int(repo_data.get("downloads")),
"repo_downloads": _as_int(repo_data.get("downloads")),
"likes": _as_int(repo_data.get("likes")),
"downloads": _as_int(repo_data.get("downloads")),
"repo_url": _repo_web_url(repo_type, repo_id),
}
if not _item_matches_where(item, where):
continue
matched_rows.append((len(matched_rows), item))
matched = len(matched_rows)
scan_exhaustive = len(payload) < scan_lim
exact_count = scan_exhaustive
total_matched = matched
total = total_matched
effective_ranking_window: int | None = None
ranking_complete = sort_key == "likedAt" and exact_count
enriched = 0
selected_pairs: list[tuple[int, dict[str, Any]]]
if count_only:
selected_pairs = []
ranking_complete = False if matched > 0 else exact_count
elif sort_key == "likedAt":
selected_pairs = matched_rows[:ret_lim]
else:
metric = str(sort_key)
requested_window = ranking_window if ranking_window is not None else ranking_default
effective_ranking_window = _clamp_int(
requested_window,
default=ranking_default,
minimum=1,
maximum=enrich_cap,
)
shortlist_size = min(effective_ranking_window, matched, scan_lim)
shortlist = matched_rows[:shortlist_size]
candidates = [
pair
for pair in shortlist
if pair[1].get(metric) is None
and isinstance(pair[1].get("repoId"), str)
and pair[1].get("repoType") in {"model", "dataset", "space"}
]
enrich_budget = min(len(candidates), _budget_remaining(), shortlist_size)
for _, item in candidates[:enrich_budget]:
repo_type = str(item.get("repoType"))
repo_id = str(item.get("repoId"))
detail_endpoint = f"/api/{_canonical_repo_type(repo_type)}s/{repo_id}"
try:
detail = _host_hf_call(
detail_endpoint,
lambda rt=repo_type, rid=repo_id: (
_get_hf_api_client().model_info(rid)
if _canonical_repo_type(rt) == "model"
else _get_hf_api_client().dataset_info(rid)
if _canonical_repo_type(rt) == "dataset"
else _get_hf_api_client().space_info(rid)
),
)
except Exception:
continue
likes = _as_int(getattr(detail, "likes", None))
downloads = _as_int(getattr(detail, "downloads", None))
if likes is not None:
item["repoLikes"] = likes
item["repo_likes"] = likes
item["likes"] = likes
if downloads is not None:
item["repoDownloads"] = downloads
item["repo_downloads"] = downloads
item["downloads"] = downloads
enriched += 1
def _ranking_key(pair: tuple[int, dict[str, Any]]) -> tuple[int, int, int]:
idx, row = pair
metric_value = _as_int(row.get(metric))
if metric_value is None:
return (1, 0, idx)
return (0, -metric_value, idx)
ranked_shortlist = sorted(shortlist, key=_ranking_key)
selected_pairs = ranked_shortlist[:ret_lim]
ranking_complete = exact_count and shortlist_size >= matched and len(candidates) <= enrich_budget
items = _project_items([row for _, row in selected_pairs], fields)
popularity_present = sum(1 for _, row in selected_pairs if row.get("repoLikes") is not None)
sample_complete = (
exact_count
and ret_lim >= matched
and (sort_key == "likedAt" or ranking_complete)
and (not count_only or matched == 0)
)
scan_limit_hit = not scan_exhaustive and len(payload) >= scan_lim
more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total)
if scan_limit_hit:
more_available = "unknown" if (allowed_repo_types is not None or where) else True
meta = _build_exhaustive_result_meta(
base_meta={
"scanned": len(scanned_rows),
"total": total,
"total_available": len(payload),
"total_matched": total_matched,
"count_source": "scan",
"lower_bound": not exact_count,
"enriched": enriched,
"popularity_present": popularity_present,
"sort_applied": sort_key,
"ranking_window": effective_ranking_window,
"ranking_complete": ranking_complete,
"username": resolved_username,
},
limit_plan=limit_plan,
matched_count=matched,
returned_count=len(items),
exact_count=exact_count,
count_only=count_only,
sample_complete=sample_complete,
more_available=more_available,
scan_limit_hit=scan_limit_hit,
truncated_extra=sort_key != "likedAt" and not ranking_complete,
)
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=items,
meta=meta,
)
async def hf_repo_likers(
repo_id: str,
repo_type: str,
return_limit: int | None = None,
count_only: bool = False,
pro_only: bool | None = None,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
rid = str(repo_id or "").strip()
if not rid:
return _helper_error(start_calls=start_calls, source="/api/repos/<repo>/likers", error="repo_id is required")
rt = _canonical_repo_type(repo_type, default="")
if rt not in {"model", "dataset", "space"}:
return _helper_error(
start_calls=start_calls,
source=f"/api/repos/{rid}/likers",
error=f"Unsupported repo_type '{repo_type}'",
repo_id=rid,
)
default_return = _policy_int("hf_repo_likers", "default_return", 1_000)
requested_return_limit = return_limit
default_limit_used = requested_return_limit is None and not count_only
has_where = isinstance(where, dict) and bool(where)
endpoint = f"/api/{rt}s/{rid}/likers"
resp = _host_raw_call(endpoint)
if not resp.get("ok"):
return _helper_error(
start_calls=start_calls,
source=endpoint,
error=resp.get("error") or "repo likers fetch failed",
repo_id=rid,
repo_type=rt,
)
payload = resp.get("data") if isinstance(resp.get("data"), list) else []
normalized: list[dict[str, Any]] = []
for row in payload:
if not isinstance(row, dict):
continue
username = row.get("user") or row.get("username")
if not isinstance(username, str) or not username:
continue
item = {
"username": username,
"fullname": row.get("fullname"),
"type": row.get("type") if isinstance(row.get("type"), str) and row.get("type") else "user",
"isPro": row.get("isPro"),
}
if pro_only is True and item.get("isPro") is not True:
continue
if pro_only is False and item.get("isPro") is True:
continue
if not _item_matches_where(item, where):
continue
normalized.append(item)
# /likers is a one-shot full-list endpoint: the Hub returns the liker rows in a
# single response with no cursor/scan continuation. Keep the default output compact,
# but do not apply the generic exhaustive hard cap here because it does not improve
# upstream coverage or cost; the full liker set has already been fetched.
if count_only:
ret_lim = 0
elif requested_return_limit is None:
ret_lim = default_return
else:
try:
ret_lim = max(0, int(requested_return_limit))
except Exception:
ret_lim = default_return
limit_plan = {
"requested_return_limit": requested_return_limit,
"applied_return_limit": ret_lim,
"default_limit_used": default_limit_used,
"hard_cap_applied": False,
}
matched = len(normalized)
items = [] if count_only else normalized[:ret_lim]
return_limit_hit = ret_lim > 0 and matched > ret_lim
truncated_by = _derive_truncated_by(
hard_cap=False,
return_limit_hit=return_limit_hit,
)
sample_complete = matched <= ret_lim and (not count_only or matched == 0)
truncated = truncated_by != "none"
more_available = _derive_more_available(
sample_complete=sample_complete,
exact_count=True,
returned=len(items),
total=matched,
)
items = _project_actor_items(items, fields)
meta = _build_exhaustive_meta(
base_meta={
"scanned": len(payload),
"matched": matched,
"returned": len(items),
"total": matched,
"total_available": len(payload),
"total_matched": matched,
"truncated": truncated,
"count_source": "likers_list",
"lower_bound": False,
"repo_id": rid,
"repo_type": rt,
"pro_only": pro_only,
"where_applied": has_where,
"upstream_pagination": "none",
},
limit_plan=limit_plan,
sample_complete=sample_complete,
exact_count=True,
truncated_by=truncated_by,
more_available=more_available,
)
meta["hard_cap_applied"] = False
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=items,
meta=meta,
)
async def hf_recent_activity(
feed_type: str | None = None,
entity: str | None = None,
activity_types: list[str] | None = None,
repo_types: list[str] | None = None,
return_limit: int | None = None,
max_pages: int | None = None,
start_cursor: str | None = None,
count_only: bool = False,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
default_return = _policy_int("hf_recent_activity", "default_return", 100)
page_cap = _policy_int("hf_recent_activity", "page_limit", RECENT_ACTIVITY_PAGE_SIZE)
pages_cap = _policy_int("hf_recent_activity", "max_pages", RECENT_ACTIVITY_SCAN_MAX_PAGES)
requested_max_pages = max_pages
ft = str(feed_type or "").strip().lower()
ent = str(entity or "").strip()
if ft not in {"user", "org"}:
if ft and not ent:
ent = ft
ft = "user"
elif not ft and ent:
ft = "user"
if ft not in {"user", "org"}:
return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="feed_type must be 'user' or 'org'")
if not ent:
return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="entity is required")
limit_plan = _resolve_exhaustive_limits(
return_limit=return_limit,
count_only=count_only,
default_return=default_return,
max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP,
)
ret_lim = int(limit_plan["applied_return_limit"])
page_lim = page_cap
pages_lim = _clamp_int(requested_max_pages, default=pages_cap, minimum=1, maximum=pages_cap)
type_filter = {str(t).strip().lower() for t in (activity_types or []) if str(t).strip()}
repo_filter = {_canonical_repo_type(t, default="") for t in (repo_types or []) if str(t).strip()}
next_cursor = str(start_cursor).strip() if isinstance(start_cursor, str) and start_cursor.strip() else None
items: list[dict[str, Any]] = []
scanned = 0
matched = 0
pages = 0
exhausted_feed = False
stopped_for_budget = False
while pages < pages_lim and (ret_lim == 0 or len(items) < ret_lim):
if _budget_remaining() <= 0:
stopped_for_budget = True
break
params: dict[str, Any] = {"feedType": ft, "entity": ent, "limit": page_lim}
if next_cursor:
params["cursor"] = next_cursor
resp = _host_raw_call("/api/recent-activity", params=params)
if not resp.get("ok"):
if pages == 0:
return _helper_error(
start_calls=start_calls,
source="/api/recent-activity",
error=resp.get("error") or "recent-activity fetch failed",
)
break
payload = resp.get("data") if isinstance(resp.get("data"), dict) else {}
rows = payload.get("recentActivity") if isinstance(payload.get("recentActivity"), list) else []
cursor_raw = payload.get("cursor")
next_cursor = cursor_raw if isinstance(cursor_raw, str) and cursor_raw else None
pages += 1
if not rows:
exhausted_feed = True
break
for row in rows:
if not isinstance(row, dict):
continue
scanned += 1
typ = str(row.get("type") or "").strip().lower()
repo_id = row.get("repoId")
repo_type = row.get("repoType")
repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else None
repo_obj = row.get("repo") if isinstance(row.get("repo"), dict) else None
if repo_id is None and repo_data is not None:
repo_id = repo_data.get("id") or repo_data.get("name")
if repo_id is None and repo_obj is not None:
repo_id = repo_obj.get("id") or repo_obj.get("name")
if repo_type is None and repo_data is not None:
repo_type = repo_data.get("type")
if repo_type is None and repo_obj is not None:
repo_type = repo_obj.get("type")
rt = _canonical_repo_type(repo_type, default="") if repo_type else ""
if type_filter and typ not in type_filter:
continue
if repo_filter and rt not in repo_filter:
continue
item = {
"time": row.get("time"),
"timestamp": row.get("time"),
"type": row.get("type"),
"event_type": row.get("type"),
"repoType": rt or repo_type,
"repo_type": rt or repo_type,
"repoId": repo_id,
"repo_id": repo_id,
}
if not _item_matches_where(item, where):
continue
matched += 1
if len(items) < ret_lim:
items.append(item)
if not next_cursor:
exhausted_feed = True
break
items = _project_items(items, fields)
exact_count = exhausted_feed and not stopped_for_budget
sample_complete = exact_count and ret_lim >= matched and (not count_only or matched == 0)
page_limit_hit = next_cursor is not None and pages >= pages_lim and not exhausted_feed
more_available: bool | str = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=matched if exact_count else None)
if next_cursor is not None:
more_available = True
elif stopped_for_budget and not exact_count:
more_available = "unknown"
meta = _build_exhaustive_result_meta(
base_meta={
"scanned": scanned,
"total": matched,
"total_matched": matched,
"pages": pages,
"count_source": "scan" if exact_count else "none",
"lower_bound": not exact_count,
"page_limit": page_lim,
"stopped_for_budget": stopped_for_budget,
"feed_type": ft,
"entity": ent,
},
limit_plan=limit_plan,
matched_count=matched,
returned_count=len(items),
exact_count=exact_count,
count_only=count_only,
sample_complete=sample_complete,
more_available=more_available,
page_limit_hit=page_limit_hit,
truncated_extra=stopped_for_budget,
requested_max_pages=requested_max_pages,
applied_max_pages=pages_lim,
)
return _helper_success(
start_calls=start_calls,
source="/api/recent-activity",
items=items,
meta=meta,
cursor=next_cursor,
)
async def hf_repo_discussions(repo_type: str, repo_id: str, limit: int = 20) -> dict[str, Any]:
start_calls = call_count["n"]
rt = _canonical_repo_type(repo_type)
rid = str(repo_id or "").strip()
if "/" not in rid:
return _helper_error(start_calls=start_calls, source="/api/.../discussions", error="repo_id must be owner/name")
lim = _clamp_int(limit, default=20, minimum=1, maximum=SELECTIVE_ENDPOINT_RETURN_HARD_CAP)
endpoint = f"/api/{rt}s/{rid}/discussions"
try:
discussions = _host_hf_call(
endpoint,
lambda: list(islice(_get_hf_api_client().get_repo_discussions(repo_id=rid, repo_type=rt), lim)),
)
except Exception as e:
return _helper_error(start_calls=start_calls, source=endpoint, error=e)
items: list[dict[str, Any]] = []
for d in discussions:
num = _as_int(getattr(d, "num", None))
items.append(
{
"num": num,
"number": num,
"discussionNum": num,
"id": num,
"title": getattr(d, "title", None),
"author": getattr(d, "author", None),
"createdAt": str(getattr(d, "created_at", None)) if getattr(d, "created_at", None) is not None else None,
"status": getattr(d, "status", None),
}
)
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=items,
scanned=len(items),
matched=len(items),
returned=len(items),
truncated=False,
total_count=None,
)
async def hf_repo_discussion_details(repo_type: str, repo_id: str, discussion_num: int) -> dict[str, Any]:
start_calls = call_count["n"]
rt = _canonical_repo_type(repo_type)
rid = str(repo_id or "").strip()
if "/" not in rid:
return _helper_error(start_calls=start_calls, source="/api/.../discussions/<num>", error="repo_id must be owner/name")
num = _as_int(discussion_num)
if num is None:
return _helper_error(
start_calls=start_calls,
source=f"/api/{rt}s/{rid}/discussions/<num>",
error="discussion_num must be an integer",
)
endpoint = f"/api/{rt}s/{rid}/discussions/{num}"
try:
detail = _host_hf_call(
endpoint,
lambda: _get_hf_api_client().get_discussion_details(
repo_id=rid,
discussion_num=int(num),
repo_type=rt,
),
)
except Exception as e:
return _helper_error(start_calls=start_calls, source=endpoint, error=e)
comment_events: list[dict[str, Any]] = []
raw_events = getattr(detail, "events", None)
if isinstance(raw_events, list):
for event in raw_events:
if str(getattr(event, "type", "")).strip().lower() != "comment":
continue
comment_events.append(
{
"author": getattr(event, "author", None),
"createdAt": _dt_to_str(getattr(event, "created_at", None)),
"text": getattr(event, "content", None),
"rendered": getattr(event, "rendered", None),
}
)
latest_comment: dict[str, Any] | None = None
if comment_events:
latest_comment = max(comment_events, key=lambda row: str(row.get("createdAt") or ""))
item: dict[str, Any] = {
"num": num,
"number": num,
"discussionNum": num,
"id": num,
"repo_id": rid,
"repo_type": rt,
"title": getattr(detail, "title", None),
"author": getattr(detail, "author", None),
"createdAt": _dt_to_str(getattr(detail, "created_at", None)),
"status": getattr(detail, "status", None),
"url": getattr(detail, "url", None),
"commentCount": len(comment_events),
"latestCommentAuthor": latest_comment.get("author") if latest_comment else None,
"latestCommentCreatedAt": latest_comment.get("createdAt") if latest_comment else None,
"latestCommentText": latest_comment.get("text") if latest_comment else None,
"latestCommentHtml": latest_comment.get("rendered") if latest_comment else None,
"latest_comment_author": latest_comment.get("author") if latest_comment else None,
"latest_comment_created_at": latest_comment.get("createdAt") if latest_comment else None,
"latest_comment_text": latest_comment.get("text") if latest_comment else None,
"latest_comment_html": latest_comment.get("rendered") if latest_comment else None,
}
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=[item],
scanned=len(comment_events),
matched=1,
returned=1,
truncated=False,
total_comments=len(comment_events),
)
def _resolve_repo_detail_row(
api: HfApi,
repo_id: str,
attempt_types: list[str],
) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
rid = str(repo_id or "").strip()
if "/" not in rid:
return None, {"repo_id": rid, "error": "repo_id must be owner/name"}
resolved_type: str | None = None
detail: Any = None
last_endpoint = "/api/repos"
errors: list[str] = []
for rt in attempt_types:
endpoint = f"/api/{rt}s/{rid}"
last_endpoint = endpoint
try:
detail = _host_hf_call(
endpoint,
lambda rt=rt, rid=rid: api.model_info(rid)
if rt == "model"
else api.dataset_info(rid)
if rt == "dataset"
else api.space_info(rid),
)
resolved_type = rt
break
except Exception as e:
errors.append(f"{rt}: {str(e)}")
if resolved_type is None or detail is None:
return None, {
"repo_id": rid,
"error": "; ".join(errors[:3]) if errors else "repo lookup failed",
"attempted_repo_types": list(attempt_types),
"source": last_endpoint,
}
return _normalize_repo_detail_row(detail, resolved_type, rid), None
async def hf_repo_details(
repo_id: str | None = None,
repo_ids: list[str] | None = None,
repo_type: str = "auto",
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
if repo_id is not None and repo_ids is not None:
return _helper_error(
start_calls=start_calls,
source="/api/repos",
error="Pass either repo_id or repo_ids, not both",
)
requested_ids = [str(repo_id).strip()] if isinstance(repo_id, str) and str(repo_id).strip() else []
if repo_ids is not None:
requested_ids = _coerce_str_list(repo_ids)
if not requested_ids:
return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_id or repo_ids is required")
raw_type = str(repo_type or "auto").strip().lower()
if raw_type in {"", "auto"}:
base_attempt_types = ["model", "dataset", "space"]
else:
canonical_type = _canonical_repo_type(raw_type, default="")
if canonical_type not in {"model", "dataset", "space"}:
return _helper_error(
start_calls=start_calls,
source="/api/repos",
error=f"Unsupported repo_type '{repo_type}'",
)
base_attempt_types = [canonical_type]
api = _get_hf_api_client()
items: list[dict[str, Any]] = []
failures: list[dict[str, Any]] = []
for rid in requested_ids:
row, failure = _resolve_repo_detail_row(api, rid, base_attempt_types)
if row is None:
if failure is not None:
failures.append(failure)
continue
items.append(row)
if not items:
summary = failures[0]["error"] if failures else "repo lookup failed"
return _helper_error(
start_calls=start_calls,
source="/api/repos",
error=summary,
failures=failures,
repo_type=repo_type,
)
items = _project_repo_items(items, fields)
return _helper_success(
start_calls=start_calls,
source="/api/repos",
items=items,
repo_type=repo_type,
requested_repo_ids=requested_ids,
failures=failures or None,
matched=len(items),
returned=len(items),
)
async def hf_trending(
repo_type: str = "model",
limit: int = 20,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
default_return = _policy_int("hf_trending", "default_return", 20)
max_return = _policy_int("hf_trending", "max_return", TRENDING_ENDPOINT_MAX_LIMIT)
raw_type = str(repo_type or "model").strip().lower()
if raw_type == "all":
requested_type = "all"
else:
requested_type = _canonical_repo_type(raw_type, default="")
if requested_type not in {"model", "dataset", "space"}:
return _helper_error(
start_calls=start_calls,
source="/api/trending",
error=f"Unsupported repo_type '{repo_type}'",
)
lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return)
resp = _host_raw_call("/api/trending", params={"type": requested_type, "limit": lim})
if not resp.get("ok"):
return _helper_error(start_calls=start_calls, source="/api/trending", error=resp.get("error") or "trending fetch failed")
payload = resp.get("data") if isinstance(resp.get("data"), dict) else {}
rows = payload.get("recentlyTrending") if isinstance(payload.get("recentlyTrending"), list) else []
items: list[dict[str, Any]] = []
default_row_type = requested_type if requested_type != "all" else "model"
for idx, row in enumerate(rows[:lim], start=1):
if not isinstance(row, dict):
continue
repo = row.get("repoData") if isinstance(row.get("repoData"), dict) else {}
items.append(_normalize_trending_row(repo, default_row_type, rank=idx))
api = _get_hf_api_client()
enriched_items: list[dict[str, Any]] = []
enrichment_failures: list[dict[str, Any]] = []
for item in items:
repo_id = item.get("repo_id")
if not isinstance(repo_id, str) or not repo_id:
enriched_items.append(item)
continue
item_repo_type = item.get("repo_type")
if isinstance(item_repo_type, str) and item_repo_type in {"model", "dataset", "space"}:
attempt_types = [item_repo_type]
else:
attempt_types = ["model", "dataset", "space"]
detail_row, failure = _resolve_repo_detail_row(api, repo_id, attempt_types)
if detail_row is None:
enriched_items.append(item)
if failure is not None:
enrichment_failures.append(failure)
continue
merged = dict(detail_row)
trending_score = item.get("trending_score")
if trending_score is not None:
merged["trending_score"] = trending_score
if item.get("trending_rank") is not None:
merged["trending_rank"] = item.get("trending_rank")
enriched_items.append(merged)
items = enriched_items
items = _apply_where(items, where)
matched = len(items)
items = _project_repo_items(items[:lim], fields)
return _helper_success(
start_calls=start_calls,
source="/api/trending",
items=items,
repo_type=requested_type,
limit=lim,
scanned=len(rows),
matched=matched,
returned=len(items),
trending_score_available=any(item.get("trending_score") is not None for item in items),
ordered_ranking=True,
failures=enrichment_failures or None,
)
async def hf_collections_search(
query: str | None = None,
owner: str | None = None,
return_limit: int = 20,
count_only: bool = False,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
default_return = _policy_int("hf_collections_search", "default_return", 20)
max_return = _policy_int("hf_collections_search", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT)
if count_only:
return_limit = 0
lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return)
owner_clean = str(owner or "").strip() or None
fetch_lim = max_return if lim == 0 or owner_clean else lim
if owner_clean:
fetch_lim = min(fetch_lim, 100)
term = str(query or "").strip()
if not term and owner_clean:
term = owner_clean
if not term:
return _helper_error(start_calls=start_calls, source="/api/collections", error="query or owner is required")
params: dict[str, Any] = {"limit": fetch_lim}
if term:
params["q"] = term
if owner_clean:
params["owner"] = owner_clean
resp = _host_raw_call("/api/collections", params=params)
if not resp.get("ok"):
return _helper_error(
start_calls=start_calls,
source="/api/collections",
error=resp.get("error") or "collections fetch failed",
)
payload = resp.get("data") if isinstance(resp.get("data"), list) else []
items: list[dict[str, Any]] = []
for row in payload[:fetch_lim]:
if not isinstance(row, dict):
continue
owner = _author_from_any(row.get("owner")) or _author_from_any(row.get("ownerData"))
if not owner and isinstance(row.get("slug"), str) and "/" in str(row.get("slug")):
owner = str(row.get("slug")).split("/", 1)[0]
if owner_clean is not None and owner != owner_clean:
continue
owner_payload = row.get("owner") if isinstance(row.get("owner"), dict) else {}
collection_items = row.get("items") if isinstance(row.get("items"), list) else []
slug = row.get("slug")
items.append(
{
"collection_id": slug,
"slug": slug,
"title": row.get("title"),
"owner": owner,
"owner_type": owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None,
"description": row.get("description"),
"gating": row.get("gating"),
"last_updated": row.get("lastUpdated"),
"item_count": len(collection_items),
}
)
items = _apply_where(items, where)
total_matched = len(items)
items = items[:lim]
items = _project_collection_items(items, fields)
truncated = (lim > 0 and total_matched > lim) or (lim == 0 and len(payload) >= fetch_lim)
return _helper_success(
start_calls=start_calls,
source="/api/collections",
items=items,
scanned=len(payload),
matched=total_matched,
returned=len(items),
total=len(payload),
total_matched=total_matched,
total_population=len(payload),
truncated=truncated,
complete=not truncated,
query=term,
owner=owner_clean,
)
async def hf_collection_items(
collection_id: str,
repo_types: list[str] | None = None,
return_limit: int = 100,
count_only: bool = False,
where: dict[str, Any] | None = None,
fields: list[str] | None = None,
) -> dict[str, Any]:
start_calls = call_count["n"]
default_return = _policy_int("hf_collection_items", "default_return", 100)
max_return = _policy_int("hf_collection_items", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT)
cid = str(collection_id or "").strip()
if not cid:
return _helper_error(
start_calls=start_calls,
source="/api/collections/<collection_id>",
error="collection_id is required",
)
if count_only:
return_limit = 0
lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return)
allowed_repo_types: set[str] | None = None
try:
raw_repo_types = _coerce_str_list(repo_types) if repo_types is not None else []
except ValueError as e:
return _helper_error(start_calls=start_calls, source=f"/api/collections/{cid}", error=e, collection_id=cid)
if raw_repo_types:
allowed_repo_types = set()
for raw in raw_repo_types:
canonical = _canonical_repo_type(raw, default="")
if canonical not in {"model", "dataset", "space"}:
return _helper_error(
start_calls=start_calls,
source=f"/api/collections/{cid}",
error=f"Unsupported repo_type '{raw}'",
collection_id=cid,
)
allowed_repo_types.add(canonical)
endpoint = f"/api/collections/{cid}"
resp = _host_raw_call(endpoint)
if not resp.get("ok"):
return _helper_error(
start_calls=start_calls,
source=endpoint,
error=resp.get("error") or "collection fetch failed",
collection_id=cid,
)
payload = resp.get("data") if isinstance(resp.get("data"), dict) else {}
raw_items = payload.get("items") if isinstance(payload.get("items"), list) else []
owner = _author_from_any(payload.get("owner"))
owner_payload = payload.get("owner") if isinstance(payload.get("owner"), dict) else {}
if owner is None and "/" in cid:
owner = cid.split("/", 1)[0]
normalized: list[dict[str, Any]] = []
for row in raw_items:
if not isinstance(row, dict):
continue
item = _normalize_collection_repo_item(row)
if item is None:
continue
repo_type = item.get("repo_type")
if allowed_repo_types is not None and repo_type not in allowed_repo_types:
continue
if not _item_matches_where(item, where):
continue
normalized.append(item)
total_matched = len(normalized)
items = [] if count_only else normalized[:lim]
items = _project_repo_items(items, fields)
truncated = lim > 0 and total_matched > lim
return _helper_success(
start_calls=start_calls,
source=endpoint,
items=items,
scanned=len(raw_items),
matched=total_matched,
returned=len(items),
total=len(raw_items),
total_matched=total_matched,
total_population=len(raw_items),
truncated=truncated,
complete=not truncated,
collection_id=cid,
title=payload.get("title"),
owner=owner,
owner_type=owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None,
repo_types=sorted(allowed_repo_types) if allowed_repo_types is not None else None,
)
async def hf_runtime_capabilities(section: str | None = None) -> dict[str, Any]:
start_calls = call_count["n"]
internal_helper_used["used"] = True
def _render_annotation(annotation: Any) -> str:
if annotation is inspect.Signature.empty:
return "Any"
return str(annotation)
def _render_default(default: Any) -> str | None:
if default is inspect.Signature.empty:
return None
return repr(default)
def _signature_payload(fn: Callable[..., Any]) -> dict[str, Any]:
signature = inspect.signature(fn)
parameters: list[dict[str, Any]] = []
for parameter in signature.parameters.values():
item: dict[str, Any] = {
"name": parameter.name,
"kind": str(parameter.kind).replace("Parameter.", "").lower(),
"annotation": _render_annotation(parameter.annotation),
"required": parameter.default is inspect.Signature.empty,
}
default = _render_default(parameter.default)
if default is not None:
item["default"] = default
parameters.append(item)
return {
"parameters": parameters,
"returns": _render_annotation(signature.return_annotation),
}
helper_payload = {
name: _signature_payload(fn)
for name, fn in sorted(helper_functions.items())
}
manifest: dict[str, Any] = {
"overview": {
"helper_count": len(helper_functions),
"supports_current_user": True,
"supports_raw_api_fallback": True,
"helper_result_envelope": {
"ok": "bool",
"item": "dict | None",
"items": "list[dict]",
"meta": "dict",
"error": "str | None",
},
"raw_result_envelope": {
"result": "Any",
"meta": {
"ok": "bool",
"api_calls": "int",
"elapsed_ms": "int",
"limits_reached": "bool",
"limit_summary": "list[dict]",
},
},
},
"helpers": helper_payload,
"fields": {
"profile": list(PROFILE_CANONICAL_FIELDS),
"repo": list(REPO_CANONICAL_FIELDS),
"user": list(USER_CANONICAL_FIELDS),
"actor": list(ACTOR_CANONICAL_FIELDS),
"activity": list(ACTIVITY_CANONICAL_FIELDS),
"collection": list(COLLECTION_CANONICAL_FIELDS),
},
"aliases": {
"repo": dict(sorted(_REPO_FIELD_ALIASES.items())),
"user": dict(sorted(_USER_FIELD_ALIASES.items())),
"actor": dict(sorted(_ACTOR_FIELD_ALIASES.items())),
"collection": dict(sorted(_COLLECTION_FIELD_ALIASES.items())),
"sort_keys": dict(sorted(_SORT_KEY_ALIASES.items())),
},
"limits": {
"default_timeout_sec": DEFAULT_TIMEOUT_SEC,
"default_max_calls": DEFAULT_MAX_CALLS,
"max_calls_limit": MAX_CALLS_LIMIT,
"output_items_truncation_limit": OUTPUT_ITEMS_TRUNCATION_LIMIT,
"graph_scan_limit_cap": GRAPH_SCAN_LIMIT_CAP,
"likes_scan_limit_cap": LIKES_SCAN_LIMIT_CAP,
"recent_activity_scan_max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES,
"trending_endpoint_max_limit": TRENDING_ENDPOINT_MAX_LIMIT,
"pagination_policy": {
helper_name: dict(sorted(policy.items()))
for helper_name, policy in sorted(PAGINATION_POLICY.items())
},
},
"raw_api": {
"call_api": _signature_payload(call_api),
"allowed_methods": ["GET", "POST"],
"allowed_endpoint_patterns": list(ALLOWLIST_PATTERNS),
"helper_covered_endpoint_patterns": [
{"pattern": pattern, "helper": helper_name}
for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS
],
},
"repo_search": {
"sort_keys": {
repo_type: sorted(keys)
for repo_type, keys in sorted(_REPO_SORT_KEYS.items())
},
"extra_args": {
repo_type: sorted(args)
for repo_type, args in sorted(_REPO_SEARCH_EXTRA_ARGS.items())
},
},
}
allowed_sections = sorted(manifest)
requested = str(section or "").strip().lower()
if requested:
if requested not in manifest:
return _helper_error(
start_calls=start_calls,
source="internal://runtime-capabilities",
error=f"Unsupported section {section!r}. Allowed sections: {allowed_sections}",
section=section,
allowed_sections=allowed_sections,
)
payload = {
"section": requested,
"content": manifest[requested],
"allowed_sections": allowed_sections,
}
else:
payload = {
"allowed_sections": allowed_sections,
**manifest,
}
return _helper_success(
start_calls=start_calls,
source="internal://runtime-capabilities",
items=[payload],
section=requested or None,
)
m = pydantic_monty.Monty(
code,
inputs=["query", "max_calls"],
script_name="monty_agent.py",
type_check=False,
)
def _collecting_wrapper(helper_name: str, fn: Callable[..., Any]) -> Callable[..., Any]:
async def wrapped(*args: Any, **kwargs: Any) -> Any:
result = await fn(*args, **kwargs)
summary = _summarize_limit_hit(helper_name, result)
if summary is not None and len(limit_summaries) < 20:
limit_summaries.append(summary)
return result
return wrapped
limits: pydantic_monty.ResourceLimits = {
"max_duration_secs": float(timeout_sec),
"max_memory": DEFAULT_MONTY_MAX_MEMORY,
"max_allocations": DEFAULT_MONTY_MAX_ALLOCATIONS,
"max_recursion_depth": DEFAULT_MONTY_MAX_RECURSION_DEPTH,
}
helper_functions = _resolve_helper_functions(locals())
try:
result = await pydantic_monty.run_monty_async(
m,
inputs={"query": query, "max_calls": max_calls},
external_functions={
"call_api": call_api,
**{name: _collecting_wrapper(name, fn) for name, fn in helper_functions.items()},
},
limits=limits,
)
except Exception as e:
raise MontyExecutionError(str(e), call_count["n"], trace) from e
if call_count["n"] == 0:
# Some current-user helpers can fail before any live API call is made
# (for example when request-scoped auth is unavailable). If generated
# code either returns that explicit helper error envelope or flattens it
# into an empty fallback shape, preserve the helper-owned error instead
# of replacing it with a generic zero-call runtime failure.
if internal_helper_used["used"]:
return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries}
if isinstance(result, dict) and result.get("ok") is True:
meta = result.get("meta") if isinstance(result.get("meta"), dict) else {}
source = meta.get("source")
if isinstance(source, str) and source.startswith("internal://"):
return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries}
if latest_helper_error is not None:
return {"output": _truncate_result_payload(latest_helper_error), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries}
if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str):
return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries}
raise MontyExecutionError("Code completed without calling any external API function", call_count["n"], trace)
if not any(step.get("ok") is True for step in trace):
# Allow explicit helper/live failure envelopes to be returned as-is.
# This preserves concrete API error context (e.g. repo not found) while
# still blocking fabricated successful fallback outputs.
if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str):
return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries}
raise MontyExecutionError(
"Code completed without a successful API call; refusing non-live fallback result",
call_count["n"],
trace,
)
return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries}
async def hf_hub_query(
query: str,
code: str,
max_calls: int = DEFAULT_MAX_CALLS,
timeout_sec: int = DEFAULT_TIMEOUT_SEC,
) -> dict[str, Any]:
"""Use natural-language queries to explore the Hugging Face Hub.
Best for read-only Hub discovery, lookup, ranking, and relationship questions
across users, organizations, repositories, activity, followers, likes,
discussions, and collections.
"""
if not query or not query.strip():
raise ValueError("query is required")
if not code or not code.strip():
raise ValueError("code is required")
max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT))
code = code.strip()
try:
_validate_generated_code(code)
run = await _run_with_monty(
code=code,
query=query,
max_calls=max_calls,
strict_mode=INTERNAL_STRICT_MODE,
timeout_sec=timeout_sec,
)
return {
"ok": True,
"data": run["output"],
"error": None,
"api_calls": run["api_calls"],
}
except MontyExecutionError as e:
return {
"ok": False,
"data": None,
"error": str(e),
"api_calls": e.api_calls,
}
except Exception as e:
return {
"ok": False,
"data": None,
"error": str(e),
"api_calls": 0,
}
async def hf_hub_query_raw(
query: str,
code: str,
max_calls: int = DEFAULT_MAX_CALLS,
timeout_sec: int = DEFAULT_TIMEOUT_SEC,
) -> Any:
"""Use natural-language queries to explore the Hugging Face Hub in raw mode.
Best for read-only Hub discovery, lookup, ranking, and relationship
questions when the caller wants a runtime-owned raw envelope:
``result`` contains the direct ``solve(...)`` output and ``meta`` contains
execution details such as timing, call counts, and limit summaries.
"""
if not query or not query.strip():
raise ValueError("query is required")
if not code or not code.strip():
raise ValueError("code is required")
max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT))
code = code.strip()
started = time.perf_counter()
try:
_validate_generated_code(code)
run = await _run_with_monty(
code=code,
query=query,
max_calls=max_calls,
strict_mode=INTERNAL_STRICT_MODE,
timeout_sec=timeout_sec,
)
elapsed_ms = int((time.perf_counter() - started) * 1000)
return _wrap_raw_result(
run["output"],
ok=True,
api_calls=run["api_calls"],
elapsed_ms=elapsed_ms,
limit_summaries=run.get("limit_summaries"),
)
except MontyExecutionError as e:
elapsed_ms = int((time.perf_counter() - started) * 1000)
return _wrap_raw_result(
None,
ok=False,
api_calls=e.api_calls,
elapsed_ms=elapsed_ms,
error=str(e),
)
except Exception as e:
elapsed_ms = int((time.perf_counter() - started) * 1000)
return _wrap_raw_result(
None,
ok=False,
api_calls=0,
elapsed_ms=elapsed_ms,
error=str(e),
)
def _arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Monty-backed API chaining tool (v2)")
p.add_argument("--query", required=True, help="Natural language query")
p.add_argument("--code", default=None, help="Inline Monty code to execute")
p.add_argument("--code-file", default=None, help="Path to .py file with Monty code to execute")
p.add_argument("--max-calls", type=int, default=DEFAULT_MAX_CALLS, help="Max external API/helper calls")
p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SEC)
return p
def main() -> int:
args = _arg_parser().parse_args()
code = args.code
if args.code_file:
with open(args.code_file, "r", encoding="utf-8") as f:
code = f.read()
if not code:
print(json.dumps({"ok": False, "error": "Either --code or --code-file is required"}, ensure_ascii=False))
return 1
try:
out = asyncio.run(
hf_hub_query(
query=args.query,
code=code,
max_calls=args.max_calls,
timeout_sec=args.timeout,
)
)
print(json.dumps(out, ensure_ascii=False))
return 0 if out.get("ok") else 1
except Exception as e:
print(json.dumps({"ok": False, "error": str(e)}, ensure_ascii=False))
return 1
if __name__ == "__main__":
raise SystemExit(main())