Source code for acp_sdk.server.context

from concurrent.futures import ThreadPoolExecutor

import janus

from acp_sdk.models import SessionId
from acp_sdk.server.types import RunYield, RunYieldResume


[docs] class Context:
[docs] def __init__( self, *, session_id: SessionId | None = None, executor: ThreadPoolExecutor, yield_queue: janus.Queue[RunYield], yield_resume_queue: janus.Queue[RunYieldResume], ) -> None: self.session_id = session_id self.executor = executor self._yield_queue = yield_queue self._yield_resume_queue = yield_resume_queue
[docs] def yield_sync(self, value: RunYield) -> RunYieldResume: self._yield_queue.sync_q.put(value) return self._yield_resume_queue.sync_q.get()
[docs] async def yield_async(self, value: RunYield) -> RunYieldResume: await self._yield_queue.async_q.put(value) return await self._yield_resume_queue.async_q.get()
[docs] def shutdown(self) -> None: self._yield_queue.shutdown() self._yield_resume_queue.shutdown()