Performance Optimization
Maximize the speed and efficiency of your video processing workflows with the VFX MCP Server. This guide covers performance tuning, optimization strategies, and best practices for handling large-scale video processing tasks.
π Understanding Performance Factors
Section titled βπ Understanding Performance FactorsβSystem Resources Impact
Section titled βSystem Resources ImpactβVideo processing performance depends on several key factors:
- CPU Performance: Single-core speed affects encoding/decoding
- Memory (RAM): Determines how much video data can be processed in memory
- Storage Speed: I/O speed affects file reading/writing operations
- Video Properties: Resolution, codec, bitrate, and duration all impact processing time
Checking System Capabilities
Section titled βChecking System Capabilitiesβasync def check_system_performance(): """Analyze current system performance and capabilities"""
# Get server capabilities capabilities = await session.read_resource("tools://capabilities")
system_info = capabilities['system_resources'] limits = capabilities['limits'] ffmpeg_info = capabilities['ffmpeg_info']
print("π₯οΈ System Performance Analysis") print("=" * 50)
# Current resource usage print(f"Current CPU Usage: {system_info['cpu_usage']:.1f}%") print(f"Current Memory Usage: {system_info['memory_usage']:.1f}%") print(f"Available Disk Space: {system_info['disk_space_free']}")
# Performance limits print(f"\nPerformance Limits:") print(f"Max File Size: {limits['max_file_size']}") print(f"Max Resolution: {limits['max_resolution']}") print(f"Concurrent Operations: {limits['concurrent_operations']}") print(f"Temp Storage Limit: {limits['temp_storage_limit']}")
# FFmpeg capabilities print(f"\nFFmpeg Performance:") print(f"Version: {ffmpeg_info['version']}") print(f"Hardware Encoding: {capabilities['feature_flags'].get('hardware_encoding', False)}") print(f"GPU Acceleration: {capabilities['feature_flags'].get('gpu_acceleration', False)}")
# Performance recommendations recommendations = []
if system_info['cpu_usage'] > 80: recommendations.append("High CPU usage - consider reducing concurrent operations")
if system_info['memory_usage'] > 85: recommendations.append("High memory usage - process smaller files or restart server")
if not capabilities['feature_flags'].get('hardware_encoding', False): recommendations.append("Hardware encoding not available - processing may be slower")
if recommendations: print(f"\nπ‘ Recommendations:") for rec in recommendations: print(f" β’ {rec}") else: print(f"\nβ
System performance looks good!")
return capabilities
# Usageperformance_info = await check_system_performance()
β‘ Speed Optimization Strategies
Section titled ββ‘ Speed Optimization Strategiesβ1. Choose Optimal Quality Settings
Section titled β1. Choose Optimal Quality SettingsβDifferent quality settings dramatically affect processing speed:
# Performance vs Quality trade-offsQUALITY_PERFORMANCE_MAP = { "ultra": { "relative_speed": 0.3, # 30% of base speed "use_case": "Archive, master copies", "file_size": "Largest" }, "high": { "relative_speed": 0.6, # 60% of base speed "use_case": "Final output, professional delivery", "file_size": "Large" }, "medium": { "relative_speed": 1.0, # Base speed (100%) "use_case": "Social media, web delivery", "file_size": "Medium" }, "low": { "relative_speed": 2.0, # 200% of base speed "use_case": "Previews, drafts, testing", "file_size": "Small" }}
async def optimize_quality_for_use_case(use_case: str): """Get optimal quality setting for specific use case"""
quality_map = { "preview": "low", "draft": "low", "social_media": "medium", "web": "medium", "professional": "high", "broadcast": "high", "archive": "ultra", "master": "ultra" }
recommended_quality = quality_map.get(use_case, "medium") performance_info = QUALITY_PERFORMANCE_MAP[recommended_quality]
print(f"π Use case: {use_case}") print(f"Recommended quality: {recommended_quality}") print(f"Expected speed: {performance_info['relative_speed']}x") print(f"File size: {performance_info['file_size']}")
return recommended_quality
# Usagequality = await optimize_quality_for_use_case("social_media")
2. Leverage Copy Mode Operations
Section titled β2. Leverage Copy Mode OperationsβWhen possible, use operations that donβt require re-encoding:
async def fast_copy_operations_demo(): """Demonstrate fast operations that use copy mode"""
print("β‘ Fast Copy Mode Operations:")
# Trimming (no re-encoding when formats match) start_time = time.time()
await session.call_tool("trim_video", { "input_path": "source.mp4", "output_path": "trimmed_fast.mp4", "start_time": 10, "duration": 60 # Uses copy mode - very fast! })
trim_time = time.time() - start_time print(f" β
Trim (copy mode): {trim_time:.1f}s")
# Concatenation (when formats match) start_time = time.time()
await session.call_tool("concatenate_videos", { "input_paths": ["clip1.mp4", "clip2.mp4"], "output_path": "combined_fast.mp4", "transition_duration": 0.0 # No transitions = copy mode })
concat_time = time.time() - start_time print(f" β
Concatenate (copy mode): {concat_time:.1f}s")
print("\nπ‘ Copy mode is used when:") print(" β’ Input and output formats match") print(" β’ No transcoding is required") print(" β’ No effects or filters are applied")
# Usageawait fast_copy_operations_demo()
3. Optimize File I/O
Section titled β3. Optimize File I/OβStorage speed significantly impacts performance:
async def optimize_file_io(): """Optimize file I/O for better performance"""
print("πΎ File I/O Optimization Tips:")
# Check current temp directory capabilities = await session.read_resource("tools://capabilities") temp_dir = capabilities['system_resources']['temp_directory'] print(f"Current temp directory: {temp_dir}")
# Recommendations for different storage types storage_recommendations = { "SSD": { "performance": "Excellent", "tips": [ "Use for input, output, and temp files", "Enable concurrent operations", "Process multiple files simultaneously" ] }, "NVMe": { "performance": "Outstanding", "tips": [ "Maximum concurrent operations", "Use for real-time processing", "Best for large file workflows" ] }, "HDD": { "performance": "Good", "tips": [ "Process files sequentially", "Use SSD for temp files if available", "Avoid too many concurrent operations" ] }, "Network": { "performance": "Variable", "tips": [ "Copy files locally before processing", "Use local temp directory", "Process in smaller batches" ] } }
for storage_type, info in storage_recommendations.items(): print(f"\n{storage_type} Storage:") print(f" Performance: {info['performance']}") for tip in info['tips']: print(f" β’ {tip}")
# Usageawait optimize_file_io()
π Workflow Optimization
Section titled βπ Workflow OptimizationβBatch Processing Best Practices
Section titled βBatch Processing Best Practicesβasync def optimized_batch_processing(video_files: list): """Implement optimized batch processing strategies"""
print("π Optimized Batch Processing")
# Strategy 1: Sort by file size (process smaller files first) print("\nπ Analyzing file sizes...")
file_info = [] for video_file in video_files: info = await session.call_tool("get_video_info", { "video_path": video_file })
# Convert file size to MB for sorting size_parts = info['file_size'].split() size_mb = float(size_parts[0]) if size_parts[1] == 'GB': size_mb *= 1024
file_info.append({ "file": video_file, "size_mb": size_mb, "duration": info['duration'] })
# Sort by size (smallest first for faster feedback) file_info.sort(key=lambda x: x['size_mb'])
print("Processing order (smallest to largest):") for i, info in enumerate(file_info, 1): print(f" {i}. {info['file']}: {info['size_mb']:.1f}MB, {info['duration']:.1f}s")
# Strategy 2: Use optimal settings for batch batch_settings = { "quality": "medium", # Good balance for batch processing "concurrent_limit": 2 # Conservative for stability }
print(f"\nβοΈ Batch settings: {batch_settings}")
# Strategy 3: Process with progress tracking results = [] total_time = 0
for i, file_info in enumerate(file_info, 1): file_start = time.time() video_file = file_info['file']
print(f"\n[{i}/{len(file_info)}] Processing: {video_file}")
try: # Example operation: resize to 720p await session.call_tool("resize_video", { "input_path": video_file, "output_path": f"optimized_{video_file}", "width": 1280, "height": 720, "quality": batch_settings["quality"] })
file_time = time.time() - file_start total_time += file_time
results.append({ "file": video_file, "status": "success", "time": file_time })
# Estimate remaining time remaining_files = len(file_info) - i if i > 1: # After first file avg_time = total_time / i estimated_remaining = avg_time * remaining_files print(f" β
Completed in {file_time:.1f}s (Est. remaining: {estimated_remaining:.1f}s)") else: print(f" β
Completed in {file_time:.1f}s")
except Exception as e: file_time = time.time() - file_start total_time += file_time
results.append({ "file": video_file, "status": "error", "error": str(e), "time": file_time })
print(f" β Failed in {file_time:.1f}s: {e}")
# Final statistics successful = len([r for r in results if r['status'] == 'success']) print(f"\nπ Batch Processing Statistics:") print(f"Total time: {total_time:.1f}s") print(f"Successful: {successful}/{len(file_info)}") print(f"Average time per file: {total_time/len(file_info):.1f}s")
return results
# Usagevideo_files = ["small.mp4", "large.mp4", "medium.mp4"]batch_results = await optimized_batch_processing(video_files)
Smart Preprocessing
Section titled βSmart Preprocessingβasync def smart_preprocessing_workflow(input_path: str, target_use: str): """Intelligently preprocess video based on intended use"""
print(f"π§ Smart preprocessing for: {target_use}")
# Analyze input video info = await session.call_tool("get_video_info", { "video_path": input_path })
print(f"Input: {info['width']}x{info['height']}, {info['duration']:.1f}s, {info['file_size']}")
# Define target specifications target_specs = { "social_media": { "max_resolution": "1080x1080", "max_duration": 60, "target_bitrate": "2M", "quality": "medium" }, "web": { "max_resolution": "1280x720", "max_duration": 300, "target_bitrate": "1.5M", "quality": "medium" }, "professional": { "max_resolution": "1920x1080", "max_duration": None, "target_bitrate": "5M", "quality": "high" }, "archive": { "max_resolution": None, "max_duration": None, "target_bitrate": None, "quality": "ultra" } }
specs = target_specs.get(target_use, target_specs["web"]) current_file = input_path
# Step 1: Duration optimization if specs["max_duration"] and info["duration"] > specs["max_duration"]: print(f"β±οΈ Trimming to {specs['max_duration']}s...")
trimmed_file = f"preprocessed_trimmed_{input_path}" await session.call_tool("trim_video", { "input_path": current_file, "output_path": trimmed_file, "start_time": 0, "duration": specs["max_duration"] }) current_file = trimmed_file
# Step 2: Resolution optimization if specs["max_resolution"]: max_w, max_h = map(int, specs["max_resolution"].split('x'))
if info["width"] > max_w or info["height"] > max_h: print(f"π Resizing to {specs['max_resolution']}...")
resized_file = f"preprocessed_resized_{input_path}" await session.call_tool("resize_video", { "input_path": current_file, "output_path": resized_file, "width": max_w, "height": max_h, "quality": specs["quality"] }) current_file = resized_file
# Step 3: Quality optimization (if not already done) if current_file == input_path and specs["quality"] != "ultra": print(f"βοΈ Optimizing quality ({specs['quality']})...")
optimized_file = f"preprocessed_optimized_{input_path}" await session.call_tool("resize_video", { "input_path": current_file, "output_path": optimized_file, "scale_factor": 1.0, # Keep same size "quality": specs["quality"] }) current_file = optimized_file
# Check final result if current_file != input_path: final_info = await session.call_tool("get_video_info", { "video_path": current_file })
print(f"β
Preprocessing complete:") print(f" Output: {current_file}") print(f" Final: {final_info['width']}x{final_info['height']}, {final_info['duration']:.1f}s, {final_info['file_size']}")
# Calculate size reduction original_size = float(info['file_size'].split()[0]) final_size = float(final_info['file_size'].split()[0]) reduction = (1 - final_size / original_size) * 100
if reduction > 0: print(f" Size reduction: {reduction:.1f}%") else: print("β
No preprocessing needed - video already optimal")
return current_file
# Usagepreprocessed = await smart_preprocessing_workflow("large_video.mp4", "social_media")
π§ Hardware Optimization
Section titled βπ§ Hardware OptimizationβMulti-Core Processing
Section titled βMulti-Core Processingβasync def optimize_for_multicore(): """Optimize processing for multi-core systems"""
# Check available cores/threads capabilities = await session.read_resource("tools://capabilities") concurrent_ops = capabilities['limits']['concurrent_operations']
print(f"π» Multi-Core Optimization") print(f"Max concurrent operations: {concurrent_ops}")
# Strategies for different core counts if concurrent_ops >= 8: strategy = "aggressive_parallel" recommendation = "Use maximum parallelization" elif concurrent_ops >= 4: strategy = "moderate_parallel" recommendation = "Use moderate parallelization" else: strategy = "sequential" recommendation = "Process sequentially to avoid overload"
strategies = { "aggressive_parallel": { "batch_size": 8, "worker_count": concurrent_ops, "memory_buffer": "large" }, "moderate_parallel": { "batch_size": 4, "worker_count": concurrent_ops // 2, "memory_buffer": "medium" }, "sequential": { "batch_size": 1, "worker_count": 1, "memory_buffer": "small" } }
config = strategies[strategy]
print(f"\nβοΈ Recommended strategy: {strategy}") print(f"Recommendation: {recommendation}") print(f"Configuration: {config}")
return config
# Usageoptimization_config = await optimize_for_multicore()
Memory Management
Section titled βMemory Managementβasync def optimize_memory_usage(): """Optimize memory usage for large video processing"""
capabilities = await session.read_resource("tools://capabilities") memory_usage = capabilities['system_resources']['memory_usage']
print(f"π§ Memory Optimization") print(f"Current memory usage: {memory_usage:.1f}%")
# Memory optimization strategies if memory_usage > 80: print("π΄ High memory usage - implementing conservative strategies")
strategies = [ "Process one video at a time", "Use 'low' or 'medium' quality settings", "Clear temp files frequently", "Consider restarting server if needed" ]
elif memory_usage > 60: print("π‘ Moderate memory usage - balanced approach")
strategies = [ "Limit concurrent operations to 2-3", "Use 'medium' quality for batch processing", "Monitor memory during long operations" ]
else: print("π’ Good memory availability - can use full capabilities")
strategies = [ "Full concurrent processing available", "Can use 'high' or 'ultra' quality", "Batch processing recommended" ]
print("\nπ‘ Memory optimization strategies:") for strategy in strategies: print(f" β’ {strategy}")
# Memory-efficient processing example print(f"\nπ Memory-efficient batch processing:")
if memory_usage > 80: batch_config = { "max_concurrent": 1, "quality": "low", "chunk_size": 1 } elif memory_usage > 60: batch_config = { "max_concurrent": 2, "quality": "medium", "chunk_size": 3 } else: batch_config = { "max_concurrent": 4, "quality": "high", "chunk_size": 5 }
print(f" Recommended config: {batch_config}")
return batch_config
# Usagememory_config = await optimize_memory_usage()
π Performance Monitoring
Section titled βπ Performance MonitoringβReal-Time Performance Tracking
Section titled βReal-Time Performance Trackingβimport timeimport asyncio
class PerformanceMonitor: def __init__(self): self.start_time = None self.operations = [] self.current_operation = None
async def start_monitoring(self): """Start performance monitoring""" self.start_time = time.time() print("π Performance monitoring started")
async def log_operation_start(self, operation: str, details: dict = None): """Log the start of an operation""" self.current_operation = { "operation": operation, "start_time": time.time(), "details": details or {} } print(f"β±οΈ Started: {operation}")
async def log_operation_end(self, success: bool = True, error: str = None): """Log the end of an operation""" if not self.current_operation: return
end_time = time.time() operation_time = end_time - self.current_operation["start_time"]
operation_log = { **self.current_operation, "end_time": end_time, "duration": operation_time, "success": success, "error": error }
self.operations.append(operation_log)
status = "β
" if success else "β" print(f"{status} Completed: {self.current_operation['operation']} ({operation_time:.1f}s)")
self.current_operation = None
async def get_performance_report(self): """Generate comprehensive performance report""" if not self.operations: print("No operations recorded") return
total_time = time.time() - self.start_time successful_ops = [op for op in self.operations if op['success']] failed_ops = [op for op in self.operations if not op['success']]
print(f"\nπ Performance Report") print(f"{'='*50}") print(f"Total monitoring time: {total_time:.1f}s") print(f"Total operations: {len(self.operations)}") print(f"Successful: {len(successful_ops)}") print(f"Failed: {len(failed_ops)}")
if successful_ops: durations = [op['duration'] for op in successful_ops] avg_duration = sum(durations) / len(durations) min_duration = min(durations) max_duration = max(durations)
print(f"\nOperation Performance:") print(f" Average duration: {avg_duration:.1f}s") print(f" Fastest operation: {min_duration:.1f}s") print(f" Slowest operation: {max_duration:.1f}s") print(f" Throughput: {len(successful_ops) / total_time * 60:.1f} ops/minute")
# Operation breakdown operation_types = {} for op in successful_ops: op_type = op['operation'] if op_type not in operation_types: operation_types[op_type] = [] operation_types[op_type].append(op['duration'])
if operation_types: print(f"\nOperation Type Breakdown:") for op_type, durations in operation_types.items(): avg_time = sum(durations) / len(durations) print(f" {op_type}: {len(durations)} ops, avg {avg_time:.1f}s")
# Failed operations if failed_ops: print(f"\nFailed Operations:") for op in failed_ops: print(f" β {op['operation']}: {op.get('error', 'Unknown error')}")
return { "total_time": total_time, "total_operations": len(self.operations), "successful": len(successful_ops), "failed": len(failed_ops), "operation_types": operation_types }
# Usage exampleasync def monitored_batch_processing(): """Example of batch processing with performance monitoring"""
monitor = PerformanceMonitor() await monitor.start_monitoring()
video_files = ["video1.mp4", "video2.mp4", "video3.mp4"]
for video_file in video_files: await monitor.log_operation_start("resize_video", {"file": video_file})
try: await session.call_tool("resize_video", { "input_path": video_file, "output_path": f"resized_{video_file}", "width": 1280, "height": 720, "quality": "medium" })
await monitor.log_operation_end(success=True)
except Exception as e: await monitor.log_operation_end(success=False, error=str(e))
# Generate final report report = await monitor.get_performance_report() return report
# Run monitored processingperformance_report = await monitored_batch_processing()
π‘ Performance Tips Summary
Section titled βπ‘ Performance Tips SummaryβQuick Optimization Checklist
Section titled βQuick Optimization Checklistβdef performance_optimization_checklist(): """Quick checklist for optimizing video processing performance"""
checklist = { "File Management": [ "β
Use SSD storage for temp files", "β
Clean up temp files regularly", "β
Use absolute file paths", "β
Keep input/output on same drive when possible" ], "Quality Settings": [ "β
Use 'low' for previews and testing", "β
Use 'medium' for social media and web", "β
Use 'high' for professional output", "β
Use 'ultra' only for archival/master copies" ], "Processing Strategy": [ "β
Process smaller files first in batches", "β
Use copy mode when possible (no re-encoding)", "β
Limit concurrent operations based on system resources", "β
Monitor memory and CPU usage during processing" ], "Workflow Optimization": [ "β
Combine multiple operations when possible", "β
Use preprocessing for consistent input formats", "β
Implement error recovery in batch processing", "β
Cache analysis results to avoid recomputation" ] }
print("β‘ Performance Optimization Checklist") print("=" * 50)
for category, items in checklist.items(): print(f"\n{category}:") for item in items: print(f" {item}")
return checklist
# Display checklistchecklist = performance_optimization_checklist()
π― Next Steps
Section titled βπ― Next StepsβNow you have the knowledge to optimize your video processing workflows! Continue with:
- Batch Processing Guide - Apply these optimizations to batch workflows
- Common Workflows - Optimize complete production pipelines
- Troubleshooting Guide - Debug performance issues
- FAQ - Common performance questions
Questions about performance? Check our troubleshooting guide or explore the system capabilities documentation.