#!/usr/bin/env python3 """ FastAPI Server for N8N Workflow Documentation High-performance API with sub-100ms response times. """ from fastapi import FastAPI, HTTPException, Query, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from pydantic import BaseModel, validator from typing import Optional, List, Dict, Any import json import os import asyncio from pathlib import Path import uvicorn from workflow_db import WorkflowDatabase # Initialize FastAPI app app = FastAPI( title="N8N Workflow Documentation API", description="Fast API for browsing and searching workflow documentation", version="2.0.0" ) # Add middleware for performance app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize database db = WorkflowDatabase() # Response models class WorkflowSummary(BaseModel): id: Optional[int] = None filename: str name: str active: bool description: str = "" trigger_type: str = "Manual" complexity: str = "low" node_count: int = 0 integrations: List[str] = [] tags: List[str] = [] created_at: Optional[str] = None updated_at: Optional[str] = None class Config: # Allow conversion of int to bool for active field validate_assignment = True @validator('active', pre=True) def convert_active(cls, v): if isinstance(v, int): return bool(v) return v class SearchResponse(BaseModel): workflows: List[WorkflowSummary] total: int page: int per_page: int pages: int query: str filters: Dict[str, Any] class StatsResponse(BaseModel): total: int active: int inactive: int triggers: Dict[str, int] complexity: Dict[str, int] total_nodes: int unique_integrations: int last_indexed: str @app.get("/") async def root(): """Serve the main documentation page.""" static_dir = Path("static") index_file = static_dir / "index.html" if not index_file.exists(): return HTMLResponse("""
Static files not found. Please ensure the static directory exists with index.html
Current directory: """ + str(Path.cwd()) + """
""") return FileResponse(str(index_file)) @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "message": "N8N Workflow API is running"} @app.get("/api/stats", response_model=StatsResponse) async def get_stats(): """Get workflow database statistics.""" try: stats = db.get_stats() return StatsResponse(**stats) except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}") @app.get("/api/workflows", response_model=SearchResponse) async def search_workflows( q: str = Query("", description="Search query"), trigger: str = Query("all", description="Filter by trigger type"), complexity: str = Query("all", description="Filter by complexity"), active_only: bool = Query(False, description="Show only active workflows"), page: int = Query(1, ge=1, description="Page number"), per_page: int = Query(20, ge=1, le=100, description="Items per page") ): """Search and filter workflows with pagination.""" try: offset = (page - 1) * per_page workflows, total = db.search_workflows( query=q, trigger_filter=trigger, complexity_filter=complexity, active_only=active_only, limit=per_page, offset=offset ) # Convert to Pydantic models with error handling workflow_summaries = [] for workflow in workflows: try: # Remove extra fields that aren't in the model clean_workflow = { 'id': workflow.get('id'), 'filename': workflow.get('filename', ''), 'name': workflow.get('name', ''), 'active': workflow.get('active', False), 'description': workflow.get('description', ''), 'trigger_type': workflow.get('trigger_type', 'Manual'), 'complexity': workflow.get('complexity', 'low'), 'node_count': workflow.get('node_count', 0), 'integrations': workflow.get('integrations', []), 'tags': workflow.get('tags', []), 'created_at': workflow.get('created_at'), 'updated_at': workflow.get('updated_at') } workflow_summaries.append(WorkflowSummary(**clean_workflow)) except Exception as e: print(f"Error converting workflow {workflow.get('filename', 'unknown')}: {e}") # Continue with other workflows instead of failing completely continue pages = (total + per_page - 1) // per_page # Ceiling division return SearchResponse( workflows=workflow_summaries, total=total, page=page, per_page=per_page, pages=pages, query=q, filters={ "trigger": trigger, "complexity": complexity, "active_only": active_only } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error searching workflows: {str(e)}") @app.get("/api/workflows/{filename}") async def get_workflow_detail(filename: str): """Get detailed workflow information including raw JSON.""" try: # Get workflow metadata from database workflows, _ = db.search_workflows(f'filename:"{filename}"', limit=1) if not workflows: raise HTTPException(status_code=404, detail="Workflow not found") workflow_meta = workflows[0] # Load raw JSON from file file_path = os.path.join("workflows", filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="Workflow file not found") with open(file_path, 'r', encoding='utf-8') as f: raw_json = json.load(f) return { "metadata": workflow_meta, "raw_json": raw_json } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Error loading workflow: {str(e)}") @app.get("/api/workflows/{filename}/download") async def download_workflow(filename: str): """Download workflow JSON file.""" file_path = os.path.join("workflows", filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="Workflow file not found") return FileResponse( file_path, media_type="application/json", filename=filename ) @app.get("/api/workflows/{filename}/diagram") async def get_workflow_diagram(filename: str): """Get Mermaid diagram code for workflow visualization.""" try: file_path = os.path.join("workflows", filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="Workflow file not found") with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) nodes = data.get('nodes', []) connections = data.get('connections', {}) # Generate Mermaid diagram diagram = generate_mermaid_diagram(nodes, connections) return {"diagram": diagram} except Exception as e: raise HTTPException(status_code=500, detail=f"Error generating diagram: {str(e)}") def generate_mermaid_diagram(nodes: List[Dict], connections: Dict) -> str: """Generate Mermaid.js flowchart code from workflow nodes and connections.""" if not nodes: return "graph TD\n EmptyWorkflow[No nodes found in workflow]" # Create mapping for node names to ensure valid mermaid IDs mermaid_ids = {} for i, node in enumerate(nodes): node_id = f"node{i}" node_name = node.get('name', f'Node {i}') mermaid_ids[node_name] = node_id # Start building the mermaid diagram mermaid_code = ["graph TD"] # Add nodes with styling for node in nodes: node_name = node.get('name', 'Unnamed') node_id = mermaid_ids[node_name] node_type = node.get('type', '').replace('n8n-nodes-base.', '') # Determine node style based on type style = "" if any(x in node_type.lower() for x in ['trigger', 'webhook', 'cron']): style = "fill:#b3e0ff,stroke:#0066cc" # Blue for triggers elif any(x in node_type.lower() for x in ['if', 'switch']): style = "fill:#ffffb3,stroke:#e6e600" # Yellow for conditional nodes elif any(x in node_type.lower() for x in ['function', 'code']): style = "fill:#d9b3ff,stroke:#6600cc" # Purple for code nodes elif 'error' in node_type.lower(): style = "fill:#ffb3b3,stroke:#cc0000" # Red for error handlers else: style = "fill:#d9d9d9,stroke:#666666" # Gray for other nodes # Add node with label (escaping special characters) clean_name = node_name.replace('"', "'") clean_type = node_type.replace('"', "'") label = f"{clean_name}