"""Implements a FastAPI server to run the gradio interface. Note that some types in this
module use the Optional/Union notation so that they work correctly with pydantic."""

from __future__ import annotations

import asyncio
import contextlib
import hashlib
import inspect
import io
import json
import math
import mimetypes
import os
import secrets
import sys
import time
import traceback
import warnings
from collections.abc import AsyncIterator, Callable, Sequence
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Literal,
    Union,
    cast,
)

import fastapi
import httpx
import markupsafe
import orjson
from fastapi import (
    APIRouter,
    BackgroundTasks,
    Body,
    Depends,
    FastAPI,
    HTTPException,
    status,
)
from fastapi.responses import (
    FileResponse,
    HTMLResponse,
    JSONResponse,
    PlainTextResponse,
    Response,
    StreamingResponse,
)
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from gradio_client import utils as client_utils
from gradio_client.documentation import document
from gradio_client.snippet import generate_code_snippets
from gradio_client.utils import ServerMessage
from hf_gradio.cli import _condense_info, generate_cli_snippet
from jinja2.exceptions import TemplateNotFound
from python_multipart.multipart import parse_options_header
from starlette.background import BackgroundTask
from starlette.datastructures import UploadFile as StarletteUploadFile
from starlette.formparsers import MultiPartException
from starlette.responses import RedirectResponse

import gradio
from gradio import (
    caching,
    route_utils,
    themes,
    utils,
)
from gradio.brotli_middleware import BrotliMiddleware
from gradio.context import Context
from gradio.data_classes import (
    CancelBody,
    ComponentServerBlobBody,
    ComponentServerJSONBody,
    DataWithFiles,
    DeveloperPath,
    JsonData,
    PredictBody,
    PredictBodyInternal,
    ResetBody,
    SimplePredictBody,
    UserProvidedPath,
    VibeCodeBody,
    VibeEditBody,
)
from gradio.exceptions import Error, InvalidPathError
from gradio.helpers import special_args
from gradio.i18n import I18n
from gradio.node_server import (
    start_node_server,
)
from gradio.oauth import attach_oauth
from gradio.route_utils import (  # noqa: F401
    API_PREFIX,
    BUILD_PATH_LIB,
    DEFAULT_TEMP_DIR,
    STATIC_PATH_LIB,
    STATIC_TEMPLATE_LIB,
    VERSION,
    XSS_SAFE_MIMETYPES,
    CustomCORSMiddleware,
    FileUploadProgress,
    FileUploadProgressNotQueuedError,
    FileUploadProgressNotTrackedError,
    GradioMultiPartParser,
    GradioUploadFile,
    Request,
    compare_passwords_securely,
    create_lifespan_handler,
    favicon,
    file_fetch,
    file_response,
    move_uploaded_files_to_cache,
    routes_safe_join,
    upload_fn,
)
from gradio.screen_recording_utils import process_video_with_ffmpeg
from gradio.server_messages import (
    CloseStreamMessage,
    EstimationMessage,
    EventMessage,
    HeartbeatMessage,
    ProcessCompletedMessage,
    ProcessGeneratingMessage,
    UnexpectedErrorMessage,
)
from gradio.state_holder import StateHolder
from gradio.themes import ThemeClass as Theme
from gradio.utils import (
    cancel_tasks,
    get_node_path,
    get_upload_folder,
    safe_aclose_iterator,
)

if TYPE_CHECKING:
    from gradio.blocks import Block

import difflib
import re
import shutil
import tempfile

mimetypes.init()

BUILT_IN_THEMES: dict[str, Theme] = {
    t.name: t  # type: ignore
    for t in [
        themes.Base(),
        themes.Default(),
        themes.Monochrome(),
        themes.Soft(),
        themes.Glass(),
        themes.Origin(),
        themes.Citrus(),
        themes.Ocean(),
        themes.Mario(),
    ]
    if t.name is not None
}


class ORJSONResponse(JSONResponse):
    media_type = "application/json"

    @staticmethod
    def default(content: Any) -> str:
        if isinstance(content, JsonData):
            return content.model_dump()
        return str(content)

    @staticmethod
    def _render(content: Any) -> bytes:
        return orjson.dumps(
            content,
            option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_PASSTHROUGH_DATETIME,
            default=ORJSONResponse.default,
        )

    def render(self, content: Any) -> bytes:
        return ORJSONResponse._render(content)

    @staticmethod
    def _render_str(content: Any) -> str:
        return ORJSONResponse._render(content).decode("utf-8")


def toorjson(value):
    return markupsafe.Markup(
        ORJSONResponse._render_str(value)
        .replace("<", "\\u003c")
        .replace(">", "\\u003e")
        .replace("&", "\\u0026")
        .replace("'", "\\u0027")
    )


templates = Jinja2Templates(directory=STATIC_TEMPLATE_LIB)
templates.env.filters["toorjson"] = toorjson

# Shared transport keeps the connection pool warm without sharing an
# `httpx.AsyncClient` (and therefore a cookie jar) across `/proxy=` requests.
# A single shared `AsyncClient` would persist `Set-Cookie` headers from one
# proxied response and replay them on subsequent requests to any sibling
# `*.hf.space` URL — see GHSA-2mr9-9r47-px2g.
_proxy_transport = httpx.AsyncHTTPTransport(
    limits=httpx.Limits(
        max_connections=100,
        max_keepalive_connections=20,
    ),
)


file_upload_statuses = FileUploadProgress()


class App(FastAPI):
    """
    FastAPI App Wrapper
    """

    app_port = None

    def __init__(
        self,
        auth_dependency: Callable[[fastapi.Request], str | None] | None = None,
        **kwargs,
    ):
        self.tokens = {}
        self.auth = None
        self.analytics_key = secrets.token_urlsafe(16)
        self.monitoring_enabled = False
        self.blocks: gradio.Blocks | None = None
        self.state_holder = StateHolder()
        self.iterators: dict[str, AsyncIterator] = {}
        self.iterators_to_reset: set[str] = set()
        self.lock = utils.safe_get_lock()
        self.stop_event = utils.safe_get_stop_event()
        self.cookie_id = secrets.token_urlsafe(32)
        self.queue_token = secrets.token_urlsafe(32)
        self.startup_events_triggered = False
        self.uploaded_file_dir = get_upload_folder()
        self.change_count: int = 0
        self.change_type: Literal["reload", "error"] | None = None
        self.reload_error_message: str | None = None
        self._asyncio_tasks: list[asyncio.Task] = []
        self.auth_dependency = auth_dependency
        self.api_info = None
        self.static_worker_pool = None  # Set by launch() when num_workers > 0
        self.all_app_info = None
        self._static_prefixes: tuple[
            str, ...
        ] = ()  # Populated by enable_static_workers

        # Allow user to manually set `docs_url` and `redoc_url`
        # when instantiating an App; when they're not set, disable docs and redoc.
        kwargs.setdefault("docs_url", None)
        kwargs.setdefault("redoc_url", None)
        self.custom_component_hashes: dict[str, str] = {}
        super().__init__(**kwargs)

    def configure_app(self, blocks: gradio.Blocks) -> None:
        auth = blocks.auth
        if auth is not None:
            if not callable(auth):
                self.auth = {account[0]: account[1] for account in auth}  # type: ignore
            else:
                self.auth = auth
        else:
            self.auth = None
        self.blocks = blocks
        self.cwd = os.getcwd()
        self.favicon_path = blocks.favicon_path
        self.tokens = {}
        self.root_path = blocks.root_path or ""
        self.state_holder.set_blocks(blocks)

    def get_blocks(self) -> gradio.Blocks:
        if self.blocks is None:
            raise ValueError("No Blocks has been configured for this app.")
        return self.blocks

    def build_proxy_request(self, url_path):
        url = httpx.URL(url_path)
        assert self.blocks  # noqa: S101
        # Don't proxy a URL unless it's a URL specifically loaded by the user using
        # gr.load() to prevent SSRF or harvesting of HF tokens by malicious Spaces.
        is_safe_url = any(
            url.host == httpx.URL(root).host for root in self.blocks.proxy_urls
        )
        if not is_safe_url:
            raise PermissionError("This URL cannot be proxied.")
        # Only allow proxying to Hugging Face Space URLs to prevent SSRF
        # via malicious proxy_url values in untrusted configs.
        if not url.host.endswith(".hf.space"):
            raise PermissionError("This URL cannot be proxied.")
        headers = {}
        if Context.token is not None:
            headers["Authorization"] = f"Bearer {Context.token}"
        # Build a plain request rather than `client.build_request` so that
        # the proxy does not share an `httpx.AsyncClient` (or cookie jar)
        # across calls (see GHSA-2mr9-9r47-px2g).
        return url, headers

    def _cancel_asyncio_tasks(self):
        for task in self._asyncio_tasks:
            task.cancel()
        self._asyncio_tasks = []

    @staticmethod
    def setup_mcp_server(
        blocks: gradio.Blocks,
        app_kwargs: dict[str, Any],
        mcp_server: bool | None = None,
    ):
        mcp_subpath = API_PREFIX + "/mcp"
        if mcp_server is None:
            mcp_server = os.environ.get("GRADIO_MCP_SERVER", "False").lower() == "true"
        if mcp_server:
            try:
                import gradio.mcp
            except ImportError as e:
                raise ImportError(
                    'In order to use `mcp_server=True`, you must install gradio with the `mcp` extra. Please install it with `pip install "gradio[mcp]"`'
                ) from e
            try:
                blocks.mcp_server_obj = gradio.mcp.GradioMCPServer(blocks)
                blocks.mcp_server = True
                user_lifespan = None
                if "lifespan" in app_kwargs:
                    user_lifespan = app_kwargs["lifespan"]

                @contextlib.asynccontextmanager
                async def _lifespan(app: App):
                    async with contextlib.AsyncExitStack() as stack:
                        if blocks.mcp_server_obj:
                            await stack.enter_async_context(
                                blocks.mcp_server_obj.lifespan(app)
                            )
                        if user_lifespan is not None:
                            await stack.enter_async_context(user_lifespan(app))
                        yield

                app_kwargs["lifespan"] = _lifespan
            except Exception as e:
                blocks.mcp_server = False
                blocks.mcp_error = f"Error launching MCP server: {e}"

        blocks.config = (
            blocks.get_config_file()
        )  # Because the config should include the fact that the MCP server is enabled
        return mcp_subpath

    @staticmethod
    def create_app(
        blocks: gradio.Blocks,
        app: App | None = None,
        app_kwargs: dict[str, Any] | None = None,
        auth_dependency: Callable[[fastapi.Request], str | None] | None = None,
        strict_cors: bool = True,
        mcp_server: bool | None = None,
        debug: bool = False,
    ) -> App:
        app_kwargs = app_kwargs or {}
        app_kwargs.setdefault("default_response_class", ORJSONResponse)
        mcp_subpath = App.setup_mcp_server(blocks, app_kwargs, mcp_server)

        delete_cache = blocks.delete_cache or (None, None)
        if app is None:
            app_kwargs["lifespan"] = create_lifespan_handler(
                app_kwargs.get("lifespan", None), *delete_cache
            )
            app = App(auth_dependency=auth_dependency, **app_kwargs, debug=debug)
        else:
            app.router.lifespan_context = create_lifespan_handler(
                app_kwargs.get("lifespan", None), *delete_cache
            )
        if blocks.mcp_server_obj:
            blocks.mcp_server_obj.launch_mcp_on_sse(app, mcp_subpath, blocks.root_path)
        router = APIRouter(prefix=API_PREFIX)

        app.configure_app(blocks)

        app.add_middleware(CustomCORSMiddleware, strict_cors=strict_cors)  # type: ignore
        app.add_middleware(
            BrotliMiddleware,  # type: ignore
            quality=4,
            excluded_handlers=[mcp_subpath],
        )

        # Note: In the current architecture, Node is the front proxy and
        # routes requests to Python. The old conditional_routing_middleware
        # that proxied Python -> Node is no longer needed.

        @router.get("/user")
        @router.get("/user/")
        def get_current_user(request: fastapi.Request) -> str | None:
            if app.auth_dependency is not None:
                return app.auth_dependency(request)
            token = request.cookies.get(
                f"access-token-{app.cookie_id}"
            ) or request.cookies.get(f"access-token-unsecure-{app.cookie_id}")
            return app.tokens.get(token)

        @router.get("/login_check")
        @router.get("/login_check/")
        def login_check(user: str = Depends(get_current_user)):
            if (app.auth is None and app.auth_dependency is None) or user is not None:
                return
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail={
                    "error": "Not authenticated",
                    "auth_message": blocks.auth_message,
                },
            )

        @router.get("/token")
        @router.get("/token/")
        def get_token(request: fastapi.Request) -> dict:
            token = request.cookies.get(f"access-token-{app.cookie_id}")
            return {"token": token, "user": app.tokens.get(token)}

        @router.get("/app_id")
        @router.get("/app_id/")
        def app_id(request: fastapi.Request) -> dict:  # noqa: ARG001
            return {"app_id": app.get_blocks().app_id}

        @router.get("/dev/reload", dependencies=[Depends(login_check)])
        async def notify_changes(
            request: fastapi.Request,
        ):
            async def reload_checker(request: fastapi.Request):
                heartbeat_rate = 15
                check_rate = 0.05
                last_heartbeat = time.perf_counter()
                current_count = app.change_count

                while True:
                    if await request.is_disconnected():
                        return

                    if app.change_count != current_count:
                        current_count = app.change_count
                        msg = (
                            json.dumps(f"{app.reload_error_message}")
                            if app.change_type == "error"
                            else "{}"
                        )
                        yield f"""event: {app.change_type}\ndata: {msg}\n\n"""

                    await asyncio.sleep(check_rate)
                    if time.perf_counter() - last_heartbeat > heartbeat_rate:
                        yield """event: heartbeat\ndata: {}\n\n"""
                        last_heartbeat = time.perf_counter()

            return StreamingResponse(
                reload_checker(request),
                media_type="text/event-stream",
            )

        @app.post("/login")
        @app.post("/login/")
        async def login(
            request: fastapi.Request, form_data: OAuth2PasswordRequestForm = Depends()
        ):
            username, password = form_data.username.strip(), form_data.password
            if app.auth is None:
                root = route_utils.get_root_url(
                    request=request,
                    route_path="/login",
                    root_path=app.root_path,
                )
                return RedirectResponse(url=root, status_code=status.HTTP_302_FOUND)
            if (
                not callable(app.auth)
                and username in app.auth  # type: ignore
                and compare_passwords_securely(password, app.auth[username])  # type: ignore
            ) or (
                callable(app.auth)
                and (
                    await app.auth(username, password)
                    if inspect.iscoroutinefunction(app.auth)
                    else app.auth(username, password)
                )
            ):  # type: ignore
                token = secrets.token_urlsafe(16)
                app.tokens[token] = username
                response = JSONResponse(content={"success": True})
                response.set_cookie(
                    key=f"access-token-{app.cookie_id}",
                    value=token,
                    httponly=True,
                    samesite="none",
                    secure=True,
                )
                response.set_cookie(
                    key=f"access-token-unsecure-{app.cookie_id}",
                    value=token,
                    httponly=True,
                )
                return response
            else:
                raise HTTPException(status_code=400, detail="Incorrect credentials.")

        ###############
        # OAuth Routes
        ###############

        # Define OAuth routes if the app expects it (i.e. a LoginButton is defined).
        # It allows users to "Sign in with HuggingFace". Otherwise, add the default
        # logout route.
        if app.blocks is not None and app.blocks.expects_oauth:
            attach_oauth(app)
        else:

            @app.get("/logout")
            def logout(
                request: fastapi.Request,
                user: str = Depends(get_current_user),
                all_session: bool = True,
            ):
                root = route_utils.get_root_url(
                    request=request,
                    route_path="/logout",
                    root_path=app.root_path,
                )
                response = RedirectResponse(url=root, status_code=status.HTTP_302_FOUND)
                response.delete_cookie(key=f"access-token-{app.cookie_id}", path="/")
                response.delete_cookie(
                    key=f"access-token-unsecure-{app.cookie_id}", path="/"
                )
                if all_session:
                    # Delete the tokens of all sessions associated with the current user.
                    for token in list(app.tokens.keys()):
                        if app.tokens[token] == user:
                            del app.tokens[token]
                # Delete only the token associated with the current session.
                elif request.cookies.get(f"access-token-{app.cookie_id}") in app.tokens:
                    del app.tokens[request.cookies.get(f"access-token-{app.cookie_id}")]
                return response

        ###############
        # Main Routes
        ###############

        @app.get("/svelte/{path:path}")
        async def _(path: str):
            svelte_path = DeveloperPath(str(Path(BUILD_PATH_LIB) / "svelte"))
            return file_response(svelte_path, UserProvidedPath(path))

        def attach_page(page):
            @app.get(f"/{page}", response_class=HTMLResponse)
            def page_route(
                request: fastapi.Request,
                user: str = Depends(get_current_user),
                deep_link: str = "",
            ):
                return main(request, user, page, deep_link)

            @app.get(f"/{page}/")
            def page_redirect():
                return RedirectResponse(
                    url=f"/{page}", status_code=status.HTTP_301_MOVED_PERMANENTLY
                )

        for pageset in blocks.pages:
            page = pageset[0]
            if page != "":
                attach_page(page)

        def load_deep_link(
            deep_link: str, config: dict[str, Any], page: str | None = None
        ):
            components = config["components"]
            try:
                user_path = Path("deep_links") / deep_link / "state.json"
                path = Path(
                    routes_safe_join(
                        DeveloperPath(app.uploaded_file_dir),
                        UserProvidedPath(str(user_path)),
                    )
                )
                if path.exists():
                    components = orjson.loads(path.read_bytes())
                    deep_link_state = "valid"
                else:
                    deep_link_state = "invalid"
            except (FileNotFoundError, OSError, orjson.JSONDecodeError):
                deep_link_state = "invalid"
                components = []
            if page:
                components = [
                    component
                    for component in components
                    if component["id"] in config["page"][page]["components"]
                ]
            return components, deep_link_state

        @app.head("/", response_class=HTMLResponse)
        @app.get("/", response_class=HTMLResponse)
        def main(
            request: fastapi.Request,
            user: str = Depends(get_current_user),
            page: str = "",
            deep_link: str = "",
        ):
            mimetypes.add_type("application/javascript", ".js")
            blocks = app.get_blocks()
            root = route_utils.get_root_url(
                request=request,
                route_path=f"/{page}",
                root_path=app.root_path
                or request.scope.get("root_path")
                or blocks.custom_mount_path,
            )
            if (app.auth is None and app.auth_dependency is None) or user is not None:
                config = utils.safe_deepcopy(blocks.config)
                deep_link_state = "none"
                components = [
                    component
                    for component in config["components"]
                    if component["id"] in config["page"][page]["components"]
                ]
                if deep_link:
                    components, deep_link_state = load_deep_link(
                        deep_link,
                        config,  # type: ignore
                        page,
                    )
                config["username"] = user
                config["deep_link_state"] = deep_link_state
                config["components"] = components  # type: ignore
                config["dependencies"] = [
                    dependency
                    for dependency in config.get("dependencies", [])
                    if dependency["id"] in config["page"][page]["dependencies"]
                ]
                config["layout"] = config["page"][page]["layout"]
                config["current_page"] = page
                # Update root after loading the deep link state (if applicable)
                # so that static files are served from the correct root
                config = route_utils.update_root_in_config(config, root)
            elif app.auth_dependency:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail={
                        "error": "Not authenticated",
                        "auth_message": blocks.auth_message,
                    },
                )
            else:
                config = {
                    "auth_required": True,
                    "auth_message": blocks.auth_message,
                    "space_id": blocks.space_id,
                    "root": root,
                    "page": {"": {"layout": {}}},
                    "pages": [""],
                    "components": [],
                    "dependencies": [],
                    "current_page": "",
                }

            try:
                template = (
                    "frontend/share.html" if blocks.share else "frontend/index.html"
                )
                gradio_api_info = api_info(request)
                resp = templates.TemplateResponse(
                    request=request,
                    name=template,
                    context={
                        "config": config,
                        "gradio_api_info": gradio_api_info,
                    },
                )
                return resp
            except TemplateNotFound as err:
                if blocks.share:
                    raise ValueError(
                        "Did you install Gradio from source files? Share mode only "
                        "works when Gradio is installed through the pip package."
                    ) from err
                else:
                    raise ValueError(
                        "Did you install Gradio from source files? You need to build "
                        "the frontend by running /scripts/build_frontend.sh"
                    ) from err

        @app.get("/gradio_api/deep_link")
        def deep_link(session_hash: str):
            if session_hash in app.state_holder:
                components = [
                    utils.safe_deepcopy(c)
                    for c in app.state_holder[session_hash].components
                ]
                components_json = orjson.dumps(
                    components,
                    option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_PASSTHROUGH_DATETIME,
                    default=str,
                )
                deep_link = route_utils.create_url_safe_hash(components_json)
                directory = Path(app.uploaded_file_dir) / "deep_links" / deep_link
                directory.mkdir(parents=True, exist_ok=True)
                with open(directory / "state.json", "wb") as f:
                    f.write(components_json)
                return deep_link
            else:
                return ""

        @router.get("/info/", dependencies=[Depends(login_check)])
        @router.get("/info", dependencies=[Depends(login_check)])
        def api_info(request: fastapi.Request):
            all_endpoints = request.query_params.get("all_endpoints", False)
            if all_endpoints:
                if not app.all_app_info:
                    app.all_app_info = app.get_blocks().get_api_info(all_endpoints=True)
                return app.all_app_info
            if not app.api_info:
                api_info = utils.safe_deepcopy(app.get_blocks().get_api_info())
                api_info = cast(dict[str, Any], api_info)
                api_info = route_utils.update_example_values_to_use_public_url(api_info)
                root = route_utils.get_root_url(
                    request=request,
                    route_path=f"{API_PREFIX}/info",
                    root_path=app.root_path,
                )
                space_id = app.get_blocks().space_id
                cli_snippets = generate_cli_snippet(api_info["named_endpoints"])
                for k, v in cli_snippets.items():
                    cli_snippets[k] = v.replace("{space_id}", space_id or str(root))
                api_prefix = API_PREFIX + "/"
                for ep_name, ep_info in api_info.get("named_endpoints", {}).items():
                    ep_info["code_snippets"] = generate_code_snippets(
                        ep_name,
                        ep_info,
                        str(root),
                        space_id=space_id,
                        api_prefix=api_prefix,
                    )
                    ep_info["code_snippets"]["cli"] = cli_snippets[ep_name]
                app.api_info = api_info
            return app.api_info

        @router.get("/openapi.json", dependencies=[Depends(login_check)])
        def openapi_schema(request: fastapi.Request):
            """Generate an OpenAPI schema from the Gradio app's API info."""
            info = api_info(request)
            info_simple = _condense_info(info, url_only=True)
            schema = {
                "openapi": "3.0.2",
                "info": {
                    "title": getattr(app.get_blocks(), "title", "Gradio App"),
                    "description": getattr(app.get_blocks(), "description", ""),
                    "version": VERSION,
                },
                "paths": {
                    "/gradio_api/upload": {
                        "post": {
                            "summary": "Upload File",
                            "operationId": "upload_file_upload_post",
                            "parameters": [
                                {
                                    "name": "upload_id",
                                    "in": "query",
                                    "required": False,
                                    "schema": {
                                        "type": "string",
                                        "nullable": True,
                                        "default": None,
                                    },
                                    "description": "Optional ID to track upload progress",
                                }
                            ],
                            "requestBody": {
                                "required": True,
                                "content": {
                                    "multipart/form-data": {
                                        "schema": {
                                            "type": "object",
                                            "properties": {
                                                "files": {
                                                    "type": "array",
                                                    "items": {
                                                        "type": "string",
                                                        "format": "binary",
                                                    },
                                                    "description": "One or more files to upload",
                                                }
                                            },
                                            "required": ["files"],
                                        }
                                    }
                                },
                            },
                            "responses": {
                                "200": {
                                    "description": "List of file paths where the uploaded files were saved",
                                    "content": {
                                        "application/json": {
                                            "schema": {
                                                "type": "array",
                                                "items": {"type": "string"},
                                            }
                                        }
                                    },
                                },
                                "400": {
                                    "description": "Invalid content type or invalid file name",
                                    "content": {
                                        "text/plain": {"schema": {"type": "string"}}
                                    },
                                },
                                "413": {
                                    "description": "File exceeds maximum allowed size",
                                    "content": {
                                        "text/plain": {"schema": {"type": "string"}}
                                    },
                                },
                            },
                            "security": [{"login_check": []}],
                        }
                    }
                },
                "components": {"schemas": {}},
            }

            for endpoint_path, endpoint_info in info.get("named_endpoints", {}).items():  # type: ignore
                if endpoint_info.get("api_visibility", "public") == "private":
                    continue
                endpoint_name = endpoint_path.strip("/").replace("/", "_")
                has_file_params = any(
                    p.get("type", {}).get("type") == "filepath"
                    for p in info_simple[endpoint_path].get("parameters", [])
                )
                summary = (
                    endpoint_info.get("description", "") or f"Endpoint {endpoint_path}"
                )
                if has_file_params:
                    summary += '. File inputs must first be uploaded via POST /gradio_api/upload (multipart/form-data with a "files" field). Use the returned path in the request body as {"path": "<uploaded_path>", "meta": {"_type": "gradio.FileData"}}.'
                path_item = {
                    "post": {
                        "summary": summary,
                        "description": endpoint_info.get("description", ""),
                        "operationId": endpoint_name,
                        "requestBody": {
                            "required": True,
                            "content": {
                                "application/json": {
                                    "schema": {"type": "object", "properties": {}}
                                },
                            },
                        },
                        "responses": {
                            "200": {
                                "description": "Successful response",
                                "content": {
                                    "application/json": {
                                        "schema": {
                                            "type": "object",
                                            "properties": {
                                                "event_id": {"type": "string"}
                                            },
                                        }
                                    }
                                },
                            }
                        },
                    }
                }

                request_properties = path_item["post"]["requestBody"]["content"][  # type: ignore
                    "application/json"
                ]["schema"]["properties"]  # type: ignore
                for param in info_simple[endpoint_path].get("parameters", []):
                    param_name = param["name"]
                    param_type = param.get("type", {})

                    if "additional_description" in param_type:
                        param_type = dict(param_type)
                        param_type.pop("additional_description", None)

                    if "properties" in param_type and "type" not in param_type:
                        param_type = dict(param_type)
                        param_type["type"] = "object"

                    if param_type.get("type") == "filepath":
                        param_type = dict(param_type)
                        param_type["type"] = "string"
                        param_type["format"] = "filepath"

                    request_properties[param_name] = param_type  # type: ignore

                    if "example_input" in param:
                        if (
                            "examples"
                            not in path_item["post"]["requestBody"]["content"][  # type: ignore
                                "application/json"
                            ]
                        ):
                            path_item["post"]["requestBody"]["content"][  # type: ignore
                                "application/json"
                            ]["examples"] = {"example1": {"value": {}}}
                        path_item["post"]["requestBody"]["content"]["application/json"][  # type: ignore
                            "examples"
                        ]["example1"]["value"][param_name] = param["example_input"]  # type: ignore

                returns_info = []
                for i, ret in enumerate(info_simple[endpoint_path].get("returns", [])):
                    ret_name = f"output_{i}" if i > 0 else "output"
                    ret_type = ret.get("type", {})
                    desc = ""
                    returns_info.append(
                        f"{ret_name} ({ret_type})" + "" if not desc else f"desc: {desc}"
                    )  # type: ignore
                path_item["post"]["description"] += (  # type: ignore
                    f"Output must be fetched from GET /gradio_api/call{endpoint_path}/{{event_id}}. Returns an array of {len(returns_info)} elements of the following format: {returns_info}"
                )

                schema["paths"][f"/gradio_api/call/v2{endpoint_path}"] = path_item  # type: ignore

                get_path = f"/gradio_api/call{endpoint_path}/{{event_id}}"
                schema["paths"][get_path] = {  # type: ignore
                    "get": {
                        "summary": f"Fetch results for {endpoint_path}",
                        "description": "Returns a stream of server-sent events (SSE). The final event has `event: complete` with `data` containing a JSON array of outputs.",
                        "operationId": f"{endpoint_name}_get",
                        "parameters": [
                            {
                                "name": "event_id",
                                "in": "path",
                                "required": True,
                                "schema": {"type": "string"},
                                "description": "The event_id returned by the POST request",
                            }
                        ],
                        "responses": {
                            "200": {
                                "description": "SSE stream with event: complete containing a JSON array of outputs",
                                "content": {
                                    "text/event-stream": {"schema": {"type": "string"}}
                                },
                            }
                        },
                    }
                }

            return schema

        @app.get("/config/", dependencies=[Depends(login_check)])
        @app.get("/config", dependencies=[Depends(login_check)])
        def get_config(request: fastapi.Request, deep_link: str = ""):
            config = utils.safe_deepcopy(app.get_blocks().config)
            root = route_utils.get_root_url(
                request=request,
                route_path="/config",
                root_path=app.root_path
                or request.scope.get("root_path")
                or blocks.custom_mount_path,
            )
            config["username"] = get_current_user(request)
            if deep_link:
                components, deep_link_state = load_deep_link(deep_link, config, page="")  # type: ignore
                config["components"] = components  # type: ignore
                config["deep_link_state"] = deep_link_state
            if hasattr(blocks, "i18n_instance") and blocks.i18n_instance:
                config["i18n_translations"] = blocks.i18n_instance.translations_dict
            else:
                config["i18n_translations"] = None
            config = route_utils.update_root_in_config(config, root)
            return ORJSONResponse(content=config)

        @app.get("/static/{path:path}")
        async def static_resource(path: str):
            return file_response(STATIC_PATH_LIB, UserProvidedPath(path))

        @router.get("/custom_component/{id}/{environment}/{type}/{file_name}")
        def custom_component_path(
            id: str,
            environment: Literal["client", "server"],
            type: str,
            file_name: str,
            req: fastapi.Request,
        ):
            print(
                f"id={id}, environment={environment}, type={type}, file_name={file_name}"
            )
            if environment not in ["client", "server"]:
                raise HTTPException(
                    status_code=404, detail="Environment not supported."
                )
            components = utils.get_all_components()
            location = next(
                (item for item in components if item.get_component_class_id() == id),
                None,
            )
            if location is None:
                raise HTTPException(status_code=404, detail="Component not found.")

            module_name = location.__module__
            module_path = sys.modules[module_name].__file__

            if module_path is None:
                raise HTTPException(status_code=404, detail="Component not found.")

            try:
                requested_path = utils.safe_join(
                    location.TEMPLATE_DIR,
                    UserProvidedPath(f"{type}/{file_name}"),
                )
            except InvalidPathError:
                raise HTTPException(
                    status_code=404, detail="Component not found."
                ) from None

            path = routes_safe_join(
                DeveloperPath(str(Path(module_path).parent)),
                UserProvidedPath(requested_path),
            )

            # Uncomment when we support custom component SSR
            # if environment == "server":
            #     return PlainTextResponse(path)

            key = f"{id}-{type}-{file_name}"

            if key not in app.custom_component_hashes:
                app.custom_component_hashes[key] = hashlib.sha256(
                    Path(path).read_text(encoding="utf-8").encode()
                ).hexdigest()

            version = app.custom_component_hashes.get(key)
            headers = {"Cache-Control": "max-age=0, must-revalidate"}
            if version:
                headers["ETag"] = version

            if version and req.headers.get("if-none-match") == version:
                return PlainTextResponse(status_code=304, headers=headers)

            return FileResponse(path, headers=headers)

        @app.get("/assets/{path:path}")
        async def build_resource(path: str):
            return file_response(BUILD_PATH_LIB, UserProvidedPath(path))

        @app.get("/favicon.ico")
        async def _():
            favicon_path = app.get_blocks().favicon_path
            return favicon(favicon_path)

        @router.head("/proxy={url_path:path}", dependencies=[Depends(login_check)])
        @router.get("/proxy={url_path:path}", dependencies=[Depends(login_check)])
        async def reverse_proxy(url_path: str):
            # Adapted from: https://github.com/tiangolo/fastapi/issues/1788
            try:
                proxy_client = httpx.AsyncClient(
                    transport=_proxy_transport,
                    timeout=httpx.Timeout(10.0),
                )
                url, headers = app.build_proxy_request(url_path)
                rp_req = proxy_client.build_request("GET", url, headers=headers)
            except PermissionError as err:
                raise HTTPException(status_code=400, detail=str(err)) from err

            rp_resp = await proxy_client.send(rp_req, stream=True)

            mime_type, _ = mimetypes.guess_type(url_path)
            if mime_type not in XSS_SAFE_MIMETYPES:
                rp_resp.headers.update({"Content-Disposition": "attachment"})
                rp_resp.headers.update({"Content-Type": "application/octet-stream"})
            return StreamingResponse(
                rp_resp.aiter_raw(),
                status_code=rp_resp.status_code,
                headers=rp_resp.headers,  # type: ignore
                background=BackgroundTask(rp_resp.aclose),
            )

        @router.head("/file={path_or_url:path}", dependencies=[Depends(login_check)])
        @router.get("/file={path_or_url:path}", dependencies=[Depends(login_check)])
        async def file(path_or_url: str, request: fastapi.Request):
            blocks = app.get_blocks()
            return file_fetch(path_or_url, request, blocks, app.uploaded_file_dir)

        @router.post("/stream/{event_id}")
        async def _(event_id: str, body: PredictBody, request: fastapi.Request):
            event = app.get_blocks()._queue.event_ids_to_events[event_id]
            body = PredictBodyInternal(**body.model_dump(), request=request)  # type: ignore
            event.data = body
            event.signal.set()
            return {"msg": "success"}

        @router.post("/stream/{event_id}/close")
        async def _(event_id: str):
            event = app.get_blocks()._queue.event_ids_to_events[event_id]
            event.run_time = math.inf
            event.closed = True
            event.signal.set()
            return {"msg": "success"}

        @router.get("/stream/{session_hash}/{run}/{component_id}/playlist.m3u8")
        async def _(session_hash: str, run: int, component_id: int):
            stream: route_utils.MediaStream | None = (
                app.get_blocks()
                .pending_streams[session_hash]
                .get(run, {})
                .get(component_id, None)
            )

            if not stream:
                return Response(status_code=404)

            playlist = f"#EXTM3U\n#EXT-X-PLAYLIST-TYPE:EVENT\n#EXT-X-TARGETDURATION:{stream.max_duration}\n#EXT-X-VERSION:4\n#EXT-X-MEDIA-SEQUENCE:0\n"

            for segment in stream.segments:
                playlist += f"#EXTINF:{segment['duration']:.3f},\n"
                playlist += f"{segment['id']}{segment['extension']}\n"  # type: ignore
                # HLS expects the start time of the video segments to be continuous
                # Instead of re-encoding the user video chunks, we add a discontinuity tag
                if segment["extension"] == ".ts":
                    playlist += "#EXT-X-DISCONTINUITY\n"

            if stream.ended:
                playlist += "#EXT-X-ENDLIST\n"

            return Response(
                content=playlist, media_type="application/vnd.apple.mpegurl"
            )

        @router.get("/stream/{session_hash}/{run}/{component_id}/{segment_id}.{ext}")
        async def _(
            session_hash: str, run: int, component_id: int, segment_id: str, ext: str
        ):
            if ext not in ["aac", "ts"]:
                return Response(status_code=400, content="Unsupported file extension")
            stream: route_utils.MediaStream | None = (
                app.get_blocks()
                .pending_streams[session_hash]
                .get(run, {})
                .get(component_id, None)
            )

            if not stream:
                return Response(status_code=404, content="Stream not found")

            segment = next((s for s in stream.segments if s["id"] == segment_id), None)  # type: ignore

            if segment is None:
                return Response(status_code=404, content="Segment not found")

            if ext == "aac":
                return Response(content=segment["data"], media_type="audio/aac")
            else:
                return Response(content=segment["data"], media_type="video/MP2T")

        @router.get("/stream/{session_hash}/{run}/{component_id}/playlist-file")
        async def _(session_hash: str, run: int, component_id: int):
            stream: route_utils.MediaStream | None = (
                app.get_blocks()
                .pending_streams[session_hash]
                .get(run, {})
                .get(component_id, None)
            )

            if not stream:
                return Response(status_code=404)

            if not stream.combined_file:
                stream_data = [s["data"] for s in stream.segments]
                combined_file = (
                    await app.get_blocks()
                    .get_component(component_id)
                    .combine_stream(  # type: ignore
                        stream_data,
                        only_file=True,
                        desired_output_format=stream.desired_output_format,
                    )
                )
                stream.combined_file = combined_file.path
            return FileResponse(stream.combined_file)

        @router.get("/file/{path:path}", dependencies=[Depends(login_check)])
        async def file_deprecated(path: str, request: fastapi.Request):
            return await file(path, request)

        @router.post("/reset/")
        @router.post("/reset")
        async def reset_iterator(body: ResetBody):  # noqa: ARG001
            # No-op, all the cancelling/reset logic handled by /cancel
            return {"success": True}

        @router.get("/heartbeat/{session_hash}")
        def heartbeat(
            session_hash: str,
            request: fastapi.Request,
            background_tasks: BackgroundTasks,
            username: str = Depends(get_current_user),
        ):
            """Clients make a persistent connection to this endpoint to keep the session alive.
            When the client disconnects, the session state is deleted.
            """
            heartbeat_rate = utils.get_heartbeat_rate()

            async def iterator():
                stop_stream_task = asyncio.create_task(app.stop_event.wait())
                while True:
                    try:
                        yield "data: ALIVE\n\n"
                        # We need to close the heartbeat connections as soon as the server stops
                        # otherwise the server can take forever to close
                        wait_task = asyncio.create_task(asyncio.sleep(heartbeat_rate))
                        done, _ = await asyncio.wait(
                            [wait_task, stop_stream_task],
                            return_when=asyncio.FIRST_COMPLETED,
                        )
                        if stop_stream_task in done:
                            raise asyncio.CancelledError()
                    except asyncio.CancelledError:
                        if not stop_stream_task.done():
                            stop_stream_task.cancel()

                        req = Request(request, username, session_hash=session_hash)
                        root_path = route_utils.get_root_url(
                            request=request,
                            route_path=f"{API_PREFIX}/heartbeat/{session_hash}",
                            root_path=app.root_path,
                        )
                        body = PredictBodyInternal(
                            session_hash=session_hash, data=[], request=request
                        )
                        unload_fn_indices = [
                            i
                            for i, dep in app.get_blocks().fns.items()
                            if any(t for t in dep.targets if t[1] == "unload")
                        ]
                        for fn_index in unload_fn_indices:
                            # The task running this loop has been cancelled
                            # so we add tasks in the background
                            background_tasks.add_task(
                                route_utils.call_process_api,
                                app=app,
                                body=body,
                                gr_request=req,
                                fn=app.get_blocks().fns[fn_index],
                                root_path=root_path,
                            )
                        # This will mark the state to be deleted in an hour
                        if session_hash in app.state_holder.session_data:
                            app.state_holder.session_data[session_hash].is_closed = True
                        caching.clear_session_caches(session_hash)
                        for (
                            event_id
                        ) in app.get_blocks()._queue.pending_event_ids_session.get(
                            session_hash, []
                        ):
                            event = app.get_blocks()._queue.event_ids_to_events[
                                event_id
                            ]
                            event.run_time = math.inf
                            event.signal.set()
                        return

            return StreamingResponse(iterator(), media_type="text/event-stream")

        # had to use '/run' endpoint for Colab compatibility, '/api' supported for backwards compatibility
        @router.post("/run/{api_name}", dependencies=[Depends(login_check)])
        @router.post("/run/{api_name}/", dependencies=[Depends(login_check)])
        @router.post("/api/{api_name}", dependencies=[Depends(login_check)])
        @router.post("/api/{api_name}/", dependencies=[Depends(login_check)])
        async def predict(
            api_name: str,
            body: PredictBody,
            request: fastapi.Request,
            username: str = Depends(get_current_user),
        ):
            body = PredictBodyInternal(**body.model_dump(), request=request)  # type: ignore
            fn = route_utils.get_fn(
                blocks=app.get_blocks(), api_name=api_name, body=body
            )

            if not app.get_blocks().api_open and fn.queue:
                raise HTTPException(
                    detail="This API endpoint does not accept direct HTTP POST requests. Please join the queue to use this API.",
                    status_code=status.HTTP_404_NOT_FOUND,
                )
            gr_request = route_utils.compile_gr_request(
                body,
                fn=fn,
                username=username,
                request=request,
            )
            root_path = route_utils.get_root_url(
                request=request,
                route_path=request.url.path,
                root_path=app.root_path,
            )
            try:
                output = await route_utils.call_process_api(
                    app=app,
                    body=body,
                    gr_request=gr_request,
                    fn=fn,
                    root_path=root_path,
                )
            except BaseException as error:
                content = utils.error_payload(error, app.get_blocks().show_error)
                if not isinstance(error, Error) or error.print_exception:
                    traceback.print_exc()
                return JSONResponse(
                    content=content,
                    status_code=500,
                )
            return ORJSONResponse(output)

        def prepare_simple_api_data(body: PredictBody, fn: Any) -> None:
            if len(body.data) == len(fn.inputs):
                return
            api_inputs = [block for block in fn.inputs if not block.skip_api]
            if len(body.data) != len(api_inputs):
                return
            data = iter(body.data)
            body.data = [None if block.skip_api else next(data) for block in fn.inputs]

        @router.post("/call/v2/{api_name}", dependencies=[Depends(login_check)])
        @router.post("/call/v2/{api_name}/", dependencies=[Depends(login_check)])
        async def _(
            api_name: str,
            body: dict[str, Any],
            request: fastapi.Request,
            username: str = Depends(get_current_user),
        ):
            parameters_info = app.api_info["named_endpoints"]["/" + api_name][  # type: ignore
                "parameters"
            ]
            processed_args = client_utils.construct_args(
                parameters_info,
                (),
                body,
            )
            simple_body = SimplePredictBody(data=processed_args)
            full_body = PredictBody(**simple_body.model_dump(), simple_format=True)  # type: ignore
            fn = route_utils.get_fn(
                blocks=app.get_blocks(), api_name=api_name, body=full_body
            )
            prepare_simple_api_data(full_body, fn)
            full_body.fn_index = fn._id
            return await queue_join_helper(full_body, request, username)

        @router.post("/call/{api_name}", dependencies=[Depends(login_check)])
        @router.post("/call/{api_name}/", dependencies=[Depends(login_check)])
        async def simple_predict_post(
            api_name: str,
            body: SimplePredictBody,
            request: fastapi.Request,
            username: str = Depends(get_current_user),
        ):
            full_body = PredictBody(**body.model_dump(), simple_format=True)  # type: ignore
            fn = route_utils.get_fn(
                blocks=app.get_blocks(), api_name=api_name, body=full_body
            )
            prepare_simple_api_data(full_body, fn)
            full_body.fn_index = fn._id
            return await queue_join_helper(full_body, request, username)

        @router.post("/queue/join", dependencies=[Depends(login_check)])
        async def queue_join(
            body: PredictBody,
            request: fastapi.Request,
            username: str = Depends(get_current_user),
        ):
            if body.session_hash is None:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail="Session hash not found.",
                )
            return await queue_join_helper(body, request, username)

        async def queue_join_helper(
            body: PredictBody,
            request: fastapi.Request,
            username: str,
        ):
            blocks = app.get_blocks()

            if blocks._queue.server_app is None:
                blocks._queue.set_server_app(app)

            if blocks._queue.stopped:
                raise HTTPException(
                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                    detail="Queue is stopped.",
                )
            body = PredictBodyInternal(**body.model_dump(), request=request)  # type: ignore
            success, event_id, state = await blocks._queue.push(
                body=body, request=request, username=username
            )
            error_map = {
                "queue_full": status.HTTP_503_SERVICE_UNAVAILABLE,
                "validator_error": status.HTTP_422_UNPROCESSABLE_ENTITY,
                "error": status.HTTP_400_BAD_REQUEST,
                "success": status.HTTP_200_OK,
            }

            if not success:
                status_code = error_map[state]
                raise HTTPException(status_code=status_code, detail=event_id)
            return {"event_id": event_id}

        @router.post("/cancel")
        async def cancel_event(body: CancelBody):
            await cancel_tasks({f"{body.session_hash}_{body.fn_index}"})
            blocks = app.get_blocks()
            # Need to complete the job so that the client disconnects
            session_open = (
                body.session_hash in blocks._queue.pending_messages_per_session
            )
            event_running = (
                body.event_id
                in blocks._queue.pending_event_ids_session.get(body.session_hash, {})
            )
            await blocks._queue.remove_from_queue(body.event_id)
            if session_open and event_running:
                message = ProcessCompletedMessage(
                    output={}, success=True, event_id=body.event_id
                )
                blocks._queue.pending_messages_per_session[
                    body.session_hash
                ].put_nowait(message)
            if body.event_id in app.iterators:
                async with app.lock:
                    try:
                        await safe_aclose_iterator(app.iterators[body.event_id])
                    except Exception:
                        pass
                    del app.iterators[body.event_id]
                    app.iterators_to_reset.add(body.event_id)
            return {"success": True}

        @router.get(
            "/call/v2/{api_name}/{event_id}", dependencies=[Depends(login_check)]
        )
        @router.get("/call/{api_name}/{event_id}", dependencies=[Depends(login_check)])
        async def simple_predict_get(
            request: fastapi.Request,
            event_id: str,
        ):
            def process_msg(message: EventMessage) -> str | None:
                msg = message.model_dump()
                if isinstance(message, ProcessCompletedMessage):
                    event = "complete" if message.success else "error"
                    data = (
                        msg["output"].get("data") if message.success else msg["output"]
                    )
                elif isinstance(message, ProcessGeneratingMessage):
                    event = "generating" if message.success else "error"
                    data = (
                        msg["output"].get("data") if message.success else msg["output"]
                    )
                elif isinstance(message, HeartbeatMessage):
                    event = "heartbeat"
                    data = None
                elif isinstance(message, UnexpectedErrorMessage):
                    event = "error"
                    data = message.message
                else:
                    return None
                return f"event: {event}\ndata: {json.dumps(data)}\n\n"

            event = app.get_blocks()._queue.event_ids_to_events.get(event_id)
            session_hash = event.session_hash if event else event_id
            return await queue_data_helper(request, session_hash, process_msg)

        @router.get("/queue/data", dependencies=[Depends(login_check)])
        async def queue_data(
            request: fastapi.Request,
            session_hash: str,
        ):
            def process_msg(message: EventMessage) -> str:
                return f"data: {orjson.dumps(message.model_dump(), default=str).decode('utf-8')}\n\n"

            return await queue_data_helper(request, session_hash, process_msg)

        async def queue_data_helper(
            request: fastapi.Request,
            session_hash: str,
            process_msg: Callable[[EventMessage], str | None],
        ):
            blocks = app.get_blocks()
            heartbeat_rate = utils.get_heartbeat_rate()

            async def heartbeat():
                while blocks.is_running:
                    await asyncio.sleep(heartbeat_rate)
                    # It's possible the event has finished by the time
                    # the heartbeat wakes up
                    queue = blocks._queue.pending_messages_per_session.get(session_hash)
                    if queue:
                        await queue.put(HeartbeatMessage())

            async def sse_stream(request: fastapi.Request):
                heartbeat_task = asyncio.create_task(heartbeat())
                try:
                    while True:
                        if await request.is_disconnected():
                            await blocks._queue.clean_events(session_hash=session_hash)
                            heartbeat_task.cancel()
                            return

                        if (
                            session_hash
                            not in blocks._queue.pending_messages_per_session
                        ):
                            raise HTTPException(
                                status_code=status.HTTP_404_NOT_FOUND,
                            )

                        message = None
                        try:
                            messages = blocks._queue.pending_messages_per_session[
                                session_hash
                            ]
                            message = await asyncio.wait_for(messages.get(), timeout=10)
                        except (TimeoutError, asyncio.TimeoutError):
                            pass

                        if blocks._queue.stopped:
                            message = UnexpectedErrorMessage(
                                message="Server stopped unexpectedly.",
                                success=False,
                            )
                        if message:
                            response = process_msg(message)
                            if response is not None:
                                yield response
                            if (
                                isinstance(message, ProcessCompletedMessage)
                                and message.event_id
                            ):
                                # It's possible that the event_id has already been removed
                                # for example, the user sent two duplicate `/cancel` requests.
                                # The first one would have removed the event_id from pending_event_ids_session
                                if (
                                    message.event_id
                                    in (
                                        blocks._queue.pending_event_ids_session[
                                            session_hash
                                        ]
                                    )
                                ):
                                    blocks._queue.pending_event_ids_session[
                                        session_hash
                                    ].remove(message.event_id)
                                if message.msg == ServerMessage.server_stopped or (
                                    message.msg == ServerMessage.process_completed
                                    and (
                                        len(
                                            blocks._queue.pending_event_ids_session[
                                                session_hash
                                            ]
                                        )
                                        == 0
                                    )
                                ):
                                    message = CloseStreamMessage()
                                    response = process_msg(message)
                                    if response is not None:
                                        yield response
                                    heartbeat_task.cancel()
                                    return
                except BaseException as e:
                    message = UnexpectedErrorMessage(
                        message=str(e),
                        session_not_found=isinstance(e, HTTPException),
                    )
                    response = process_msg(message)
                    if isinstance(e, asyncio.CancelledError):
                        del blocks._queue.pending_messages_per_session[session_hash]
                        await blocks._queue.clean_events(session_hash=session_hash)
                    if response is not None:
                        yield response
                    heartbeat_task.cancel()
                    raise e

            return StreamingResponse(
                sse_stream(request),
                media_type="text/event-stream",
            )

        async def get_item_or_file(
            request: fastapi.Request,
        ) -> Union[ComponentServerJSONBody, ComponentServerBlobBody]:
            content_type = request.headers.get("Content-Type")

            if isinstance(content_type, str) and content_type.startswith(
                "multipart/form-data"
            ):
                files = []
                data = {}
                async with request.form() as form:
                    for key, value in form.items():
                        if (
                            isinstance(value, list)
                            and len(value) > 1
                            and isinstance(value[0], StarletteUploadFile)
                        ):
                            for i, v in enumerate(value):
                                if isinstance(v, StarletteUploadFile):
                                    filename = v.filename
                                    contents = await v.read()
                                    files.append((filename, contents))
                                else:
                                    data[f"{key}-{i}"] = v
                        elif isinstance(value, StarletteUploadFile):
                            filename = value.filename
                            contents = await value.read()
                            files.append((filename, contents))
                        else:
                            data[key] = value

                return ComponentServerBlobBody(
                    data=DataWithFiles(data=data, files=files),
                    component_id=data["component_id"],
                    session_hash=data["session_hash"],
                    fn_name=data["fn_name"],
                )
            else:
                try:
                    data = await request.json()
                    return ComponentServerJSONBody(
                        data=data["data"],
                        component_id=data["component_id"],
                        session_hash=data["session_hash"],
                        fn_name=data["fn_name"],
                    )

                except Exception:
                    raise HTTPException(
                        status_code=status.HTTP_400_BAD_REQUEST,
                        detail="Invalid JSON body.",
                    ) from None

        @router.post(
            "/component_server",
            dependencies=[Depends(login_check)],
        )
        @router.post(
            "/component_server/",
            dependencies=[Depends(login_check)],
        )
        async def component_server(
            request: fastapi.Request,
        ):
            body = await get_item_or_file(request)
            state = app.state_holder[body.session_hash]
            component_id = body.component_id
            block: Block
            if component_id in state:
                block = state[component_id]
            else:
                block = app.get_blocks().blocks[component_id]
            fn = getattr(block, body.fn_name, None)
            if fn is None or not getattr(fn, "_is_server_fn", False):
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail="Function not found.",
                )
            processed_input, *_ = special_args(
                fn,
                [body.data],
                request,  # type: ignore
                None,
            )
            if inspect.iscoroutinefunction(fn):
                return await fn(*processed_input)
            else:
                return fn(*processed_input)

        @router.get(
            "/queue/status",
            dependencies=[Depends(login_check)],
            response_model=EstimationMessage,
        )
        async def get_queue_status():
            return app.get_blocks()._queue.get_status()

        @router.get("/upload_progress")
        async def get_upload_progress(upload_id: str, request: fastapi.Request):
            async def sse_stream(request: fastapi.Request):
                last_heartbeat = time.perf_counter()
                is_done = False
                while True:
                    if await request.is_disconnected():
                        file_upload_statuses.stop_tracking(upload_id)
                        return
                    if is_done:
                        file_upload_statuses.stop_tracking(upload_id)
                        return

                    heartbeat_rate = 15
                    check_rate = 0.05
                    try:
                        if file_upload_statuses.is_done(upload_id):
                            message = {"msg": "done"}
                            is_done = True
                        else:
                            update = file_upload_statuses.pop(upload_id)
                            message = {
                                "msg": "update",
                                "orig_name": update.filename,
                                "chunk_size": update.chunk_size,
                            }
                        yield f"data: {json.dumps(message)}\n\n"
                    except FileUploadProgressNotTrackedError:
                        return
                    except FileUploadProgressNotQueuedError:
                        await asyncio.sleep(check_rate)
                        if time.perf_counter() - last_heartbeat > heartbeat_rate:
                            message = {"msg": "heartbeat"}
                            yield f"data: {json.dumps(message)}\n\n"
                            last_heartbeat = time.perf_counter()

            try:
                await asyncio.wait_for(
                    file_upload_statuses.is_tracked(upload_id), timeout=3
                )
            except (asyncio.TimeoutError, TimeoutError):
                return PlainTextResponse("Upload not found", status_code=404)

            return StreamingResponse(
                sse_stream(request),
                media_type="text/event-stream",
            )

        def set_upload_trace(session_hash: str, start: float):
            import uuid

            from gradio.profiling import PROFILING_ENABLED, RequestTrace, collector

            if PROFILING_ENABLED:
                trace = RequestTrace(
                    event_id=str(uuid.uuid4()),
                    fn_name="gradio_file_upload",
                    session_hash=session_hash,
                )
                trace.upload_ms = (time.monotonic() - start) * 1000
                collector.add(trace)

        @router.post("/upload", dependencies=[Depends(login_check)])
        async def upload_file(
            request: fastapi.Request,
            bg_tasks: BackgroundTasks,
            upload_id: str | None = None,
        ):
            start = None
            if PROFILING_ENABLED:
                start = time.monotonic()
            try:
                output_files, files_to_copy, locations = await upload_fn(
                    request,
                    app.uploaded_file_dir,
                    blocks.max_file_size
                    if blocks.max_file_size is not None
                    else math.inf,
                    upload_id,
                    force_move=False,
                    upload_progress=file_upload_statuses if upload_id else None,
                )
            except MultiPartException as exc:
                code = 413 if "maximum allowed size" in exc.message else 400
                return PlainTextResponse(exc.message, status_code=code)

            if files_to_copy:
                bg_tasks.add_task(
                    move_uploaded_files_to_cache, files_to_copy, locations
                )
            if PROFILING_ENABLED:
                bg_tasks.add_task(
                    set_upload_trace, request.headers.get("session_hash", ""), start
                )
            blocks.upload_file_set.update(output_files)

            return output_files

        @router.get("/startup-events")
        async def startup_events():
            if not app.startup_events_triggered:
                app.get_blocks().run_startup_events()
                await app.get_blocks().run_extra_startup_events()
                app.startup_events_triggered = True
                return True
            return False

        @router.get("/theme.css", response_class=PlainTextResponse)
        @app.get("/theme.css", response_class=PlainTextResponse)
        def theme_css():
            return PlainTextResponse(app.get_blocks().theme_css, media_type="text/css")

        @app.get("/robots.txt", response_class=PlainTextResponse)
        def robots_txt():
            if app.get_blocks().share:
                return "User-agent: *\nDisallow: /"
            else:
                return "User-agent: *\nDisallow: "

        @app.get("/pwa_icon")
        @app.get("/pwa_icon/{size}")
        async def pwa_icon(size: int | None = None):
            blocks = app.get_blocks()
            favicon_path = blocks.favicon_path
            if favicon_path is None:
                raise HTTPException(status_code=404)

            if size is None:
                return FileResponse(favicon_path)

            import PIL.Image

            img = PIL.Image.open(favicon_path)
            img = img.resize((size, size))

            img_byte_array = io.BytesIO()
            img.save(img_byte_array, format="PNG")
            img_byte_array.seek(0)

            return StreamingResponse(
                io.BytesIO(img_byte_array.read()), media_type="image/png"
            )

        @app.get("/manifest.json")
        def manifest_json():
            favicon_path = blocks.favicon_path
            if isinstance(favicon_path, Path):
                favicon_path = str(favicon_path)
            if favicon_path is None:
                icons = [
                    {
                        "src": "static/img/logo_nosize.svg",
                        "sizes": "any",
                        "type": "image/svg+xml",
                        "purpose": "any",
                    },
                ]
            elif favicon_path.endswith(".svg"):
                icons = [
                    {
                        "src": app.url_path_for("pwa_icon"),
                        "sizes": "any",
                        "type": "image/svg+xml",
                        "purpose": "any",
                    },
                ]
            else:
                icons = [
                    {
                        "src": app.url_path_for("pwa_icon", size=192),
                        "sizes": "192x192",
                        "type": "image/png",
                        "purpose": "any",
                    },
                    {
                        "src": app.url_path_for("pwa_icon", size=512),
                        "sizes": "512x512",
                        "type": "image/png",
                        "purpose": "any",
                    },
                ]

            return ORJSONResponse(
                content={
                    # NOTE: Required members: https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Guides/Making_PWAs_installable#required_manifest_members
                    "name": app.get_blocks().title or "Gradio",
                    "icons": icons,
                    "start_url": "./",
                    "display": "standalone",
                },
                media_type="application/manifest+json",
            )

        @app.get("/monitoring", dependencies=[Depends(login_check)])
        async def analytics_login(request: fastapi.Request):
            if not blocks.enable_monitoring:
                raise HTTPException(
                    status_code=403, detail="Monitoring is not enabled."
                )
            root_url = route_utils.get_root_url(
                request=request,
                route_path=f"{API_PREFIX}/monitoring",
                root_path=app.root_path,
            )
            monitoring_url = f"{root_url}/monitoring/{app.analytics_key}"
            print(f"* Monitoring URL: {monitoring_url} *")
            return HTMLResponse("See console for monitoring URL.")

        @app.get("/monitoring/summary")
        async def _():
            return app.get_blocks()._queue.cached_event_analytics_summary

        @app.get("/monitoring/{key}")
        async def analytics_dashboard(key: str):
            if not blocks.enable_monitoring:
                raise HTTPException(
                    status_code=403, detail="Monitoring is not enabled."
                )
            if compare_passwords_securely(key, app.analytics_key):
                analytics_url = f"/monitoring/{app.analytics_key}/dashboard"
                if not app.monitoring_enabled:
                    from gradio.monitoring_dashboard import data
                    from gradio.monitoring_dashboard import demo as dashboard

                    mount_gradio_app(
                        app, dashboard, path=analytics_url, mcp_server=False
                    )
                    dashboard._queue.start()
                    analytics = app.get_blocks()._queue.event_analytics
                    data["data"] = analytics
                    app.monitoring_enabled = True
                return RedirectResponse(
                    url=analytics_url, status_code=status.HTTP_302_FOUND
                )
            else:
                raise HTTPException(status_code=403, detail="Invalid key.")

        @router.post("/process_recording", dependencies=[Depends(login_check)])
        async def process_recording(
            request: fastapi.Request,
        ):
            try:
                content_type_header = request.headers.get("Content-Type")
                content_type: bytes
                content_type, _ = parse_options_header(content_type_header or "")
                if content_type != b"multipart/form-data":
                    raise HTTPException(status_code=400, detail="Invalid content type.")

                app = request.app
                max_file_size = (
                    app.get_blocks().max_file_size
                    if hasattr(app, "get_blocks")
                    else None
                )
                max_file_size = max_file_size if max_file_size is not None else math.inf

                multipart_parser = GradioMultiPartParser(
                    request.headers,
                    request.stream(),
                    max_files=1,
                    max_fields=10,
                    max_file_size=max_file_size,
                )
                form = await multipart_parser.parse()
            except MultiPartException as exc:
                code = 413 if "maximum allowed size" in exc.message else 400
                return PlainTextResponse(exc.message, status_code=code)

            video_files = form.getlist("video")
            if not video_files or not isinstance(video_files[0], GradioUploadFile):
                raise HTTPException(status_code=400, detail="No video file provided")

            video_file = video_files[0]

            params = {}
            if (
                form.get("remove_segment_start") is not None
                and form.get("remove_segment_end") is not None
            ):
                params["remove_segment_start"] = form.get("remove_segment_start")
                params["remove_segment_end"] = form.get("remove_segment_end")

            zoom_effects_json = form.get("zoom_effects")
            if zoom_effects_json:
                try:
                    params["zoom_effects"] = json.loads(str(zoom_effects_json))
                except json.JSONDecodeError:
                    params["zoom_effects"] = []

            with tempfile.NamedTemporaryFile(
                delete=False, suffix=".mp4", dir=DEFAULT_TEMP_DIR
            ) as input_file:
                video_file.file.seek(0)
                shutil.copyfileobj(video_file.file, input_file)
                input_path = input_file.name

            if shutil.which("ffmpeg") is None:
                return FileResponse(
                    input_path,
                    media_type="video/mp4",
                    filename="gradio-screen-recording.mp4",
                    background=BackgroundTask(lambda: cleanup_files([input_path])),
                )

            output_path = tempfile.mkstemp(
                suffix="_processed.mp4", dir=DEFAULT_TEMP_DIR
            )[1]

            try:
                processed_path, temp_files = await process_video_with_ffmpeg(
                    input_path, output_path, params
                )

                return FileResponse(
                    processed_path,
                    media_type="video/mp4",
                    filename="gradio-screen-recording.mp4",
                    background=BackgroundTask(lambda: cleanup_files(temp_files)),
                )
            except Exception:
                return FileResponse(
                    input_path,
                    media_type="video/mp4",
                    filename="gradio-screen-recording.mp4",
                    background=BackgroundTask(lambda: cleanup_files([input_path])),
                )

        vibe_edit_history_dir = Path(DEFAULT_TEMP_DIR) / "vibe_edit_history"
        vibe_edit_history_dir.mkdir(exist_ok=True, parents=True)
        chat_history = {"history": ""}
        hash_to_chat_history = {}

        def limit_chat_history(history: str, max_pairs: int = 5) -> str:
            """Limit chat history in the prompt to the last max_pairs user-assistant pairs."""
            if not history.strip():
                return ""

            user_messages = history.split("\nUser: ")
            if len(user_messages) <= max_pairs:
                return history

            recent_messages = user_messages[-max_pairs:]

            if len(recent_messages) > 0:
                if recent_messages[0].startswith("User: "):
                    result = recent_messages[0]
                else:
                    result = "User: " + recent_messages[0]

                for msg in recent_messages[1:]:
                    result += "\nUser: " + msg

                return result

            return ""

        control_token_re = re.compile(r"<\|[^>]*\|>")
        final_start_re = re.compile(
            r"<\|start\|>assistant<\|channel\|>final<\|message\|>", re.IGNORECASE
        )
        end_re = re.compile(r"<\|end\|>", re.IGNORECASE)
        reasoning_block_re = re.compile(
            r"<\s*reasoning\s*>\s*(?P<body>.*?)\s*<\s*/\s*reasoning\s*>",
            re.IGNORECASE | re.DOTALL,
        )
        think_block_re = re.compile(
            r"<\s*think\s*>.*?<\s*/\s*think\s*>", re.IGNORECASE | re.DOTALL
        )

        # Remove analysis and weird markers from gpt-oss
        def clean_out_markers(raw: str) -> str:
            if not raw:
                return raw

            m = final_start_re.search(raw)
            if m:
                text = raw[m.end() :]
                m_end = end_re.search(text)
                if m_end:
                    text = text[: m_end.start()]
                return text.strip()

            text = control_token_re.sub("", raw)
            return text.strip()

        def strip_think_blocks(text: str) -> str:
            """Remove any <think> ... </think> blocks entirely"""
            return think_block_re.sub("", text)

        def split_reasoning_code(text: str) -> tuple[str, str]:
            """
            Extract all <reasoning>...</reasoning> and code. If multiple, concatenate with blank lines, de-duping exact copies.
            """
            reasoning_chunks = []
            seen = set()
            for m in reasoning_block_re.finditer(text):
                body = m.group("body").strip()
                if body and body not in seen:
                    reasoning_chunks.append(body)
                    seen.add(body)

            reasoning_text = "\n\n".join(reasoning_chunks).strip()
            code_text = reasoning_block_re.sub("", text).strip()

            return reasoning_text, code_text

        if blocks.vibe_mode:
            from huggingface_hub import InferenceClient

            inference_client = InferenceClient()

        @router.post("/vibe-edit/")
        @router.post("/vibe-edit")
        async def vibe_edit(body: VibeEditBody):
            if not blocks.vibe_mode:
                raise HTTPException(
                    status_code=403,
                    detail="Vibe editor is not enabled. Use --vibe flag to enable.",
                )

            from gradio.http_server import GRADIO_WATCH_DEMO_PATH

            with open(GRADIO_WATCH_DEMO_PATH) as f:
                original_code = f.read()

            snapshot_hash = secrets.token_hex(16)
            snapshot_file = vibe_edit_history_dir / f"{snapshot_hash}.py"

            with open(snapshot_file, "w") as f:
                f.write(original_code)

            hash_to_chat_history[snapshot_hash] = chat_history["history"]

            content = ""
            limited_history = limit_chat_history(chat_history["history"])

            prompt = f"""
You are a code generator for Gradio apps. Given the following existing code and prompt, return the full new code.
Existing code:
```python
{original_code}
```

Prompt:
{body.prompt}

History:
{limited_history if limited_history else "No chat history."}
"""

            system_prompt = load_system_prompt()
            content = (
                inference_client.chat_completion(
                    model="openai/gpt-oss-120b",
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": prompt},
                    ],
                    max_tokens=10000,
                )
                .choices[0]
                .message.content
            )

            if content is None:
                raise HTTPException(status_code=500, detail="Error generating code")

            content = clean_out_markers(content)
            content = strip_think_blocks(content)

            chat_history["history"] += f"\nUser: {body.prompt}\nAssistant: {content}\n"

            reasoning, content = split_reasoning_code(content)

            if "```python\n" in content:
                start = content.index("```python\n") + len("```python\n")
                end = content.find("\n```", start)
                content = content[start:end] if end != -1 else content[start:]

            # Calculate diff stats
            original_lines = original_code.splitlines(keepends=True)
            new_lines = content.splitlines(keepends=True)
            diff = list(difflib.unified_diff(original_lines, new_lines, n=0))

            lines_added = 0
            lines_removed = 0
            for line in diff:
                if line.startswith("+") and not line.startswith("+++"):
                    lines_added += 1
                elif line.startswith("-") and not line.startswith("---"):
                    lines_removed += 1

            with open(GRADIO_WATCH_DEMO_PATH, "w") as f:
                f.write(content)

            return {
                "hash": snapshot_hash,
                "diff_stats": {
                    "lines_added": lines_added,
                    "lines_removed": lines_removed,
                },
                "reasoning": reasoning,
            }

        @router.post("/undo-vibe-edit/")
        @router.post("/undo-vibe-edit")
        async def undo_vibe_edit(hash: str = Body(..., embed=True)):
            if not blocks.vibe_mode:
                raise HTTPException(
                    status_code=403,
                    detail="Vibe editor is not enabled. Use --vibe flag to enable.",
                )

            from gradio.http_server import GRADIO_WATCH_DEMO_PATH

            snapshot_file = vibe_edit_history_dir / f"{hash}.py"

            if not snapshot_file.exists():
                raise HTTPException(status_code=404, detail="Snapshot not found")

            # Restore the file from the snapshot
            with open(snapshot_file) as f:
                saved_content = f.read()

            with open(GRADIO_WATCH_DEMO_PATH, "w") as f:
                f.write(saved_content)

            chat_history["history"] = hash_to_chat_history.get(hash, "")

            return {"success": True}

        @router.get("/vibe-code/")
        @router.get("/vibe-code")
        async def get_vibe_code():
            if not blocks.vibe_mode:
                raise HTTPException(
                    status_code=403,
                    detail="Vibe editor is not enabled. Use --vibe flag to enable.",
                )

            from gradio.http_server import GRADIO_WATCH_DEMO_PATH

            try:
                with open(GRADIO_WATCH_DEMO_PATH) as f:
                    code = f.read()
                return {"code": code}
            except FileNotFoundError:
                raise HTTPException(
                    status_code=404, detail="Demo file not found"
                ) from None
            except Exception as e:
                raise HTTPException(
                    status_code=500, detail=f"Error reading file: {str(e)}"
                ) from e

        @router.post("/vibe-code/")
        @router.post("/vibe-code")
        async def update_vibe_code(body: VibeCodeBody):
            if not blocks.vibe_mode:
                raise HTTPException(
                    status_code=403,
                    detail="Vibe editor is not enabled. Use --vibe flag to enable.",
                )

            from gradio.http_server import GRADIO_WATCH_DEMO_PATH

            try:
                with open(GRADIO_WATCH_DEMO_PATH, "w") as f:
                    f.write(body.code)
                return {"success": True}
            except Exception as e:
                raise HTTPException(
                    status_code=500, detail=f"Error writing file: {str(e)}"
                ) from e

        @router.post("/vibe-starter-queries/")
        @router.post("/vibe-starter-queries")
        async def get_vibe_starter_queries():
            if not blocks.vibe_mode:
                raise HTTPException(
                    status_code=403,
                    detail="Vibe editor is not enabled. Use --vibe flag to enable.",
                )

            from gradio.http_server import GRADIO_WATCH_DEMO_PATH

            with open(GRADIO_WATCH_DEMO_PATH) as f:
                code = f.read()

            prompt = f"""
You are a prompt generator for a gradio vibe editor. Given the following existing code, return a list of starter queries that can be used to generate a new code.
Existing code:
```python
{code}
```
"""

            system_prompt = load_system_prompt(starter_queries=True)
            content = (
                inference_client.chat_completion(
                    model="openai/gpt-oss-120b",
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": prompt},
                    ],
                    max_tokens=10000,
                )
                .choices[0]
                .message.content
            )

            if content is None:
                raise HTTPException(status_code=500, detail="Error generating code")

            content = strip_think_blocks(content)
            content = clean_out_markers(content)

            starter_queries = content.split("\n")

            return {
                "starter_queries": starter_queries,
            }

        def cleanup_files(files):
            for file in files:
                try:
                    if file and os.path.exists(file):
                        os.unlink(file)
                except Exception as e:
                    print(f"Error cleaning up file {file}: {str(e)}")

        from gradio.profiling import PROFILING_ENABLED

        if PROFILING_ENABLED:
            from gradio.profiling import collector

            @router.get("/profiling/traces")
            async def profiling_traces(
                last_n: int | None = None,
            ):
                return ORJSONResponse(collector.get_all(last_n=last_n))

            @router.get("/profiling/summary")
            async def profiling_summary():
                return ORJSONResponse(collector.get_summary())

            @router.post("/profiling/clear")
            async def profiling_clear():
                collector.clear()
                return ORJSONResponse({"status": "cleared"})

        app.include_router(router)
        return app


########
# Helper functions
########


def load_system_prompt(starter_queries: bool = False):
    prompt_rules = """Generate code for using the Gradio python library.

The following RULES must be followed.  Whenever you are forming a response, ensure all rules have been followed otherwise start over.

RULES:
Respond with code written in valid Python syntax, along with one coherent explanation surrounded by <reasoning> tags.
Any text that is not code, should be surrounded by one large <reasoning> tag.
Never include backticks in your response such as ``` or ```python.
Do not include any code that is not necessary for the app to run.
Respond with a full Gradio app.
Respond with a full Gradio app using correct syntax and features of the latest Gradio version. DO NOT write code that doesn't follow the signatures listed.
Do not add comments explaining the code, unless they are very necessary to understand the code.
Make sure the code includes all necessary imports.
Clearly explain the changes, summary, or reasoning for the code you respond with, inside one large <reasoning> tag. Make sure it's easy to parse. Use markdown formatting when it makes sense, including bullet points if there are multiple changes.


Here's an example of a valid response:

<reasoning>
I created a simple Gradio app that greets the user. It defines a function then creates a gradio interface and launches it.
</reasoning>

import gradio as gr

def greet(name):
    return "Hello " + name + "!"

demo = gr.Interface(fn=greet, inputs="textbox", outputs="textbox")

demo.launch()
"""
    if starter_queries:
        prompt_rules = """
        You are a prompt generator for a gradio vibe editor.

        Given python code of a gradio app, return a list of starter queries that can be used to generate new code.
        Make sure the queries are short, useful and actually possible with Gradio.
        The queries should be really simple and easy to understand.
        You should respond with at most three queries, each on a new line. Do not include any other text.
        Make sure the features you suggest are actually supported by Gradio, and documented in the docs section below.
        Never suggest a query with more than one gradio feature or concept.
        You may suggest queries that are not related to Gradio, but they must be related to the existing code and app. Never suggest queries that require external packages or libraries other than gradio.
        Don't suggest adding a clear button if the app is an Interface, because Interface already has a clear button.

        Here's an example of a gradio app:

        ```python
        import gradio as gr

        def greet(name):
            return "Hello " + name + "!"

        demo = gr.Interface(fn=greet, inputs="textbox", outputs="textbox")

        demo.launch()
        ```

        Here's an example of a valid response:
        Add a title to the app
        Add examples
        Rewrite this app using Blocks

        Here's an example of another valid response:
        Add another textbox for name
        Change the theme
        Greet the user in many languages

        """
    try:
        with httpx.Client() as client:
            response = client.get("https://www.gradio.app/llms.txt")
            system_prompt = response.text
    except Exception:
        system_prompt = ""
    system_prompt = prompt_rules + system_prompt + prompt_rules
    return system_prompt


def get_types(cls_set: list[type]):
    docset = []
    types = []
    for cls in cls_set:
        doc = inspect.getdoc(cls) or ""
        doc_lines = doc.split("\n")
        for line in doc_lines:
            if "value (" in line:
                types.append(line.split("value (")[1].split(")")[0])
        docset.append(doc_lines[1].split(":")[-1])
    return docset, types


@document()
def mount_gradio_app(
    app: fastapi.FastAPI,
    blocks: gradio.Blocks,
    path: str,
    server_name: str = "0.0.0.0",
    server_port: int = 7860,
    footer_links: (
        list[Literal["api", "gradio", "settings"] | dict[str, str]] | None
    ) = None,
    app_kwargs: dict[str, Any] | None = None,
    *,
    auth: Callable | tuple[str, str] | list[tuple[str, str]] | None = None,
    auth_message: str | None = None,
    auth_dependency: Callable[[fastapi.Request], str | None] | None = None,
    root_path: str | None = None,
    allowed_paths: list[str] | None = None,
    blocked_paths: list[str] | None = None,
    favicon_path: str | None = None,
    show_error: bool = True,
    max_file_size: str | int | None = None,
    ssr_mode: bool | None = None,
    node_server_name: str | None = None,
    node_port: int | None = None,
    enable_monitoring: bool | None = None,
    pwa: bool | None = None,
    i18n: I18n | None = None,
    mcp_server: bool | None = None,
    theme: Theme | str | None = None,
    css: str | None = None,
    css_paths: str | Path | Sequence[str | Path] | None = None,
    js: str | Literal[True] | None = None,
    head: str | None = None,
    head_paths: str | Path | Sequence[str | Path] | None = None,
) -> fastapi.FastAPI:
    """Mount a gradio.Blocks to an existing FastAPI application.

    Parameters:
        app: The parent FastAPI application.
        blocks: The blocks object we want to mount to the parent app.
        path: The path at which the gradio application will be mounted, e.g. "/gradio".
        server_name: The server name on which the Gradio app will be run.
        server_port: The port on which the Gradio app will be run.
        app_kwargs: Additional keyword arguments to pass to the underlying FastAPI app as a dictionary of parameter keys and argument values. For example, `{"docs_url": "/docs"}`
        auth: If provided, username and password (or list of username-password tuples) required to access the gradio app. Can also provide function that takes username and password and returns True if valid login.
        auth_message: If provided, HTML message provided on login page for this gradio app.
        auth_dependency: A function that takes a FastAPI request and returns a string user ID or None. If the function returns None for a specific request, that user is not authorized to access the gradio app (they will see a 401 Unauthorized response). To be used with external authentication systems like OAuth. Cannot be used with `auth`.
        root_path: The subpath corresponding to the public deployment of this FastAPI application. For example, if the application is served at "https://example.com/myapp", the `root_path` should be set to "/myapp". A full URL beginning with http:// or https:// can be provided, which will be used in its entirety. Normally, this does not need to provided (even if you are using a custom `path`). However, if you are serving the FastAPI app behind a proxy, the proxy may not provide the full path to the Gradio app in the request headers. In which case, you can provide the root path here.
        allowed_paths: List of complete filepaths or parent directories that this gradio app is allowed to serve. Must be absolute paths. Warning: if you provide directories, any files in these directories or their subdirectories are accessible to all users of your app.
        blocked_paths: List of complete filepaths or parent directories that this gradio app is not allowed to serve (i.e. users of your app are not allowed to access). Must be absolute paths. Warning: takes precedence over `allowed_paths` and all other directories exposed by Gradio by default.
        favicon_path: If a path to a file (.png, .gif, or .ico) is provided, it will be used as the favicon for this gradio app's page.
        show_error: If True, any errors in the gradio app will be displayed in an alert modal and printed in the browser console log. Otherwise, errors will only be visible in the terminal session running the Gradio app.
        max_file_size: The maximum file size in bytes that can be uploaded. Can be a string of the form "<value><unit>", where value is any positive integer and unit is one of "b", "kb", "mb", "gb", "tb". If None, no limit is set.
        footer_links: The links to display in the footer of the app. Accepts a list, where each element of the list must be one of "api", "gradio", or "settings" corresponding to the API docs, "built with Gradio", and settings pages respectively. If None, all three links will be shown in the footer. An empty list means that no footer is shown.
        ssr_mode: If True, the Gradio app will be rendered using server-side rendering mode, which is typically more performant and provides better SEO, but this requires Node 20+ to be installed on the system. If False, the app will be rendered using client-side rendering mode. If None, will use GRADIO_SSR_MODE environment variable or default to False.
        node_server_name: The name of the Node server to use for SSR. If None, will use GRADIO_NODE_SERVER_NAME environment variable or search for a node binary in the system.
        i18n: If provided, the i18n instance to use for this gradio app.
        node_port: The port on which the Node server should run. If None, will use GRADIO_NODE_SERVER_PORT environment variable or find a free port.
        mcp_server: If True, the MCP server will be launched on the gradio app. If None, will use GRADIO_MCP_SERVER environment variable or default to False.
        theme: A Theme object or a string representing a theme. If a string, will look for a built-in theme with that name (e.g. "soft" or "default"), or will attempt to load a theme from the Hugging Face Hub (e.g. "gradio/monochrome"). If None, will use the Default theme.
        css: Custom css as a code string. This css will be included in the demo webpage.
        css_paths: Custom css as a pathlib.Path to a css file or a list of such paths. This css files will be read, concatenated, and included in the demo webpage. If the `css` parameter is also set, the css from `css` will be included first.
        js: Custom js as a code string. The custom js should be in the form of a single js function. This function will automatically be executed when the page loads. For more flexibility, use the head parameter to insert js inside <script> tags.
        head: Custom html code to insert into the head of the demo webpage. This can be used to add custom meta tags, multiple scripts, stylesheets, etc. to the page.
        head_paths: Custom html code as a pathlib.Path to a html file or a list of such paths. This html files will be read, concatenated, and included in the head of the demo webpage. If the `head` parameter is also set, the html from `head` will be included first.
    Example:
        from fastapi import FastAPI
        import gradio as gr
        app = FastAPI()
        @app.get("/")
        def read_main():
            return {"message": "This is your main app"}
        io = gr.Interface(lambda x: "Hello, " + x + "!", "textbox", "textbox")
        app = gr.mount_gradio_app(app, io, path="/gradio")
        # Then run `uvicorn run:app` from the terminal and navigate to http://localhost:8000/gradio.
    """
    if favicon_path is not None and path != "/":
        warnings.warn(
            "The 'favicon_path' parameter is set but will be ignored because 'path' is not '/'. "
            "Please add the favicon directly to your FastAPI app."
        )

    blocks.dev_mode = False
    if footer_links is None:
        footer_links = ["api", "gradio", "settings"]
    blocks.footer_links = footer_links
    blocks.max_file_size = utils._parse_file_size(max_file_size)
    blocks.config = blocks.get_config_file()
    blocks.validate_queue_settings()
    blocks.custom_mount_path = path
    blocks.server_port = server_port
    blocks.server_name = server_name
    blocks.enable_monitoring = enable_monitoring
    if pwa is not None:
        blocks.pwa = pwa
    if i18n is not None:
        blocks.i18n_instance = i18n
    if auth is not None and auth_dependency is not None:
        raise ValueError(
            "You cannot provide both `auth` and `auth_dependency` in mount_gradio_app(). Please choose one."
        )
    if (
        auth
        and not callable(auth)
        and not isinstance(auth[0], tuple)
        and not isinstance(auth[0], list)
    ):
        blocks.auth = [auth]
    else:
        blocks.auth = auth
    blocks.auth_message = auth_message
    blocks.favicon_path = favicon_path
    blocks.allowed_paths = allowed_paths or []
    blocks.blocked_paths = blocked_paths or []
    blocks.show_error = show_error

    if not isinstance(blocks.allowed_paths, list):
        raise ValueError("`allowed_paths` must be a list of directories.")
    if not isinstance(blocks.blocked_paths, list):
        raise ValueError("`blocked_paths` must be a list of directories.")

    if root_path is not None:
        blocks.root_path = root_path

    blocks.ssr_mode = blocks._resolve_ssr_mode(ssr_mode)

    if blocks.ssr_mode:
        blocks.node_path = os.environ.get("GRADIO_NODE_PATH", get_node_path())

        blocks.node_server_name = node_server_name
        blocks.node_port = node_port
        blocks.node_server_name, blocks.node_process, blocks.node_port = (
            start_node_server(
                server_name=blocks.node_server_name,
                server_port=blocks.node_port,
                node_path=blocks.node_path,
            )
        )
    blocks.theme = utils.get_theme(theme)
    blocks.css = css or ""
    blocks.js = js or ""
    blocks.head = head or ""
    blocks.head_paths = head_paths or []
    blocks.css_paths = css_paths or []
    blocks._set_html_css_theme_variables()

    blocks.transpile_to_js()
    gradio_app = App.create_app(
        blocks,
        app_kwargs=app_kwargs,
        auth_dependency=auth_dependency,
        mcp_server=mcp_server,
    )
    old_lifespan = app.router.lifespan_context

    @contextlib.asynccontextmanager
    async def new_lifespan(app: FastAPI):
        async with old_lifespan(
            app
        ) as state:  # Insert the startup events inside the FastAPI context manager
            async with gradio_app.router.lifespan_context(gradio_app):
                gradio_app.get_blocks().run_startup_events()
                await gradio_app.get_blocks().run_extra_startup_events()
                yield state

    app.router.lifespan_context = new_lifespan  # type: ignore

    app.mount(path, gradio_app)
    return app


INTERNAL_ROUTES = [
    "theme.css",
    "robots.txt",
    "pwa_icon",
    "manifest.json",
    "login",
    "logout",
    "svelte",
    "config",
    "static",
    "assets",
    "favicon.ico",
    "gradio_api",
    "monitoring",
]
