Documentation Index
Fetch the complete documentation index at: https://mintlify.com/ansible/awx/llms.txt
Use this file to discover all available pages before exploring further.
The dispatcher system is responsible for executing background tasks in AWX. It manages worker processes, consumes messages from queues, and runs Python code to accomplish various tasks.
Overview
AWX uses the dispatcherd library for all task management. This is a dedicated task queue system that distributes work across machines in an AWX installation.
Key Components
┌───────────────────────────────┐
│ Task Publishers │
│ (Django Views/Signals) │
└─────────────┬──────────────────┘
│
│ publish task
▼
┌───────────────────────────────┐
│ Message Queues │
│ (PostgreSQL pg_notify) │
└─────────────┬──────────────────┘
│
│ consume
▼
┌───────────────────────────────┐
│ awx-manage │
│ dispatcherd │
│ (Dispatcher Process) │
└─────────────┬──────────────────┘
│
│ distribute to
▼
┌───────────────────────────────┐
│ Worker Pool │
│ (Child Processes) │
│ │
│ Worker 1 Worker 2 ... │
└───────────────────────────────┘
Dispatcherd Library
AWX uses the dispatcherd library for task management:
Task Decorator
Tasks are decorated using:
from dispatcherd.publish import task
@task()
def my_task(arg1, arg2):
"""A simple task function"""
return arg1 + arg2
Task Queues and Workers
Task Queue Abstraction
AWX uses a Task Queue abstraction to distribute work:
Input: Unit of work called a Task
Workers: Dedicated processes on every AWX node that monitor queues
Communication: Via distributed queues using PostgreSQL’s pg_notify
Clustered Installations
Clustered AWX installations consist of:
- Multiple workers spread across every node
- High availability
- Horizontal scaling
Message Types
Direct Messages
Bound directly to a specific named queue.
Example: Launching a Job Template
- AWX looks at available capacity
- Chooses an Execution Node
- Publishes message to node-specific queue
- Dispatcher on that node listens for events
Characteristics:
- Targeted to specific node
- Consumed by one worker process
- Used for job execution, inventory updates, etc.
Shared Direct Queues
Some direct queues are bound by every AWX node.
Example: Inventory deletion task
- Any available node may perform the work
- First available worker processes the task
Fanout Messages
Sent out in a broadcast fashion.
Example: Changing a setting in AWX API
- Message broadcast to every AWX node
- Code runs on every node
- Used for cache invalidation, configuration updates
Characteristics:
- Broadcast to all nodes
- Every node processes the message
- Ensures cluster-wide consistency
Defining Tasks
Function-Based Tasks
Simple functions decorated with @task():
from dispatcherd.publish import task
@task()
def add(a, b):
"""Add two numbers"""
return a + b
Class-Based Tasks
Classes with a run() method:
from dispatcherd.publish import task
@task()
class Adder:
def run(self, a, b):
"""Add two numbers"""
return a + b
Task Location
Tasks are defined in awx.main.tasks module:
awx/main/tasks/
├── __init__.py
├── jobs.py # Job execution tasks
├── system.py # System tasks
├── callback.py # Callback processing
├── policy.py # Cleanup/maintenance
└── ...
Running Tasks
Publishing Tasks
To run a task in the background:
# Function-based task
add.apply_async([1, 1])
# Class-based task
Adder.apply_async([1, 1])
# With keyword arguments
add.apply_async(args=[1], kwargs={'b': 2})
# To specific queue
add.apply_async([1, 1], queue='awx_node_1')
When you run apply_async(), a JSON message is composed:
{
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"args": [1, 1],
"kwargs": {},
"task": "awx.main.tasks.system.add",
"time_pub": 1234567890.123
}
Task Execution
When a worker receives the message:
- Deserialize: Parse JSON message
- Import: Import the task callable
- Execute: Run the Python code
- Return: Task completes, result may be stored
# Worker executes
awx.main.tasks.system.add(1, 1)
Dispatcher Implementation
The Dispatcher Process
Every node runs awx-manage dispatcherd:
# Start dispatcher
awx-manage dispatcherd
# With custom pool size
awx-manage dispatcherd --workers 8
Responsibilities:
- Uses
kombu library for message consumption
- Consumes from appropriate queues for the node:
- Default shared queue
- Node-specific queue (by hostname)
- Broadcast queue
- Manages pool of child processes
- Distributes inbound messages to workers
Worker Pool
The dispatcher manages a pool of child processes:
class WorkerPool:
def __init__(self):
self.workers = []
self.min_workers = 4
self.max_workers = 60
def init_workers(self, work_loop):
"""Initialize worker processes"""
for i in range(self.min_workers):
worker = Process(target=work_loop)
worker.start()
self.workers.append(worker)
Worker scaling:
- Minimum workers: 4 (default)
- Maximum workers: 60 (default)
- Auto-scales based on load
Task Resolution
The dispatcher resolves tasks by dotted path:
def resolve_callable(task):
"""
Transform dotted notation into callable function:
awx.main.tasks.system.delete_inventory
awx.main.tasks.jobs.RunProjectUpdate
"""
if not task.startswith('awx.'):
raise ValueError(f'{task} is not a valid awx task')
module, target = task.rsplit('.', 1)
module = importlib.import_module(module)
_call = getattr(module, target, None)
if not hasattr(_call, 'apply_async'):
raise ValueError(f'{task} is not decorated with @task()')
return _call
Running Tasks
Workers execute tasks via run_callable():
def run_callable(body):
"""Execute a task from message body"""
task = body['task']
uuid = body.get('uuid', '<unknown>')
args = body.get('args', [])
kwargs = body.get('kwargs', {})
# Import and execute
_call = resolve_callable(task)
logger.info(f'task {uuid} starting {task}(*{args})')
return _call(*args, **kwargs)
Dispatcher Control
dispatcherctl Command
The awx-manage dispatcherctl command provides debugging capabilities:
Check Status
$ awx-manage dispatcherctl status
awx[pid:9610] workers total=4 min=4 max=60
. worker[pid:9758] sent=12 finished=12 qsize=0 rss=106.730MB [IDLE]
. worker[pid:9769] sent=5 finished=5 qsize=0 rss=105.141MB [IDLE]
. worker[pid:9782] sent=5 finished=4 qsize=1 rss=110.430MB
- running 0c1deb4d-25ae-49a9-804f-a8afd05aff29 RunJob(*[9])
. worker[pid:9787] sent=3 finished=3 qsize=0 rss=101.824MB [IDLE]
Information shown:
- Worker PIDs
- Tasks sent to each worker
- Tasks completed
- Queue size per worker
- Memory usage (RSS)
- Currently running tasks with UUIDs
List Running Tasks
$ awx-manage dispatcherctl running
['eb3b0a83-86da-413d-902a-16d7d530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f']
Returns UUIDs of currently running tasks (corresponds to main_unifiedjob.celery_task_id in database).
Task Categories
Housekeeping Tasks
Background maintenance and scheduling:
- run_task_manager: Periodic task that schedules jobs
- run_dependency_manager: Creates job dependencies
- run_workflow_manager: Manages workflow execution
See Task Manager documentation for details.
Heartbeats and Capacity
Periodic tasks running on every node:
@task()
def heartbeat():
"""Record heartbeat and capacity"""
instance = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
instance.last_seen = now()
instance.capacity = calculate_capacity()
instance.save()
Purpose:
- Record node heartbeat
- Calculate and report capacity
- Reap jobs from dead nodes
Job Execution Tasks
Run Ansible playbooks and commands:
@task()
class RunJob:
"""Execute a job template"""
def run(self, job_id):
job = Job.objects.get(id=job_id)
# Prepare ansible-runner
# Execute playbook
# Stream events
pass
@task()
class RunProjectUpdate:
"""Update a project from SCM"""
def run(self, project_update_id):
# Clone/update git repo
pass
@task()
class RunInventoryUpdate:
"""Sync inventory from external source"""
def run(self, inventory_update_id):
# Run ansible-inventory
# Parse and import results
pass
Administrative Tasks
Maintenance and cleanup:
@task()
def purge_old_stdout_files():
"""Clean up old job output files"""
pass
@task()
def delete_project_files(project_path):
"""Delete project files from filesystem"""
pass
@task()
def handle_setting_changes(setting_name):
"""Invalidate cache when setting changes"""
pass
Notification Tasks
Send notifications:
@task()
def send_notifications(notification_list):
"""Send job notifications"""
for notification in notification_list:
# Send via email, Slack, webhook, etc.
pass
Callback Receiver
Special dispatcher process for handling Ansible callback events:
class AWXConsumerRedis:
"""Consumer for callback events from Redis"""
def __init__(self, name, worker):
self.name = name
self.pool = WorkerPool()
self.redis = get_redis_client()
def run(self):
"""Consume callback events"""
while True:
# Read events from Redis
# Process and save to database
pass
Purpose:
- Receive Ansible events from running jobs
- Process event data
- Save to database for UI display
Task Routing
Queue Selection
Tasks can be routed to specific queues:
# Route to specific node
task.apply_async([args], queue=f'awx_{instance.hostname}')
# Route to default queue (any node)
task.apply_async([args], queue='awx')
# Fanout to all nodes
task.apply_async([args], exchange='broadcast')
Execution Node Selection
For job execution:
- Task Manager selects execution node
- Considers capacity and instance groups
- Routes task to node-specific queue
- Dispatcher on that node processes task
Monitoring and Debugging
Check Dispatcher Health
# Check if dispatcher is running
ps aux | grep dispatcherd
# Check worker status
awx-manage dispatcherctl status
# View dispatcher logs
tail -f /var/log/tower/dispatcher.log
Task Tracking
Tasks have UUIDs that can be tracked:
# In Django shell
from awx.main.models import UnifiedJob
# Find job by task UUID
job = UnifiedJob.objects.get(celery_task_id='550e8400-...')
print(f"Status: {job.status}")
print(f"Node: {job.execution_node}")
Common Issues
Dispatcher not running:
# Start dispatcher
awx-manage dispatcherd
# Check for errors
journalctl -u awx-dispatcher
Workers stuck:
# Check worker status
awx-manage dispatcherctl status
# Restart dispatcher if needed
systemctl restart awx-dispatcher
High memory usage:
# Check worker memory
awx-manage dispatcherctl status
# Adjust worker pool size
awx-manage dispatcherd --workers 4
Worker Pool Size
# In settings
DISPATCHER_MIN_WORKERS = 4 # Minimum workers
DISPATCHER_MAX_WORKERS = 60 # Maximum workers
Considerations:
- More workers = more concurrency
- Each worker consumes memory
- Balance based on workload and resources
Queue Backlog
Monitor queue depth:
from awx.main.dispatch.pool import WorkerPool
pool = WorkerPool()
for worker in pool.workers:
print(f"Worker {worker.pid}: Queue size {worker.qsize}")
Large queue sizes indicate workers are overloaded.
Task Prioritization
Currently AWX uses FIFO (First In, First Out) for task processing.
Future enhancements may include:
- Priority queues
- Task preemption
- Resource-based scheduling
Security Considerations
Task Validation
Only tasks starting with awx. are allowed:
if not task.startswith('awx.'):
raise ValueError(f'{task} is not a valid awx task')
This prevents arbitrary code execution.
Credential Handling
Credentials are never passed in task arguments:
- Retrieved from database within task
- Decrypted at runtime
- Never logged
Process Isolation
Worker processes are isolated:
- Separate process per task
- Failures don’t affect other tasks
- Resource limits via cgroups (in containers)
Next Steps