Add n8n callback plugin — structured JSON webhook reporter with timing and version diffs

This commit is contained in:
Semaphore
2026-03-10 14:25:52 -07:00
parent 187933b8c0
commit 3d8cc2ada1

View File

@@ -1,9 +1,235 @@
# n8n_reporter.py
# Ansible callback plugin
# Posts structured patch run results to n8n webhook
# Full implementation to follow
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
callback: n8n_reporter
type: notification
short_description: Posts patch run results to n8n webhook
short_description: Posts structured patch run results to n8n webhook
description:
- Posts a structured JSON payload to an n8n webhook at the end of every run
- Includes package version diffs, timing, safety check results, and errors
- Supports both success and failure reporting
'''
import json
import time
import uuid
import os
from datetime import datetime, timezone
from ansible.plugins.callback import CallbackBase
try:
import urllib.request
import urllib.error
HAS_URLLIB = True
except ImportError:
HAS_URLLIB = False
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'n8n_reporter'
CALLBACK_NEEDS_ENABLED = True
def __init__(self):
super(CallbackModule, self).__init__()
self.run_id = str(uuid.uuid4())
self.started_at = time.time()
self.started_iso = datetime.now(timezone.utc).isoformat()
self.errors = []
self.warnings = []
self.host_results = {}
self.play_vars = {}
self.task_timings = {}
self.current_task_start = None
self.current_task_name = None
def v2_playbook_on_start(self, playbook):
self.playbook_name = os.path.basename(playbook._file_name)
def v2_playbook_on_play_start(self, play):
self.current_play = play
vm = play.get_variable_manager()
self.play_vars = vm.get_vars()
def v2_playbook_on_task_start(self, task, is_conditional):
self.current_task_name = task.get_name()
self.current_task_start = time.time()
def v2_runner_on_ok(self, result):
host = result._host.get_name()
if host not in self.host_results:
self.host_results[host] = {
'status': 'success',
'packages_updated': [],
'safety_checks': [],
'reboot_required': False,
'reboot_performed': False,
'errors': []
}
# Capture packages_updated fact if set
if 'ansible_facts' in result._result:
facts = result._result['ansible_facts']
if 'packages_updated' in facts:
self.host_results[host]['packages_updated'] = facts['packages_updated']
if 'host_reboot_required' in facts:
self.host_results[host]['reboot_required'] = facts['host_reboot_required']
if 'snapshot_id' in facts:
self.host_results[host]['snapshot_id'] = facts['snapshot_id']
# Capture assert results as safety checks
if self.current_task_name and any(
kw in self.current_task_name.lower()
for kw in ['assert', 'check', 'verify', 'preflight']
):
self.host_results[host]['safety_checks'].append({
'check': self.current_task_name,
'passed': True
})
def v2_runner_on_failed(self, result, ignore_errors=False):
host = result._host.get_name()
if host not in self.host_results:
self.host_results[host] = {
'status': 'failed',
'packages_updated': [],
'safety_checks': [],
'reboot_required': False,
'reboot_performed': False,
'errors': []
}
error_detail = {
'task': self.current_task_name,
'host': host,
'message': str(result._result.get('msg', result._result.get('stderr', 'Unknown error'))),
'ignored': ignore_errors
}
self.host_results[host]['errors'].append(error_detail)
self.errors.append(error_detail)
if self.current_task_name and any(
kw in self.current_task_name.lower()
for kw in ['assert', 'check', 'verify', 'preflight']
):
self.host_results[host]['safety_checks'].append({
'check': self.current_task_name,
'passed': False,
'error': error_detail['message']
})
if not ignore_errors:
self.host_results[host]['status'] = 'failed'
def v2_runner_on_unreachable(self, result):
host = result._host.get_name()
self.host_results[host] = {
'status': 'unreachable',
'packages_updated': [],
'safety_checks': [],
'reboot_required': False,
'reboot_performed': False,
'errors': [{
'task': 'connectivity',
'host': host,
'message': str(result._result.get('msg', 'Host unreachable')),
'ignored': False
}]
}
self.errors.append({
'task': 'connectivity',
'host': host,
'message': str(result._result.get('msg', 'Host unreachable'))
})
def v2_playbook_on_stats(self, stats):
completed_at = time.time()
completed_iso = datetime.now(timezone.utc).isoformat()
elapsed_seconds = int(completed_at - self.started_at)
# Get config from environment (set via Semaphore Variable Groups)
webhook_url = os.environ.get('N8N_WEBHOOK_URL', '')
client_id = os.environ.get('CLIENT_ID', 'UNKNOWN')
client_name = os.environ.get('CLIENT_NAME', 'Unknown Client')
billing_model = os.environ.get('BILLING_MODEL', 'hybrid')
human_estimate = int(os.environ.get('HUMAN_ESTIMATE_SECONDS', 2700))
# Determine overall status
hosts_ok = stats.processed
overall_status = 'success'
for host in stats.processed:
s = stats.summarize(host)
if s['failures'] > 0 or s['unreachable'] > 0:
overall_status = 'failed' if s['failures'] > 0 else 'unreachable'
break
if s['failures'] == 0 and s['unreachable'] == 0 and overall_status != 'failed':
overall_status = 'success'
# Build per-host payloads
host_payloads = []
for host, result in self.host_results.items():
host_payloads.append({
'host': host,
'status': result.get('status', 'unknown'),
'packages_updated': result.get('packages_updated', []),
'safety_checks': result.get('safety_checks', []),
'reboot_required': result.get('reboot_required', False),
'reboot_performed': result.get('reboot_performed', False),
'snapshot_id': result.get('snapshot_id', None),
'errors': result.get('errors', [])
})
payload = {
'run_id': self.run_id,
'playbook': getattr(self, 'playbook_name', 'unknown'),
'client_id': client_id,
'client_name': client_name,
'billing_model': billing_model,
'overall_status': overall_status,
'timing': {
'started_at': self.started_iso,
'completed_at': completed_iso,
'elapsed_seconds': elapsed_seconds,
'human_estimate_seconds': human_estimate,
'automation_saving_seconds': max(0, human_estimate - elapsed_seconds)
},
'hosts': host_payloads,
'errors': self.errors,
'warnings': self.warnings,
'requires_human_review': len(self.errors) > 0 and overall_status == 'failed'
}
self._post_to_webhook(webhook_url, payload)
def _post_to_webhook(self, webhook_url, payload):
if not webhook_url:
self._display.warning('N8N_WEBHOOK_URL not set — skipping webhook notification')
return
if not HAS_URLLIB:
self._display.warning('urllib not available — cannot post to webhook')
return
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(
webhook_url,
data=data,
headers={
'Content-Type': 'application/json',
'User-Agent': 'ansible-n8n-reporter/1.0'
},
method='POST'
)
with urllib.request.urlopen(req, timeout=30) as response:
self._display.display(
f'n8n webhook posted successfully — status {response.status} — run_id {payload["run_id"]}',
color='green'
)
except Exception as e:
self._display.warning(f'Failed to post to n8n webhook: {str(e)}')
self._display.warning(f'Payload was: {json.dumps(payload, indent=2)}')