Source code for acp_sdk.server.session
import uuid
from collections.abc import Iterator
from acp_sdk.models import Message, SessionId
from acp_sdk.models.models import RunStatus
from acp_sdk.server.bundle import RunBundle
[docs]
class Session:
[docs]
def __init__(self, id: SessionId | None = None) -> None:
self.id: SessionId = id or uuid.uuid4()
self.bundles: list[RunBundle] = []
[docs]
def append(self, bundle: RunBundle) -> None:
self.bundles.append(bundle)
[docs]
def history(self) -> Iterator[Message]:
for bundle in self.bundles:
if bundle.run.status == RunStatus.COMPLETED:
yield from bundle.input
yield from bundle.run.output