Source code for oas2mcp.normalize.spec_to_catalog

"""OpenAPI-to-catalog normalization helpers.

Purpose:
    Convert parsed OpenAPI data into the normalized ``ApiCatalog`` model used by
    ``oas2mcp``.

Design:
    - Accept either LangChain ``OpenAPISpec`` objects or plain dictionaries.
    - Resolve relative server URLs against the original source URI.
    - Flatten OpenAPI path items into reusable normalized operations.
    - Preserve raw fragments where useful for later inspection or enrichment.
    - Prefer normalized model field names over alias names during model
      construction to keep Python call sites valid and readable.

Examples:
    .. code-block:: python

        from oas2mcp.loaders import load_openapi_spec_from_url
        from oas2mcp.normalize import openapi_spec_to_catalog

        source_uri = "https://petstore3.swagger.io/api/v3/openapi.json"
        spec = load_openapi_spec_from_url(source_uri)
        catalog = openapi_spec_to_catalog(spec, source_uri=source_uri)

        print(catalog.name)
        print(catalog.operation_count)
"""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import Any
from urllib.parse import urljoin, urlparse

from langchain_community.utilities.openapi import OpenAPISpec

from oas2mcp.loaders.openapi import dump_openapi_spec
from oas2mcp.models.normalized import (
    ApiCatalog,
    ApiContact,
    ApiInfo,
    ApiLicense,
    ApiMediaType,
    ApiOperation,
    ApiParameter,
    ApiPathItem,
    ApiRequestBody,
    ApiResponse,
    ApiSecurityRequirement,
    ApiSecurityScheme,
    ApiServer,
    ApiTag,
)

_HTTP_METHODS: tuple[str, ...] = (
    "get",
    "post",
    "put",
    "patch",
    "delete",
    "options",
    "head",
    "trace",
)


[docs] def spec_dict_to_catalog( spec_dict: Mapping[str, Any], *, source_uri: str ) -> ApiCatalog: """Normalize a dumped OpenAPI specification into an ``ApiCatalog``. Args: spec_dict: A plain-Python OpenAPI specification mapping. source_uri: The URI from which the spec originated. This is used for source tracking and for resolving relative server URLs. Returns: A normalized ``ApiCatalog``. Raises: ValueError: If ``source_uri`` is empty. Examples: .. code-block:: python from oas2mcp.normalize import spec_dict_to_catalog catalog = spec_dict_to_catalog( { "openapi": "3.1.0", "info": {"title": "Example API", "version": "1.0.0"}, "paths": {}, }, source_uri="https://example.com/openapi.json", ) """ if not source_uri.strip(): raise ValueError("source_uri cannot be empty.") resolved_spec = dict(spec_dict) info_mapping = _as_mapping(resolved_spec.get("info")) components_mapping = _as_mapping(resolved_spec.get("components")) name = _infer_catalog_name(info_mapping, source_uri) info = _normalize_info(info_mapping) servers = _normalize_servers(resolved_spec.get("servers"), source_uri=source_uri) tags = _normalize_tags(resolved_spec.get("tags")) security_schemes = _normalize_security_schemes( components_mapping.get("securitySchemes"), ) global_security = _normalize_security_requirements(resolved_spec.get("security")) component_counts = _build_component_counts(components_mapping) paths, operations = _normalize_paths( resolved_spec.get("paths"), global_security=global_security, ) return ApiCatalog( name=name, source_uri=source_uri, openapi_version=_as_optional_str(resolved_spec.get("openapi")), info=info, servers=servers, tags=tags, security_schemes=security_schemes, global_security=global_security, paths=paths, operations=operations, component_counts=component_counts, raw_spec=resolved_spec, )
[docs] def openapi_spec_to_catalog(spec: OpenAPISpec, *, source_uri: str) -> ApiCatalog: """Normalize a LangChain ``OpenAPISpec`` into an ``ApiCatalog``. Args: spec: The parsed LangChain OpenAPI spec. source_uri: The URI from which the spec originated. Returns: A normalized ``ApiCatalog``. Raises: ValueError: If ``source_uri`` is empty. Examples: .. code-block:: python from oas2mcp.loaders import load_openapi_spec_from_url from oas2mcp.normalize import openapi_spec_to_catalog source_uri = "https://petstore3.swagger.io/api/v3/openapi.json" spec = load_openapi_spec_from_url(source_uri) catalog = openapi_spec_to_catalog(spec, source_uri=source_uri) """ spec_dict = dump_openapi_spec(spec) return spec_dict_to_catalog(spec_dict, source_uri=source_uri)
def _as_mapping(value: Any) -> dict[str, Any]: """Return a plain dictionary for mapping-like input. Args: value: The candidate mapping-like object. Returns: A plain dictionary when the input is mapping-like, otherwise an empty dictionary. Raises: None. Examples: .. code-block:: python assert _as_mapping({"a": 1}) == {"a": 1} assert _as_mapping(None) == {} """ if isinstance(value, Mapping): return dict(value) return {} def _as_sequence(value: Any) -> list[Any]: """Return a plain list for sequence-like input. Args: value: The candidate sequence-like object. Returns: A plain list when the input is sequence-like and not string-like, otherwise an empty list. Raises: None. Examples: .. code-block:: python assert _as_sequence([1, 2]) == [1, 2] assert _as_sequence("abc") == [] """ if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)): return list(value) return [] def _as_optional_str(value: Any) -> str | None: """Return a cleaned string or ``None``. Args: value: The candidate string value. Returns: A stripped string when the value is a non-empty string, otherwise ``None``. Raises: None. Examples: .. code-block:: python assert _as_optional_str(" hello ") == "hello" assert _as_optional_str("") is None """ if isinstance(value, str): cleaned = value.strip() return cleaned or None return None def _as_bool(value: Any) -> bool: """Return a normalized boolean. Args: value: The candidate value. Returns: The boolean interpretation of ``value``. Raises: None. Examples: .. code-block:: python assert _as_bool(1) is True assert _as_bool(0) is False """ return bool(value) def _infer_catalog_name(info_mapping: Mapping[str, Any], source_uri: str) -> str: """Infer the catalog display name from ``info.title`` or the source host. Args: info_mapping: The normalized ``info`` mapping. source_uri: The original source URI. Returns: The inferred catalog name. Raises: None. Examples: .. code-block:: python name = _infer_catalog_name({"title": "Petstore"}, "https://example.com") assert name == "Petstore" """ title = _as_optional_str(info_mapping.get("title")) if title is not None: return title parsed = urlparse(source_uri) if parsed.netloc: return parsed.netloc return "OpenAPI Catalog" def _normalize_info(info_mapping: Mapping[str, Any]) -> ApiInfo | None: """Normalize the top-level OpenAPI ``info`` block. Args: info_mapping: The raw ``info`` mapping. Returns: A normalized ``ApiInfo`` when enough metadata exists, otherwise ``None``. Raises: None. Examples: .. code-block:: python info = _normalize_info({"title": "API", "version": "1.0.0"}) assert info is not None """ title = _as_optional_str(info_mapping.get("title")) version = _as_optional_str(info_mapping.get("version")) if title is None or version is None: return None contact_mapping = _as_mapping(info_mapping.get("contact")) license_mapping = _as_mapping(info_mapping.get("license")) contact = ( ApiContact( name=_as_optional_str(contact_mapping.get("name")), email=_as_optional_str(contact_mapping.get("email")), url=_as_optional_str(contact_mapping.get("url")), ) if contact_mapping else None ) license_info = ( ApiLicense( name=_as_optional_str(license_mapping.get("name")) or "Unknown", identifier=_as_optional_str(license_mapping.get("identifier")), url=_as_optional_str(license_mapping.get("url")), ) if license_mapping else None ) return ApiInfo( title=title, version=version, summary=_as_optional_str(info_mapping.get("summary")), description=_as_optional_str(info_mapping.get("description")), terms_of_service=_as_optional_str(info_mapping.get("termsOfService")), contact=contact, license=license_info, ) def _normalize_servers( raw_servers: Any, *, source_uri: str, ) -> list[ApiServer]: """Normalize OpenAPI servers and resolve relative URLs. Args: raw_servers: The raw ``servers`` value from the spec. source_uri: The original source URI. Returns: A list of normalized ``ApiServer`` instances. Raises: None. Examples: .. code-block:: python servers = _normalize_servers( [{"url": "/api"}], source_uri="https://example.com/openapi.json", ) assert servers[0].url == "https://example.com/api" """ servers: list[ApiServer] = [] for raw_server in _as_sequence(raw_servers): server_mapping = _as_mapping(raw_server) url = _as_optional_str(server_mapping.get("url")) if url is None: continue resolved_url = urljoin(source_uri, url) variables = _normalize_server_variables(server_mapping.get("variables")) servers.append( ApiServer( url=resolved_url, description=_as_optional_str(server_mapping.get("description")), variables=variables, ) ) if servers: return servers parsed = urlparse(source_uri) if parsed.scheme and parsed.netloc: default_url = f"{parsed.scheme}://{parsed.netloc}" else: default_url = source_uri return [ApiServer(url=default_url, description="Derived from source URI")] def _normalize_server_variables(raw_variables: Any) -> dict[str, dict[str, Any]]: """Normalize server variables into plain dictionaries. Args: raw_variables: The raw server variables mapping. Returns: A normalized mapping of variable names to metadata dictionaries. Raises: None. Examples: .. code-block:: python variables = _normalize_server_variables( {"env": {"default": "prod"}} ) assert variables["env"]["default"] == "prod" """ variables: dict[str, dict[str, Any]] = {} for name, raw_metadata in _as_mapping(raw_variables).items(): variables[str(name)] = dict(_as_mapping(raw_metadata)) return variables def _normalize_tags(raw_tags: Any) -> list[ApiTag]: """Normalize top-level OpenAPI tags. Args: raw_tags: The raw ``tags`` value from the spec. Returns: A list of normalized ``ApiTag`` instances. Raises: None. Examples: .. code-block:: python tags = _normalize_tags([{"name": "pets"}]) assert tags[0].name == "pets" """ tags: list[ApiTag] = [] for raw_tag in _as_sequence(raw_tags): tag_mapping = _as_mapping(raw_tag) name = _as_optional_str(tag_mapping.get("name")) if name is None: continue external_docs_mapping = _as_mapping(tag_mapping.get("externalDocs")) tags.append( ApiTag( name=name, description=_as_optional_str(tag_mapping.get("description")), external_docs_description=_as_optional_str( external_docs_mapping.get("description") ), external_docs_url=_as_optional_str(external_docs_mapping.get("url")), ) ) return tags def _normalize_security_schemes(raw_security_schemes: Any) -> list[ApiSecurityScheme]: """Normalize named security schemes from ``components.securitySchemes``. Args: raw_security_schemes: The raw security schemes mapping. Returns: A list of normalized ``ApiSecurityScheme`` instances. Raises: None. Examples: .. code-block:: python schemes = _normalize_security_schemes( {"apiKeyAuth": {"type": "apiKey", "in": "header", "name": "X-API-Key"}} ) assert schemes[0].name == "apiKeyAuth" """ schemes: list[ApiSecurityScheme] = [] for scheme_name, raw_scheme in _as_mapping(raw_security_schemes).items(): scheme_mapping = _as_mapping(raw_scheme) scheme_type = _as_optional_str(scheme_mapping.get("type")) if scheme_type is None: continue schemes.append( ApiSecurityScheme( name=str(scheme_name), type=scheme_type, description=_as_optional_str(scheme_mapping.get("description")), scheme=_as_optional_str(scheme_mapping.get("scheme")), bearer_format=_as_optional_str(scheme_mapping.get("bearerFormat")), location=_as_optional_str(scheme_mapping.get("in")), parameter_name=_as_optional_str(scheme_mapping.get("name")), open_id_connect_url=_as_optional_str( scheme_mapping.get("openIdConnectUrl") ), flows=_as_mapping(scheme_mapping.get("flows")), ) ) return schemes def _normalize_security_requirements( raw_requirements: Any, ) -> list[ApiSecurityRequirement]: """Normalize OpenAPI security requirement arrays. Args: raw_requirements: The raw security requirement list. Returns: A list of normalized ``ApiSecurityRequirement`` instances. Raises: None. Examples: .. code-block:: python requirements = _normalize_security_requirements([{"bearerAuth": []}]) assert requirements[0].scheme_names == ["bearerAuth"] """ requirements: list[ApiSecurityRequirement] = [] for raw_requirement in _as_sequence(raw_requirements): requirement_mapping = _as_mapping(raw_requirement) if not requirement_mapping: requirements.append(ApiSecurityRequirement(scheme_names=[])) continue requirements.append( ApiSecurityRequirement( scheme_names=[str(key) for key in requirement_mapping.keys()], ) ) return requirements def _build_component_counts(components_mapping: Mapping[str, Any]) -> dict[str, int]: """Build simple counts for common OpenAPI component sections. Args: components_mapping: The raw ``components`` mapping. Returns: A mapping of section names to simple counts. Raises: None. Examples: .. code-block:: python counts = _build_component_counts({"schemas": {"A": {}, "B": {}}}) assert counts["schemas"] == 2 """ section_names = ( "schemas", "responses", "parameters", "examples", "requestBodies", "headers", "securitySchemes", "links", "callbacks", ) counts: dict[str, int] = {} for section_name in section_names: counts[section_name] = len(_as_mapping(components_mapping.get(section_name))) return counts def _normalize_paths( raw_paths: Any, *, global_security: list[ApiSecurityRequirement], ) -> tuple[list[ApiPathItem], list[ApiOperation]]: """Normalize OpenAPI paths into path items and flattened operations. Args: raw_paths: The raw ``paths`` mapping. global_security: The normalized top-level security requirements. Returns: A tuple of ``(path_items, operations)``. Raises: None. Examples: .. code-block:: python path_items, operations = _normalize_paths( {"/pets": {"get": {"responses": {"200": {"description": "ok"}}}}}, global_security=[], ) assert len(path_items) == 1 assert len(operations) == 1 """ path_items: list[ApiPathItem] = [] operations: list[ApiOperation] = [] for raw_path, raw_path_item in _as_mapping(raw_paths).items(): path = _normalize_path_string(str(raw_path)) path_item_mapping = _as_mapping(raw_path_item) path_parameters = _normalize_parameters(path_item_mapping.get("parameters")) path_operations: list[ApiOperation] = [] for method_name in _HTTP_METHODS: raw_operation = path_item_mapping.get(method_name) operation_mapping = _as_mapping(raw_operation) if not operation_mapping: continue operation = _normalize_operation( method=method_name, path=path, operation_mapping=operation_mapping, path_parameters=path_parameters, global_security=global_security, ) path_operations.append(operation) operations.append(operation) path_items.append( ApiPathItem( path=path, parameters=path_parameters, operations=path_operations, ) ) return path_items, operations def _normalize_operation( *, method: str, path: str, operation_mapping: Mapping[str, Any], path_parameters: list[ApiParameter], global_security: list[ApiSecurityRequirement], ) -> ApiOperation: """Normalize a single OpenAPI operation. Args: method: The lowercase HTTP method name. path: The normalized OpenAPI path. operation_mapping: The raw operation mapping. path_parameters: Parameters inherited from the path item. global_security: The normalized top-level security requirements. Returns: A normalized ``ApiOperation``. Raises: None. Examples: .. code-block:: python operation = _normalize_operation( method="get", path="/pets", operation_mapping={"responses": {"200": {"description": "ok"}}}, path_parameters=[], global_security=[], ) assert operation.method == "GET" """ operation_parameters = _normalize_parameters(operation_mapping.get("parameters")) merged_parameters = [*path_parameters, *operation_parameters] request_body_mapping = _as_mapping(operation_mapping.get("requestBody")) request_body = ( _normalize_request_body(request_body_mapping) if request_body_mapping else None ) responses = _normalize_responses(operation_mapping.get("responses")) operation_security = _normalize_security_requirements( operation_mapping.get("security") ) external_docs_mapping = _as_mapping(operation_mapping.get("externalDocs")) return ApiOperation( method=method, path=path, operation_id=_as_optional_str(operation_mapping.get("operationId")), summary=_as_optional_str(operation_mapping.get("summary")), description=_as_optional_str(operation_mapping.get("description")), tags=[ str(tag) for tag in _as_sequence(operation_mapping.get("tags")) if isinstance(tag, str) and tag.strip() ], parameters=_deduplicate_parameters(merged_parameters), request_body=request_body, responses=responses, security=operation_security if operation_security else list(global_security), deprecated=_as_bool(operation_mapping.get("deprecated")), external_docs_description=_as_optional_str( external_docs_mapping.get("description") ), external_docs_url=_as_optional_str(external_docs_mapping.get("url")), raw_operation=dict(operation_mapping), ) def _normalize_parameters(raw_parameters: Any) -> list[ApiParameter]: """Normalize OpenAPI parameters. Args: raw_parameters: The raw parameters list. Returns: A list of normalized ``ApiParameter`` instances. Raises: None. Examples: .. code-block:: python params = _normalize_parameters( [{"name": "id", "in": "path", "required": True, "schema": {"type": "string"}}] ) assert params[0].name == "id" """ parameters: list[ApiParameter] = [] for raw_parameter in _as_sequence(raw_parameters): parameter_mapping = _as_mapping(raw_parameter) if not parameter_mapping: continue name = _as_optional_str(parameter_mapping.get("name")) location = _as_optional_str(parameter_mapping.get("in")) if name is None or location is None: continue schema_mapping = _as_mapping(parameter_mapping.get("schema")) enum_values = list(_as_sequence(schema_mapping.get("enum"))) parameters.append( ApiParameter( name=name, location=location, required=_as_bool(parameter_mapping.get("required")), description=_as_optional_str(parameter_mapping.get("description")), schema_type=_as_optional_str(schema_mapping.get("type")), schema_format=_as_optional_str(schema_mapping.get("format")), default=schema_mapping.get("default"), enum_values=enum_values, raw_schema=schema_mapping, ) ) return parameters def _deduplicate_parameters(parameters: list[ApiParameter]) -> list[ApiParameter]: """Deduplicate parameters by ``(location, name)`` while preserving order. Args: parameters: The candidate parameters. Returns: A de-duplicated parameter list. Raises: None. Examples: .. code-block:: python parameter = ApiParameter(name="id", location="path") result = _deduplicate_parameters([parameter, parameter]) assert len(result) == 1 """ seen: set[tuple[str, str]] = set() deduplicated: list[ApiParameter] = [] for parameter in parameters: key = (parameter.location, parameter.name) if key in seen: continue seen.add(key) deduplicated.append(parameter) return deduplicated def _normalize_request_body(request_body_mapping: Mapping[str, Any]) -> ApiRequestBody: """Normalize an OpenAPI request body. Args: request_body_mapping: The raw request body mapping. Returns: A normalized ``ApiRequestBody``. Raises: None. Examples: .. code-block:: python body = _normalize_request_body({"required": True, "content": {}}) assert body.required is True """ return ApiRequestBody( required=_as_bool(request_body_mapping.get("required")), description=_as_optional_str(request_body_mapping.get("description")), media_types=_normalize_media_types(request_body_mapping.get("content")), ) def _normalize_responses(raw_responses: Any) -> list[ApiResponse]: """Normalize OpenAPI responses. Args: raw_responses: The raw responses mapping. Returns: A list of normalized ``ApiResponse`` instances. Raises: None. Examples: .. code-block:: python responses = _normalize_responses({"200": {"description": "ok"}}) assert responses[0].status_code == "200" """ responses: list[ApiResponse] = [] for status_code, raw_response in _as_mapping(raw_responses).items(): response_mapping = _as_mapping(raw_response) if not response_mapping: continue responses.append( ApiResponse( status_code=str(status_code), description=_as_optional_str(response_mapping.get("description")), media_types=_normalize_media_types(response_mapping.get("content")), headers=_as_mapping(response_mapping.get("headers")), ) ) return responses def _normalize_media_types(raw_content: Any) -> list[ApiMediaType]: """Normalize media-type entries from request or response content maps. Args: raw_content: The raw OpenAPI content mapping. Returns: A list of normalized ``ApiMediaType`` instances. Raises: None. Examples: .. code-block:: python media_types = _normalize_media_types( {"application/json": {"schema": {"type": "object"}}} ) assert media_types[0].content_type == "application/json" """ media_types: list[ApiMediaType] = [] for content_type, raw_media_type in _as_mapping(raw_content).items(): media_mapping = _as_mapping(raw_media_type) schema_mapping = _as_mapping(media_mapping.get("schema")) media_types.append( ApiMediaType( content_type=str(content_type), schema_ref=_as_optional_str(schema_mapping.get("$ref")), schema_type=_as_optional_str(schema_mapping.get("type")), example=media_mapping.get("example"), examples=_as_mapping(media_mapping.get("examples")), raw_schema=schema_mapping, ) ) return media_types def _normalize_path_string(value: str) -> str: """Normalize a path string to begin with ``/``. Args: value: The candidate path string. Returns: A normalized path string. Raises: None. Examples: .. code-block:: python assert _normalize_path_string("pets") == "/pets" """ cleaned = value.strip() if not cleaned: return "/" if not cleaned.startswith("/"): return f"/{cleaned}" return cleaned