#!/usr/bin/env python3
"""
ā ļø DEPRECATED: N8N Workflow Documentation Generator (Legacy System)
šØ WARNING: This script generates a 71MB HTML file that is extremely slow to load.
It has been replaced by a modern FastAPI system that's 700x smaller and 10x faster.
š USE THE NEW SYSTEM INSTEAD:
1. pip install fastapi uvicorn
2. python3 api_server.py
3. Open http://localhost:8000
š PERFORMANCE COMPARISON:
Old System (this script): 71MB, 10+ seconds load time, poor mobile support
New System (api_server.py): <100KB, <1 second load time, excellent mobile support
ā” The new system provides:
- Instant full-text search with ranking
- Real-time filtering and statistics
- Professional responsive design
- Sub-100ms response times
- Dark/light theme support
This legacy script is kept for backwards compatibility only.
For the best experience, please use the new FastAPI documentation system.
Usage (NOT RECOMMENDED): python generate_documentation.py
"""
import json
import os
import glob
import datetime
import re # Added for regex support
from typing import Dict, List, Any, Optional, Tuple, Set
# Constants
DEFAULT_WORKFLOWS_DIR = "workflows"
class WorkflowAnalyzer:
"""Analyzes n8n workflow JSON files and generates documentation data."""
def __init__(self, workflows_dir: str = DEFAULT_WORKFLOWS_DIR):
self.workflows_dir = workflows_dir
self.workflows = []
self.stats = {
'total': 0,
'active': 0,
'inactive': 0,
'triggers': {},
'complexity': {'low': 0, 'medium': 0, 'high': 0},
'total_nodes': 0,
'integrations': set()
}
def analyze_all_workflows(self) -> Dict[str, Any]:
"""Analyze all workflow files and return comprehensive data."""
if not os.path.exists(self.workflows_dir):
print(f"Warning: Workflows directory '{self.workflows_dir}' not found.")
return self._get_empty_data()
json_files = glob.glob(os.path.join(self.workflows_dir, "*.json"))
if not json_files:
print(f"Warning: No JSON files found in '{self.workflows_dir}' directory.")
return self._get_empty_data()
print(f"Found {len(json_files)} workflow files. Analyzing...")
for file_path in json_files:
try:
workflow_data = self._analyze_workflow_file(file_path)
if workflow_data:
self.workflows.append(workflow_data)
except Exception as e:
print(f"Error analyzing {file_path}: {str(e)}")
continue
self._calculate_stats()
return {
'workflows': self.workflows,
'stats': self.stats,
'timestamp': datetime.datetime.now().isoformat()
}
def _analyze_workflow_file(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Analyze a single workflow file and extract metadata."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error reading {file_path}: {str(e)}")
return None
filename = os.path.basename(file_path)
# Extract basic metadata
workflow = {
'filename': filename,
'name': data.get('name', filename.replace('.json', '')),
'id': data.get('id', 'unknown'),
'active': data.get('active', False),
'nodes': data.get('nodes', []),
'connections': data.get('connections', {}),
'tags': data.get('tags', []),
'settings': data.get('settings', {}),
'createdAt': data.get('createdAt', ''),
'updatedAt': data.get('updatedAt', ''),
'versionId': data.get('versionId', '')
}
# Analyze nodes
node_count = len(workflow['nodes'])
workflow['nodeCount'] = node_count
# Determine complexity
if node_count <= 5:
complexity = 'low'
elif node_count <= 15:
complexity = 'medium'
else:
complexity = 'high'
workflow['complexity'] = complexity
# Find trigger type and integrations
trigger_type, integrations = self._analyze_nodes(workflow['nodes'])
workflow['triggerType'] = trigger_type
workflow['integrations'] = list(integrations)
# Generate description
workflow['description'] = self._generate_description(workflow, trigger_type, integrations)
# Extract or generate step-by-step process
steps = self._extract_or_generate_steps(workflow['nodes'], workflow['connections'])
workflow['steps'] = steps
# Debug logging
if steps:
print(f"Found/Generated {len(steps)} steps in workflow: {workflow['name']}")
# Generate workflow diagram code using mermaid.js (will be rendered on-demand)
workflow['diagram'] = self._generate_workflow_diagram(workflow['nodes'], workflow['connections'])
# Extract raw JSON for viewer
workflow['rawJson'] = json.dumps(data, indent=2)
return workflow
def _analyze_nodes(self, nodes: List[Dict]) -> Tuple[str, Set[str]]:
"""Analyze nodes to determine trigger type and integrations."""
trigger_type = 'Manual'
integrations = set()
for node in nodes:
node_type = node.get('type', '')
node_name = node.get('name', '')
# Determine trigger type
if 'webhook' in node_type.lower() or 'webhook' in node_name.lower():
trigger_type = 'Webhook'
elif 'cron' in node_type.lower() or 'schedule' in node_type.lower():
trigger_type = 'Scheduled'
elif 'trigger' in node_type.lower() and trigger_type == 'Manual':
if 'manual' not in node_type.lower():
trigger_type = 'Webhook' # Most non-manual triggers are webhook-based
# Extract integrations
if node_type.startswith('n8n-nodes-base.'):
service = node_type.replace('n8n-nodes-base.', '')
# Clean up service names
service = service.replace('Trigger', '').replace('trigger', '')
if service and service not in ['set', 'function', 'if', 'switch', 'merge', 'stickyNote']:
integrations.add(service.title())
# Determine if complex based on node variety and count
if len(nodes) > 10 and len(integrations) > 3:
trigger_type = 'Complex'
return trigger_type, integrations
def _generate_description(self, workflow: Dict, trigger_type: str, integrations: Set[str]) -> str:
"""Generate a descriptive summary of the workflow."""
name = workflow['name']
node_count = workflow['nodeCount']
# Start with trigger description
trigger_descriptions = {
'Webhook': "Webhook-triggered automation that",
'Scheduled': "Scheduled automation that",
'Complex': "Complex multi-step automation that",
}
desc = trigger_descriptions.get(trigger_type, "Manual workflow that")
# Add functionality based on name and integrations
if integrations:
main_services = list(integrations)[:3] # Top 3 services
if len(main_services) == 1:
desc += f" integrates with {main_services[0]}"
elif len(main_services) == 2:
desc += f" connects {main_services[0]} and {main_services[1]}"
else:
desc += f" orchestrates {', '.join(main_services[:-1])}, and {main_services[-1]}"
# Add workflow purpose hints from name
name_lower = name.lower()
if 'create' in name_lower:
desc += " to create new records"
elif 'update' in name_lower:
desc += " to update existing data"
elif 'sync' in name_lower:
desc += " to synchronize data"
elif 'notification' in name_lower or 'alert' in name_lower:
desc += " for notifications and alerts"
elif 'backup' in name_lower:
desc += " for data backup operations"
elif 'monitor' in name_lower:
desc += " for monitoring and reporting"
else:
desc += " for data processing"
desc += f". Uses {node_count} nodes"
if len(integrations) > 3:
desc += f" and integrates with {len(integrations)} services"
desc += ".";
return desc
def _extract_or_generate_steps(self, nodes: List[Dict], connections: Dict) -> List[Dict]:
"""Extract notes from nodes or generate steps from workflow structure."""
steps = []
# First, try to extract existing notes
nodes_with_notes = []
for node in nodes:
note = node.get('notes')
if note:
nodes_with_notes.append({
'name': node.get('name', ''),
'type': node.get('type', ''),
'note': note,
'id': node.get('id', '')
})
# If we have notes, use them
if nodes_with_notes:
return nodes_with_notes
# Otherwise, generate steps from workflow structure
return self._generate_steps_from_structure(nodes, connections)
def _generate_steps_from_structure(self, nodes: List[Dict], connections: Dict) -> List[Dict]:
"""Generate step descriptions from workflow node structure and connections."""
if not nodes:
return []
# Create a map of nodes by ID for easy lookup
node_map = {node.get('id', ''): node for node in nodes}
# Find the starting node (trigger or first node)
start_node = self._find_start_node(nodes, connections)
if not start_node:
# Fallback: just describe all nodes in order
return self._generate_basic_steps(nodes)
# Follow the workflow path
steps = []
visited = set()
self._traverse_workflow(start_node, node_map, connections, steps, visited)
return steps
def _find_start_node(self, nodes: List[Dict], connections: Dict) -> Optional[Dict]:
"""Find the starting node of the workflow (trigger or node with no inputs)."""
# Look for trigger nodes first
for node in nodes:
node_type = node.get('type', '').lower()
if any(trigger in node_type for trigger in ['trigger', 'webhook', 'cron', 'schedule', 'manual']):
return node
# Find nodes that are not targets of any connections
target_nodes = set()
for source_connections in connections.values():
if isinstance(source_connections, dict) and 'main' in source_connections:
for main_connections in source_connections['main']:
if isinstance(main_connections, list):
for connection in main_connections:
if isinstance(connection, dict) and 'node' in connection:
target_nodes.add(connection['node'])
# Return first node that's not a target
for node in nodes:
if node.get('name', '') not in target_nodes:
return node
# Fallback: return first node
return nodes[0] if nodes else None
def _traverse_workflow(self, current_node: Dict, node_map: Dict, connections: Dict, steps: List[Dict], visited: set):
"""Traverse the workflow following connections and generate step descriptions."""
node_name = current_node.get('name', '')
node_id = current_node.get('id', '')
if node_id in visited:
return
visited.add(node_id)
# Generate step description for current node
step_description = self._generate_step_description(current_node)
if step_description:
steps.append({
'name': node_name,
'type': current_node.get('type', ''),
'note': step_description,
'id': node_id
})
# Find next nodes
if node_name in connections:
node_connections = connections[node_name]
if isinstance(node_connections, dict) and 'main' in node_connections:
for main_connections in node_connections['main']:
if isinstance(main_connections, list):
for connection in main_connections:
if isinstance(connection, dict) and 'node' in connection:
next_node_name = connection['node']
next_node = None
for node in node_map.values():
if node.get('name') == next_node_name:
next_node = node
break
if next_node:
self._traverse_workflow(next_node, node_map, connections, steps, visited)
def _generate_step_description(self, node: Dict) -> str:
"""Generate a meaningful description for a workflow node based on its type and parameters."""
node_type = node.get('type', '')
node_name = node.get('name', '')
parameters = node.get('parameters', {})
# Clean up node type
clean_type = node_type.replace('n8n-nodes-base.', '').replace('Trigger', '').replace('trigger', '')
# Generate description based on node type
if 'webhook' in node_type.lower():
return f"Receives incoming webhook requests to trigger the workflow"
elif 'cron' in node_type.lower() or 'schedule' in node_type.lower():
return f"Runs on a scheduled basis to trigger the workflow automatically"
elif 'manual' in node_type.lower():
return f"Manual trigger to start the workflow execution"
elif 'http' in node_type.lower() or 'httpRequest' in node_type:
url = parameters.get('url', '')
method = parameters.get('method', 'GET')
return f"Makes {method} HTTP request" + (f" to {url}" if url else "")
elif 'set' in node_type.lower():
return f"Sets and transforms data values for use in subsequent steps"
elif 'if' in node_type.lower():
return f"Evaluates conditions to determine workflow path"
elif 'switch' in node_type.lower():
return f"Routes workflow execution based on multiple conditions"
elif 'function' in node_type.lower() or 'code' in node_type.lower():
return f"Executes custom JavaScript code for data processing"
elif 'merge' in node_type.lower():
return f"Combines data from multiple workflow branches"
elif 'split' in node_type.lower():
return f"Splits data into multiple items for parallel processing"
elif 'filter' in node_type.lower():
return f"Filters data based on specified conditions"
elif 'gmail' in node_type.lower():
operation = parameters.get('operation', 'send')
return f"Performs Gmail {operation} operation"
elif 'slack' in node_type.lower():
return f"Sends message or performs action in Slack"
elif 'discord' in node_type.lower():
return f"Sends message or performs action in Discord"
elif 'telegram' in node_type.lower():
return f"Sends message or performs action in Telegram"
elif 'airtable' in node_type.lower():
operation = parameters.get('operation', 'create')
return f"Performs Airtable {operation} operation on records"
elif 'google' in node_type.lower():
if 'sheets' in node_type.lower():
return f"Reads from or writes to Google Sheets"
elif 'drive' in node_type.lower():
return f"Manages files in Google Drive"
elif 'calendar' in node_type.lower():
return f"Manages Google Calendar events"
else:
return f"Integrates with Google {clean_type} service"
elif 'microsoft' in node_type.lower():
if 'outlook' in node_type.lower():
return f"Manages Microsoft Outlook emails"
elif 'excel' in node_type.lower():
return f"Works with Microsoft Excel files"
else:
return f"Integrates with Microsoft {clean_type} service"
elif 'openai' in node_type.lower():
return f"Processes data using OpenAI AI models"
elif 'anthropic' in node_type.lower():
return f"Processes data using Anthropic Claude AI"
elif 'database' in node_type.lower() or 'mysql' in node_type.lower() or 'postgres' in node_type.lower():
return f"Executes database operations"
elif 'wait' in node_type.lower():
return f"Pauses workflow execution for specified duration"
elif 'error' in node_type.lower():
return f"Handles errors and stops workflow execution"
else:
# Generic description based on service name
service_name = clean_type.title()
return f"Integrates with {service_name} to process data"
def _generate_basic_steps(self, nodes: List[Dict]) -> List[Dict]:
"""Generate basic steps when workflow structure is unclear."""
steps = []
for i, node in enumerate(nodes, 1):
description = self._generate_step_description(node)
if description:
steps.append({
'name': node.get('name', f'Step {i}'),
'type': node.get('type', ''),
'note': description,
'id': node.get('id', '')
})
return steps
def _calculate_stats(self):
"""Calculate statistics from analyzed workflows."""
self.stats['total'] = len(self.workflows)
for workflow in self.workflows:
# Active/inactive count
if workflow['active']:
self.stats['active'] += 1
else:
self.stats['inactive'] += 1
# Trigger type count
trigger = workflow['triggerType']
self.stats['triggers'][trigger] = self.stats['triggers'].get(trigger, 0) + 1
# Complexity count
complexity = workflow['complexity']
self.stats['complexity'][complexity] += 1
# Node count
self.stats['total_nodes'] += workflow['nodeCount']
# Integrations
self.stats['integrations'].update(workflow['integrations'])
# Convert integrations set to count
self.stats['unique_integrations'] = len(self.stats['integrations'])
self.stats['integrations'] = list(self.stats['integrations'])
def _get_empty_data(self) -> Dict[str, Any]:
"""Return empty data structure when no workflows found."""
return {
'workflows': [],
'stats': {
'total': 0,
'active': 0,
'inactive': 0,
'triggers': {},
'complexity': {'low': 0, 'medium': 0, 'high': 0},
'total_nodes': 0, 'unique_integrations': 0,
'integrations': []
},
'timestamp': datetime.datetime.now().isoformat()
}
def _generate_workflow_diagram(self, nodes: List[Dict], connections: Dict) -> str:
"""
Generate a mermaid.js workflow diagram showing node connections.
Args:
nodes: List of workflow nodes
connections: Dictionary of workflow connections
Returns:
str: Mermaid.js flowchart markup
"""
if not nodes:
return "graph TD\n EmptyWorkflow[No nodes found in workflow]"
# Create mapping for node names to ensure valid mermaid IDs
mermaid_ids = {}
for i, node in enumerate(nodes):
node_id = f"node{i}"
node_name = node.get('name', f'Node {i}')
mermaid_ids[node_name] = node_id
# Start building the mermaid diagram
mermaid_code = ["graph TD"]
# Add nodes with styling
for node in nodes:
node_name = node.get('name', 'Unnamed')
node_id = mermaid_ids[node_name]
node_type = node.get('type', '').replace('n8n-nodes-base.', '')
# Determine node style based on type
style = ""
if any(x in node_type.lower() for x in ['trigger', 'webhook', 'cron']):
style = "fill:#b3e0ff,stroke:#0066cc" # Blue for triggers
elif any(x in node_type.lower() for x in ['if', 'switch']):
style = "fill:#ffffb3,stroke:#e6e600" # Yellow for conditional nodes
elif any(x in node_type.lower() for x in ['function', 'code']):
style = "fill:#d9b3ff,stroke:#6600cc" # Purple for code nodes
elif 'error' in node_type.lower():
style = "fill:#ffb3b3,stroke:#cc0000" # Red for error handlers
else:
style = "fill:#d9d9d9,stroke:#666666" # Gray for other nodes
# Add node with label (escaping special characters)
# Use HTML line break instead of \n for better compatibility
clean_name = node_name.replace('"', "'")
clean_type = node_type.replace('"', "'")
label = f"{clean_name}
({clean_type})"
mermaid_code.append(f" {node_id}[\"{label}\"]")
mermaid_code.append(f" style {node_id} {style}")
# Add connections between nodes correctly based on n8n connection structure
for source_name, source_connections in connections.items():
if source_name not in mermaid_ids:
continue
if isinstance(source_connections, dict) and 'main' in source_connections:
main_connections = source_connections['main']
for i, output_connections in enumerate(main_connections):
if not isinstance(output_connections, list):
continue
for connection in output_connections:
if not isinstance(connection, dict) or 'node' not in connection:
continue
target_name = connection['node']
if target_name not in mermaid_ids:
continue
# Add arrow with output index if multiple outputs
label = f" -->|{i}| " if len(main_connections) > 1 else " --> "
mermaid_code.append(f" {mermaid_ids[source_name]}{label}{mermaid_ids[target_name]}")
# Format the final mermaid diagram code
return "\n".join(mermaid_code)
def generate_html_documentation(data: Dict[str, Any]) -> str:
"""Generate the complete HTML documentation with embedded data."""
# Convert Python data to JavaScript with proper escaping
js_data = json.dumps(data, indent=2, ensure_ascii=False)
# Escape any script tags and HTML entities in the JSON data
js_data = js_data.replace('', '<\\/script>').replace('