# -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = ''' callback: n8n_reporter type: notification short_description: Posts structured patch run results to n8n webhook description: - Posts a structured JSON payload to an n8n webhook at the end of every run - Includes package version diffs, timing, safety check results, and errors - Supports both success and failure reporting ''' import json import time import uuid import os from datetime import datetime, timezone from ansible.plugins.callback import CallbackBase try: import urllib.request import urllib.error HAS_URLLIB = True except ImportError: HAS_URLLIB = False class CallbackModule(CallbackBase): CALLBACK_VERSION = 2.0 CALLBACK_TYPE = 'notification' CALLBACK_NAME = 'n8n_reporter' CALLBACK_NEEDS_ENABLED = True def __init__(self): super(CallbackModule, self).__init__() self.run_id = str(uuid.uuid4()) self.started_at = time.time() self.started_iso = datetime.now(timezone.utc).isoformat() self.errors = [] self.warnings = [] self.host_results = {} self.play_vars = {} self.task_timings = {} self.current_task_start = None self.current_task_name = None def v2_playbook_on_start(self, playbook): self.playbook_name = os.path.basename(playbook._file_name) def v2_playbook_on_play_start(self, play): self.current_play = play vm = play.get_variable_manager() self.play_vars = vm.get_vars() def v2_playbook_on_task_start(self, task, is_conditional): self.current_task_name = task.get_name() self.current_task_start = time.time() def v2_runner_on_ok(self, result): host = result._host.get_name() if host not in self.host_results: self.host_results[host] = { 'status': 'success', 'packages_updated': [], 'safety_checks': [], 'reboot_required': False, 'reboot_performed': False, 'errors': [] } # Capture packages_updated fact if set if 'ansible_facts' in result._result: facts = result._result['ansible_facts'] if 'packages_updated' in facts: self.host_results[host]['packages_updated'] = facts['packages_updated'] if 'host_reboot_required' in facts: self.host_results[host]['reboot_required'] = facts['host_reboot_required'] if 'snapshot_id' in facts: self.host_results[host]['snapshot_id'] = facts['snapshot_id'] # Capture assert results as safety checks if self.current_task_name and any( kw in self.current_task_name.lower() for kw in ['assert', 'check', 'verify', 'preflight'] ): self.host_results[host]['safety_checks'].append({ 'check': self.current_task_name, 'passed': True }) def v2_runner_on_failed(self, result, ignore_errors=False): host = result._host.get_name() if host not in self.host_results: self.host_results[host] = { 'status': 'failed', 'packages_updated': [], 'safety_checks': [], 'reboot_required': False, 'reboot_performed': False, 'errors': [] } error_detail = { 'task': self.current_task_name, 'host': host, 'message': str(result._result.get('msg', result._result.get('stderr', 'Unknown error'))), 'ignored': ignore_errors } self.host_results[host]['errors'].append(error_detail) self.errors.append(error_detail) if self.current_task_name and any( kw in self.current_task_name.lower() for kw in ['assert', 'check', 'verify', 'preflight'] ): self.host_results[host]['safety_checks'].append({ 'check': self.current_task_name, 'passed': False, 'error': error_detail['message'] }) if not ignore_errors: self.host_results[host]['status'] = 'failed' def v2_runner_on_unreachable(self, result): host = result._host.get_name() self.host_results[host] = { 'status': 'unreachable', 'packages_updated': [], 'safety_checks': [], 'reboot_required': False, 'reboot_performed': False, 'errors': [{ 'task': 'connectivity', 'host': host, 'message': str(result._result.get('msg', 'Host unreachable')), 'ignored': False }] } self.errors.append({ 'task': 'connectivity', 'host': host, 'message': str(result._result.get('msg', 'Host unreachable')) }) def v2_playbook_on_stats(self, stats): completed_at = time.time() completed_iso = datetime.now(timezone.utc).isoformat() elapsed_seconds = int(completed_at - self.started_at) # Get config from environment (set via Semaphore Variable Groups) webhook_url = self.play_vars.get("N8N_WEBHOOK_URL", os.environ.get("N8N_WEBHOOK_URL", "")) client_id = self.play_vars.get("CLIENT_ID", os.environ.get("CLIENT_ID", "UNKNOWN")) client_name = self.play_vars.get("CLIENT_NAME", os.environ.get("CLIENT_NAME", "Unknown Client")) billing_model = self.play_vars.get("BILLING_MODEL", os.environ.get("BILLING_MODEL", "hybrid")) human_estimate = int(self.play_vars.get("HUMAN_ESTIMATE_SECONDS", os.environ.get("HUMAN_ESTIMATE_SECONDS", 2700))) # Determine overall status hosts_ok = stats.processed overall_status = 'success' for host in stats.processed: s = stats.summarize(host) if s['failures'] > 0 or s['unreachable'] > 0: overall_status = 'failed' if s['failures'] > 0 else 'unreachable' break if s['failures'] == 0 and s['unreachable'] == 0 and overall_status != 'failed': overall_status = 'success' # Build per-host payloads host_payloads = [] for host, result in self.host_results.items(): host_payloads.append({ 'host': host, 'status': result.get('status', 'unknown'), 'packages_updated': result.get('packages_updated', []), 'safety_checks': result.get('safety_checks', []), 'reboot_required': result.get('reboot_required', False), 'reboot_performed': result.get('reboot_performed', False), 'snapshot_id': result.get('snapshot_id', None), 'errors': result.get('errors', []) }) payload = { 'run_id': self.run_id, 'playbook': getattr(self, 'playbook_name', 'unknown'), 'client_id': client_id, 'client_name': client_name, 'billing_model': billing_model, 'overall_status': overall_status, 'timing': { 'started_at': self.started_iso, 'completed_at': completed_iso, 'elapsed_seconds': elapsed_seconds, 'human_estimate_seconds': human_estimate, 'automation_saving_seconds': max(0, human_estimate - elapsed_seconds) }, 'hosts': host_payloads, 'errors': self.errors, 'warnings': self.warnings, 'requires_human_review': len(self.errors) > 0 and overall_status == 'failed' } self._post_to_webhook(webhook_url, payload) def _post_to_webhook(self, webhook_url, payload): if not webhook_url: self._display.warning('N8N_WEBHOOK_URL not set — skipping webhook notification') return if not HAS_URLLIB: self._display.warning('urllib not available — cannot post to webhook') return try: data = json.dumps(payload).encode('utf-8') req = urllib.request.Request( webhook_url, data=data, headers={ 'Content-Type': 'application/json', 'User-Agent': 'ansible-n8n-reporter/1.0' }, method='POST' ) with urllib.request.urlopen(req, timeout=30) as response: self._display.display( f'n8n webhook posted successfully — status {response.status} — run_id {payload["run_id"]}', color='green' ) except Exception as e: self._display.warning(f'Failed to post to n8n webhook: {str(e)}') self._display.warning(f'Payload was: {json.dumps(payload, indent=2)}') # patched - see v2_playbook_on_play_start for var reading