Need help in debugging ML Backend automatization

I implemented the code according to the documentation. After launching everything works in principle, but there are some problems with integration and loading predictions back to the studio.

Error : ml-backend | [2024-11-16 21:15:43,846] [ERROR] [model::process_task::106] Task 10044: Object of type PredictionValue is not JSON serializable

I don’t understand why the studio server doesn’t have time to process requests from the backend (maybe it does, but I’d like to know about best practices on this issue).

Also I would be grateful if you could tell me how to realize correctly the batch processing of tasks.

**The markup model itself works in manual mode

model.py

import os
import requests
import numpy as np
from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.response import ModelResponse
from uuid import uuid4
from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.response import ModelResponse
from label_studio_sdk.converter import brush
from label_studio_sdk._extensions.label_studio_tools.core.utils.io import get_local_path
from typing import List, Dict, Optional
import logging
import time
from PIL import Image
import cv2

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)



class NewModel(LabelStudioMLBase):
    """Custom ML Backend model for Label Studio.
    
    This model generates a mask by removing the white background from images.
    
    Attributes:
        DEVICE (str): Device type for computation, e.g., 'cuda'.
        LOWER_WHITE (tuple): Lower bound for white color in HSV space.
        UPPER_WHITE (tuple): Upper bound for white color in HSV space.
        KERNEL_SIZE (tuple): Size of the kernel for morphological operations.
    """

    DEVICE = os.getenv('DEVICE', 'cuda')
    LOWER_WHITE = (0, 0, 200)      # Lower bound for white in HSV
    UPPER_WHITE = (180, 20, 255)   # Upper bound for white in HSV
    KERNEL_SIZE = (5, 5)           # Kernel size for morphological operations

    def __init__(self, **kwargs):
        super(NewModel, self).__init__(**kwargs)
        self.label_studio_url = os.getenv("LABEL_STUDIO_URL")
        self.api_key = os.getenv("LABEL_STUDIO_API_KEY")
        self.project_id = os.getenv("LABEL_STUDIO_PROJECT_ID")
        self.batch_size = 100

        # Инициализация предсказаний для задач без предсказаний
        self.initialize_predictions()

    def initialize_predictions(self):
        """Инициализация предсказаний для задач, не имеющих предсказаний."""
        logger.info("Инициализация предсказаний для задач без аннотаций...")

        offset = 0
        while True:
            # Получение очередной партии задач
            tasks = self.fetch_project_tasks(offset, self.batch_size)
            if not tasks:
                logger.info("Больше задач для обработки нет.")
                break

            # Фильтрация задач без предсказаний
            tasks_without_predictions = [
                task for task in tasks if "predictions" not in task or not task["predictions"]
            ]
            logger.info(f"Обработка партии: найдено {len(tasks_without_predictions)} задач без предсказаний.")

            # Если нет задач без предсказаний, переход к следующей партии
            if not tasks_without_predictions:
                offset += self.batch_size
                continue

            # Обработка задач по одной
            for task in tasks_without_predictions:
                logger.info(f"Обработка задачи ID: {task['id']}")
                self.process_task(task)
            
            offset += self.batch_size
            logger.info(f"Партия завершена. Переход к следующей партии (offset={offset}).")
            time.sleep(1)  # Небольшая задержка для предотвращения перегрузки API

    def fetch_project_tasks(self, offset: int, limit: int):
        """Получить задачи проекта партиями через API."""
        endpoint = f"{self.label_studio_url}/api/projects/{self.project_id}/tasks"
        headers = {"Authorization": f"Token {self.api_key}"}
        params = {"offset": offset, "limit": limit}

        try:
            response = requests.get(endpoint, headers=headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"Ошибка при получении задач: {e}")
            return []

    def process_task(self, task: Dict):
        """Обработка задачи для создания предсказания."""
        try:
            prediction = self.predict([task])
            logger.info(f"Предсказание для задачи {task['id']} успешно создано.")
        except Exception as e:
            logger.error(f"Ошибка при обработке задачи {task['id']}: {e}")



    def predict(self, tasks: List[Dict], context: Optional[Dict] = None, **kwargs) -> ModelResponse:
        """Make predictions based on the input tasks.
        
        Args:
            tasks (List[Dict]): The list of tasks containing image data.
            context (Optional[Dict]): Optional context for the prediction.

        Returns:
            ModelResponse: The response containing predictions.
        """
        logger.debug(f"Tasks received: {tasks}")
        logger.debug(f"Context received: {context}")

        from_name, to_name, value = self.get_first_tag_occurence('BrushLabels', 'Image')
        logger.debug(f"From name: {from_name}, To name: {to_name}, Value: {value}")

        # Устанавливаем значения по умолчанию для ширины и высоты, если `context` пустой
        if context and context.get('result'):
            image_width = context['result'][0]['original_width']
            image_height = context['result'][0]['original_height']
        else:
            logger.warning("Context is empty; using default image dimensions.")
            image_width, image_height = 1920, 1080  # Default dimensions for the mask

        # Проверка URL изображения
        img_url = tasks[0]['data'].get(value)
        if not img_url:
            logger.error("Image URL not found in tasks data.")
            return ModelResponse(predictions=[])

        logger.debug(f"Image URL: {img_url}")
        mask = self.set_image(img_url, tasks[0].get('id'))

        predictions = self.get_results(
            mask=mask,
            width=image_width,
            height=image_height,
            from_name=from_name,
            to_name=to_name,
            label="object"  # Default label
        )
        
        return ModelResponse(predictions=predictions)

    def set_image(self, image_url, task_id):
        """Set the image for processing and create a mask based on the white background removal.
        
        Args:
            image_url (str): URL of the image to process.
            task_id (str): The task identifier for the image.

        Returns:
            ndarray: The generated mask image.
        """
        logger.debug(f"Получение локального пути для URL изображения: {image_url}")
        image_path = get_local_path(image_url, task_id=task_id)
        logger.debug(f"Путь к изображению: {image_path}")

        # Открываем изображение и конвертируем его в формат BGR для OpenCV
        image = Image.open(image_path).convert("RGB")
        image_np = np.array(image)
        image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)

        # Переводим изображение в цветовое пространство HSV
        hsv = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2HSV)
        logger.debug("Изображение переведено в HSV цветовое пространство")

        # Создаем маску для белых участков
        white_mask = cv2.inRange(hsv, self.LOWER_WHITE, self.UPPER_WHITE)
        logger.debug(f"Маска белых областей создана. Размер маски: {white_mask.shape}")

        # Инвертируем маску для выделения объекта, а не фона
        object_mask = cv2.bitwise_not(white_mask)
        logger.debug("Маска инвертирована для выделения объекта")

        # Применяем морфологическое закрытие для устранения шумов
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, self.KERNEL_SIZE)
        object_mask = cv2.morphologyEx(object_mask, cv2.MORPH_CLOSE, kernel)
        logger.debug("Применено морфологическое закрытие для устранения шумов")

        return object_mask

    def get_results(self, mask, width, height, from_name, to_name, label):
        """Generate results based on the processed mask.
        
        Args:
            mask (ndarray): The mask image where the object is highlighted.
            width (int): The width of the original image.
            height (int): The height of the original image.
            from_name (str): The label source name.
            to_name (str): The label target name.
            label (str): The label for the detected object.

        Returns:
            List[Dict]: A list containing the formatted results.
        """
        label_id = str(uuid4())[:4]
        rle = brush.mask2rle(mask)

        return [{
            'result': [{
                'id': label_id,
                'from_name': from_name,
                'to_name': to_name,
                'original_width': width,
                'original_height': height,
                'image_rotation': 0,
                'value': {
                    'format': 'rle',
                    'rle': rle,
                    'brushlabels': [label],
                },
                'type': 'brushlabels',
                'readonly': False
            }],
            'model_version': self.get('model_version'),
            'score': 1.0  # Assuming maximum confidence
        }]