Guide
This guide covers all the features and usage patterns of django-vtasks.
Defining Tasks
Create a tasks.py in your Django app and use the @task decorator:
# myapp/tasks.py
from django_vtasks import task
@task
def send_email(to: str, subject: str, body: str):
"""Send an email."""
# Your email sending logic
pass
@task
async def process_image(image_id: int):
"""Process an image asynchronously."""
# Async tasks are processed natively without thread overhead
pass
Note
You can also use from django.tasks import task, but this won't allow vtasks-specific features like unique.
Enqueueing Tasks
Basic Enqueue
Both sync and async contexts are supported. Async performs better as it fully leverages the asyncio loop.
from myapp.tasks import send_email
# Synchronous
result = send_email.enqueue("user@example.com", "Hello", "Welcome!")
# Asynchronous (preferred)
result = await send_email.aenqueue("user@example.com", "Hello", "Welcome!")
Using the .using() API
The .using() method lets you configure task execution options:
# Route to a specific queue
await send_email.using(queue_name="emails").aenqueue(
"user@example.com", "Hello", "Welcome!"
)
# Set priority
await send_email.using(priority=10).aenqueue(
"admin@example.com", "Urgent", "Server down!"
)
# Combine multiple options
await send_email.using(
queue_name="high-priority",
priority=10,
unique=True,
).aenqueue("admin@example.com", "Alert", "Important message")
Bulk Enqueueing
For high-performance scenarios where you need to dispatch many tasks at once, use enqueue_many or aenqueue_many:
from django.tasks import task_backends
from myapp.tasks import process_user, cleanup_job
tasks_to_send = [
# (task, args, kwargs)
(process_user, (user1.id,), {}),
(process_user, (user2.id,), {}),
(cleanup_job, (), {"queue_name": "low_priority"}),
]
# Async context
await task_backends["default"].aenqueue_many(tasks_to_send)
# Sync context (usually within a transaction)
task_backends["default"].enqueue_many(tasks_to_send)
Tasks are automatically grouped by queue name and sent in optimized batches using Valkey's variadic LPUSH or Postgres's bulk_create.
Unique Tasks
Prevent duplicate tasks from being enqueued using the unique parameter. This is useful for tasks that should only run once at a time.
Mutex Mode (Default)
The lock is released as soon as the task finishes:
# Auto-generated unique key from task name + args
result = await send_email.using(unique=True).aenqueue(
"user@example.com", "Welcome", "..."
)
# If the task is already queued or running, this returns None
result2 = await send_email.using(unique=True).aenqueue(
"user@example.com", "Welcome", "..."
)
# result2 is None (rejected as duplicate)
Custom Unique Keys
Use a custom key for finer control over uniqueness:
result = await send_email.using(
unique=True,
unique_key=f"email-user-{user_id}"
).aenqueue("user@example.com", "Welcome", "...")
Throttling Mode
Keep the lock for a specified TTL even after the task completes. This is useful for rate-limiting:
# Only allow 1 API call per 60 seconds
await fetch_data.using(
unique=True,
unique_key=f"api-fetch-{user_id}",
ttl=60,
remove_unique_on_complete=False, # Throttle mode
).aenqueue(user_id)
Handling Rejections
Always check for None when using unique tasks:
result = await send_email.using(unique=True).aenqueue(user.id)
if result is None:
# Task was rejected (duplicate found)
return JsonResponse({"error": "Email already being sent"}, status=429)
else:
# Task was enqueued successfully
return JsonResponse({"task_id": result.id})
Backend Support for Unique Tasks
| Mode | Database (PostgreSQL) | Database (SQLite) | Database (MySQL) | Valkey |
|---|---|---|---|---|
| Mutex | Yes | Yes | No | Yes |
| Throttling | No | No | No | Yes |
Priority
Tasks can be assigned a priority to influence execution order:
# High priority task (processed first)
await send_email.using(priority=10).aenqueue(user.id)
# Low priority task
await send_email.using(priority=-10).aenqueue(user.id)
Database Backend: Supports full integer sorting between -100 and 100 (default 0). Tasks are ordered by priority (descending), then by creation time.
Valkey Backend: Supports binary priority:
- High Priority (> 0): Tasks are pushed to the front of the queue ("Express Lane"). Multiple high-priority tasks are processed LIFO.
- Normal Priority (<= 0): Tasks are pushed to the back and processed FIFO.
Periodic Tasks
Schedule tasks to run on a cron-like schedule:
# settings.py
from django_vtasks.scheduler import crontab
VTASKS_SCHEDULE = {
"daily_report": {
"task": "myapp.tasks.report",
"schedule": crontab(hour=5, minute=0), # Runs at 5:00 AM
},
"cleanup": {
"task": "myapp.tasks.cleanup",
"schedule": 3600, # Runs every hour (in seconds)
},
}
Run the worker with the scheduler enabled:
python manage.py runworker --scheduler
Scheduler Safety
For deployments running multiple scheduler instances, you must use one of these backends:
ValkeyTaskBackendDatabaseTaskBackendwith PostgreSQL or MySQL
Using SQLite with multiple schedulers may result in duplicate task runs.
Batch Processing
For high-throughput scenarios, batch processing allows the worker to fetch multiple tasks at once and process them in a single function call.
1. Configure a Batch Queue
# settings.py
VTASKS_BATCH_QUEUES = {
"batch_queue": {
"count": 100, # Max tasks to fetch at once
"timeout": 5.0, # Max seconds to wait for tasks
}
}
2. Create a Batch Processing Task
Your task must accept a list of task dictionaries:
@task
def process_widgets_batch(tasks: list[dict]):
"""Processes a batch of widgets."""
widget_ids = [task["kwargs"]["widget_id"] for task in tasks]
Widget.objects.filter(id__in=widget_ids).update(processed=True)
print(f"Processed {len(widget_ids)} widgets")
3. Enqueue to the Batch Queue
for i in range(10):
process_widgets_batch.using(queue_name="batch_queue").enqueue(widget_id=i)
The worker collects tasks (up to count or until timeout), groups them by task type, and calls your function with the batch.
Metrics & Observability
django-vtasks provides optional Prometheus metrics.
Installation
pip install "django-vtasks[metrics]"
Available Metrics
All metrics are prefixed with vtasks_.
| Metric | Type | Labels | Description |
|---|---|---|---|
tasks_submitted_total |
Counter | task_name, queue |
Tasks enqueued |
tasks_processed_total |
Counter | task_name, queue, status |
Tasks processed |
task_duration_seconds |
Histogram | task_name, queue |
Execution time |
active_tasks |
Gauge | queue |
Currently processing |
queue_depth |
Gauge | queue |
Tasks waiting |
Enabling Metrics
For standalone workers:
python manage.py runworker --metrics-port 9100
For embedded workers, metrics are exposed via your application's /metrics endpoint (e.g., using django-prometheus).
Queue Management
Clearing Queues
For debugging or emergencies, clear tasks from queues:
# Clear a specific queue
python manage.py clear_queue --backend-alias=default --queue=default
# Clear all queues
python manage.py clear_queue --backend-alias=default --all-queues --force
# Clear failed tasks (DLQ)
python manage.py clear_queue --backend-alias=default --failed --force
Testing
For unit tests, use the immediate backend to execute tasks synchronously:
from django.test import TestCase, override_settings
@override_settings(
TASKS={
"default": {
"BACKEND": "django_vtasks.backends.immediate.ImmediateBackend",
}
}
)
class MyTaskTests(TestCase):
def test_something(self):
# Tasks run immediately and synchronously
my_task.enqueue()
# ... assertions
Testing Batch Tasks
The ImmediateBackend stores batch queue tasks in memory. Use flush_batch() to process them:
@override_settings(
TASKS={"default": {"BACKEND": "django_vtasks.backends.immediate.ImmediateBackend"}},
VTASKS_BATCH_QUEUES={"batch_queue": {"count": 100, "timeout": 5.0}}
)
class MyBatchingTest(TestCase):
def test_batching(self):
backend = task_backends["default"]
for i in range(5):
process_batch.using(queue_name="batch_queue").enqueue(item_id=i)
# Tasks are stored, not executed
self.assertEqual(len(backend.pending_batches["batch_queue"]), 5)
# Manually trigger batch processing
backend.flush_batch("batch_queue")
# Now verify your batch logic ran
self.assertEqual(len(backend.pending_batches["batch_queue"]), 0)