codeflash/code_to_optimize/roboflow_original.py

653 lines
22 KiB
Python
Raw Normal View History

2025-04-29 17:57:50 +00:00
import base64
import json
from functools import partial
from typing import Any, Dict, List, Literal, Optional, Type, Union
import requests
from openai import OpenAI
from openai._types import NOT_GIVEN
from pydantic import ConfigDict, Field, model_validator
from inference.core.env import WORKFLOWS_REMOTE_EXECUTION_MAX_STEP_CONCURRENT_REQUESTS, API_BASE_URL
from inference.core.managers.base import ModelManager
from inference.core.utils.image_utils import encode_image_to_jpeg_bytes, load_image
from inference.core.workflows.core_steps.common.utils import run_in_parallel
from inference.core.workflows.core_steps.common.vlms import VLM_TASKS_METADATA
from inference.core.workflows.execution_engine.entities.base import (
Batch,
OutputDefinition,
WorkflowImageData,
)
from inference.core.workflows.execution_engine.entities.types import (
FLOAT_KIND,
IMAGE_KIND,
LANGUAGE_MODEL_OUTPUT_KIND,
LIST_OF_VALUES_KIND,
SECRET_KIND,
STRING_KIND,
ImageInputField,
Selector,
)
from inference.core.workflows.prototypes.block import (
BlockResult,
WorkflowBlock,
WorkflowBlockManifest,
)
SUPPORTED_TASK_TYPES_LIST = [
"unconstrained",
"ocr",
"structured-answering",
"classification",
"multi-label-classification",
"visual-question-answering",
"caption",
"detailed-caption",
]
SUPPORTED_TASK_TYPES = set(SUPPORTED_TASK_TYPES_LIST)
RELEVANT_TASKS_METADATA = {
k: v for k, v in VLM_TASKS_METADATA.items() if k in SUPPORTED_TASK_TYPES
}
RELEVANT_TASKS_DOCS_DESCRIPTION = "\n\n".join(
f"* **{v['name']}** (`{k}`) - {v['description']}"
for k, v in RELEVANT_TASKS_METADATA.items()
)
LONG_DESCRIPTION = f"""
Ask a question to OpenAI's GPT-4 with Vision model.
You can specify arbitrary text prompts or predefined ones, the block supports the following types of prompt:
{RELEVANT_TASKS_DOCS_DESCRIPTION}
You need to provide your OpenAI API key to use the GPT-4 with Vision model.
"""
TaskType = Literal[tuple(SUPPORTED_TASK_TYPES_LIST)]
TASKS_REQUIRING_PROMPT = {
"unconstrained",
"visual-question-answering",
}
TASKS_REQUIRING_CLASSES = {
"classification",
"multi-label-classification",
}
TASKS_REQUIRING_OUTPUT_STRUCTURE = {
"structured-answering",
}
class BlockManifest(WorkflowBlockManifest):
model_config = ConfigDict(
json_schema_extra={
"name": "OpenAI",
"version": "v3",
"short_description": "Run OpenAI's GPT-4 with vision capabilities.",
"long_description": LONG_DESCRIPTION,
"license": "Apache-2.0",
"block_type": "model",
"search_keywords": ["LMM", "VLM", "ChatGPT", "GPT", "OpenAI"],
"is_vlm_block": True,
"task_type_property": "task_type",
"ui_manifest": {
"section": "model",
"icon": "fal fa-atom",
"blockPriority": 5,
"popular": True,
},
},
protected_namespaces=(),
)
type: Literal["roboflow_core/open_ai@v3"]
images: Selector(kind=[IMAGE_KIND]) = ImageInputField
task_type: TaskType = Field(
default="unconstrained",
description="Task type to be performed by model. Value determines required parameters and output response.",
json_schema_extra={
"values_metadata": RELEVANT_TASKS_METADATA,
"recommended_parsers": {
"structured-answering": "roboflow_core/json_parser@v1",
"classification": "roboflow_core/vlm_as_classifier@v1",
"multi-label-classification": "roboflow_core/vlm_as_classifier@v1",
},
"always_visible": True,
},
)
prompt: Optional[Union[Selector(kind=[STRING_KIND]), str]] = Field(
default=None,
description="Text prompt to the OpenAI model",
examples=["my prompt", "$inputs.prompt"],
json_schema_extra={
"relevant_for": {
"task_type": {"values": TASKS_REQUIRING_PROMPT, "required": True},
},
"multiline": True,
},
)
output_structure: Optional[Dict[str, str]] = Field(
default=None,
description="Dictionary with structure of expected JSON response",
examples=[{"my_key": "description"}, "$inputs.output_structure"],
json_schema_extra={
"relevant_for": {
"task_type": {
"values": TASKS_REQUIRING_OUTPUT_STRUCTURE,
"required": True,
},
},
},
)
classes: Optional[Union[Selector(kind=[LIST_OF_VALUES_KIND]), List[str]]] = Field(
default=None,
description="List of classes to be used",
examples=[["class-a", "class-b"], "$inputs.classes"],
json_schema_extra={
"relevant_for": {
"task_type": {
"values": TASKS_REQUIRING_CLASSES,
"required": True,
},
},
},
)
api_key: Union[Selector(kind=[STRING_KIND, SECRET_KIND]), str] = Field(
description="Your OpenAI API key",
examples=["xxx-xxx", "$inputs.openai_api_key"],
private=True,
)
model_version: Union[
Selector(kind=[STRING_KIND]), Literal["gpt-4o", "gpt-4o-mini"]
] = Field(
default="gpt-4o",
description="Model to be used",
examples=["gpt-4o", "$inputs.openai_model"],
)
image_detail: Union[
Selector(kind=[STRING_KIND]), Literal["auto", "high", "low"]
] = Field(
default="auto",
description="Indicates the image's quality, with 'high' suggesting it is of high resolution and should be processed or displayed with high fidelity.",
examples=["auto", "high", "low"],
)
max_tokens: int = Field(
default=450,
description="Maximum number of tokens the model can generate in it's response.",
)
temperature: Optional[Union[float, Selector(kind=[FLOAT_KIND])]] = Field(
default=None,
description="Temperature to sample from the model - value in range 0.0-2.0, the higher - the more "
'random / "creative" the generations are.',
ge=0.0,
le=2.0,
)
max_concurrent_requests: Optional[int] = Field(
default=None,
description="Number of concurrent requests that can be executed by block when batch of input images provided. "
"If not given - block defaults to value configured globally in Workflows Execution Engine. "
"Please restrict if you hit OpenAI limits.",
)
@model_validator(mode="after")
def validate(self) -> "BlockManifest":
if self.task_type in TASKS_REQUIRING_PROMPT and self.prompt is None:
raise ValueError(
f"`prompt` parameter required to be set for task `{self.task_type}`"
)
if self.task_type in TASKS_REQUIRING_CLASSES and self.classes is None:
raise ValueError(
f"`classes` parameter required to be set for task `{self.task_type}`"
)
if (
self.task_type in TASKS_REQUIRING_OUTPUT_STRUCTURE
and self.output_structure is None
):
raise ValueError(
f"`output_structure` parameter required to be set for task `{self.task_type}`"
)
return self
@classmethod
def get_parameters_accepting_batches(cls) -> List[str]:
return ["images"]
@classmethod
def describe_outputs(cls) -> List[OutputDefinition]:
return [
OutputDefinition(
name="output", kind=[STRING_KIND, LANGUAGE_MODEL_OUTPUT_KIND]
),
OutputDefinition(name="classes", kind=[LIST_OF_VALUES_KIND]),
]
@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
return ">=1.4.0,<2.0.0"
class OpenAIBlockV3(WorkflowBlock):
def __init__(
self,
model_manager: ModelManager,
api_key: Optional[str],
):
self._model_manager = model_manager
self._api_key = api_key
@classmethod
def get_init_parameters(cls) -> List[str]:
return ["model_manager", "api_key"]
@classmethod
def get_manifest(cls) -> Type[WorkflowBlockManifest]:
return BlockManifest
@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
return ">=1.3.0,<2.0.0"
def run(
self,
images: Batch[WorkflowImageData],
task_type: TaskType,
prompt: Optional[str],
output_structure: Optional[Dict[str, str]],
classes: Optional[List[str]],
api_key: str,
model_version: str,
image_detail: Literal["low", "high", "auto"],
max_tokens: int,
temperature: Optional[float],
max_concurrent_requests: Optional[int],
) -> BlockResult:
inference_images = [i.to_inference_format() for i in images]
raw_outputs = run_gpt_4v_llm_prompting(
roboflow_api_key=self._api_key,
images=inference_images,
task_type=task_type,
prompt=prompt,
output_structure=output_structure,
classes=classes,
openai_api_key=api_key,
gpt_model_version=model_version,
gpt_image_detail=image_detail,
max_tokens=max_tokens,
temperature=temperature,
max_concurrent_requests=max_concurrent_requests,
)
return [
{"output": raw_output, "classes": classes} for raw_output in raw_outputs
]
def run_gpt_4v_llm_prompting(
images: List[Dict[str, Any]],
task_type: TaskType,
prompt: Optional[str],
output_structure: Optional[Dict[str, str]],
classes: Optional[List[str]],
roboflow_api_key: Optional[str],
openai_api_key: Optional[str],
gpt_model_version: str,
gpt_image_detail: Literal["auto", "high", "low"],
max_tokens: int,
temperature: Optional[int],
max_concurrent_requests: Optional[int],
) -> List[str]:
if task_type not in PROMPT_BUILDERS:
raise ValueError(f"Task type: {task_type} not supported.")
gpt4_prompts = []
for image in images:
loaded_image, _ = load_image(image)
base64_image = base64.b64encode(
encode_image_to_jpeg_bytes(loaded_image)
).decode("ascii")
generated_prompt = PROMPT_BUILDERS[task_type](
base64_image=base64_image,
prompt=prompt,
output_structure=output_structure,
classes=classes,
gpt_image_detail=gpt_image_detail,
)
gpt4_prompts.append(generated_prompt)
return execute_gpt_4v_requests(
roboflow_api_key=roboflow_api_key,
openai_api_key=openai_api_key,
gpt4_prompts=gpt4_prompts,
gpt_model_version=gpt_model_version,
max_tokens=max_tokens,
temperature=temperature,
max_concurrent_requests=max_concurrent_requests,
)
def execute_gpt_4v_requests(
roboflow_api_key:str,
openai_api_key: str,
gpt4_prompts: List[List[dict]],
gpt_model_version: str,
max_tokens: int,
temperature: Optional[float],
max_concurrent_requests: Optional[int],
) -> List[str]:
tasks = [
partial(
execute_gpt_4v_request,
roboflow_api_key=roboflow_api_key,
openai_api_key=openai_api_key,
prompt=prompt,
gpt_model_version=gpt_model_version,
max_tokens=max_tokens,
temperature=temperature,
)
for prompt in gpt4_prompts
]
max_workers = (
max_concurrent_requests
or WORKFLOWS_REMOTE_EXECUTION_MAX_STEP_CONCURRENT_REQUESTS
)
return run_in_parallel(
tasks=tasks,
max_workers=max_workers,
)
def _execute_proxied_openai_request(
roboflow_api_key: str,
openai_api_key: str,
prompt: List[dict],
gpt_model_version: str,
max_tokens: int,
temperature: Optional[float],
) -> str:
"""Executes OpenAI request via Roboflow proxy."""
payload = {
"model": gpt_model_version,
"messages": prompt,
"max_tokens": max_tokens,
"openai_api_key": openai_api_key,
}
if temperature is not None:
payload["temperature"] = temperature
try:
endpoint = f"{API_BASE_URL}/apiproxy/openai?api_key={roboflow_api_key}"
response = requests.post(endpoint, json=payload)
response.raise_for_status()
response_data = response.json()
return response_data["choices"][0]["message"]["content"]
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to connect to Roboflow proxy: {e}") from e
except (KeyError, IndexError) as e:
raise RuntimeError(
f"Invalid response structure from Roboflow proxy: {e} - Response: {response.text}"
) from e
def _execute_openai_request(
openai_api_key: str,
prompt: List[dict],
gpt_model_version: str,
max_tokens: int,
temperature: Optional[float],
) -> str:
"""Executes OpenAI request directly."""
temp_value = temperature if temperature is not None else NOT_GIVEN
try:
client = OpenAI(api_key=openai_api_key)
response = client.chat.completions.create(
model=gpt_model_version,
messages=prompt,
max_tokens=max_tokens,
temperature=temp_value,
)
return response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"OpenAI API request failed: {e}") from e
def execute_gpt_4v_request(
roboflow_api_key: str,
openai_api_key: str,
prompt: List[dict],
gpt_model_version: str,
max_tokens: int,
temperature: Optional[float],
) -> str:
if openai_api_key.startswith("rf_key:account") or openai_api_key.startswith(
"rf_key:user:"
):
return _execute_proxied_openai_request(
roboflow_api_key=roboflow_api_key,
openai_api_key=openai_api_key,
prompt=prompt,
gpt_model_version=gpt_model_version,
max_tokens=max_tokens,
temperature=temperature,
)
else:
return _execute_openai_request(
openai_api_key=openai_api_key,
prompt=prompt,
gpt_model_version=gpt_model_version,
max_tokens=max_tokens,
temperature=temperature,
)
def prepare_unconstrained_prompt(
base64_image: str,
prompt: str,
gpt_image_detail: str,
**kwargs,
) -> List[dict]:
return [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": gpt_image_detail,
},
},
],
}
]
def prepare_classification_prompt(
base64_image: str, classes: List[str], gpt_image_detail: str, **kwargs
) -> List[dict]:
serialised_classes = ", ".join(classes)
return [
{
"role": "system",
"content": "You act as single-class classification model. You must provide reasonable predictions. "
"You are only allowed to produce JSON document in Markdown ```json [...]``` markers. "
'Expected structure of json: {"class_name": "class-name", "confidence": 0.4}. '
"`class-name` must be one of the class names defined by user. You are only allowed to return "
"single JSON document, even if there are potentially multiple classes. You are not allowed to return list.",
},
{
"role": "user",
"content": [
{
"type": "text",
"text": f"List of all classes to be recognised by model: {serialised_classes}",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": gpt_image_detail,
},
},
],
},
]
def prepare_multi_label_classification_prompt(
base64_image: str, classes: List[str], gpt_image_detail: str, **kwargs
) -> List[dict]:
serialised_classes = ", ".join(classes)
return [
{
"role": "system",
"content": "You act as multi-label classification model. You must provide reasonable predictions. "
"You are only allowed to produce JSON document in Markdown ```json``` markers. "
'Expected structure of json: {"predicted_classes": [{"class": "class-name-1", "confidence": 0.9}, '
'{"class": "class-name-2", "confidence": 0.7}]}. '
"`class-name-X` must be one of the class names defined by user and `confidence` is a float value in range "
"0.0-1.0 that represent how sure you are that the class is present in the image. Only return class names "
"that are visible.",
},
{
"role": "user",
"content": [
{
"type": "text",
"text": f"List of all classes to be recognised by model: {serialised_classes}",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": gpt_image_detail,
},
},
],
},
]
def prepare_vqa_prompt(
base64_image: str, prompt: str, gpt_image_detail: str, **kwargs
) -> List[dict]:
return [
{
"role": "system",
"content": "You act as Visual Question Answering model. Your task is to provide answer to question"
"submitted by user. If this is open-question - answer with few sentences, for ABCD question, "
"return only the indicator of the answer.",
},
{
"role": "user",
"content": [
{"type": "text", "text": f"Question: {prompt}"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": gpt_image_detail,
},
},
],
},
]
def prepare_ocr_prompt(
base64_image: str, gpt_image_detail: str, **kwargs
) -> List[dict]:
return [
{
"role": "system",
"content": "You act as OCR model. Your task is to read text from the image and return it in "
"paragraphs representing the structure of texts in the image. You should only return "
"recognised text, nothing else.",
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": gpt_image_detail,
},
},
],
},
]
def prepare_caption_prompt(
base64_image: str, gpt_image_detail: str, short_description: bool, **kwargs
) -> List[dict]:
caption_detail_level = "Caption should be short."
if not short_description:
caption_detail_level = "Caption should be extensive."
return [
{
"role": "system",
"content": f"You act as image caption model. Your task is to provide description of the image. "
f"{caption_detail_level}",
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": gpt_image_detail,
},
},
],
},
]
def prepare_structured_answering_prompt(
base64_image: str, output_structure: Dict[str, str], gpt_image_detail: str, **kwargs
) -> List[dict]:
output_structure_serialised = json.dumps(output_structure, indent=4)
return [
{
"role": "system",
"content": "You are supposed to produce responses in JSON wrapped in Markdown markers: "
"```json\nyour-response\n```. User is to provide you dictionary with keys and values. "
"Each key must be present in your response. Values in user dictionary represent "
"descriptions for JSON fields to be generated. Provide only JSON Markdown in response.",
},
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Specification of requirements regarding output fields: \n"
f"{output_structure_serialised}",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": gpt_image_detail,
},
},
],
},
]
PROMPT_BUILDERS = {
"unconstrained": prepare_unconstrained_prompt,
"ocr": prepare_ocr_prompt,
"visual-question-answering": prepare_vqa_prompt,
"caption": partial(prepare_caption_prompt, short_description=True),
"detailed-caption": partial(prepare_caption_prompt, short_description=False),
"classification": prepare_classification_prompt,
"multi-label-classification": prepare_multi_label_classification_prompt,
"structured-answering": prepare_structured_answering_prompt,
}