Source code for acp_sdk.models.models

import asyncio
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Literal, Optional, Union

from pydantic import AnyUrl, BaseModel, ConfigDict, Field

from acp_sdk.models.errors import ACPError, Error


[docs] class AnyModel(BaseModel): model_config = ConfigDict(extra="allow")
[docs] class Author(BaseModel): name: str email: str | None = None url: AnyUrl | None = None
[docs] class Contributor(BaseModel): name: str email: str | None = None url: AnyUrl | None = None
[docs] class LinkType(str, Enum): SOURCE_CODE = "source-code" CONTAINER_IMAGE = "container-image" HOMEPAGE = "homepage" DOCUMENTATION = "documentation"
[docs] class DependencyType(str, Enum): AGENT = "agent" TOOL = "tool" MODEL = "model"
[docs] class Dependency(BaseModel): type: DependencyType name: str
[docs] class Capability(BaseModel): name: str description: str
[docs] class Metadata(BaseModel): annotations: AnyModel | None = None documentation: str | None = None license: str | None = None programming_language: str | None = None natural_languages: list[str] | None = None framework: str | None = None capabilities: list[Capability] | None = None domains: list[str] | None = None tags: list[str] | None = None created_at: datetime | None = None updated_at: datetime | None = None author: Author | None = None contributors: list[Contributor] | None = None links: list[Link] | None = None dependencies: list[Dependency] | None = None recommended_models: list[str] | None = None model_config = ConfigDict(extra="allow")
[docs] class MessagePart(BaseModel): name: Optional[str] = None content_type: Optional[str] = "text/plain" content: Optional[str] = None content_encoding: Optional[Literal["plain", "base64"]] = "plain" content_url: Optional[AnyUrl] = None model_config = ConfigDict(extra="allow")
[docs] def model_post_init(self, __context: Any) -> None: if self.content is None and self.content_url is None: raise ValueError("Either content or content_url must be provided") if self.content is not None and self.content_url is not None: raise ValueError("Only one of content or content_url can be provided")
[docs] class Artifact(MessagePart): name: str
[docs] class Message(BaseModel): parts: list[MessagePart] created_at: datetime | None = Field(default_factory=lambda: datetime.now(timezone.utc)) completed_at: datetime | None = Field(default_factory=lambda: datetime.now(timezone.utc)) def __add__(self, other: "Message") -> "Message": if not isinstance(other, Message): raise TypeError(f"Cannot concatenate Message with {type(other).__name__}") return Message( parts=self.parts + other.parts, created_at=min(self.created_at, other.created_at) if self.created_at and other.created_at else None, completed_at=max(self.completed_at, other.completed_at) if self.completed_at and other.completed_at else None, ) def __str__(self) -> str: return "".join( part.content for part in self.parts if part.content is not None and part.content_type == "text/plain" )
[docs] def compress(self) -> "Message": def can_be_joined(first: MessagePart, second: MessagePart) -> bool: return ( first.name is None and second.name is None and first.content_type == "text/plain" and second.content_type == "text/plain" and first.content_encoding == "plain" and second.content_encoding == "plain" and first.content_url is None and second.content_url is None ) def join(first: MessagePart, second: MessagePart) -> MessagePart: return MessagePart( name=None, content_type="text/plain", content=first.content + second.content, content_encoding="plain", content_url=None, ) parts: list[MessagePart] = [] for part in self.parts: if len(parts) > 0 and can_be_joined(parts[-1], part): parts[-1] = join(parts[-1], part) else: parts.append(part) return Message(parts=parts, created_at=self.created_at, completed_at=self.completed_at)
AgentName = str SessionId = uuid.UUID RunId = uuid.UUID
[docs] class RunMode(str, Enum): SYNC = "sync" ASYNC = "async" STREAM = "stream"
[docs] class RunStatus(str, Enum): CREATED = "created" IN_PROGRESS = "in-progress" AWAITING = "awaiting" CANCELLING = "cancelling" CANCELLED = "cancelled" COMPLETED = "completed" FAILED = "failed" @property def is_terminal(self) -> bool: terminal_states = {RunStatus.COMPLETED, RunStatus.FAILED, RunStatus.CANCELLED} return self in terminal_states
[docs] class MessageAwaitRequest(BaseModel): type: Literal["message"] = "message" message: Message
[docs] class MessageAwaitResume(BaseModel): type: Literal["message"] = "message" message: Message
AwaitRequest = Union[MessageAwaitRequest] AwaitResume = Union[MessageAwaitResume]
[docs] class Run(BaseModel): run_id: RunId = Field(default_factory=uuid.uuid4) agent_name: AgentName session_id: SessionId | None = None status: RunStatus = RunStatus.CREATED await_request: AwaitRequest | None = None output: list[Message] = [] error: Error | None = None created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) finished_at: datetime | None = None
[docs] def raise_for_status(self) -> "Run": match self.status: case RunStatus.CANCELLED: raise asyncio.CancelledError() case RunStatus.FAILED: raise ACPError(error=self.error) case _: return self
[docs] class MessageCreatedEvent(BaseModel): type: Literal["message.created"] = "message.created" message: Message
[docs] class MessagePartEvent(BaseModel): type: Literal["message.part"] = "message.part" part: MessagePart
[docs] class ArtifactEvent(BaseModel): type: Literal["message.part"] = "message.part" part: Artifact
[docs] class MessageCompletedEvent(BaseModel): type: Literal["message.completed"] = "message.completed" message: Message
[docs] class RunAwaitingEvent(BaseModel): type: Literal["run.awaiting"] = "run.awaiting" run: Run
[docs] class GenericEvent(BaseModel): type: Literal["generic"] = "generic" generic: AnyModel
[docs] class RunCreatedEvent(BaseModel): type: Literal["run.created"] = "run.created" run: Run
[docs] class RunInProgressEvent(BaseModel): type: Literal["run.in-progress"] = "run.in-progress" run: Run
[docs] class RunFailedEvent(BaseModel): type: Literal["run.failed"] = "run.failed" run: Run
[docs] class RunCancelledEvent(BaseModel): type: Literal["run.cancelled"] = "run.cancelled" run: Run
[docs] class RunCompletedEvent(BaseModel): type: Literal["run.completed"] = "run.completed" run: Run
Event = Union[ RunCreatedEvent, RunInProgressEvent, MessageCreatedEvent, ArtifactEvent, MessagePartEvent, MessageCompletedEvent, RunAwaitingEvent, GenericEvent, RunCancelledEvent, RunFailedEvent, RunCompletedEvent, MessagePartEvent, ]
[docs] class Agent(BaseModel): name: str description: str | None = None metadata: Metadata = Metadata()