Source code for oas2mcp.loaders.openapi

"""OpenAPI loading helpers.

Purpose:
    Load OpenAPI specifications from URLs, local files, or raw text into plain
    Python dictionaries.

Design:
    - Avoid runtime dependence on LangChain's ``OpenAPISpec`` parser for now.
    - Parse JSON first, then fall back to YAML.
    - Return plain dictionaries so the normalization layer can remain stable.
    - Provide backward-compatible helper names during the transition away from
      LangChain's ``OpenAPISpec`` path.

Examples:
    .. code-block:: python

        from oas2mcp.loaders.openapi import load_openapi_spec_dict_from_url

        spec_dict = load_openapi_spec_dict_from_url(
            "https://petstore3.swagger.io/api/v3/openapi.json",
        )
        print(spec_dict["openapi"])
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

import httpx
import yaml

_OPENAPI_TEXT_PREFIXES: tuple[str, ...] = (
    "{",
    "openapi:",
    "swagger:",
    "info:",
    "paths:",
)


[docs] def load_openapi_spec_dict_from_url( url: str, *, timeout: float = 30.0, ) -> dict[str, Any]: """Load an OpenAPI specification dictionary from a URL. Args: url: The URL of the OpenAPI specification. timeout: Request timeout in seconds. Returns: The parsed OpenAPI specification as a plain dictionary. Raises: httpx.HTTPError: If the request fails. ValueError: If the response body is empty or cannot be parsed. Examples: .. code-block:: python spec_dict = load_openapi_spec_dict_from_url( "https://petstore3.swagger.io/api/v3/openapi.json", ) """ with httpx.Client(follow_redirects=True, timeout=timeout) as client: response = client.get(url) response.raise_for_status() text = response.text return load_openapi_spec_dict_from_text(text)
[docs] def load_openapi_spec_dict_from_text(text: str) -> dict[str, Any]: """Load an OpenAPI specification dictionary from raw text. Args: text: Raw JSON or YAML OpenAPI text. Returns: The parsed OpenAPI specification as a plain dictionary. Raises: ValueError: If the text is empty or cannot be parsed into a dictionary. Examples: .. code-block:: python spec_dict = load_openapi_spec_dict_from_text( '{"openapi":"3.1.0","info":{"title":"A","version":"1"},"paths":{}}' ) """ if not text.strip(): raise ValueError("OpenAPI text was empty.") try: data = json.loads(text) except json.JSONDecodeError: try: data = yaml.safe_load(text) except yaml.YAMLError as exc: raise ValueError("Failed to parse OpenAPI text as JSON or YAML.") from exc if not isinstance(data, dict): raise ValueError("Parsed OpenAPI content was not a dictionary.") return normalize_api_description_dict(data)
[docs] def load_openapi_spec_dict_from_file( path: str | Path, *, encoding: str = "utf-8", ) -> dict[str, Any]: """Load an OpenAPI specification dictionary from a local file. Args: path: The file path to the OpenAPI specification. encoding: Text encoding used when reading the file. Returns: The parsed OpenAPI specification as a plain dictionary. Raises: FileNotFoundError: If the file does not exist. ValueError: If the file is empty or cannot be parsed. Examples: .. code-block:: python spec_dict = load_openapi_spec_dict_from_file("openapi.json") """ resolved_path = Path(path).expanduser().resolve() text = resolved_path.read_text(encoding=encoding) return load_openapi_spec_dict_from_text(text)
[docs] def load_openapi_spec_dict( source: str | Path, *, timeout: float = 30.0, encoding: str = "utf-8", ) -> dict[str, Any]: """Load an OpenAPI-compatible specification from a URL, file, or raw text. Args: source: URL, local file path, ``file://`` URI, or raw JSON/YAML text. timeout: Request timeout used for URL loading. encoding: Text encoding used when reading local files. Returns: The parsed specification as a plain dictionary. Raises: FileNotFoundError: If the source looks like a local file but is missing. ValueError: If the source cannot be parsed as JSON or YAML. Examples: .. code-block:: python spec_dict = load_openapi_spec_dict("openapi.yaml") spec_dict = load_openapi_spec_dict("https://example.com/openapi.json") """ if isinstance(source, Path): return load_openapi_spec_dict_from_file(source, encoding=encoding) stripped = source.strip() if _looks_like_http_url(stripped): return load_openapi_spec_dict_from_url(stripped, timeout=timeout) if stripped.startswith("file://"): return load_openapi_spec_dict_from_file( Path(urlparse(stripped).path), encoding=encoding, ) candidate_path = Path(stripped).expanduser() if candidate_path.exists(): return load_openapi_spec_dict_from_file(candidate_path, encoding=encoding) if _looks_like_openapi_text(stripped): return load_openapi_spec_dict_from_text(stripped) if _looks_like_missing_spec_file(stripped): raise FileNotFoundError(f"OpenAPI source file was not found: {source!r}") raise ValueError( "OpenAPI source must be a URL, local file, file:// URI, or raw JSON/YAML text." )
[docs] def normalize_api_description_dict(data: dict[str, Any]) -> dict[str, Any]: """Normalize common API-description variants into an OpenAPI-style mapping. Args: data: Parsed JSON/YAML API description. Returns: A dictionary shaped like an OpenAPI document. Raises: ValueError: If the document is not an OpenAPI-style mapping. """ if "openapi" in data: return data swagger_version = data.get("swagger") if isinstance(swagger_version, str) and swagger_version.startswith("2."): return _convert_swagger2_to_openapi(data) raise ValueError( "API description must declare either an 'openapi' version or a Swagger 'swagger: 2.x' version." )
[docs] def dump_openapi_spec(spec: Any) -> dict[str, Any]: """Convert a spec-like object into a plain dictionary. Args: spec: A plain dictionary or model-like object. Returns: A plain dictionary representation of the spec. Raises: TypeError: If ``spec`` cannot be converted into a dictionary. Examples: .. code-block:: python data = dump_openapi_spec({"openapi": "3.1.0"}) assert data["openapi"] == "3.1.0" """ if isinstance(spec, dict): return dict(spec) if hasattr(spec, "model_dump"): dumped = spec.model_dump() if isinstance(dumped, dict): return dumped if hasattr(spec, "dict"): dumped = spec.dict() if isinstance(dumped, dict): return dumped raise TypeError("OpenAPI spec could not be converted into a dictionary.")
def _looks_like_http_url(value: str) -> bool: """Return whether a string looks like an HTTP(S) URL.""" parsed = urlparse(value) return parsed.scheme in {"http", "https"} and bool(parsed.netloc) def _looks_like_missing_spec_file(value: str) -> bool: """Return whether a string looks like a spec file path that does not exist.""" suffixes = (".json", ".yaml", ".yml") normalized = value.lower() if normalized.endswith(suffixes): return True return "/" in value or value.startswith(".") or value.startswith("~") def _looks_like_openapi_text(value: str) -> bool: """Return whether a string looks like raw JSON or YAML API description text.""" lowered = value.lstrip().lower() if "\n" in value or "\r" in value: return True return any(lowered.startswith(prefix) for prefix in _OPENAPI_TEXT_PREFIXES) def _convert_swagger2_to_openapi(swagger_spec: dict[str, Any]) -> dict[str, Any]: """Convert a Swagger 2.0 document into an OpenAPI 3.x-compatible mapping.""" openapi_spec: dict[str, Any] = { "openapi": "3.0.3", "info": dict(swagger_spec.get("info") or {}), "paths": _convert_swagger2_paths(swagger_spec), } servers = _build_swagger2_servers(swagger_spec) if servers: openapi_spec["servers"] = servers tags = swagger_spec.get("tags") if tags is not None: openapi_spec["tags"] = tags security = swagger_spec.get("security") if security is not None: openapi_spec["security"] = security components = _convert_swagger2_components(swagger_spec) if components: openapi_spec["components"] = components return openapi_spec def _build_swagger2_servers(swagger_spec: dict[str, Any]) -> list[dict[str, str]]: """Build OpenAPI-style ``servers`` entries from Swagger 2 host metadata.""" host = swagger_spec.get("host") if not isinstance(host, str) or not host.strip(): return [] base_path = swagger_spec.get("basePath") normalized_base_path = "" if isinstance(base_path, str) and base_path.strip(): normalized_base_path = ( base_path if base_path.startswith("/") else f"/{base_path}" ) schemes = swagger_spec.get("schemes") if not isinstance(schemes, list) or not schemes: schemes = ["https"] servers: list[dict[str, str]] = [] for scheme in schemes: if not isinstance(scheme, str) or not scheme.strip(): continue servers.append({"url": f"{scheme}://{host}{normalized_base_path}"}) return servers def _convert_swagger2_components(swagger_spec: dict[str, Any]) -> dict[str, Any]: """Convert Swagger 2 reusable sections into OpenAPI ``components``.""" components: dict[str, Any] = {} definitions = swagger_spec.get("definitions") if isinstance(definitions, dict) and definitions: components["schemas"] = definitions parameters = swagger_spec.get("parameters") if isinstance(parameters, dict) and parameters: components["parameters"] = parameters responses = swagger_spec.get("responses") if isinstance(responses, dict) and responses: components["responses"] = { name: _convert_swagger2_response( response, produces=_collect_media_types( swagger_spec.get("produces"), "application/json" ), ) for name, response in responses.items() } security_definitions = swagger_spec.get("securityDefinitions") if isinstance(security_definitions, dict) and security_definitions: components["securitySchemes"] = { name: _convert_swagger2_security_scheme(scheme) for name, scheme in security_definitions.items() } return components def _convert_swagger2_paths(swagger_spec: dict[str, Any]) -> dict[str, Any]: """Convert Swagger 2 path items into OpenAPI 3 path items.""" raw_paths = swagger_spec.get("paths") if not isinstance(raw_paths, dict): return {} default_consumes = _collect_media_types( swagger_spec.get("consumes"), "application/json", ) default_produces = _collect_media_types( swagger_spec.get("produces"), "application/json", ) converted_paths: dict[str, Any] = {} for path, raw_path_item in raw_paths.items(): if not isinstance(raw_path_item, dict): continue path_item = dict(raw_path_item) path_level_consumes = _collect_media_types( path_item.get("consumes"), default_consumes[0], ) path_level_produces = _collect_media_types( path_item.get("produces"), default_produces[0], ) converted_path_item: dict[str, Any] = {} for key, value in path_item.items(): lowered_key = str(key).lower() if lowered_key in { "get", "post", "put", "patch", "delete", "options", "head", "trace", }: converted_path_item[lowered_key] = _convert_swagger2_operation( value, consumes=( _collect_media_types( getattr(value, "get", lambda *_: None)("consumes"), path_level_consumes[0], ) if isinstance(value, dict) else path_level_consumes ), produces=( _collect_media_types( getattr(value, "get", lambda *_: None)("produces"), path_level_produces[0], ) if isinstance(value, dict) else path_level_produces ), ) elif lowered_key not in {"consumes", "produces", "swagger"}: converted_path_item[key] = value converted_paths[str(path)] = converted_path_item return converted_paths def _convert_swagger2_operation( raw_operation: Any, *, consumes: list[str], produces: list[str], ) -> dict[str, Any]: """Convert one Swagger 2 operation object.""" operation = dict(raw_operation) if isinstance(raw_operation, dict) else {} raw_parameters = operation.get("parameters") parameters = raw_parameters if isinstance(raw_parameters, list) else [] remaining_parameters: list[Any] = [] body_parameters: list[dict[str, Any]] = [] form_parameters: list[dict[str, Any]] = [] for parameter in parameters: if not isinstance(parameter, dict): remaining_parameters.append(parameter) continue location = parameter.get("in") if location == "body": body_parameters.append(parameter) continue if location == "formData": form_parameters.append(parameter) continue remaining_parameters.append(parameter) converted = { key: value for key, value in operation.items() if key not in {"parameters", "consumes", "produces"} } converted["parameters"] = remaining_parameters request_body = _build_swagger2_request_body( body_parameters=body_parameters, form_parameters=form_parameters, consumes=consumes, ) if request_body is not None: converted["requestBody"] = request_body responses = operation.get("responses") if isinstance(responses, dict): converted["responses"] = { status_code: _convert_swagger2_response(response, produces=produces) for status_code, response in responses.items() } return converted def _build_swagger2_request_body( *, body_parameters: list[dict[str, Any]], form_parameters: list[dict[str, Any]], consumes: list[str], ) -> dict[str, Any] | None: """Build an OpenAPI 3 ``requestBody`` from Swagger 2 parameters.""" if body_parameters: body_parameter = body_parameters[0] schema = ( body_parameter.get("schema") if isinstance(body_parameter, dict) else None ) content = { media_type: {"schema": dict(schema) if isinstance(schema, dict) else {}} for media_type in consumes } return { "required": bool(body_parameter.get("required")), "description": body_parameter.get("description"), "content": content, } if not form_parameters: return None required = [ parameter["name"] for parameter in form_parameters if isinstance(parameter, dict) and isinstance(parameter.get("name"), str) and bool(parameter.get("required")) ] properties: dict[str, Any] = {} has_file_param = False for parameter in form_parameters: if not isinstance(parameter, dict): continue name = parameter.get("name") if not isinstance(name, str) or not name: continue schema: dict[str, Any] = { key: value for key, value in parameter.items() if key in { "type", "format", "items", "enum", "default", "description", } } if schema.get("type") == "file": has_file_param = True schema["type"] = "string" schema["format"] = "binary" properties[name] = schema content_type = "multipart/form-data" if has_file_param else consumes[0] return { "required": bool(required), "content": { content_type: { "schema": { "type": "object", "properties": properties, "required": required, } } }, } def _convert_swagger2_response( raw_response: Any, *, produces: list[str], ) -> dict[str, Any]: """Convert a Swagger 2 response object into OpenAPI 3-style content.""" response = dict(raw_response) if isinstance(raw_response, dict) else {} schema = response.pop("schema", None) if isinstance(schema, dict): response["content"] = { media_type: {"schema": dict(schema)} for media_type in produces } return response def _convert_swagger2_security_scheme(raw_scheme: Any) -> dict[str, Any]: """Convert a Swagger 2 security scheme to an OpenAPI 3-compatible shape.""" scheme = dict(raw_scheme) if isinstance(raw_scheme, dict) else {} scheme_type = scheme.get("type") if scheme_type == "basic": return { "type": "http", "scheme": "basic", **{key: value for key, value in scheme.items() if key not in {"type"}}, } if scheme_type == "oauth2": flow = scheme.get("flow") scopes = dict(scheme.get("scopes") or {}) flow_name = { "implicit": "implicit", "password": "password", "application": "clientCredentials", "accessCode": "authorizationCode", }.get(str(flow), "clientCredentials") flow_payload: dict[str, Any] = {"scopes": scopes} if scheme.get("authorizationUrl"): flow_payload["authorizationUrl"] = scheme["authorizationUrl"] if scheme.get("tokenUrl"): flow_payload["tokenUrl"] = scheme["tokenUrl"] converted = { key: value for key, value in scheme.items() if key not in { "flow", "authorizationUrl", "tokenUrl", "scopes", } } converted["flows"] = {flow_name: flow_payload} return converted return scheme def _collect_media_types(raw_media_types: Any, default: str) -> list[str]: """Normalize media type lists with a deterministic default.""" if isinstance(raw_media_types, list): collected = [ media_type for media_type in raw_media_types if isinstance(media_type, str) and media_type.strip() ] if collected: return collected return [default] # --------------------------------------------------------------------------- # Backward-compatible aliases # ---------------------------------------------------------------------------
[docs] def load_openapi_spec_from_url( url: str, *, timeout: float = 30.0, ) -> dict[str, Any]: """Backward-compatible alias for URL-based spec loading.""" return load_openapi_spec_dict_from_url(url, timeout=timeout)
[docs] def load_openapi_spec_from_text(text: str) -> dict[str, Any]: """Backward-compatible alias for text-based spec loading.""" return load_openapi_spec_dict_from_text(text)
[docs] def load_openapi_spec_from_file( path: str | Path, *, encoding: str = "utf-8", ) -> dict[str, Any]: """Backward-compatible alias for file-based spec loading.""" return load_openapi_spec_dict_from_file(path, encoding=encoding)
[docs] def load_openapi_spec( source: str | Path, *, timeout: float = 30.0, encoding: str = "utf-8", ) -> dict[str, Any]: """Backward-compatible alias for generic spec loading.""" return load_openapi_spec_dict( source, timeout=timeout, encoding=encoding, )