Source code for acp_sdk.server.utils

import asyncio
from collections.abc import AsyncGenerator, Coroutine
from typing import Any, Callable

import httpx
import requests
from pydantic import BaseModel

from acp_sdk.models import RunStatus
from acp_sdk.server.executor import RunData
from acp_sdk.server.logging import logger
from acp_sdk.server.store import Store


[docs] def encode_sse(model: BaseModel) -> str: return f"data: {model.model_dump_json()}\n\n"
[docs] async def watch_util_stop( run_data: RunData, store: Store[RunData], *, ready: asyncio.Event | None = None ) -> AsyncGenerator[RunData]: async for data in run_data.watch(store, ready=ready): yield data if data.run.status == RunStatus.AWAITING: break
[docs] async def wait_util_stop(run_data: RunData, store: Store[RunData], *, ready: asyncio.Event | None = None) -> RunData: data = run_data async for latest_data in watch_util_stop(run_data, store, ready=ready): data = latest_data return data
[docs] async def stream_sse( run_data: RunData, store: Store[RunData], idx: int, *, ready: asyncio.Event | None = None ) -> AsyncGenerator[str]: next_event_idx = idx async for data in watch_util_stop(run_data, store, ready=ready): new_events = data.events[next_event_idx:] next_event_idx = len(data.events) for event in new_events: yield encode_sse(event)
[docs] async def async_request_with_retry( request_func: Callable[[httpx.AsyncClient], Coroutine[Any, Any, httpx.Response]], max_retries: int = 5, backoff_factor: float = 1, ) -> dict[str, Any]: async with httpx.AsyncClient() as client: retries = 0 while retries < max_retries: try: response = await request_func(client) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code in [429, 500, 502, 503, 504, 509]: retries += 1 backoff = backoff_factor * (2 ** (retries - 1)) logger.debug(f"Request retry (try {retries}/{max_retries}), waiting {backoff} seconds...") await asyncio.sleep(backoff) else: logger.debug("A non-retryable error was encountered.") raise except httpx.RequestError: retries += 1 backoff = backoff_factor * (2 ** (retries - 1)) logger.debug(f"Request retry (try {retries}/{max_retries}), waiting {backoff} seconds...") await asyncio.sleep(backoff) raise requests.exceptions.ConnectionError(f"Request failed after {max_retries} retries.")