Inferencia de aprendizaje automático asincrónico con Celery, Redis y Florence 2 | de Youness Mansar | julio de 2024

Inferencia de aprendizaje automático asincrónico con Celery, Redis y Florence 2 |  de Youness Mansar |  julio de 2024

Un tutorial sencillo para comenzar con la inferencia de ML asincrónica

Foto por Fabien BELLANGER seguro desempaquetar

La mayoría de los tutoriales sobre servicios de aprendizaje automático se centran en el servicio sincrónico en tiempo real, lo que ayuda a responder a las solicitudes de predicción de inmediato. Sin embargo, este enfoque puede experimentar picos de tráfico y no es ideal para tareas de larga duración. También requiere máquinas más potentes para responder rápidamente y, si el cliente o el servidor falla, el resultado de la predicción generalmente se pierde.

En esta publicación de blog, mostraremos cómo ejecutar un modelo de aprendizaje automático como trabajador asincrónico usando Celery y Redis. Usaremos el modelo base Florence 2, un modelo de lenguaje Vision conocido por su impresionante rendimiento. Este tutorial proporcionará un ejemplo mínimo pero funcional que puede adaptar y ampliar para sus propios casos de uso.

Puedes ver una demostración de la aplicación aquí:

El corazón de nuestra solución se basa en Celery, una biblioteca de Python que implementa esta lógica cliente/trabajador por nosotros. Nos permite distribuir el trabajo computacional entre muchos trabajadores, mejorando la escalabilidad de su caso de uso de inferencia de ML a cargas altas e impredecibles.

El proceso funciona de la siguiente manera:

  1. El cliente envía una tarea con ciertos parámetros a una cola administrada por el corredor (Redis en nuestro ejemplo).
  2. Uno o más trabajadores monitorean constantemente la cola y recogen las tareas a medida que llegan. Luego los ejecuta y guarda el resultado en el almacenamiento backend.
  3. El cliente puede recuperar el resultado de la tarea utilizando su ID, ya sea consultando el backend o suscribiéndose al canal de tareas.

Comencemos con un ejemplo simplificado:

Imagen del autor

Primero, ejecute Redis:

docker run -p 6379:6379 redis

Aquí está el código de trabajador:

from celery import Celery
# Configure Celery to use Redis as the broker and backend
app = Celery(
"tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# Define a simple task
@app.task
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info"])

Y el código de cliente:

from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.control.inspect().active()=}")
task_name = "tasks.add"
add = app.signature(task_name)
print("Gotten Task")
# Send a task to the worker
result = add.delay(4, 6)
print("Waiting for Task")
result.wait()
# Get the result
print(f"Result: {result.result}")

Esto da el resultado que esperamos: «Resultado: 10»

Ahora pasemos al caso de uso real: servir a Florence 2.

Crearemos una aplicación de subtítulos de imágenes de contenedores múltiples que utilice Redis para la cola de trabajos, Celery para la distribución de trabajos y un volumen local o Google Cloud Storage para el posible almacenamiento de imágenes. La aplicación está diseñada con algunos componentes básicos: inferencia de modelos, distribución de tareas, interacción con el cliente y almacenamiento de archivos.

Presentación de la arquitectura:

Imagen del autor
  1. Cliente: Inicia solicitudes de subtítulos de imágenes enviándolas al trabajador (a través del intermediario).
  2. Obrero: Recibe solicitudes, carga imágenes, realiza inferencias utilizando el modelo previamente entrenado y devuelve resultados.
  3. Decir de nuevo: Actúa como intermediario de mensajes facilitando la comunicación entre cliente y trabajador.
  4. Almacenamiento de archivos: Almacenamiento temporal de archivos de imagen

Distribución de componentes:

1. Inferencia del modelo (model.py):

  • Dependencias e inicialización:
import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Image
from processing_florence2 import Florence2Processor
model = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")
  • Importa bibliotecas necesarias para el procesamiento de imágenes, solicitudes web, interacción con Google Cloud Storage y registro.
  • Inicializa el modelo y el procesador Florence-2 previamente entrenado para la generación de subtítulos de imágenes.
  • Descarga de imagen (download_image):
def download_image(url):
if url.startswith(" or url.startswith("https://"):
# Handle HTTP/HTTPS URLs
# ... (code to download image from URL) ...
elif url.startswith("gs://"):
# Handle Google Cloud Storage paths
# ... (code to download image from GCS) ...
else:
# Handle local file paths
# ... (code to open image from local path) ...
  • Descarga la imagen desde la URL proporcionada.
  • Admite URL HTTP/HTTPS, rutas de almacenamiento de Google Cloud (gs://) y rutas de archivos locales.
  • Ejecutar inferencia (run_inference):
def run_inference(url, task_prompt):
# ... (code to download image using download_image function) ...
try:
# ... (code to open and process the image) ...
inputs = processor(text=task_prompt, images=image, return_tensors="pt")
except ValueError:
# ... (error handling) ...
# ... (code to generate captions using the model) ...
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
# ... (model generation parameters) ...
)
# ... (code to decode generated captions) ...
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
# ... (code to post-process generated captions) ...
parsed_answer = processor.post_process_generation(
generated_text, task=task_prompt, image_size=(image.width, image.height)
)
return parsed_answer

Orquesta el proceso de subtítulos de imágenes:

  • Descarga la imagen usando download_image.
  • Prepara la imagen y el mensaje de tarea para el modelo.
  • Genera leyendas utilizando la plantilla Florence-2 cargada.
  • Decodifica y posprocesa los subtítulos generados.
  • Devuelve el título final.

2. Distribución de tareas (worker.py):

import os
from celery import Celery
# ... other imports ...
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to use Redis as the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
# ... (Celery configurations) ...
  • Configura Celery para usar Redis como intermediario de mensajes para la distribución de tareas.
  • Definición de tarea (inference_task):
@app.task(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
# ... (logging and error handling) ...
return run_inference(url, task_prompt)
  • Define el inference_task el cual será realizado por los trabajadores de Apio.
  • Esta tarea llama a la run_inference funcion de model.py.
  • Ejecución de trabajadores:
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info", "--pool=solo"])
  • Inicia un trabajador de Apio que escucha y ejecuta tareas.

3. Interacción con el cliente (client.py):

import os
from celery import Celery
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to use Redis as the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
  • Establece una conexión con Celery utilizando Redis como intermediario de mensajes.
  • Envío de tareas (send_inference_task):
def send_inference_task(url, task_prompt):
task = inference_task.delay(url, task_prompt)
print(f"Task sent with ID: {task.id}")
# Wait for the result
result = task.get(timeout=120)
return result
  • Envía una tarea de subtítulos de imágenes (inference_task) al trabajador del apio.
  • Espera a que el trabajador complete la tarea y recupera el resultado.

Integración de Docker (docker-compose.yml):

  • Define una configuración de múltiples contenedores usando Docker Compose:
  • decir de nuevo: Ejecuta el servidor Redis para la intermediación de mensajes.
  • modelo: Crea e implementa el trabajador de inferencia de modelos.
  • solicitud: Crea e implementa la aplicación cliente.
Imagen de flor por RoonZ es seguro desempaquetar
  • flor: Ejecuta una herramienta de monitoreo de tareas de Apio basada en la web.
Imagen del autor

Puede ejecutar la pila completa usando:

docker-compose up

¡Y ahí lo tienes! Acabamos de explorar una guía completa sobre cómo construir un sistema de inferencia de aprendizaje automático asincrónico usando Celery, Redis y Florence 2. Este tutorial mostró cómo usar efectivamente Celery para distribución de tareas, Redis para intermediación de mensajes y Florence 2 para subtitulado de imágenes. Al adoptar flujos de trabajo asincrónicos, puede manejar grandes volúmenes de consultas, mejorar el rendimiento y fortalecer la resiliencia general de sus aplicaciones de inferencia de ML. La configuración de Docker Compose proporcionada le permite ejecutar todo el sistema por su cuenta con un solo comando.

¿Listo para el siguiente paso? La implementación de esta arquitectura en la nube puede presentar algunos desafíos. ¡Déjame saber en los comentarios si quieres ver un artículo de seguimiento sobre la implementación de la nube!

Codificado: https://github.com/CVxTz/celery_ml_deploy
Manifestación: