#!/usr/bin/env python3 """ N8N Workflow Documentation Generator This script analyzes n8n workflow JSON files and generates a comprehensive HTML documentation page. It performs static analysis of the workflow files to extract metadata, categorize workflows, and create an interactive documentation interface. Usage: python generate_documentation.py """ import json import os import glob import datetime from typing import Dict, List, Any, Optional, Tuple, Set # Constants DEFAULT_WORKFLOWS_DIR = "workflows" class WorkflowAnalyzer: """Analyzes n8n workflow JSON files and generates documentation data.""" def __init__(self, workflows_dir: str = DEFAULT_WORKFLOWS_DIR): self.workflows_dir = workflows_dir self.workflows = [] self.stats = { 'total': 0, 'active': 0, 'inactive': 0, 'triggers': {}, 'complexity': {'low': 0, 'medium': 0, 'high': 0}, 'total_nodes': 0, 'integrations': set() } def analyze_all_workflows(self) -> Dict[str, Any]: """Analyze all workflow files and return comprehensive data.""" if not os.path.exists(self.workflows_dir): print(f"Warning: Workflows directory '{self.workflows_dir}' not found.") return self._get_empty_data() json_files = glob.glob(os.path.join(self.workflows_dir, "*.json")) if not json_files: print(f"Warning: No JSON files found in '{self.workflows_dir}' directory.") return self._get_empty_data() print(f"Found {len(json_files)} workflow files. Analyzing...") for file_path in json_files: try: workflow_data = self._analyze_workflow_file(file_path) if workflow_data: self.workflows.append(workflow_data) except Exception as e: print(f"Error analyzing {file_path}: {str(e)}") continue self._calculate_stats() return { 'workflows': self.workflows, 'stats': self.stats, 'timestamp': datetime.datetime.now().isoformat() } def _analyze_workflow_file(self, file_path: str) -> Optional[Dict[str, Any]]: """Analyze a single workflow file and extract metadata.""" try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) except (json.JSONDecodeError, UnicodeDecodeError) as e: print(f"Error reading {file_path}: {str(e)}") return None filename = os.path.basename(file_path) # Extract basic metadata workflow = { 'filename': filename, 'name': data.get('name', filename.replace('.json', '')), 'id': data.get('id', 'unknown'), 'active': data.get('active', False), 'nodes': data.get('nodes', []), 'connections': data.get('connections', {}), 'tags': data.get('tags', []), 'settings': data.get('settings', {}), 'createdAt': data.get('createdAt', ''), 'updatedAt': data.get('updatedAt', ''), 'versionId': data.get('versionId', '') } # Analyze nodes node_count = len(workflow['nodes']) workflow['nodeCount'] = node_count # Determine complexity if node_count <= 5: complexity = 'low' elif node_count <= 15: complexity = 'medium' else: complexity = 'high' workflow['complexity'] = complexity # Find trigger type and integrations trigger_type, integrations = self._analyze_nodes(workflow['nodes']) workflow['triggerType'] = trigger_type workflow['integrations'] = list(integrations) # Generate description workflow['description'] = self._generate_description(workflow, trigger_type, integrations) # Extract or generate step-by-step process steps = self._extract_or_generate_steps(workflow['nodes'], workflow['connections']) workflow['steps'] = steps # Debug logging if steps: print(f"Found/Generated {len(steps)} steps in workflow: {workflow['name']}") # Extract raw JSON for viewer workflow['rawJson'] = json.dumps(data, indent=2) return workflow def _analyze_nodes(self, nodes: List[Dict]) -> Tuple[str, Set[str]]: """Analyze nodes to determine trigger type and integrations.""" trigger_type = 'Manual' integrations = set() for node in nodes: node_type = node.get('type', '') node_name = node.get('name', '') # Determine trigger type if 'webhook' in node_type.lower() or 'webhook' in node_name.lower(): trigger_type = 'Webhook' elif 'cron' in node_type.lower() or 'schedule' in node_type.lower(): trigger_type = 'Scheduled' elif 'trigger' in node_type.lower() and trigger_type == 'Manual': if 'manual' not in node_type.lower(): trigger_type = 'Webhook' # Most non-manual triggers are webhook-based # Extract integrations if node_type.startswith('n8n-nodes-base.'): service = node_type.replace('n8n-nodes-base.', '') # Clean up service names service = service.replace('Trigger', '').replace('trigger', '') if service and service not in ['set', 'function', 'if', 'switch', 'merge', 'stickyNote']: integrations.add(service.title()) # Determine if complex based on node variety and count if len(nodes) > 10 and len(integrations) > 3: trigger_type = 'Complex' return trigger_type, integrations def _generate_description(self, workflow: Dict, trigger_type: str, integrations: Set[str]) -> str: """Generate a descriptive summary of the workflow.""" name = workflow['name'] node_count = workflow['nodeCount'] # Start with trigger description trigger_descriptions = { 'Webhook': "Webhook-triggered automation that", 'Scheduled': "Scheduled automation that", 'Complex': "Complex multi-step automation that", } desc = trigger_descriptions.get(trigger_type, "Manual workflow that") # Add functionality based on name and integrations if integrations: main_services = list(integrations)[:3] # Top 3 services if len(main_services) == 1: desc += f" integrates with {main_services[0]}" elif len(main_services) == 2: desc += f" connects {main_services[0]} and {main_services[1]}" else: desc += f" orchestrates {', '.join(main_services[:-1])}, and {main_services[-1]}" # Add workflow purpose hints from name name_lower = name.lower() if 'create' in name_lower: desc += " to create new records" elif 'update' in name_lower: desc += " to update existing data" elif 'sync' in name_lower: desc += " to synchronize data" elif 'notification' in name_lower or 'alert' in name_lower: desc += " for notifications and alerts" elif 'backup' in name_lower: desc += " for data backup operations" elif 'monitor' in name_lower: desc += " for monitoring and reporting" else: desc += " for data processing" desc += f". Uses {node_count} nodes" if len(integrations) > 3: desc += f" and integrates with {len(integrations)} services" desc += "." return desc def _extract_or_generate_steps(self, nodes: List[Dict], connections: Dict) -> List[Dict]: """Extract notes from nodes or generate steps from workflow structure.""" steps = [] # First, try to extract existing notes nodes_with_notes = [] for node in nodes: note = node.get('notes') if note: nodes_with_notes.append({ 'name': node.get('name', ''), 'type': node.get('type', ''), 'note': note, 'id': node.get('id', '') }) # If we have notes, use them if nodes_with_notes: return nodes_with_notes # Otherwise, generate steps from workflow structure return self._generate_steps_from_structure(nodes, connections) def _generate_steps_from_structure(self, nodes: List[Dict], connections: Dict) -> List[Dict]: """Generate step descriptions from workflow node structure and connections.""" if not nodes: return [] # Create a map of nodes by ID for easy lookup node_map = {node.get('id', ''): node for node in nodes} # Find the starting node (trigger or first node) start_node = self._find_start_node(nodes, connections) if not start_node: # Fallback: just describe all nodes in order return self._generate_basic_steps(nodes) # Follow the workflow path steps = [] visited = set() self._traverse_workflow(start_node, node_map, connections, steps, visited) return steps def _find_start_node(self, nodes: List[Dict], connections: Dict) -> Optional[Dict]: """Find the starting node of the workflow (trigger or node with no inputs).""" # Look for trigger nodes first for node in nodes: node_type = node.get('type', '').lower() if any(trigger in node_type for trigger in ['trigger', 'webhook', 'cron', 'schedule', 'manual']): return node # Find nodes that are not targets of any connections target_nodes = set() for source_connections in connections.values(): if isinstance(source_connections, dict) and 'main' in source_connections: for main_connections in source_connections['main']: if isinstance(main_connections, list): for connection in main_connections: if isinstance(connection, dict) and 'node' in connection: target_nodes.add(connection['node']) # Return first node that's not a target for node in nodes: if node.get('name', '') not in target_nodes: return node # Fallback: return first node return nodes[0] if nodes else None def _traverse_workflow(self, current_node: Dict, node_map: Dict, connections: Dict, steps: List[Dict], visited: set): """Traverse the workflow following connections and generate step descriptions.""" node_name = current_node.get('name', '') node_id = current_node.get('id', '') if node_id in visited: return visited.add(node_id) # Generate step description for current node step_description = self._generate_step_description(current_node) if step_description: steps.append({ 'name': node_name, 'type': current_node.get('type', ''), 'note': step_description, 'id': node_id }) # Find next nodes if node_name in connections: node_connections = connections[node_name] if isinstance(node_connections, dict) and 'main' in node_connections: for main_connections in node_connections['main']: if isinstance(main_connections, list): for connection in main_connections: if isinstance(connection, dict) and 'node' in connection: next_node_name = connection['node'] next_node = None for node in node_map.values(): if node.get('name') == next_node_name: next_node = node break if next_node: self._traverse_workflow(next_node, node_map, connections, steps, visited) def _generate_step_description(self, node: Dict) -> str: """Generate a meaningful description for a workflow node based on its type and parameters.""" node_type = node.get('type', '') node_name = node.get('name', '') parameters = node.get('parameters', {}) # Clean up node type clean_type = node_type.replace('n8n-nodes-base.', '').replace('Trigger', '').replace('trigger', '') # Generate description based on node type if 'webhook' in node_type.lower(): return f"Receives incoming webhook requests to trigger the workflow" elif 'cron' in node_type.lower() or 'schedule' in node_type.lower(): return f"Runs on a scheduled basis to trigger the workflow automatically" elif 'manual' in node_type.lower(): return f"Manual trigger to start the workflow execution" elif 'http' in node_type.lower() or 'httpRequest' in node_type: url = parameters.get('url', '') method = parameters.get('method', 'GET') return f"Makes {method} HTTP request" + (f" to {url}" if url else "") elif 'set' in node_type.lower(): return f"Sets and transforms data values for use in subsequent steps" elif 'if' in node_type.lower(): return f"Evaluates conditions to determine workflow path" elif 'switch' in node_type.lower(): return f"Routes workflow execution based on multiple conditions" elif 'function' in node_type.lower() or 'code' in node_type.lower(): return f"Executes custom JavaScript code for data processing" elif 'merge' in node_type.lower(): return f"Combines data from multiple workflow branches" elif 'split' in node_type.lower(): return f"Splits data into multiple items for parallel processing" elif 'filter' in node_type.lower(): return f"Filters data based on specified conditions" elif 'gmail' in node_type.lower(): operation = parameters.get('operation', 'send') return f"Performs Gmail {operation} operation" elif 'slack' in node_type.lower(): return f"Sends message or performs action in Slack" elif 'discord' in node_type.lower(): return f"Sends message or performs action in Discord" elif 'telegram' in node_type.lower(): return f"Sends message or performs action in Telegram" elif 'airtable' in node_type.lower(): operation = parameters.get('operation', 'create') return f"Performs Airtable {operation} operation on records" elif 'google' in node_type.lower(): if 'sheets' in node_type.lower(): return f"Reads from or writes to Google Sheets" elif 'drive' in node_type.lower(): return f"Manages files in Google Drive" elif 'calendar' in node_type.lower(): return f"Manages Google Calendar events" else: return f"Integrates with Google {clean_type} service" elif 'microsoft' in node_type.lower(): if 'outlook' in node_type.lower(): return f"Manages Microsoft Outlook emails" elif 'excel' in node_type.lower(): return f"Works with Microsoft Excel files" else: return f"Integrates with Microsoft {clean_type} service" elif 'openai' in node_type.lower(): return f"Processes data using OpenAI AI models" elif 'anthropic' in node_type.lower(): return f"Processes data using Anthropic Claude AI" elif 'database' in node_type.lower() or 'mysql' in node_type.lower() or 'postgres' in node_type.lower(): return f"Executes database operations" elif 'wait' in node_type.lower(): return f"Pauses workflow execution for specified duration" elif 'error' in node_type.lower(): return f"Handles errors and stops workflow execution" else: # Generic description based on service name service_name = clean_type.title() return f"Integrates with {service_name} to process data" def _generate_basic_steps(self, nodes: List[Dict]) -> List[Dict]: """Generate basic steps when workflow structure is unclear.""" steps = [] for i, node in enumerate(nodes, 1): description = self._generate_step_description(node) if description: steps.append({ 'name': node.get('name', f'Step {i}'), 'type': node.get('type', ''), 'note': description, 'id': node.get('id', '') }) return steps def _calculate_stats(self): """Calculate statistics from analyzed workflows.""" self.stats['total'] = len(self.workflows) for workflow in self.workflows: # Active/inactive count if workflow['active']: self.stats['active'] += 1 else: self.stats['inactive'] += 1 # Trigger type count trigger = workflow['triggerType'] self.stats['triggers'][trigger] = self.stats['triggers'].get(trigger, 0) + 1 # Complexity count complexity = workflow['complexity'] self.stats['complexity'][complexity] += 1 # Node count self.stats['total_nodes'] += workflow['nodeCount'] # Integrations self.stats['integrations'].update(workflow['integrations']) # Convert integrations set to count self.stats['unique_integrations'] = len(self.stats['integrations']) self.stats['integrations'] = list(self.stats['integrations']) def _get_empty_data(self) -> Dict[str, Any]: """Return empty data structure when no workflows found.""" return { 'workflows': [], 'stats': { 'total': 0, 'active': 0, 'inactive': 0, 'triggers': {}, 'complexity': {'low': 0, 'medium': 0, 'high': 0}, 'total_nodes': 0, 'unique_integrations': 0, 'integrations': [] }, 'timestamp': datetime.datetime.now().isoformat() } def generate_html_documentation(data: Dict[str, Any]) -> str: """Generate the complete HTML documentation with embedded data.""" # Convert Python data to JavaScript with proper escaping js_data = json.dumps(data, indent=2, ensure_ascii=False) # Escape any script tags and HTML entities in the JSON data js_data = js_data.replace('', '<\\/script>').replace(' ''' return html_template def main(): """Main function to generate the workflow documentation.""" print("🔍 N8N Workflow Documentation Generator") print("=" * 50) # Initialize analyzer analyzer = WorkflowAnalyzer() # Analyze workflows data = analyzer.analyze_all_workflows() # Generate HTML print("📝 Generating HTML documentation...") html_content = generate_html_documentation(data) # Write HTML file output_file = "workflow-documentation.html" with open(output_file, 'w', encoding='utf-8') as f: f.write(html_content) print(f"✅ Documentation generated successfully!") print(f"📄 Output file: {output_file}") print(f"📊 Analyzed {data['stats']['total']} workflows") print(f"🔗 Open {output_file} in your browser to view the documentation") if __name__ == "__main__": main()