import base64
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from openai import OpenAI
from prompter.image_data import url_to_b64
from prompter.schemas import (
Assistant,
Block,
Document,
Image,
LLMResponse,
Prompt,
System,
Text,
Tool,
ToolCall,
ToolUse,
User,
)
[docs]
@dataclass
class OpenAIParams:
max_tokens: int = 1024
model: str = "gpt-4o"
temperature: float = 0.0
[docs]
class OpenAIExecutor:
[docs]
class Converters:
[docs]
@staticmethod
def block_to_openai_messages(block: Block) -> list[dict[str, Any]]:
block_converters = {
User: OpenAIExecutor.Converters._convert_user_block,
Assistant: OpenAIExecutor.Converters._convert_assistant_block,
ToolUse: OpenAIExecutor.Converters._convert_tool_use_block,
ToolCall: OpenAIExecutor.Converters._convert_tool_call_block,
System: OpenAIExecutor.Converters._convert_system_block,
}
for block_type, converter in block_converters.items():
if isinstance(block, block_type):
return converter(block)
raise ValueError(f"Unknown block type: {type(block)}")
@staticmethod
def _convert_user_block(block: User) -> list[dict[str, Any]]:
return [OpenAIExecutor.Converters._build_user_message(block)]
@staticmethod
def _convert_assistant_block(block: Assistant) -> list[dict[str, Any]]:
content = OpenAIExecutor.Converters._process_assistant_content(block.content)
return [{"role": "assistant", "content": content}]
@staticmethod
def _convert_tool_use_block(block: ToolUse) -> list[dict[str, Any]]:
tool_call_msg = OpenAIExecutor.Converters._create_tool_call_message(
block.id, block.name, block.arguments
)
messages = [tool_call_msg]
tool_result = OpenAIExecutor.Converters._create_tool_result(
block.id, block.result, block.error
)
if tool_result:
messages.append(tool_result)
return messages
@staticmethod
def _convert_tool_call_block(block: ToolCall) -> list[dict[str, Any]]:
return [OpenAIExecutor.Converters._create_tool_call_message(
block.id, block.name, block.arguments
)]
@staticmethod
def _convert_system_block(block: System) -> list[dict[str, Any]]:
return [{"role": "system", "content": block.content}]
@staticmethod
def _build_user_message(block: User) -> dict[str, Any]:
content = OpenAIExecutor.Converters._process_user_content(block.content)
return {"role": "user", "content": content}
@staticmethod
def _process_user_content(content: list) -> str | list[dict[str, Any]]:
if len(content) == 1:
item = content[0]
if isinstance(item, str):
return item
elif isinstance(item, Text):
return item.content
return OpenAIExecutor.Converters.content_list_to_openai(content)
@staticmethod
def _process_assistant_content(content: list) -> str | list[dict[str, Any]]:
if len(content) == 1:
item = content[0]
if isinstance(item, str):
return item
elif isinstance(item, Text):
return item.content
return OpenAIExecutor.Converters.content_list_to_openai(content)
@staticmethod
def _create_tool_call_message(
tool_id: str, name: str, arguments: Any
) -> dict[str, Any]:
args_str = (
json.dumps(arguments) if isinstance(arguments, dict) else arguments
)
return {
"role": "assistant",
"tool_calls": [
{
"id": tool_id,
"type": "function",
"function": {
"name": name,
"arguments": args_str,
},
}
],
}
@staticmethod
def _create_tool_result(
tool_id: str, result: Any, error: Optional[str]
) -> Optional[dict[str, Any]]:
content = OpenAIExecutor.Converters._format_tool_result_content(result, error)
if content is not None:
return {
"role": "tool",
"tool_call_id": tool_id,
"content": content,
}
return None
@staticmethod
def _format_tool_result_content(result: Any, error: Optional[str]) -> Optional[str]:
if error:
return error
elif result is not None:
return json.dumps(result) if not isinstance(result, str) else result
return None
[docs]
@staticmethod
def content_list_to_openai(content: list) -> list[dict[str, Any]]:
content_converters = {
Text: lambda item: {"type": "text", "text": item.content},
str: lambda item: {"type": "text", "text": item},
Image: OpenAIExecutor.Converters._convert_image,
Document: lambda item: OpenAIExecutor.Converters._handle_document(),
}
result = []
for item in content:
for content_type, converter in content_converters.items():
if isinstance(item, content_type):
result.append(converter(item))
break
return result
@staticmethod
def _convert_image(image: Image) -> dict[str, Any]:
if image.source.startswith("data:"):
return OpenAIExecutor.Converters._create_image_url_dict(
image.source, image.detail
)
elif image.source.startswith(("http://", "https://")):
return OpenAIExecutor.Converters._create_image_url_dict(
image.source, image.detail
)
else:
data_url = OpenAIExecutor.Converters._file_to_data_url(image)
return OpenAIExecutor.Converters._create_image_url_dict(
data_url, image.detail
)
@staticmethod
def _create_image_url_dict(url: str, detail: str) -> dict[str, Any]:
return {
"type": "image_url",
"image_url": {
"url": url,
"detail": detail,
},
}
@staticmethod
def _file_to_data_url(image: Image) -> str:
path = Path(image.source)
with open(path, "rb") as f:
data = base64.b64encode(f.read()).decode("utf-8")
return f"data:{image.media_type};base64,{data}"
@staticmethod
def _handle_document() -> None:
raise NotImplementedError("Document support not implemented")
@staticmethod
def _extract_tool_parameters(params: Any) -> dict[str, Any]:
if not params:
return {"type": "object", "properties": {}}
if hasattr(params, "model_json_schema"):
return params.model_json_schema()
elif isinstance(params, dict):
return params
else:
return {"type": "object", "properties": {}}
[docs]
@staticmethod
def flatten_messages(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
flattened = []
for msg in messages:
if isinstance(msg, list):
flattened.extend(msg)
else:
flattened.append(msg)
return flattened
[docs]
@staticmethod
def parse_openai_response(response, tools: list[Tool]) -> LLMResponse:
choice = response.choices[0]
message = choice.message
text_content = message.content or ""
tool_calls = OpenAIExecutor._extract_tool_calls(message)
return LLMResponse(
raw_response=response,
tools=tools,
_text_content=text_content,
_tool_calls=tool_calls,
)
@staticmethod
def _extract_tool_calls(message) -> list[ToolCall]:
tool_calls = []
if hasattr(message, "tool_calls") and message.tool_calls:
for tc in message.tool_calls:
arguments = OpenAIExecutor._parse_tool_arguments(tc.function.arguments)
tool_calls.append(
ToolCall(
name=tc.function.name,
arguments=arguments,
id=tc.id,
)
)
return tool_calls
@staticmethod
def _parse_tool_arguments(arguments: Any) -> Any:
if isinstance(arguments, str):
try:
return json.loads(arguments)
except json.JSONDecodeError:
return arguments
return arguments
[docs]
def __init__(self, client=None, params: Optional[OpenAIParams] = None):
self.client = client or OpenAI()
self.params = params or OpenAIParams()
[docs]
def execute(
self, prompt: Prompt, params: Optional[OpenAIParams] = None
) -> LLMResponse:
params = params or self.params
messages = self._build_messages(prompt)
kwargs = self._build_api_kwargs(params, messages, prompt)
response = self.client.chat.completions.create(**kwargs)
return self.parse_openai_response(response, prompt.tools or [])
def _build_messages(self, prompt: Prompt) -> list[dict[str, Any]]:
messages = []
for block in prompt.conversation:
messages.extend(self.Converters.block_to_openai_messages(block))
if prompt.system and not self._has_system_message(messages):
messages.insert(0, {"role": "system", "content": prompt.system})
return self.Converters.flatten_messages(messages)
def _has_system_message(self, messages: list[dict[str, Any]]) -> bool:
return any(msg.get("role") == "system" for msg in messages)
def _build_api_kwargs(
self,
params: OpenAIParams,
messages: list[dict[str, Any]],
prompt: Prompt
) -> dict[str, Any]:
kwargs = {
"model": params.model,
"messages": messages,
"max_tokens": params.max_tokens,
"temperature": params.temperature,
}
if prompt.tools:
kwargs["tools"] = self._convert_tools(prompt.tools)
tool_choice = self._format_tool_choice(prompt.tool_choice)
if tool_choice:
kwargs["tool_choice"] = tool_choice
response_format = self._format_response_format(prompt.response_format)
if response_format:
kwargs["response_format"] = response_format
return kwargs
def _convert_tools(self, tools: list[Tool]) -> list[dict[str, Any]]:
return [self.Converters.tool_to_openai(tool) for tool in tools]
def _format_tool_choice(self, tool_choice: Optional[str]) -> Optional[Any]:
if tool_choice in ["required", "none", "auto"]:
return tool_choice
elif isinstance(tool_choice, str):
return {
"type": "function",
"function": {"name": tool_choice},
}
return None
def _format_response_format(self, response_format: Any) -> Optional[dict[str, Any]]:
if not response_format:
return None
if hasattr(response_format, "model_json_schema"):
return {
"type": "json_schema",
"json_schema": {
"name": response_format.__name__,
"schema": response_format.model_json_schema(),
"strict": True,
},
}
else:
return {"type": "json_object"}
# Compatibility layer for existing code
block_to_openai_messages = OpenAIExecutor.Converters.block_to_openai_messages
content_list_to_openai = OpenAIExecutor.Converters.content_list_to_openai
tool_to_openai = OpenAIExecutor.Converters.tool_to_openai
flatten_messages = OpenAIExecutor.Converters.flatten_messages
parse_openai_response = OpenAIExecutor.parse_openai_response