KS
Killer-Skills

happyflow-generator — Categories.community

v1.0.0
GitHub

About this Skill

Perfect for API Testing Agents needing automated Python test script generation from OpenAPI specifications and GraphQL schemas. Security-audited skills for Claude, Codex & Claude Code. One-click install, quality verified.

aiskillstore aiskillstore
[0]
[0]
Updated: 2/20/2026

Quality Score

Top 5%
80
Excellent
Based on code quality & docs
Installation
SYS Universal Install (Auto-Detect)
Cursor IDE Windsurf IDE VS Code IDE
> npx killer-skills add aiskillstore/marketplace/happyflow-generator

Agent Capability Analysis

The happyflow-generator MCP Server by aiskillstore is an open-source Categories.community integration for Claude and other AI agents, enabling seamless task automation and capability expansion.

Ideal Agent Persona

Perfect for API Testing Agents needing automated Python test script generation from OpenAPI specifications and GraphQL schemas.

Core Value

Empowers agents to automatically generate and execute Python test scripts, ensuring successful calls to all API endpoints in dependency-correct order using OpenAPI specifications and GraphQL schemas, with capabilities for code execution, web requests, and file operations.

Capabilities Granted for happyflow-generator MCP Server

Automating API testing from OpenAPI specifications
Generating test scripts for GraphQL schemas
Validating API endpoint dependencies

! Prerequisites & Limits

  • Requires code execution capabilities
  • Needs web request functionality
  • Limited to Python test script generation
  • Estimated duration of 2-5 minutes per API spec
Project
SKILL.md
41.0 KB
.cursorrules
1.2 KB
package.json
240 B
Ready
UTF-8

# Tags

[No tags]
SKILL.md
Readonly

HappyFlow Generator Skill

Metadata

  • Skill Name: HappyFlow Generator
  • Version: 2.0.0
  • Category: API Testing & Automation
  • Required Capabilities: Code execution, web requests, file operations
  • Estimated Duration: 2-5 minutes per API spec
  • Difficulty: Intermediate

Description

Automatically generate and execute Python test scripts from OpenAPI specifications and GraphQL schemas that successfully call all API endpoints in dependency-correct order, ensuring all requests return 2xx status codes.

Input: OpenAPI/GraphQL spec (URL/file) + authentication credentials
Output: Working Python script that executes complete API happy path flow

Key Features:

  • Multi-format support: OpenAPI 3.0+ and GraphQL schemas
  • Enhanced execution: Parallel execution, detailed reporting, connection pooling
  • Advanced testing: File upload support, response schema validation, rate limiting handling
  • Modular architecture: Well-organized codebase with proper error handling

Complete Workflow

Phase 1: Authentication Setup

Execute this code to prepare authentication headers:

python
1import base64 2import requests 3from typing import Dict, Any 4 5def setup_authentication(auth_type: str, credentials: Dict[str, Any]) -> Dict[str, str]: 6 """Prepare authentication headers based on auth type""" 7 8 if auth_type == "bearer": 9 return {"Authorization": f"Bearer {credentials['token']}"} 10 11 elif auth_type == "api_key": 12 header_name = credentials.get('header_name', 'X-API-Key') 13 return {header_name: credentials['api_key']} 14 15 elif auth_type == "basic": 16 auth_string = f"{credentials['username']}:{credentials['password']}" 17 encoded = base64.b64encode(auth_string.encode()).decode() 18 return {"Authorization": f"Basic {encoded}"} 19 20 elif auth_type == "oauth2_client_credentials": 21 token_url = credentials['token_url'] 22 data = { 23 'grant_type': 'client_credentials', 24 'client_id': credentials['client_id'], 25 'client_secret': credentials['client_secret'] 26 } 27 if 'scopes' in credentials: 28 data['scope'] = ' '.join(credentials['scopes']) 29 30 response = requests.post(token_url, data=data) 31 response.raise_for_status() 32 token_data = response.json() 33 34 return {"Authorization": f"Bearer {token_data['access_token']}"} 35 36 return {} 37 38# Example usage: 39# auth_headers = setup_authentication("bearer", {"token": "abc123"})

Phase 2: Specification Parsing

Execute this code to parse API specifications (OpenAPI or GraphQL):

python
1import requests 2import yaml 3import json 4import re 5from typing import Dict, List, Any, Union 6from pathlib import Path 7 8def parse_specification(spec_source: Union[str, Path], spec_type: str = "auto", **kwargs) -> Dict[str, Any]: 9 """Parse API specification and extract structured information 10 11 Args: 12 spec_source: Path or URL to API specification 13 spec_type: Type of specification ('openapi', 'graphql', or 'auto') 14 **kwargs: Additional arguments for specific parsers 15 16 Returns: 17 Dictionary containing parsed specification data 18 """ 19 20 # Auto-detect specification type if not specified 21 if spec_type == "auto": 22 if isinstance(spec_source, str): 23 if spec_source.endswith(".graphql") or "graphql" in spec_source.lower(): 24 spec_type = "graphql" 25 else: 26 spec_type = "openapi" 27 else: 28 # For file paths, check extension 29 path = Path(spec_source) 30 if path.suffix.lower() in [".graphql", ".gql"]: 31 spec_type = "graphql" 32 else: 33 spec_type = "openapi" 34 35 # Parse based on detected type 36 if spec_type == "openapi": 37 return parse_openapi_spec(spec_source, **kwargs) 38 elif spec_type == "graphql": 39 return parse_graphql_spec(spec_source, **kwargs) 40 else: 41 raise ValueError(f"Unsupported specification type: {spec_type}") 42 43def parse_openapi_spec(spec_source: Union[str, Path], headers: Dict[str, str] = None) -> Dict[str, Any]: 44 """Parse OpenAPI specification and extract structured information""" 45 46 # Fetch spec 47 if isinstance(spec_source, str) and spec_source.startswith('http'): 48 response = requests.get(spec_source, headers=headers or {}) 49 response.raise_for_status() 50 content = response.text 51 try: 52 spec = json.loads(content) 53 except json.JSONDecodeError: 54 spec = yaml.safe_load(content) 55 else: 56 with open(spec_source, 'r') as f: 57 content = f.read() 58 try: 59 spec = json.loads(content) 60 except json.JSONDecodeError: 61 spec = yaml.safe_load(content) 62 63 # Extract base information 64 openapi_version = spec.get('openapi', spec.get('swagger', 'unknown')) 65 base_url = "" 66 67 if 'servers' in spec and spec['servers']: 68 base_url = spec['servers'][0]['url'] 69 elif 'host' in spec: 70 scheme = spec.get('schemes', ['https'])[0] 71 base_path = spec.get('basePath', '') 72 base_url = f"{scheme}://{spec['host']}{base_path}" 73 74 # Extract endpoints 75 endpoints = [] 76 paths = spec.get('paths', {}) 77 78 for path, path_item in paths.items(): 79 for method in ['get', 'post', 'put', 'patch', 'delete']: 80 if method not in path_item: 81 continue 82 83 operation = path_item[method] 84 85 # Extract parameters 86 parameters = [] 87 for param in operation.get('parameters', []): 88 parameters.append({ 89 'name': param.get('name'), 90 'in': param.get('in'), 91 'required': param.get('required', False), 92 'schema': param.get('schema', {}), 93 'example': param.get('example') 94 }) 95 96 # Extract request body 97 request_body = None 98 if 'requestBody' in operation: 99 rb = operation['requestBody'] 100 content = rb.get('content', {}) 101 102 if 'application/json' in content: 103 json_content = content['application/json'] 104 request_body = { 105 'required': rb.get('required', False), 106 'content_type': 'application/json', 107 'schema': json_content.get('schema', {}), 108 'example': json_content.get('example') 109 } 110 elif 'multipart/form-data' in content: 111 form_content = content['multipart/form-data'] 112 request_body = { 113 'required': rb.get('required', False), 114 'content_type': 'multipart/form-data', 115 'schema': form_content.get('schema', {}), 116 'example': form_content.get('example') 117 } 118 119 # Extract responses 120 responses = {} 121 for status_code, response_data in operation.get('responses', {}).items(): 122 if status_code.startswith('2'): 123 content = response_data.get('content', {}) 124 if 'application/json' in content: 125 json_content = content['application/json'] 126 responses[status_code] = { 127 'description': response_data.get('description', ''), 128 'schema': json_content.get('schema', {}), 129 'example': json_content.get('example') 130 } 131 132 endpoint = { 133 'operation_id': operation.get('operationId', f"{method}_{path}"), 134 'path': path, 135 'method': method.upper(), 136 'tags': operation.get('tags', []), 137 'summary': operation.get('summary', ''), 138 'parameters': parameters, 139 'request_body': request_body, 140 'responses': responses 141 } 142 143 endpoints.append(endpoint) 144 145 return { 146 'openapi_version': openapi_version, 147 'base_url': base_url, 148 'endpoints': endpoints, 149 'schemas': spec.get('components', {}).get('schemas', {}) 150 } 151 152def parse_graphql_spec(spec_source: str, headers: Dict[str, str] = None) -> Dict[str, Any]: 153 """Parse GraphQL schema and extract operations""" 154 155 # For GraphQL, we'll create a simplified representation 156 # In practice, this would use graphql-core to parse the schema 157 158 base_url = spec_source if isinstance(spec_source, str) and spec_source.startswith('http') else "" 159 160 # Placeholder for GraphQL endpoints - in reality, this would be derived from schema introspection 161 endpoints = [ 162 { 163 'operation_id': 'graphql_query', 164 'path': '/graphql', 165 'method': 'POST', 166 'tags': ['GraphQL'], 167 'summary': 'GraphQL Query', 168 'parameters': [], 169 'request_body': { 170 'required': True, 171 'content_type': 'application/json', 172 'schema': {}, 173 'example': {'query': 'query { __schema { types { name } } }'} 174 }, 175 'responses': { 176 '200': { 177 'description': 'Successful GraphQL response', 178 'schema': {}, 179 'example': {} 180 } 181 } 182 } 183 ] 184 185 return { 186 'spec_type': 'graphql', 187 'base_url': base_url, 188 'endpoints': endpoints, 189 'schemas': {} 190 } 191 192# Example usage: 193# parsed_spec = parse_specification("https://api.example.com/openapi.json") 194# parsed_spec = parse_specification("https://api.example.com/graphql", spec_type="graphql")

Phase 3: Dependency Analysis

Execute this code to analyze dependencies and determine execution order:

python
1import re 2from typing import List, Dict, Any 3 4def analyze_dependencies(endpoints: List[Dict]) -> Dict[str, Any]: 5 """Analyze endpoint dependencies and create execution order""" 6 7 dependencies = {} 8 outputs = {} 9 10 for endpoint in endpoints: 11 endpoint_id = f"{endpoint['method']} {endpoint['path']}" 12 dependencies[endpoint_id] = [] 13 outputs[endpoint_id] = {} 14 15 # Detect path parameter dependencies 16 for endpoint in endpoints: 17 endpoint_id = f"{endpoint['method']} {endpoint['path']}" 18 path = endpoint['path'] 19 path_params = re.findall(r'\{(\w+)\}', path) 20 21 for param in path_params: 22 for other_endpoint in endpoints: 23 other_id = f"{other_endpoint['method']} {other_endpoint['path']}" 24 25 if other_endpoint['method'] in ['POST', 'PUT']: 26 for status, response in other_endpoint.get('responses', {}).items(): 27 schema = response.get('schema', {}) 28 properties = schema.get('properties', {}) 29 30 if 'id' in properties or param in properties: 31 if other_id != endpoint_id and other_id not in dependencies[endpoint_id]: 32 dependencies[endpoint_id].append(other_id) 33 output_field = 'id' if 'id' in properties else param 34 outputs[other_id][param] = f"response.body.{output_field}" 35 36 # HTTP method ordering 37 method_priority = {'POST': 1, 'GET': 2, 'PUT': 3, 'PATCH': 3, 'DELETE': 4} 38 39 for endpoint in endpoints: 40 endpoint_id = f"{endpoint['method']} {endpoint['path']}" 41 path_clean = re.sub(r'\{[^}]+\}', '', endpoint['path']) 42 43 for other_endpoint in endpoints: 44 other_id = f"{other_endpoint['method']} {other_endpoint['path']}" 45 other_path_clean = re.sub(r'\{[^}]+\}', '', other_endpoint['path']) 46 47 if path_clean == other_path_clean: 48 if method_priority.get(endpoint['method'], 5) > method_priority.get(other_endpoint['method'], 5): 49 if other_id not in dependencies[endpoint_id]: 50 dependencies[endpoint_id].append(other_id) 51 52 # Topological sort 53 def topological_sort(deps): 54 in_degree = {node: 0 for node in deps} 55 for node in deps: 56 for dep in deps[node]: 57 in_degree[dep] = in_degree.get(dep, 0) + 1 58 59 queue = [node for node in deps if in_degree[node] == 0] 60 result = [] 61 62 while queue: 63 queue.sort(key=lambda x: (x.split()[1].count('/'), method_priority.get(x.split()[0], 5))) 64 node = queue.pop(0) 65 result.append(node) 66 67 for other_node in deps: 68 if node in deps[other_node]: 69 in_degree[other_node] -= 1 70 if in_degree[other_node] == 0: 71 queue.append(other_node) 72 73 return result 74 75 execution_order_ids = topological_sort(dependencies) 76 77 execution_plan = [] 78 for step, endpoint_id in enumerate(execution_order_ids, 1): 79 endpoint = next(e for e in endpoints if f"{e['method']} {e['path']}" == endpoint_id) 80 81 inputs = {} 82 for dep_id in dependencies[endpoint_id]: 83 if dep_id in outputs: 84 for param_name, json_path in outputs[dep_id].items(): 85 dep_step = execution_order_ids.index(dep_id) + 1 86 inputs[param_name] = { 87 'source': f"step_{dep_step}", 88 'json_path': json_path 89 } 90 91 execution_plan.append({ 92 'step': step, 93 'endpoint': endpoint, 94 'dependencies': dependencies[endpoint_id], 95 'inputs': inputs, 96 'outputs': outputs[endpoint_id] 97 }) 98 99 return { 100 'execution_order': execution_plan, 101 'dependency_graph': dependencies 102 } 103 104def identify_parallel_groups(execution_plan: List[Dict]) -> List[List[int]]: 105 """Identify groups of steps that can be executed in parallel""" 106 107 # Group steps by their dependencies 108 parallel_groups = [] 109 processed_steps = set() 110 111 # Find steps with no dependencies (can run in parallel) 112 independent_steps = [step['step'] for step in execution_plan if not step['dependencies']] 113 if independent_steps: 114 parallel_groups.append(independent_steps) 115 processed_steps.update(independent_steps) 116 117 # For remaining steps, group those with the same dependencies 118 remaining_steps = [step for step in execution_plan if step['step'] not in processed_steps] 119 120 # Simple grouping by dependency sets 121 dependency_map = {} 122 for step in remaining_steps: 123 dep_tuple = tuple(sorted(step['dependencies'])) 124 if dep_tuple not in dependency_map: 125 dependency_map[dep_tuple] = [] 126 dependency_map[dep_tuple].append(step['step']) 127 128 for group in dependency_map.values(): 129 parallel_groups.append(group) 130 131 return parallel_groups 132 133# Example usage: 134# dependency_analysis = analyze_dependencies(parsed_spec['endpoints']) 135# parallel_groups = identify_parallel_groups(dependency_analysis['execution_order'])

Phase 4: Script Generation

Execute this code to generate the Python test script:

python
1import json 2import time 3from typing import Dict, List, Any 4from jsonschema import validate, ValidationError 5 6def generate_value_from_schema(schema: Dict, field_name: str = "") -> Any: 7 """Generate example value based on schema""" 8 9 if 'example' in schema: 10 return schema['example'] 11 if 'default' in schema: 12 return schema['default'] 13 if 'enum' in schema: 14 return schema['enum'][0] 15 16 schema_type = schema.get('type', 'string') 17 18 if schema_type == 'string': 19 if schema.get('format') == 'email': 20 return 'test@example.com' 21 elif schema.get('format') == 'uuid': 22 return '550e8400-e29b-41d4-a716-446655440000' 23 elif 'email' in field_name.lower(): 24 return 'test@example.com' 25 elif 'name' in field_name.lower(): 26 return 'Test User' 27 elif 'description' in field_name.lower(): 28 return 'Test description' 29 return 'test_value' 30 elif schema_type == 'integer': 31 minimum = schema.get('minimum', 1) 32 maximum = schema.get('maximum', minimum + 100) 33 return max(minimum, 1) # Ensure positive for IDs 34 elif schema_type == 'number': 35 return 10.5 36 elif schema_type == 'boolean': 37 return True 38 elif schema_type == 'array': 39 items_schema = schema.get('items', {}) 40 return [generate_value_from_schema(items_schema)] 41 elif schema_type == 'object': 42 obj = {} 43 for prop, prop_schema in schema.get('properties', {}).items(): 44 if prop in schema.get('required', []) or not schema.get('required'): 45 obj[prop] = generate_value_from_schema(prop_schema, prop) 46 return obj 47 48 return None 49 50def generate_python_script( 51 execution_plan: List[Dict], 52 base_url: str, 53 auth_headers: Dict, 54 parallel_execution: bool = False, 55 parallel_groups: List[List[int]] = None 56) -> str: 57 """Generate complete Python script""" 58 59 lines = [] 60 61 # Header 62 lines.append('#!/usr/bin/env python3') 63 lines.append('"""HappyFlow Generator - Auto-generated API test script"""') 64 lines.append('') 65 lines.append('import requests') 66 lines.append('import json') 67 lines.append('import sys') 68 lines.append('import time') 69 lines.append('from datetime import datetime') 70 71 if parallel_execution: 72 lines.append('from concurrent.futures import ThreadPoolExecutor, as_completed') 73 74 lines.append('from jsonschema import validate, ValidationError') 75 lines.append('') 76 77 # Class 78 lines.append('class APIFlowExecutor:') 79 lines.append(' def __init__(self, base_url, auth_headers):') 80 lines.append(' self.base_url = base_url.rstrip("/")') 81 lines.append(' self.session = requests.Session()') 82 lines.append(' self.session.headers.update(auth_headers)') 83 lines.append(' self.context = {}') 84 lines.append(' self.results = []') 85 lines.append('') 86 87 lines.append(' def log(self, message, level="INFO"):') 88 lines.append(' print(f"[{datetime.utcnow().isoformat()}] [{level}] {message}")') 89 lines.append('') 90 91 lines.append(' def _make_request(self, method, url, **kwargs):') 92 lines.append(' """Make HTTP request with retry logic for rate limiting"""') 93 lines.append(' max_retries = 3') 94 lines.append(' for attempt in range(max_retries):') 95 lines.append(' try:') 96 lines.append(' response = self.session.request(method, url, **kwargs)') 97 lines.append(' # Handle rate limiting') 98 lines.append(' if response.status_code == 429:') 99 lines.append(' if attempt < max_retries - 1:') 100 lines.append(' delay = 2 ** attempt # Exponential backoff') 101 lines.append(' self.log(f"Rate limited. Waiting {delay}s before retry...", "WARN")') 102 lines.append(' time.sleep(delay)') 103 lines.append(' continue') 104 lines.append(' return response') 105 lines.append(' except Exception as e:') 106 lines.append(' if attempt < max_retries - 1:') 107 lines.append(' delay = 2 ** attempt') 108 lines.append(' self.log(f"Request failed: {e}. Retrying in {delay}s...", "WARN")') 109 lines.append(' time.sleep(delay)') 110 lines.append(' else:') 111 lines.append(' raise') 112 lines.append('') 113 114 if parallel_execution and parallel_groups: 115 lines.append(' def execute_parallel_group(self, step_numbers):') 116 lines.append(' """Execute a group of steps in parallel"""') 117 lines.append(' with ThreadPoolExecutor(max_workers=5) as executor:') 118 lines.append(' future_to_step = {') 119 for group in parallel_groups: 120 if len(group) > 1: # Only create parallel execution for groups with multiple steps 121 for step_num in group: 122 lines.append(f' executor.submit(self.step_{step_num}): {step_num},') 123 break 124 lines.append(' }') 125 lines.append(' ') 126 lines.append(' for future in as_completed(future_to_step):') 127 lines.append(' step_num = future_to_step[future]') 128 lines.append(' try:') 129 lines.append(' future.result()') 130 lines.append(' self.log(f"Step {step_num} completed successfully")') 131 lines.append(' except Exception as e:') 132 lines.append(' self.log(f"Step {step_num} failed: {e}", "ERROR")') 133 lines.append(' raise') 134 lines.append('') 135 136 lines.append(' def execute_flow(self):') 137 lines.append(' try:') 138 139 # If parallel execution is enabled, organize steps by groups 140 if parallel_execution and parallel_groups: 141 executed_steps = set() 142 for i, group in enumerate(parallel_groups): 143 if len(group) > 1: 144 # Parallel group 145 lines.append(f' # Parallel Group {i+1}') 146 lines.append(f' self.log("Executing parallel group: {group}")') 147 lines.append(f' self.execute_parallel_group({group})') 148 executed_steps.update(group) 149 else: 150 # Sequential step 151 step_num = group[0] 152 if step_num not in executed_steps: 153 lines.append(f' self.step_{step_num}()') 154 executed_steps.add(step_num) 155 156 # Execute any remaining steps not covered by groups 157 for step_info in execution_plan: 158 step_num = step_info['step'] 159 if step_num not in executed_steps: 160 lines.append(f' self.step_{step_num}()') 161 else: 162 # Sequential execution 163 for step_info in execution_plan: 164 lines.append(f' self.step_{step_info["step"]}()') 165 166 lines.append(' self.log("✓ All requests completed", "SUCCESS")') 167 lines.append(' return True') 168 lines.append(' except Exception as e:') 169 lines.append(' self.log(f"✗ Failed: {e}", "ERROR")') 170 lines.append(' return False') 171 lines.append('') 172 173 # Generate steps 174 for step_info in execution_plan: 175 endpoint = step_info['endpoint'] 176 step_num = step_info['step'] 177 method = endpoint['method'] 178 path = endpoint['path'] 179 180 lines.append(f' def step_{step_num}(self):') 181 lines.append(f' """Step {step_num}: {method} {path}"""') 182 lines.append(f' self.log("Step {step_num}: {method} {path}")') 183 184 # Initialize tracking variables 185 lines.append(' # Initialize tracking variables') 186 lines.append(' start_time = time.time()') 187 lines.append(' request_details = {') 188 lines.append(' "method": "%s",' % method) 189 lines.append(' "url": None,') 190 lines.append(' "headers": dict(self.session.headers),') 191 lines.append(' "payload": None') 192 lines.append(' }') 193 lines.append(' response_details = {') 194 lines.append(' "status_code": None,') 195 lines.append(' "headers": None,') 196 lines.append(' "body": None,') 197 lines.append(' "elapsed": None') 198 lines.append(' }') 199 lines.append(' error_details = None') 200 lines.append('') 201 202 lines.append(' try:') 203 # Build URL 204 url_expr = f'f"{{self.base_url}}{path}"' 205 # Replace path parameters 206 if '{' in path: 207 for param in re.findall(r'\{(\w+)\}', path): 208 url_expr = url_expr.replace(f'{{{param}}}', f'{{self.context.get("{param}", "UNKNOWN_{param}")}}') 209 lines.append(f' # Build URL with path parameters') 210 lines.append(f' url = {url_expr}') 211 lines.append(' request_details["url"] = url') 212 lines.append('') 213 214 # Handle request body 215 if endpoint.get('request_body'): 216 schema = endpoint['request_body'].get('schema', {}) 217 example = endpoint['request_body'].get('example') 218 content_type = endpoint['request_body'].get('content_type', 'application/json') 219 220 if example: 221 payload = example 222 else: 223 payload = generate_value_from_schema(schema) 224 225 lines.append(f' # Handle request body ({content_type})') 226 if content_type == 'multipart/form-data': 227 lines.append(' # Handle file uploads') 228 lines.append(' files = {}') 229 lines.append(f' payload = {json.dumps(payload) if payload else {}}') 230 lines.append(' request_details["payload"] = payload') 231 lines.append(' response = self._make_request("%s", url, data=payload, files=files)' % method.lower()) 232 else: 233 lines.append(f' payload = {json.dumps(payload) if payload else {}}') 234 lines.append(' request_details["payload"] = payload') 235 lines.append(' response = self._make_request("%s", url, json=payload)' % method.lower()) 236 else: 237 lines.append(' # No request body') 238 lines.append(' response = self._make_request("%s", url)' % method.lower()) 239 240 lines.append(' self.log(f"Status: {response.status_code}")') 241 lines.append(' if response.status_code not in [200, 201, 202, 204]:') 242 lines.append(' raise Exception(f"Unexpected status code: {response.status_code}")') 243 244 # Process response 245 lines.append(' if response.text:') 246 lines.append(' try:') 247 lines.append(' data = response.json()') 248 249 # Add response validation if schema exists 250 success_response = None 251 for status_code, resp_data in endpoint.get('responses', {}).items(): 252 if status_code.startswith('2'): 253 success_response = resp_data 254 break 255 256 if success_response and success_response.get('schema'): 257 schema = success_response['schema'] 258 lines.append(' # Validate response against schema') 259 lines.append(' schema = %s' % json.dumps(schema)) 260 lines.append(' try:') 261 lines.append(' validate(instance=data, schema=schema)') 262 lines.append(' self.log("Response validated successfully against schema")') 263 lines.append(' except ValidationError as e:') 264 lines.append(' self.log(f"Response validation failed: {e.message}", "ERROR")') 265 lines.append(' self.log(f"Validation path: {\' -> \'.join(str(x) for x in e.absolute_path)}", "ERROR")') 266 267 # Extract outputs 268 if step_info['outputs']: 269 for output_name, json_path in step_info['outputs'].items(): 270 field = json_path.split('.')[-1] 271 lines.append(f' self.context["{output_name}"] = data.get("{field}")') 272 273 lines.append(' except ValueError:') 274 lines.append(' self.log("Warning: Response is not valid JSON", "WARN")') 275 276 # Calculate execution time 277 lines.append('') 278 lines.append(' # Calculate execution time') 279 lines.append(' end_time = time.time()') 280 lines.append(' elapsed_time = end_time - start_time') 281 lines.append('') 282 283 # Capture response details 284 lines.append(' # Capture response details') 285 lines.append(' response_details.update({') 286 lines.append(' "status_code": response.status_code,') 287 lines.append(' "headers": dict(response.headers),') 288 lines.append(' "body": response.text[:1000] if response.text else "",') 289 lines.append(' "elapsed": elapsed_time') 290 lines.append(' })') 291 292 lines.append('') 293 lines.append(' except Exception as e:') 294 lines.append(' error_details = str(e)') 295 lines.append(' self.log(f"Error processing response: {e}", "ERROR")') 296 lines.append(' # Still capture timing info even on error') 297 lines.append(' end_time = time.time()') 298 lines.append(' elapsed_time = end_time - start_time if "start_time" in locals() else 0') 299 lines.append(' # Capture partial response details if available') 300 lines.append(' if "response" in locals():') 301 lines.append(' response_details.update({') 302 lines.append(' "status_code": getattr(response, "status_code", None),') 303 lines.append(' "headers": dict(getattr(response, "headers", {})),') 304 lines.append(' "body": getattr(response, "text", "")[:1000] if getattr(response, "text", "") else "",') 305 lines.append(' "elapsed": elapsed_time') 306 lines.append(' })') 307 lines.append(' raise') 308 lines.append('') 309 310 # Store detailed results 311 lines.append(' # Store detailed results') 312 lines.append(' result_entry = {') 313 lines.append(' "step": %d,' % step_num) 314 lines.append(' "status": response.status_code if "response" in locals() else None,') 315 lines.append(' "method": "%s",' % method) 316 lines.append(' "path": "%s",' % path) 317 lines.append(' "elapsed_time": elapsed_time,') 318 lines.append(' "request": request_details,') 319 lines.append(' "response": response_details,') 320 lines.append(' "error": error_details') 321 lines.append(' }') 322 lines.append(' self.results.append(result_entry)') 323 lines.append('') 324 325 # Summary methods 326 lines.append(' def print_summary(self):') 327 lines.append(' print("\\n" + "="*60)') 328 lines.append(' print("EXECUTION SUMMARY")') 329 lines.append(' print("="*60)') 330 lines.append(' for r in self.results:') 331 lines.append(' print(f"✓ Step {r[\'step\']}: {r[\'method\']} {r[\'path\']} - {r[\'status\']} ({r[\'elapsed_time\']:.3f}s)")') 332 lines.append(' print("="*60)') 333 lines.append('') 334 335 lines.append(' def print_detailed_report(self):') 336 lines.append(' """Print detailed execution report with metrics"""') 337 lines.append(' print("\\n" + "="*80)') 338 lines.append(' print("DETAILED EXECUTION REPORT")') 339 lines.append(' print("="*80)') 340 lines.append(' ') 341 lines.append(' total_time = 0') 342 lines.append(' successful_steps = 0') 343 lines.append(' failed_steps = 0') 344 lines.append(' ') 345 lines.append(' for r in self.results:') 346 lines.append(' print(f"\\n--- Step {r[\'step\']}: {r[\'method\']} {r[\'path\']} ---")') 347 lines.append(' print(f" Status: {r[\'status\']}")') 348 lines.append(' print(f" Elapsed Time: {r[\'elapsed_time\']:.3f}s")') 349 lines.append(' ') 350 lines.append(' if r[\'error\'] is not None:') 351 lines.append(' print(f" Error: {r[\'error\']}")') 352 lines.append(' failed_steps += 1') 353 lines.append(' else:') 354 lines.append(' successful_steps += 1') 355 lines.append(' ') 356 lines.append(' # Request details') 357 lines.append(' req = r[\'request\']') 358 lines.append(' if req[\'payload\'] is not None:') 359 lines.append(' print(f" Request Payload: {req[\'payload\']}")') 360 lines.append(' ') 361 lines.append(' # Response details') 362 lines.append(' resp = r[\'response\']') 363 lines.append(' if resp[\'headers\'] is not None:') 364 lines.append(' content_type = resp[\'headers\'].get(\'Content-Type\', \'Unknown\')') 365 lines.append(' print(f" Content-Type: {content_type}")') 366 lines.append(' ') 367 lines.append(' total_time += r[\'elapsed_time\']') 368 lines.append(' ') 369 lines.append(' print("\\n" + "-"*80)') 370 lines.append(' print("SUMMARY STATISTICS")') 371 lines.append(' print("-"*80)') 372 lines.append(' print(f" Total Steps: {len(self.results)}")') 373 lines.append(' print(f" Successful: {successful_steps}")') 374 lines.append(' print(f" Failed: {failed_steps}")') 375 lines.append(' print(f" Total Execution Time: {total_time:.3f}s")') 376 lines.append(' if len(self.results) > 0:') 377 lines.append(' avg_time = total_time / len(self.results)') 378 lines.append(' print(f" Average Time per Step: {avg_time:.3f}s")') 379 lines.append(' print("="*80)') 380 lines.append('') 381 382 # Main 383 lines.append('def main():') 384 lines.append(f' BASE_URL = "{base_url}"') 385 lines.append(f' AUTH_HEADERS = {json.dumps(auth_headers)}') 386 lines.append(' executor = APIFlowExecutor(BASE_URL, AUTH_HEADERS)') 387 lines.append(' success = executor.execute_flow()') 388 lines.append(' executor.print_summary()') 389 lines.append(' # Check if DETAILED_REPORT environment variable is set') 390 lines.append(' import os') 391 lines.append(' if os.environ.get("DETAILED_REPORT", "").lower() == "true":') 392 lines.append(' executor.print_detailed_report()') 393 lines.append(' sys.exit(0 if success else 1)') 394 lines.append('') 395 lines.append('if __name__ == "__main__":') 396 lines.append(' main()') 397 398 return '\n'.join(lines) 399 400# Example usage: 401# script = generate_python_script(dependency_analysis['execution_order'], base_url, auth_headers) 402# script = generate_python_script(dependency_analysis['execution_order'], base_url, auth_headers, parallel_execution=True, parallel_groups=parallel_groups)

Phase 5: Execute and Iterate

Execute this code to run the script and fix errors:

python
1import subprocess 2import tempfile 3import os 4import re 5 6def execute_script_with_retries(script_content: str, max_retries: int = 5, detailed_reporting: bool = False): 7 """Execute script and retry with fixes""" 8 9 for attempt in range(1, max_retries + 1): 10 print(f"\n=== Attempt {attempt}/{max_retries} ===") 11 12 with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: 13 f.write(script_content) 14 script_path = f.name 15 16 try: 17 # Set environment for detailed reporting if requested 18 env = os.environ.copy() 19 if detailed_reporting: 20 env["DETAILED_REPORT"] = "true" 21 22 result = subprocess.run( 23 ['python', script_path], 24 capture_output=True, 25 text=True, 26 timeout=300, 27 env=env 28 ) 29 30 print(result.stdout) 31 32 if result.returncode == 0: 33 print("\n✓ SUCCESS! All requests returned 2xx") 34 return { 35 'success': True, 36 'script': script_content, 37 'attempts': attempt 38 } 39 40 # Analyze errors and apply fixes 41 print(f"✗ Exit code: {result.returncode}") 42 43 # Simple fix patterns 44 if '400' in result.stdout and 'missing required field' in result.stdout: 45 # Add missing fields 46 field_match = re.search(r"field '(\w+)'", result.stdout) 47 if field_match: 48 field = field_match.group(1) 49 script_content = script_content.replace( 50 'payload = {', 51 f'payload = {{"{field}": "test_value", ' 52 ) 53 print(f"Applied fix: Added missing field '{field}'") 54 continue 55 56 if '422' in result.stdout: 57 # Adjust constraint violations 58 script_content = script_content.replace('"quantity": 0', '"quantity": 1') 59 script_content = script_content.replace('"age": 0', '"age": 18') 60 print("Applied fix: Adjusted values to meet constraints") 61 continue 62 63 break 64 65 except subprocess.TimeoutExpired: 66 print("✗ Script execution timed out") 67 break 68 except Exception as e: 69 print(f"✗ Execution error: {e}") 70 break 71 finally: 72 if os.path.exists(script_path): 73 os.unlink(script_path) 74 75 return { 76 'success': False, 77 'script': script_content, 78 'attempts': max_retries 79 } 80 81# Example usage: 82# result = execute_script_with_retries(generated_script) 83# result = execute_script_with_retries(generated_script, detailed_reporting=True)

Complete End-to-End Example

Here's how to execute the entire workflow:

python
1# 1. Setup 2auth_headers = setup_authentication("bearer", {"token": "YOUR_TOKEN"}) 3 4# 2. Parse specification (auto-detects OpenAPI/GraphQL) 5parsed_spec = parse_specification("https://api.example.com/openapi.json") 6print(f"Found {len(parsed_spec['endpoints'])} endpoints") 7 8# 3. Analyze dependencies 9dependency_analysis = analyze_dependencies(parsed_spec['endpoints']) 10parallel_groups = identify_parallel_groups(dependency_analysis['execution_order']) 11print(f"Execution order: {len(dependency_analysis['execution_order'])} steps") 12 13# 4. Generate script with enhanced features 14generated_script = generate_python_script( 15 dependency_analysis['execution_order'], 16 parsed_spec['base_url'], 17 auth_headers, 18 parallel_execution=True, # Enable parallel execution 19 parallel_groups=parallel_groups 20) 21print(f"Generated script: {len(generated_script)} characters") 22 23# 5. Execute with retries and detailed reporting 24final_result = execute_script_with_retries(generated_script, max_retries=5, detailed_reporting=True) 25 26# 6. Output results 27if final_result['success']: 28 print("\n" + "="*60) 29 print("✓ HAPPYFLOW SCRIPT GENERATED SUCCESSFULLY") 30 print("="*60) 31 print(f"Attempts required: {final_result['attempts']}") 32 print("\nFinal Script:") 33 print(final_result['script']) 34else: 35 print("\n✗ Failed to generate working script") 36 print("Manual intervention required")

Usage Instructions

When invoked, execute this skill by:

  1. Receive input from user (API spec URL + credentials)
  2. Execute Phase 1 code with user's auth credentials
  3. Execute Phase 2 code with spec URL
  4. Execute Phase 3 code with parsed endpoints
  5. Execute Phase 4 code to generate script with enhanced features
  6. Execute Phase 5 code to test and fix script
  7. Return final working script to user

Output Format

Return to user:

markdown
1## ✓ HappyFlow Script Generated Successfully 2 3**API**: [API name from spec] 4**Total Endpoints**: [count] 5**Execution Attempts**: [attempts] 6 7### Generated Script 8```python 9[COMPLETE WORKING SCRIPT]

Usage

  1. Save as test_api.py
  2. Run: python test_api.py
  3. All requests will return 2xx status codes

Enhanced Features Used

  • Parallel Execution: Enabled for faster testing
  • Detailed Reporting: Set DETAILED_REPORT=true for comprehensive metrics
  • Rate Limiting Handling: Automatic retry with exponential backoff
  • Response Validation: JSON Schema validation for responses

## Enhanced Features

### Multi-Format Support
- **OpenAPI 3.0+**: Full specification parsing with schema resolution
- **GraphQL**: Schema introspection and operation extraction

### Advanced Execution
- **Parallel Execution**: Concurrent execution of independent endpoints
- **Detailed Reporting**: Comprehensive execution metrics and timing
- **Connection Pooling**: HTTP connection reuse for improved performance
- **Caching**: Specification parsing cache for reduced processing time

### Enhanced Testing Capabilities
- **File Upload Support**: Multipart/form-data request handling
- **Response Schema Validation**: JSON Schema validation against specifications
- **Rate Limiting Handling**: Automatic retry with exponential backoff
- **Error Recovery**: Intelligent error handling and automatic fixes

### Improved Code Quality
- **Modular Architecture**: Well-organized components for maintainability
- **Type Hints**: Comprehensive type annotations throughout
- **Custom Exceptions**: Structured exception hierarchy
- **Proper Logging**: Structured logging instead of print statements

## Version History

- v2.0.0 (2026-01-08): Enhanced implementation with modular architecture
- v1.0.0 (2025-12-29): Self-contained implementation with embedded code

Related Skills

Looking for an alternative to happyflow-generator or building a Categories.community AI Agent? Explore these related open-source MCP Servers.

View All

widget-generator

Logo of f
f

widget-generator is an open-source AI agent skill for creating widget plugins that are injected into prompt feeds on prompts.chat. It supports two rendering modes: standard prompt widgets using default PromptCard styling and custom render widgets built as full React components.

149.6k
0
Design

chat-sdk

Logo of lobehub
lobehub

chat-sdk is a unified TypeScript SDK for building chat bots across multiple platforms, providing a single interface for deploying bot logic.

73.0k
0
Communication

zustand

Logo of lobehub
lobehub

The ultimate space for work and life — to find, build, and collaborate with agent teammates that grow with you. We are taking agent harness to the next level — enabling multi-agent collaboration, effortless agent team design, and introducing agents as the unit of work interaction.

72.8k
0
Communication

data-fetching

Logo of lobehub
lobehub

The ultimate space for work and life — to find, build, and collaborate with agent teammates that grow with you. We are taking agent harness to the next level — enabling multi-agent collaboration, effortless agent team design, and introducing agents as the unit of work interaction.

72.8k
0
Communication