Source code for acp_sdk.server.utils

import asyncio
from collections.abc import AsyncGenerator, Coroutine
from typing import Any, Callable

import httpx
import requests
from pydantic import BaseModel

from acp_sdk.server.bundle import RunBundle
from acp_sdk.server.logging import logger


[docs] def encode_sse(model: BaseModel) -> str: return f"data: {model.model_dump_json()}\n\n"
[docs] async def stream_sse(bundle: RunBundle) -> AsyncGenerator[str]: async for event in bundle.stream(): yield encode_sse(event)
[docs] async def async_request_with_retry( request_func: Callable[[httpx.AsyncClient], Coroutine[Any, Any, httpx.Response]], max_retries: int = 5, backoff_factor: float = 1, ) -> dict[str, Any]: async with httpx.AsyncClient() as client: retries = 0 while retries < max_retries: try: response = await request_func(client) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code in [429, 500, 502, 503, 504, 509]: retries += 1 backoff = backoff_factor * (2 ** (retries - 1)) logger.debug(f"Request retry (try {retries}/{max_retries}), waiting {backoff} seconds...") await asyncio.sleep(backoff) else: logger.debug("A non-retryable error was encountered.") raise except httpx.RequestError: retries += 1 backoff = backoff_factor * (2 ** (retries - 1)) logger.debug(f"Request retry (try {retries}/{max_retries}), waiting {backoff} seconds...") await asyncio.sleep(backoff) raise requests.exceptions.ConnectionError(f"Request failed after {max_retries} retries.")