Monitoring & Alerting Automation

Production-ready Zabbix scripts, Prometheus exporters, and Grafana automation for enterprise monitoring

Zabbix Low-Level Discovery (LLD) Script

Automatic discovery and monitoring of services, Docker containers, databases, and custom resources with Zabbix LLD.

#!/usr/bin/env python3
"""
Zabbix Low-Level Discovery Script for Docker Containers
Automatically discovers and monitors all Docker containers on a host
"""

import json
import docker
import sys
from datetime import datetime

class ZabbixDockerLLD:
    def __init__(self):
        try:
            self.client = docker.from_env()
        except Exception as e:
            self.log_error(f"Failed to connect to Docker: {e}")
            sys.exit(1)

    def discover_containers(self):
        """Discover all running Docker containers"""
        containers = []

        try:
            for container in self.client.containers.list(all=True):
                container_data = {
                    "{#CONTAINER_ID}": container.short_id,
                    "{#CONTAINER_NAME}": container.name,
                    "{#CONTAINER_IMAGE}": container.image.tags[0] if container.image.tags else "none",
                    "{#CONTAINER_STATUS}": container.status,
                    "{#CONTAINER_CREATED}": container.attrs['Created'],
                }
                containers.append(container_data)

        except Exception as e:
            self.log_error(f"Discovery failed: {e}")
            return {"data": []}

        return {"data": containers}

    def get_container_stats(self, container_id):
        """Get resource usage stats for a specific container"""
        try:
            container = self.client.containers.get(container_id)

            if container.status != 'running':
                return None

            stats = container.stats(stream=False)

            # Calculate CPU percentage
            cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
                       stats['precpu_stats']['cpu_usage']['total_usage']
            system_delta = stats['cpu_stats']['system_cpu_usage'] - \
                          stats['precpu_stats']['system_cpu_usage']
            cpu_percent = (cpu_delta / system_delta) * 100.0 if system_delta > 0 else 0

            # Calculate memory usage
            memory_usage = stats['memory_stats']['usage']
            memory_limit = stats['memory_stats']['limit']
            memory_percent = (memory_usage / memory_limit) * 100.0

            # Network stats
            networks = stats.get('networks', {})
            network_rx = sum(net['rx_bytes'] for net in networks.values())
            network_tx = sum(net['tx_bytes'] for net in networks.values())

            return {
                'cpu_percent': round(cpu_percent, 2),
                'memory_usage': memory_usage,
                'memory_percent': round(memory_percent, 2),
                'network_rx': network_rx,
                'network_tx': network_tx,
                'pids': stats['pids_stats']['current'],
            }

        except Exception as e:
            self.log_error(f"Failed to get stats for {container_id}: {e}")
            return None

    def check_container_health(self, container_id):
        """Check container health status"""
        try:
            container = self.client.containers.get(container_id)

            # Check if container has health check
            health = container.attrs.get('State', {}).get('Health', {})

            if not health:
                return "no_healthcheck"

            return health.get('Status', 'unknown')

        except Exception as e:
            self.log_error(f"Health check failed for {container_id}: {e}")
            return "error"

    def get_container_logs(self, container_id, lines=100):
        """Get recent container logs"""
        try:
            container = self.client.containers.get(container_id)
            logs = container.logs(tail=lines, timestamps=True).decode('utf-8')
            return logs

        except Exception as e:
            self.log_error(f"Failed to get logs for {container_id}: {e}")
            return ""

    def restart_container(self, container_id):
        """Restart a container (for remediation actions)"""
        try:
            container = self.client.containers.get(container_id)
            container.restart(timeout=10)
            return True

        except Exception as e:
            self.log_error(f"Failed to restart {container_id}: {e}")
            return False

    @staticmethod
    def log_error(message):
        """Log errors to Zabbix log file"""
        with open('/var/log/zabbix/docker_lld.log', 'a') as f:
            f.write(f"{datetime.now().isoformat()} - ERROR - {message}\n")

# Main execution
if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Usage: docker_lld.py [discover|stats|health|logs|restart] [container_id]")
        sys.exit(1)

    action = sys.argv[1]
    lld = ZabbixDockerLLD()

    if action == 'discover':
        # Output JSON for Zabbix LLD
        result = lld.discover_containers()
        print(json.dumps(result, indent=2))

    elif action == 'stats' and len(sys.argv) >= 3:
        container_id = sys.argv[2]
        stats = lld.get_container_stats(container_id)
        if stats:
            print(json.dumps(stats))
        else:
            sys.exit(1)

    elif action == 'health' and len(sys.argv) >= 3:
        container_id = sys.argv[2]
        health = lld.check_container_health(container_id)
        print(health)

    elif action == 'logs' and len(sys.argv) >= 3:
        container_id = sys.argv[2]
        logs = lld.get_container_logs(container_id)
        print(logs)

    elif action == 'restart' and len(sys.argv) >= 3:
        container_id = sys.argv[2]
        success = lld.restart_container(container_id)
        sys.exit(0 if success else 1)

    else:
        print("Invalid action or missing parameters")
        sys.exit(1)

Zabbix Configuration:

# UserParameter for Zabbix agent config
UserParameter=docker.discovery,/usr/local/bin/docker_lld.py discover
UserParameter=docker.stats[*],/usr/local/bin/docker_lld.py stats $1
UserParameter=docker.health[*],/usr/local/bin/docker_lld.py health $1
UserParameter=docker.restart[*],/usr/local/bin/docker_lld.py restart $1

Zabbix API Automation

Automate host creation, template management, and mass operations using Zabbix API.

// Zabbix API Automation Framework
import axios, { AxiosInstance } from 'axios';

interface ZabbixHost {
  host: string;
  name: string;
  groups: { groupid: string }[];
  templates: { templateid: string }[];
  interfaces: ZabbixInterface[];
}

interface ZabbixInterface {
  type: number;
  main: number;
  useip: number;
  ip: string;
  dns: string;
  port: string;
}

class ZabbixAPI {
  private api: AxiosInstance;
  private authToken: string | null = null;
  private requestId: number = 1;

  constructor(private url: string, private username: string, private password: string) {
    this.api = axios.create({
      baseURL: url,
      headers: { 'Content-Type': 'application/json' },
    });
  }

  private async request(method: string, params: any = {}) {
    const data = {
      jsonrpc: '2.0',
      method,
      params,
      auth: this.authToken,
      id: this.requestId++,
    };

    try {
      const response = await this.api.post('/api_jsonrpc.php', data);

      if (response.data.error) {
        throw new Error(`Zabbix API Error: ${response.data.error.message}`);
      }

      return response.data.result;
    } catch (error: any) {
      throw new Error(`API Request Failed: ${error.message}`);
    }
  }

  async login(): Promise<boolean> {
    try {
      this.authToken = await this.request('user.login', {
        username: this.username,
        password: this.password,
      });
      return true;
    } catch (error) {
      console.error('Login failed:', error);
      return false;
    }
  }

  async logout(): Promise<void> {
    await this.request('user.logout');
    this.authToken = null;
  }

  // Host Management
  async createHost(hostConfig: ZabbixHost): Promise<string> {
    const result = await this.request('host.create', hostConfig);
    return result.hostids[0];
  }

  async updateHost(hostid: string, updates: Partial<ZabbixHost>): Promise<boolean> {
    const result = await this.request('host.update', { hostid, ...updates });
    return result.hostids.length > 0;
  }

  async deleteHost(hostid: string): Promise<boolean> {
    const result = await this.request('host.delete', [hostid]);
    return result.hostids.length > 0;
  }

  async getHosts(filter?: any): Promise<any[]> {
    return await this.request('host.get', {
      output: 'extend',
      selectInterfaces: 'extend',
      selectGroups: 'extend',
      selectTemplates: 'extend',
      filter,
    });
  }

  // Mass operations
  async massUpdateHosts(hostids: string[], updates: any): Promise<boolean> {
    const result = await this.request('host.massupdate', {
      hosts: hostids.map(id => ({ hostid: id })),
      ...updates,
    });
    return result.hostids.length > 0;
  }

  async enableHosts(hostids: string[]): Promise<boolean> {
    return await this.massUpdateHosts(hostids, { status: 0 });
  }

  async disableHosts(hostids: string[]): Promise<boolean> {
    return await this.massUpdateHosts(hostids, { status: 1 });
  }

  // Template Management
  async linkTemplates(hostid: string, templateids: string[]): Promise<boolean> {
    return await this.updateHost(hostid, {
      templates: templateids.map(id => ({ templateid: id })),
    });
  }

  async unlinkTemplates(hostid: string, templateids: string[]): Promise<boolean> {
    return await this.request('host.update', {
      hostid,
      templates_clear: templateids.map(id => ({ templateid: id })),
    });
  }

  // Item Management
  async createItem(item: any): Promise<string> {
    const result = await this.request('item.create', item);
    return result.itemids[0];
  }

  async getItems(hostids: string[]): Promise<any[]> {
    return await this.request('item.get', {
      output: 'extend',
      hostids,
      sortfield: 'name',
    });
  }

  // Trigger Management
  async createTrigger(trigger: any): Promise<string> {
    const result = await this.request('trigger.create', trigger);
    return result.triggerids[0];
  }

  async getProblems(options: any = {}): Promise<any[]> {
    return await this.request('problem.get', {
      output: 'extend',
      selectAcknowledges: 'extend',
      recent: true,
      sortfield: ['eventid'],
      sortorder: 'DESC',
      ...options,
    });
  }

  // Maintenance Management
  async createMaintenance(maintenance: any): Promise<string> {
    const result = await this.request('maintenance.create', maintenance);
    return result.maintenanceids[0];
  }

  async deleteMaintenance(maintenanceid: string): Promise<boolean> {
    const result = await this.request('maintenance.delete', [maintenanceid]);
    return result.maintenanceids.length > 0;
  }

  // Bulk host creation from inventory
  async bulkCreateHostsFromCSV(csvData: string[]): Promise<string[]> {
    const hostids: string[] = [];

    for (const line of csvData) {
      const [hostname, ip, groupName, templateName] = line.split(',');

      // Get group and template IDs (cached for performance)
      const groups = await this.getHostGroups({ name: groupName });
      const templates = await this.getTemplates({ name: templateName });

      if (groups.length === 0 || templates.length === 0) {
        console.error(`Skipping ${hostname}: group or template not found`);
        continue;
      }

      const hostConfig: ZabbixHost = {
        host: hostname,
        name: hostname,
        groups: [{ groupid: groups[0].groupid }],
        templates: [{ templateid: templates[0].templateid }],
        interfaces: [
          {
            type: 1, // Agent
            main: 1,
            useip: 1,
            ip,
            dns: '',
            port: '10050',
          },
        ],
      };

      try {
        const hostid = await this.createHost(hostConfig);
        hostids.push(hostid);
        console.log(`Created host: ${hostname} (ID: ${hostid})`);
      } catch (error) {
        console.error(`Failed to create ${hostname}:`, error);
      }
    }

    return hostids;
  }

  private async getHostGroups(filter?: any): Promise<any[]> {
    return await this.request('hostgroup.get', { output: 'extend', filter });
  }

  private async getTemplates(filter?: any): Promise<any[]> {
    return await this.request('template.get', { output: 'extend', filter });
  }
}

// Usage Example
const zabbix = new ZabbixAPI(
  'https://zabbix.example.com',
  'admin',
  'password'
);

await zabbix.login();

// Create a new host
const hostid = await zabbix.createHost({
  host: 'web-server-01',
  name: 'Web Server 01',
  groups: [{ groupid: '2' }],
  templates: [{ templateid: '10001' }],
  interfaces: [
    {
      type: 1,
      main: 1,
      useip: 1,
      ip: '192.168.1.100',
      dns: '',
      port: '10050',
    },
  ],
});

// Get all problems
const problems = await zabbix.getProblems({ severities: [4, 5] });
console.log(`Active problems: ${problems.length}`);

// Bulk enable/disable hosts
const hostids = ['10084', '10085', '10086'];
await zabbix.disableHosts(hostids);

await zabbix.logout();

Custom Prometheus Exporter

Build custom Prometheus exporters for application-specific metrics collection.

# Custom Prometheus Exporter in Python
from prometheus_client import start_http_server, Gauge, Counter, Histogram, Info
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY
import time
import psutil
import requests
from typing import Dict, List

class CustomApplicationExporter:
    """
    Custom Prometheus exporter for monitoring application metrics
    """

    def __init__(self, app_url: str, port: int = 9100):
        self.app_url = app_url
        self.port = port

        # Define metrics
        self.app_requests_total = Counter(
            'app_requests_total',
            'Total number of requests',
            ['method', 'endpoint', 'status']
        )

        self.app_request_duration = Histogram(
            'app_request_duration_seconds',
            'Request duration in seconds',
            ['method', 'endpoint']
        )

        self.app_active_users = Gauge(
            'app_active_users',
            'Number of currently active users'
        )

        self.app_queue_size = Gauge(
            'app_queue_size',
            'Size of processing queue',
            ['queue_name']
        )

        self.app_database_connections = Gauge(
            'app_database_connections',
            'Number of active database connections',
            ['database']
        )

        self.app_cache_hit_ratio = Gauge(
            'app_cache_hit_ratio',
            'Cache hit ratio percentage'
        )

        self.app_info = Info('app_info', 'Application information')

    def collect_app_metrics(self):
        """Collect metrics from application API"""
        try:
            response = requests.get(f"{self.app_url}/metrics", timeout=5)
            if response.status_code == 200:
                data = response.json()

                # Update gauges
                self.app_active_users.set(data.get('active_users', 0))

                # Update queue metrics
                for queue_name, size in data.get('queues', {}).items():
                    self.app_queue_size.labels(queue_name=queue_name).set(size)

                # Update database connections
                for db_name, connections in data.get('db_connections', {}).items():
                    self.app_database_connections.labels(database=db_name).set(connections)

                # Update cache hit ratio
                cache_hits = data.get('cache_hits', 0)
                cache_misses = data.get('cache_misses', 0)
                total = cache_hits + cache_misses
                if total > 0:
                    hit_ratio = (cache_hits / total) * 100
                    self.app_cache_hit_ratio.set(hit_ratio)

        except Exception as e:
            print(f"Error collecting app metrics: {e}")

    def collect_system_metrics(self):
        """Collect system-level metrics"""
        # CPU usage
        cpu_percent = psutil.cpu_percent(interval=1)

        # Memory usage
        memory = psutil.virtual_memory()
        memory_percent = memory.percent

        # Disk usage
        disk = psutil.disk_usage('/')
        disk_percent = disk.percent

        # Network I/O
        net_io = psutil.net_io_counters()

        return {
            'cpu_percent': cpu_percent,
            'memory_percent': memory_percent,
            'disk_percent': disk_percent,
            'network_sent': net_io.bytes_sent,
            'network_recv': net_io.bytes_recv,
        }

class DatabaseMetricsCollector:
    """Collector for database-specific metrics"""

    def collect(self):
        """Called by Prometheus registry to collect metrics"""
        # Query pool metrics
        pool_size = GaugeMetricFamily(
            'db_connection_pool_size',
            'Database connection pool size',
            labels=['database', 'pool']
        )

        pool_size.add_metric(['production', 'write'], 20)
        pool_size.add_metric(['production', 'read'], 50)
        pool_size.add_metric(['analytics', 'read'], 30)

        yield pool_size

        # Query duration metrics
        query_duration = HistogramMetricFamily(
            'db_query_duration_seconds',
            'Database query duration',
            labels=['query_type']
        )

        # Add sample data
        yield query_duration

        # Slow queries counter
        slow_queries = CounterMetricFamily(
            'db_slow_queries_total',
            'Number of slow queries',
            labels=['database', 'threshold']
        )

        slow_queries.add_metric(['production', '1s'], 150)
        slow_queries.add_metric(['production', '5s'], 25)

        yield slow_queries

# Main application
def main():
    # Initialize exporter
    exporter = CustomApplicationExporter(
        app_url='http://localhost:8000',
        port=9100
    )

    # Register custom collector
    REGISTRY.register(DatabaseMetricsCollector())

    # Set application info
    exporter.app_info.info({
        'version': '2.5.0',
        'environment': 'production',
        'region': 'us-east-1'
    })

    # Start HTTP server for Prometheus to scrape
    start_http_server(exporter.port)
    print(f"Exporter started on port {exporter.port}")

    # Continuously collect metrics
    while True:
        exporter.collect_app_metrics()
        time.sleep(15)  # Collect every 15 seconds

if __name__ == '__main__':
    main()

More Monitoring Topics

Grafana Dashboard Automation
AlertManager Configuration
Custom Prometheus Rules
Zabbix Auto-Remediation Actions
Log Aggregation with ELK Stack
APM Integration (New Relic, Datadog)
Synthetic Monitoring Scripts
SLA Monitoring and Reporting
Multi-Region Monitoring
Cost Monitoring and Optimization

Need Custom Monitoring Solutions?

We build comprehensive monitoring and alerting systems for enterprise infrastructure.

Get Free Consultation