Source code for oas2mcp.agent.orchestrator

from __future__ import annotations

from pathlib import Path
from typing import Any

from rich.console import Console

from oas2mcp.agent.enhancer.agent import run_operation_enhancer
from oas2mcp.agent.runtime import Oas2McpRuntimeContext
from oas2mcp.agent.summarizer.agent import run_catalog_summarizer
from oas2mcp.agent.surface.agent import run_catalog_surface_planner
from oas2mcp.classify.operations import classify_catalog
from oas2mcp.generate.config import ExportConfig
from oas2mcp.generate.export import export_enhanced_catalog_bundle
from oas2mcp.generate.models import EnhancedCatalog
from oas2mcp.loaders.openapi import load_openapi_spec_dict
from oas2mcp.normalize.spec_to_catalog import spec_dict_to_catalog
from oas2mcp.utils.names import make_catalog_slug


[docs] def run_oas2mcp_pipeline( *, source: str | Path | None = None, source_url: str | None = None, runtime_context: Oas2McpRuntimeContext, summarizer_model: Any | None = None, enhancer_model: Any | None = None, surface_model: Any | None = None, console: Console | None = None, ) -> EnhancedCatalog: """Run the full summarize -> enhance-all pipeline.""" resolved_source = str(source or source_url or runtime_context.source_uri) if console is not None: console.print("[bold blue]Loading OpenAPI spec...[/bold blue]") spec_dict = load_openapi_spec_dict(resolved_source) if console is not None: console.print("[bold blue]Normalizing catalog...[/bold blue]") catalog = spec_dict_to_catalog(spec_dict, source_uri=resolved_source) if console is not None: console.print("[bold blue]Classifying MCP candidates...[/bold blue]") bundle = classify_catalog(catalog) if console is not None: console.print("[bold blue]Running catalog summarizer...[/bold blue]") summary = run_catalog_summarizer( catalog=catalog, runtime_context=runtime_context, model=summarizer_model, ) enhanced_operations = [] total_operations = len(catalog.operations) for index, operation in enumerate(catalog.operations, start=1): if console is not None: console.print( f"[bold yellow]Enhancing operation {index}/{total_operations}:[/bold yellow] {operation.key}" ) enhancement = run_operation_enhancer( catalog=catalog, bundle=bundle, summary=summary, operation=operation, runtime_context=runtime_context, model=enhancer_model, ) enhanced_operations.append(enhancement) partial_catalog = EnhancedCatalog( source_url=resolved_source, catalog_name=catalog.name, catalog_slug=make_catalog_slug(catalog.name), catalog_version=catalog.info.version, summary=summary, operations=enhanced_operations, notes=[ "Generated by the oas2mcp summarize-and-enhance pipeline.", ], ) if console is not None: console.print("[bold blue]Planning shared FastMCP surface...[/bold blue]") surface_plan = run_catalog_surface_planner( enhanced_catalog=partial_catalog, runtime_context=runtime_context, model=surface_model, ) return partial_catalog.model_copy(update={"surface_plan": surface_plan})
[docs] def run_and_export_oas2mcp_pipeline( *, source: str | Path | None = None, source_url: str | None = None, runtime_context: Oas2McpRuntimeContext, export_config: ExportConfig | None = None, summarizer_model: Any | None = None, enhancer_model: Any | None = None, surface_model: Any | None = None, console: Console | None = None, ) -> dict[str, Path]: """Run the pipeline and export enhanced catalog artifacts.""" enhanced_catalog = run_oas2mcp_pipeline( source=source, source_url=source_url, runtime_context=runtime_context, summarizer_model=summarizer_model, enhancer_model=enhancer_model, surface_model=surface_model, console=console, ) if console is not None: console.print("[bold green]Exporting artifacts...[/bold green]") return export_enhanced_catalog_bundle( enhanced_catalog=enhanced_catalog, config=export_config or ExportConfig(), )