I implemented the code according to the documentation. After launching everything works in principle, but there are some problems with integration and loading predictions back to the studio.
Error : ml-backend | [2024-11-16 21:15:43,846] [ERROR] [model::process_task::106] Task 10044: Object of type PredictionValue is not JSON serializable
I don’t understand why the studio server doesn’t have time to process requests from the backend (maybe it does, but I’d like to know about best practices on this issue).
Also I would be grateful if you could tell me how to realize correctly the batch processing of tasks.
**The markup model itself works in manual mode
model.py
import os
import requests
import numpy as np
from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.response import ModelResponse
from uuid import uuid4
from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.response import ModelResponse
from label_studio_sdk.converter import brush
from label_studio_sdk._extensions.label_studio_tools.core.utils.io import get_local_path
from typing import List, Dict, Optional
import logging
import time
from PIL import Image
import cv2
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class NewModel(LabelStudioMLBase):
"""Custom ML Backend model for Label Studio.
This model generates a mask by removing the white background from images.
Attributes:
DEVICE (str): Device type for computation, e.g., 'cuda'.
LOWER_WHITE (tuple): Lower bound for white color in HSV space.
UPPER_WHITE (tuple): Upper bound for white color in HSV space.
KERNEL_SIZE (tuple): Size of the kernel for morphological operations.
"""
DEVICE = os.getenv('DEVICE', 'cuda')
LOWER_WHITE = (0, 0, 200) # Lower bound for white in HSV
UPPER_WHITE = (180, 20, 255) # Upper bound for white in HSV
KERNEL_SIZE = (5, 5) # Kernel size for morphological operations
def __init__(self, **kwargs):
super(NewModel, self).__init__(**kwargs)
self.label_studio_url = os.getenv("LABEL_STUDIO_URL")
self.api_key = os.getenv("LABEL_STUDIO_API_KEY")
self.project_id = os.getenv("LABEL_STUDIO_PROJECT_ID")
self.batch_size = 100
# Инициализация предсказаний для задач без предсказаний
self.initialize_predictions()
def initialize_predictions(self):
"""Инициализация предсказаний для задач, не имеющих предсказаний."""
logger.info("Инициализация предсказаний для задач без аннотаций...")
offset = 0
while True:
# Получение очередной партии задач
tasks = self.fetch_project_tasks(offset, self.batch_size)
if not tasks:
logger.info("Больше задач для обработки нет.")
break
# Фильтрация задач без предсказаний
tasks_without_predictions = [
task for task in tasks if "predictions" not in task or not task["predictions"]
]
logger.info(f"Обработка партии: найдено {len(tasks_without_predictions)} задач без предсказаний.")
# Если нет задач без предсказаний, переход к следующей партии
if not tasks_without_predictions:
offset += self.batch_size
continue
# Обработка задач по одной
for task in tasks_without_predictions:
logger.info(f"Обработка задачи ID: {task['id']}")
self.process_task(task)
offset += self.batch_size
logger.info(f"Партия завершена. Переход к следующей партии (offset={offset}).")
time.sleep(1) # Небольшая задержка для предотвращения перегрузки API
def fetch_project_tasks(self, offset: int, limit: int):
"""Получить задачи проекта партиями через API."""
endpoint = f"{self.label_studio_url}/api/projects/{self.project_id}/tasks"
headers = {"Authorization": f"Token {self.api_key}"}
params = {"offset": offset, "limit": limit}
try:
response = requests.get(endpoint, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Ошибка при получении задач: {e}")
return []
def process_task(self, task: Dict):
"""Обработка задачи для создания предсказания."""
try:
prediction = self.predict([task])
logger.info(f"Предсказание для задачи {task['id']} успешно создано.")
except Exception as e:
logger.error(f"Ошибка при обработке задачи {task['id']}: {e}")
def predict(self, tasks: List[Dict], context: Optional[Dict] = None, **kwargs) -> ModelResponse:
"""Make predictions based on the input tasks.
Args:
tasks (List[Dict]): The list of tasks containing image data.
context (Optional[Dict]): Optional context for the prediction.
Returns:
ModelResponse: The response containing predictions.
"""
logger.debug(f"Tasks received: {tasks}")
logger.debug(f"Context received: {context}")
from_name, to_name, value = self.get_first_tag_occurence('BrushLabels', 'Image')
logger.debug(f"From name: {from_name}, To name: {to_name}, Value: {value}")
# Устанавливаем значения по умолчанию для ширины и высоты, если `context` пустой
if context and context.get('result'):
image_width = context['result'][0]['original_width']
image_height = context['result'][0]['original_height']
else:
logger.warning("Context is empty; using default image dimensions.")
image_width, image_height = 1920, 1080 # Default dimensions for the mask
# Проверка URL изображения
img_url = tasks[0]['data'].get(value)
if not img_url:
logger.error("Image URL not found in tasks data.")
return ModelResponse(predictions=[])
logger.debug(f"Image URL: {img_url}")
mask = self.set_image(img_url, tasks[0].get('id'))
predictions = self.get_results(
mask=mask,
width=image_width,
height=image_height,
from_name=from_name,
to_name=to_name,
label="object" # Default label
)
return ModelResponse(predictions=predictions)
def set_image(self, image_url, task_id):
"""Set the image for processing and create a mask based on the white background removal.
Args:
image_url (str): URL of the image to process.
task_id (str): The task identifier for the image.
Returns:
ndarray: The generated mask image.
"""
logger.debug(f"Получение локального пути для URL изображения: {image_url}")
image_path = get_local_path(image_url, task_id=task_id)
logger.debug(f"Путь к изображению: {image_path}")
# Открываем изображение и конвертируем его в формат BGR для OpenCV
image = Image.open(image_path).convert("RGB")
image_np = np.array(image)
image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
# Переводим изображение в цветовое пространство HSV
hsv = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2HSV)
logger.debug("Изображение переведено в HSV цветовое пространство")
# Создаем маску для белых участков
white_mask = cv2.inRange(hsv, self.LOWER_WHITE, self.UPPER_WHITE)
logger.debug(f"Маска белых областей создана. Размер маски: {white_mask.shape}")
# Инвертируем маску для выделения объекта, а не фона
object_mask = cv2.bitwise_not(white_mask)
logger.debug("Маска инвертирована для выделения объекта")
# Применяем морфологическое закрытие для устранения шумов
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, self.KERNEL_SIZE)
object_mask = cv2.morphologyEx(object_mask, cv2.MORPH_CLOSE, kernel)
logger.debug("Применено морфологическое закрытие для устранения шумов")
return object_mask
def get_results(self, mask, width, height, from_name, to_name, label):
"""Generate results based on the processed mask.
Args:
mask (ndarray): The mask image where the object is highlighted.
width (int): The width of the original image.
height (int): The height of the original image.
from_name (str): The label source name.
to_name (str): The label target name.
label (str): The label for the detected object.
Returns:
List[Dict]: A list containing the formatted results.
"""
label_id = str(uuid4())[:4]
rle = brush.mask2rle(mask)
return [{
'result': [{
'id': label_id,
'from_name': from_name,
'to_name': to_name,
'original_width': width,
'original_height': height,
'image_rotation': 0,
'value': {
'format': 'rle',
'rle': rle,
'brushlabels': [label],
},
'type': 'brushlabels',
'readonly': False
}],
'model_version': self.get('model_version'),
'score': 1.0 # Assuming maximum confidence
}]