diff --git a/callback_plugins/n8n_reporter.py b/callback_plugins/n8n_reporter.py index 59f065f..8b066f1 100644 --- a/callback_plugins/n8n_reporter.py +++ b/callback_plugins/n8n_reporter.py @@ -1,9 +1,235 @@ -# n8n_reporter.py -# Ansible callback plugin -# Posts structured patch run results to n8n webhook -# Full implementation to follow +# -*- coding: utf-8 -*- +from __future__ import absolute_import, division, print_function +__metaclass__ = type + DOCUMENTATION = ''' callback: n8n_reporter type: notification - short_description: Posts patch run results to n8n webhook + short_description: Posts structured patch run results to n8n webhook + description: + - Posts a structured JSON payload to an n8n webhook at the end of every run + - Includes package version diffs, timing, safety check results, and errors + - Supports both success and failure reporting ''' + +import json +import time +import uuid +import os +from datetime import datetime, timezone +from ansible.plugins.callback import CallbackBase + +try: + import urllib.request + import urllib.error + HAS_URLLIB = True +except ImportError: + HAS_URLLIB = False + + +class CallbackModule(CallbackBase): + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'notification' + CALLBACK_NAME = 'n8n_reporter' + CALLBACK_NEEDS_ENABLED = True + + def __init__(self): + super(CallbackModule, self).__init__() + self.run_id = str(uuid.uuid4()) + self.started_at = time.time() + self.started_iso = datetime.now(timezone.utc).isoformat() + self.errors = [] + self.warnings = [] + self.host_results = {} + self.play_vars = {} + self.task_timings = {} + self.current_task_start = None + self.current_task_name = None + + def v2_playbook_on_start(self, playbook): + self.playbook_name = os.path.basename(playbook._file_name) + + def v2_playbook_on_play_start(self, play): + self.current_play = play + vm = play.get_variable_manager() + self.play_vars = vm.get_vars() + + def v2_playbook_on_task_start(self, task, is_conditional): + self.current_task_name = task.get_name() + self.current_task_start = time.time() + + def v2_runner_on_ok(self, result): + host = result._host.get_name() + if host not in self.host_results: + self.host_results[host] = { + 'status': 'success', + 'packages_updated': [], + 'safety_checks': [], + 'reboot_required': False, + 'reboot_performed': False, + 'errors': [] + } + + # Capture packages_updated fact if set + if 'ansible_facts' in result._result: + facts = result._result['ansible_facts'] + if 'packages_updated' in facts: + self.host_results[host]['packages_updated'] = facts['packages_updated'] + if 'host_reboot_required' in facts: + self.host_results[host]['reboot_required'] = facts['host_reboot_required'] + if 'snapshot_id' in facts: + self.host_results[host]['snapshot_id'] = facts['snapshot_id'] + + # Capture assert results as safety checks + if self.current_task_name and any( + kw in self.current_task_name.lower() + for kw in ['assert', 'check', 'verify', 'preflight'] + ): + self.host_results[host]['safety_checks'].append({ + 'check': self.current_task_name, + 'passed': True + }) + + def v2_runner_on_failed(self, result, ignore_errors=False): + host = result._host.get_name() + if host not in self.host_results: + self.host_results[host] = { + 'status': 'failed', + 'packages_updated': [], + 'safety_checks': [], + 'reboot_required': False, + 'reboot_performed': False, + 'errors': [] + } + + error_detail = { + 'task': self.current_task_name, + 'host': host, + 'message': str(result._result.get('msg', result._result.get('stderr', 'Unknown error'))), + 'ignored': ignore_errors + } + + self.host_results[host]['errors'].append(error_detail) + self.errors.append(error_detail) + + if self.current_task_name and any( + kw in self.current_task_name.lower() + for kw in ['assert', 'check', 'verify', 'preflight'] + ): + self.host_results[host]['safety_checks'].append({ + 'check': self.current_task_name, + 'passed': False, + 'error': error_detail['message'] + }) + + if not ignore_errors: + self.host_results[host]['status'] = 'failed' + + def v2_runner_on_unreachable(self, result): + host = result._host.get_name() + self.host_results[host] = { + 'status': 'unreachable', + 'packages_updated': [], + 'safety_checks': [], + 'reboot_required': False, + 'reboot_performed': False, + 'errors': [{ + 'task': 'connectivity', + 'host': host, + 'message': str(result._result.get('msg', 'Host unreachable')), + 'ignored': False + }] + } + self.errors.append({ + 'task': 'connectivity', + 'host': host, + 'message': str(result._result.get('msg', 'Host unreachable')) + }) + + def v2_playbook_on_stats(self, stats): + completed_at = time.time() + completed_iso = datetime.now(timezone.utc).isoformat() + elapsed_seconds = int(completed_at - self.started_at) + + # Get config from environment (set via Semaphore Variable Groups) + webhook_url = os.environ.get('N8N_WEBHOOK_URL', '') + client_id = os.environ.get('CLIENT_ID', 'UNKNOWN') + client_name = os.environ.get('CLIENT_NAME', 'Unknown Client') + billing_model = os.environ.get('BILLING_MODEL', 'hybrid') + human_estimate = int(os.environ.get('HUMAN_ESTIMATE_SECONDS', 2700)) + + # Determine overall status + hosts_ok = stats.processed + overall_status = 'success' + for host in stats.processed: + s = stats.summarize(host) + if s['failures'] > 0 or s['unreachable'] > 0: + overall_status = 'failed' if s['failures'] > 0 else 'unreachable' + break + if s['failures'] == 0 and s['unreachable'] == 0 and overall_status != 'failed': + overall_status = 'success' + + # Build per-host payloads + host_payloads = [] + for host, result in self.host_results.items(): + host_payloads.append({ + 'host': host, + 'status': result.get('status', 'unknown'), + 'packages_updated': result.get('packages_updated', []), + 'safety_checks': result.get('safety_checks', []), + 'reboot_required': result.get('reboot_required', False), + 'reboot_performed': result.get('reboot_performed', False), + 'snapshot_id': result.get('snapshot_id', None), + 'errors': result.get('errors', []) + }) + + payload = { + 'run_id': self.run_id, + 'playbook': getattr(self, 'playbook_name', 'unknown'), + 'client_id': client_id, + 'client_name': client_name, + 'billing_model': billing_model, + 'overall_status': overall_status, + 'timing': { + 'started_at': self.started_iso, + 'completed_at': completed_iso, + 'elapsed_seconds': elapsed_seconds, + 'human_estimate_seconds': human_estimate, + 'automation_saving_seconds': max(0, human_estimate - elapsed_seconds) + }, + 'hosts': host_payloads, + 'errors': self.errors, + 'warnings': self.warnings, + 'requires_human_review': len(self.errors) > 0 and overall_status == 'failed' + } + + self._post_to_webhook(webhook_url, payload) + + def _post_to_webhook(self, webhook_url, payload): + if not webhook_url: + self._display.warning('N8N_WEBHOOK_URL not set — skipping webhook notification') + return + + if not HAS_URLLIB: + self._display.warning('urllib not available — cannot post to webhook') + return + + try: + data = json.dumps(payload).encode('utf-8') + req = urllib.request.Request( + webhook_url, + data=data, + headers={ + 'Content-Type': 'application/json', + 'User-Agent': 'ansible-n8n-reporter/1.0' + }, + method='POST' + ) + with urllib.request.urlopen(req, timeout=30) as response: + self._display.display( + f'n8n webhook posted successfully — status {response.status} — run_id {payload["run_id"]}', + color='green' + ) + except Exception as e: + self._display.warning(f'Failed to post to n8n webhook: {str(e)}') + self._display.warning(f'Payload was: {json.dumps(payload, indent=2)}')