Monitoring & Alerting Automation
Production-ready Zabbix scripts, Prometheus exporters, and Grafana automation for enterprise monitoring
Zabbix Low-Level Discovery (LLD) Script
Automatic discovery and monitoring of services, Docker containers, databases, and custom resources with Zabbix LLD.
#!/usr/bin/env python3
"""
Zabbix Low-Level Discovery Script for Docker Containers
Automatically discovers and monitors all Docker containers on a host
"""
import json
import docker
import sys
from datetime import datetime
class ZabbixDockerLLD:
def __init__(self):
try:
self.client = docker.from_env()
except Exception as e:
self.log_error(f"Failed to connect to Docker: {e}")
sys.exit(1)
def discover_containers(self):
"""Discover all running Docker containers"""
containers = []
try:
for container in self.client.containers.list(all=True):
container_data = {
"{#CONTAINER_ID}": container.short_id,
"{#CONTAINER_NAME}": container.name,
"{#CONTAINER_IMAGE}": container.image.tags[0] if container.image.tags else "none",
"{#CONTAINER_STATUS}": container.status,
"{#CONTAINER_CREATED}": container.attrs['Created'],
}
containers.append(container_data)
except Exception as e:
self.log_error(f"Discovery failed: {e}")
return {"data": []}
return {"data": containers}
def get_container_stats(self, container_id):
"""Get resource usage stats for a specific container"""
try:
container = self.client.containers.get(container_id)
if container.status != 'running':
return None
stats = container.stats(stream=False)
# Calculate CPU percentage
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
cpu_percent = (cpu_delta / system_delta) * 100.0 if system_delta > 0 else 0
# Calculate memory usage
memory_usage = stats['memory_stats']['usage']
memory_limit = stats['memory_stats']['limit']
memory_percent = (memory_usage / memory_limit) * 100.0
# Network stats
networks = stats.get('networks', {})
network_rx = sum(net['rx_bytes'] for net in networks.values())
network_tx = sum(net['tx_bytes'] for net in networks.values())
return {
'cpu_percent': round(cpu_percent, 2),
'memory_usage': memory_usage,
'memory_percent': round(memory_percent, 2),
'network_rx': network_rx,
'network_tx': network_tx,
'pids': stats['pids_stats']['current'],
}
except Exception as e:
self.log_error(f"Failed to get stats for {container_id}: {e}")
return None
def check_container_health(self, container_id):
"""Check container health status"""
try:
container = self.client.containers.get(container_id)
# Check if container has health check
health = container.attrs.get('State', {}).get('Health', {})
if not health:
return "no_healthcheck"
return health.get('Status', 'unknown')
except Exception as e:
self.log_error(f"Health check failed for {container_id}: {e}")
return "error"
def get_container_logs(self, container_id, lines=100):
"""Get recent container logs"""
try:
container = self.client.containers.get(container_id)
logs = container.logs(tail=lines, timestamps=True).decode('utf-8')
return logs
except Exception as e:
self.log_error(f"Failed to get logs for {container_id}: {e}")
return ""
def restart_container(self, container_id):
"""Restart a container (for remediation actions)"""
try:
container = self.client.containers.get(container_id)
container.restart(timeout=10)
return True
except Exception as e:
self.log_error(f"Failed to restart {container_id}: {e}")
return False
@staticmethod
def log_error(message):
"""Log errors to Zabbix log file"""
with open('/var/log/zabbix/docker_lld.log', 'a') as f:
f.write(f"{datetime.now().isoformat()} - ERROR - {message}\n")
# Main execution
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: docker_lld.py [discover|stats|health|logs|restart] [container_id]")
sys.exit(1)
action = sys.argv[1]
lld = ZabbixDockerLLD()
if action == 'discover':
# Output JSON for Zabbix LLD
result = lld.discover_containers()
print(json.dumps(result, indent=2))
elif action == 'stats' and len(sys.argv) >= 3:
container_id = sys.argv[2]
stats = lld.get_container_stats(container_id)
if stats:
print(json.dumps(stats))
else:
sys.exit(1)
elif action == 'health' and len(sys.argv) >= 3:
container_id = sys.argv[2]
health = lld.check_container_health(container_id)
print(health)
elif action == 'logs' and len(sys.argv) >= 3:
container_id = sys.argv[2]
logs = lld.get_container_logs(container_id)
print(logs)
elif action == 'restart' and len(sys.argv) >= 3:
container_id = sys.argv[2]
success = lld.restart_container(container_id)
sys.exit(0 if success else 1)
else:
print("Invalid action or missing parameters")
sys.exit(1)Zabbix Configuration:
# UserParameter for Zabbix agent config
UserParameter=docker.discovery,/usr/local/bin/docker_lld.py discover
UserParameter=docker.stats[*],/usr/local/bin/docker_lld.py stats $1
UserParameter=docker.health[*],/usr/local/bin/docker_lld.py health $1
UserParameter=docker.restart[*],/usr/local/bin/docker_lld.py restart $1Zabbix API Automation
Automate host creation, template management, and mass operations using Zabbix API.
// Zabbix API Automation Framework
import axios, { AxiosInstance } from 'axios';
interface ZabbixHost {
host: string;
name: string;
groups: { groupid: string }[];
templates: { templateid: string }[];
interfaces: ZabbixInterface[];
}
interface ZabbixInterface {
type: number;
main: number;
useip: number;
ip: string;
dns: string;
port: string;
}
class ZabbixAPI {
private api: AxiosInstance;
private authToken: string | null = null;
private requestId: number = 1;
constructor(private url: string, private username: string, private password: string) {
this.api = axios.create({
baseURL: url,
headers: { 'Content-Type': 'application/json' },
});
}
private async request(method: string, params: any = {}) {
const data = {
jsonrpc: '2.0',
method,
params,
auth: this.authToken,
id: this.requestId++,
};
try {
const response = await this.api.post('/api_jsonrpc.php', data);
if (response.data.error) {
throw new Error(`Zabbix API Error: ${response.data.error.message}`);
}
return response.data.result;
} catch (error: any) {
throw new Error(`API Request Failed: ${error.message}`);
}
}
async login(): Promise<boolean> {
try {
this.authToken = await this.request('user.login', {
username: this.username,
password: this.password,
});
return true;
} catch (error) {
console.error('Login failed:', error);
return false;
}
}
async logout(): Promise<void> {
await this.request('user.logout');
this.authToken = null;
}
// Host Management
async createHost(hostConfig: ZabbixHost): Promise<string> {
const result = await this.request('host.create', hostConfig);
return result.hostids[0];
}
async updateHost(hostid: string, updates: Partial<ZabbixHost>): Promise<boolean> {
const result = await this.request('host.update', { hostid, ...updates });
return result.hostids.length > 0;
}
async deleteHost(hostid: string): Promise<boolean> {
const result = await this.request('host.delete', [hostid]);
return result.hostids.length > 0;
}
async getHosts(filter?: any): Promise<any[]> {
return await this.request('host.get', {
output: 'extend',
selectInterfaces: 'extend',
selectGroups: 'extend',
selectTemplates: 'extend',
filter,
});
}
// Mass operations
async massUpdateHosts(hostids: string[], updates: any): Promise<boolean> {
const result = await this.request('host.massupdate', {
hosts: hostids.map(id => ({ hostid: id })),
...updates,
});
return result.hostids.length > 0;
}
async enableHosts(hostids: string[]): Promise<boolean> {
return await this.massUpdateHosts(hostids, { status: 0 });
}
async disableHosts(hostids: string[]): Promise<boolean> {
return await this.massUpdateHosts(hostids, { status: 1 });
}
// Template Management
async linkTemplates(hostid: string, templateids: string[]): Promise<boolean> {
return await this.updateHost(hostid, {
templates: templateids.map(id => ({ templateid: id })),
});
}
async unlinkTemplates(hostid: string, templateids: string[]): Promise<boolean> {
return await this.request('host.update', {
hostid,
templates_clear: templateids.map(id => ({ templateid: id })),
});
}
// Item Management
async createItem(item: any): Promise<string> {
const result = await this.request('item.create', item);
return result.itemids[0];
}
async getItems(hostids: string[]): Promise<any[]> {
return await this.request('item.get', {
output: 'extend',
hostids,
sortfield: 'name',
});
}
// Trigger Management
async createTrigger(trigger: any): Promise<string> {
const result = await this.request('trigger.create', trigger);
return result.triggerids[0];
}
async getProblems(options: any = {}): Promise<any[]> {
return await this.request('problem.get', {
output: 'extend',
selectAcknowledges: 'extend',
recent: true,
sortfield: ['eventid'],
sortorder: 'DESC',
...options,
});
}
// Maintenance Management
async createMaintenance(maintenance: any): Promise<string> {
const result = await this.request('maintenance.create', maintenance);
return result.maintenanceids[0];
}
async deleteMaintenance(maintenanceid: string): Promise<boolean> {
const result = await this.request('maintenance.delete', [maintenanceid]);
return result.maintenanceids.length > 0;
}
// Bulk host creation from inventory
async bulkCreateHostsFromCSV(csvData: string[]): Promise<string[]> {
const hostids: string[] = [];
for (const line of csvData) {
const [hostname, ip, groupName, templateName] = line.split(',');
// Get group and template IDs (cached for performance)
const groups = await this.getHostGroups({ name: groupName });
const templates = await this.getTemplates({ name: templateName });
if (groups.length === 0 || templates.length === 0) {
console.error(`Skipping ${hostname}: group or template not found`);
continue;
}
const hostConfig: ZabbixHost = {
host: hostname,
name: hostname,
groups: [{ groupid: groups[0].groupid }],
templates: [{ templateid: templates[0].templateid }],
interfaces: [
{
type: 1, // Agent
main: 1,
useip: 1,
ip,
dns: '',
port: '10050',
},
],
};
try {
const hostid = await this.createHost(hostConfig);
hostids.push(hostid);
console.log(`Created host: ${hostname} (ID: ${hostid})`);
} catch (error) {
console.error(`Failed to create ${hostname}:`, error);
}
}
return hostids;
}
private async getHostGroups(filter?: any): Promise<any[]> {
return await this.request('hostgroup.get', { output: 'extend', filter });
}
private async getTemplates(filter?: any): Promise<any[]> {
return await this.request('template.get', { output: 'extend', filter });
}
}
// Usage Example
const zabbix = new ZabbixAPI(
'https://zabbix.example.com',
'admin',
'password'
);
await zabbix.login();
// Create a new host
const hostid = await zabbix.createHost({
host: 'web-server-01',
name: 'Web Server 01',
groups: [{ groupid: '2' }],
templates: [{ templateid: '10001' }],
interfaces: [
{
type: 1,
main: 1,
useip: 1,
ip: '192.168.1.100',
dns: '',
port: '10050',
},
],
});
// Get all problems
const problems = await zabbix.getProblems({ severities: [4, 5] });
console.log(`Active problems: ${problems.length}`);
// Bulk enable/disable hosts
const hostids = ['10084', '10085', '10086'];
await zabbix.disableHosts(hostids);
await zabbix.logout();Custom Prometheus Exporter
Build custom Prometheus exporters for application-specific metrics collection.
# Custom Prometheus Exporter in Python
from prometheus_client import start_http_server, Gauge, Counter, Histogram, Info
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY
import time
import psutil
import requests
from typing import Dict, List
class CustomApplicationExporter:
"""
Custom Prometheus exporter for monitoring application metrics
"""
def __init__(self, app_url: str, port: int = 9100):
self.app_url = app_url
self.port = port
# Define metrics
self.app_requests_total = Counter(
'app_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
self.app_request_duration = Histogram(
'app_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
self.app_active_users = Gauge(
'app_active_users',
'Number of currently active users'
)
self.app_queue_size = Gauge(
'app_queue_size',
'Size of processing queue',
['queue_name']
)
self.app_database_connections = Gauge(
'app_database_connections',
'Number of active database connections',
['database']
)
self.app_cache_hit_ratio = Gauge(
'app_cache_hit_ratio',
'Cache hit ratio percentage'
)
self.app_info = Info('app_info', 'Application information')
def collect_app_metrics(self):
"""Collect metrics from application API"""
try:
response = requests.get(f"{self.app_url}/metrics", timeout=5)
if response.status_code == 200:
data = response.json()
# Update gauges
self.app_active_users.set(data.get('active_users', 0))
# Update queue metrics
for queue_name, size in data.get('queues', {}).items():
self.app_queue_size.labels(queue_name=queue_name).set(size)
# Update database connections
for db_name, connections in data.get('db_connections', {}).items():
self.app_database_connections.labels(database=db_name).set(connections)
# Update cache hit ratio
cache_hits = data.get('cache_hits', 0)
cache_misses = data.get('cache_misses', 0)
total = cache_hits + cache_misses
if total > 0:
hit_ratio = (cache_hits / total) * 100
self.app_cache_hit_ratio.set(hit_ratio)
except Exception as e:
print(f"Error collecting app metrics: {e}")
def collect_system_metrics(self):
"""Collect system-level metrics"""
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
# Memory usage
memory = psutil.virtual_memory()
memory_percent = memory.percent
# Disk usage
disk = psutil.disk_usage('/')
disk_percent = disk.percent
# Network I/O
net_io = psutil.net_io_counters()
return {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'disk_percent': disk_percent,
'network_sent': net_io.bytes_sent,
'network_recv': net_io.bytes_recv,
}
class DatabaseMetricsCollector:
"""Collector for database-specific metrics"""
def collect(self):
"""Called by Prometheus registry to collect metrics"""
# Query pool metrics
pool_size = GaugeMetricFamily(
'db_connection_pool_size',
'Database connection pool size',
labels=['database', 'pool']
)
pool_size.add_metric(['production', 'write'], 20)
pool_size.add_metric(['production', 'read'], 50)
pool_size.add_metric(['analytics', 'read'], 30)
yield pool_size
# Query duration metrics
query_duration = HistogramMetricFamily(
'db_query_duration_seconds',
'Database query duration',
labels=['query_type']
)
# Add sample data
yield query_duration
# Slow queries counter
slow_queries = CounterMetricFamily(
'db_slow_queries_total',
'Number of slow queries',
labels=['database', 'threshold']
)
slow_queries.add_metric(['production', '1s'], 150)
slow_queries.add_metric(['production', '5s'], 25)
yield slow_queries
# Main application
def main():
# Initialize exporter
exporter = CustomApplicationExporter(
app_url='http://localhost:8000',
port=9100
)
# Register custom collector
REGISTRY.register(DatabaseMetricsCollector())
# Set application info
exporter.app_info.info({
'version': '2.5.0',
'environment': 'production',
'region': 'us-east-1'
})
# Start HTTP server for Prometheus to scrape
start_http_server(exporter.port)
print(f"Exporter started on port {exporter.port}")
# Continuously collect metrics
while True:
exporter.collect_app_metrics()
time.sleep(15) # Collect every 15 seconds
if __name__ == '__main__':
main()More Monitoring Topics
Grafana Dashboard Automation
AlertManager Configuration
Custom Prometheus Rules
Zabbix Auto-Remediation Actions
Log Aggregation with ELK Stack
APM Integration (New Relic, Datadog)
Synthetic Monitoring Scripts
SLA Monitoring and Reporting
Multi-Region Monitoring
Cost Monitoring and Optimization
Need Custom Monitoring Solutions?
We build comprehensive monitoring and alerting systems for enterprise infrastructure.
Get Free Consultation