Recently we faced an issue how to fairly distribute computational resources for our imaging service using Celery. The imaging service does some basic operations with image like resize. We chose Celery to distribute work among our workers. The service works with chunks of 1000 images that is processed in batch.
We wanted to avoid situation where one large client uses all processors on a worker and other smaller clients would have to wait. To achieve this we needed to create special queue for each client. Celery worker then takes tasks from all queues in round robin fashion. The problem is that we do not know what clients will be using our service in future and thus we needed to add queues for them dynamically. To dynamically add new queue and more consumer for it call following method:
from ..celery import celery as celery_app
def establish_celery_queue(image_domain, destination_workers: list):
celery_app.control.add_consumer(
queue="domain_{}".format(image_domain),
exchange='celery',
routing_key=image_domain,
reply=True, # wait for consumers to be properly created
destination=destination_workers, # notify only specific workers
options={
'queue_durable': True,
'exchange_durable': True,
'auto_delete': False,
},
)
This function just uses function app.control.add_consumer that notifies worker to start listening from queue that identifies our client, e.g. “domain_eshop.com”. Only thing we need is to call this function whenever we get message from client we don’t know (we can have it stored in set for example).
This works great but if the application is restarted we are not connected to the queues until some new message from this particular client comes. We need to automatically add all consumers on start of the service. To know what queues are already in rabbit we need to look in a rabbit management and get their names from there. We’ve used python library PyRabbit that allows access to Rabbit management api (don’t forget to add management role to the user you are accessing Rabbit). Every application startup just call method connect_domain_queues and it will add all consumers from previous instance.
from pyrabbit.api import Client
def connect_domain_queues(host, management_port, login, password,
virtual_host, destination_workers):
client = Client("{}:{}".format(host, port), login, password)
for queue in client.get_queues(virtual_host):
if queue["name"].startswith("domain_"):
domain = queue["name"][len("domain_"):]
establish_celery_queue(domain, destination_workers)
This recipe allows to have multiple clients where larger clients do not block smaller one.