Message Processing¶
How messages flow through the broker and workers.
Message Lifecycle¶
- Enqueue - Actor sends message to broker
- Route - Broker puts message in appropriate queue
- Consume - Worker pulls message from queue
- Process - Actor function executes
- Acknowledge - Worker ACKs or NACKs message
Sending Messages¶
Immediate Send¶
@dramatiq.actor
def send_email(email: str):
pass
# Basic send
send_email.send("user@example.com")
# Equivalent to
send_email.send_with_options(args=("user@example.com",))
Delayed Send¶
# Delay 5 seconds
send_email.send_with_options(
args=("user@example.com",),
delay=5000 # milliseconds
)
Priority Send¶
Message Format¶
Messages are JSON-serialized by default:
{
"queue_name": "default",
"actor_name": "send_email",
"args": ["user@example.com"],
"kwargs": {},
"options": {
"eta": 1234567890,
"broker_priority": 5
},
"message_id": "unique-id",
"message_timestamp": 1234567890
}
Acknowledgments¶
Auto-ACK (Default)¶
Worker automatically ACKs after successful processing:
Manual ACK¶
Control acknowledgment in middleware:
class ManualAckMiddleware:
def after_process_message(self, broker, message, *, result=None, exception=None):
if exception:
message.nack() # Reject and requeue
else:
message.ack() # Acknowledge
Check ACK Status¶
from dramatiq import Message
def process_message(broker, message: Message):
# Do something
if message.acknowledged:
print("Already ACKed")
Message TTL¶
Queue-Level TTL¶
Set TTL on entire queue (not recommended for work queues):
Message-Level TTL¶
Set TTL per message via expiration property (used for delays):
# Dramatiq sets this automatically for delays
channel.basic_publish(
body=message.encode(),
properties={"expiration": "5000"} # 5 seconds
)
Dead Letter Queue¶
Failed messages go to dead letter queue (DLQ):
# For queue "tasks":
# - tasks - Main queue
# - tasks.DQ - Delay queue
# - tasks.XQ - Dead letter queue (failed messages)
Inspecting DLQ¶
def check_dlq(broker, queue_name):
dlq_name = broker.topology.get_dead_letter_queue_name(queue_name)
_, count, _ = broker.get_queue_message_counts(queue_name)
print(f"Messages in DLQ: {count}")
Reprocessing DLQ¶
Manually move messages from DLQ back to main queue via RabbitMQ management UI or:
# Via rabbitmqadmin
rabbitmqadmin get queue=tasks.XQ requeue=false count=10 | \
rabbitmqadmin publish routing_key=tasks
Message Persistence¶
Messages are persistent by default (delivery_mode=2):
This ensures messages survive RabbitMQ restart.
Confirm Delivery¶
See Delivery Guarantees for how to configure message delivery confirmation.
Message Routing¶
Standard Flow¶
Delayed Flow¶
Failed Message Flow¶
Queue Depths¶
Monitor queue depths:
def monitor_queues(broker):
for queue_name in broker.get_declared_queues():
main, delay, dlq = broker.get_queue_message_counts(queue_name)
print(f"{queue_name}:")
print(f" Main: {main}")
print(f" Delay: {delay}")
print(f" DLQ: {dlq}")
Troubleshooting¶
Messages Not Processing¶
Check: 1. Worker is running 2. Worker consuming from correct queue 3. No exceptions in worker logs 4. RabbitMQ connection is healthy
Messages Stuck in Delay Queue¶
Check: - Delay queue has dead-letter configuration - Dead-letter routing key matches main queue name - Main queue exists
Messages Going to DLQ¶
Check worker logs for exceptions. Common causes: - Unhandled exceptions - Message deserialization errors - Actor not found
Best Practices¶
- Use delays sparingly - Don't delay millions of messages
- Monitor queue depths - Alert on growing queues
- Check DLQ regularly - Failed messages need attention
- Set reasonable TTLs - Don't let messages sit forever
- Enable confirm_delivery - Ensure reliable delivery
- Handle exceptions - Log errors before they reach DLQ
Next Steps¶
- Examples - Message processing examples
- Troubleshooting - Fix message issues
- Configuration - Configure message handling