Files
gain/scripts/step6_assemble_video.py
2025-07-08 15:27:03 +08:00

167 lines
8.0 KiB
Python

import subprocess
from pathlib import Path
import os
from utils.helpers import get_media_info, get_media_duration
# --- 標準化設定 ---
TARGET_WIDTH = 1920
TARGET_HEIGHT = 1080
TARGET_FPS = 30
def escape_ffmpeg_path_for_filter(path: Path) -> str:
"""為在 FFmpeg 濾鏡圖中安全使用而轉義路徑。"""
path_str = str(path.resolve())
if os.name == 'nt':
return path_str.replace('\\', '\\\\').replace(':', '\\:')
else:
return path_str.replace("'", "'\\\\\\''")
def run_step6_assemble_video(
project_path: Path,
logo_video: Path,
open_video: Path,
end_video: Path,
p_loop_count: int,
transition_duration: float = 1.0
):
"""
使用 FFmpeg 的 xfade 濾鏡組裝最終影片,並在過程中統一影片屬性與進行色彩校正。
"""
try:
# --- 1. 路徑與檔案檢查 ---
output_dir = project_path / "output"
test_dir = output_dir / "test"
audio_path = output_dir / "combined_audio.wav"
ass_path = output_dir / "subtitles.ass"
bg_final_path = output_dir / "bg_final.mp4"
final_video_path = output_dir / "final_video_full_sequence.mp4"
for file_path in [logo_video, open_video, end_video, audio_path, ass_path]:
if not file_path or not file_path.exists():
return False, f"❌ 找不到必要的輸入檔案: {file_path}", None
base_videos = sorted([p for p in test_dir.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']])
if not base_videos:
return False, "❌ `test` 資料夾中沒有影片可供合成。", None
# --- 2. 一步式生成主要內容影片 (bg_final.mp4) ---
if not bg_final_path.exists():
print(f"⚙️ 準備一步式生成主要內容影片 (循環 {p_loop_count} 次)...")
looped_video_list = base_videos * p_loop_count
inputs_cmd = []
for video_path in looped_video_list:
inputs_cmd.extend(["-i", str(video_path)])
inputs_cmd.extend(["-i", str(audio_path)])
ass_path_str = escape_ffmpeg_path_for_filter(ass_path)
filter_parts = []
stream_count = len(looped_video_list)
# 【核心修改】定義色彩校正濾鏡
color_correction_filter = "eq=brightness=-0.05:contrast=0.95:saturation=0.7"
# 將標準化 (尺寸、影格率) 與色彩校正合併
for i in range(stream_count):
filter_parts.append(
f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS},{color_correction_filter}[v_scaled_{i}]"
)
last_v_out = "[v_scaled_0]"
for i in range(stream_count - 1):
offset = sum(get_media_duration(v) for v in looped_video_list[:i+1]) - (i + 1) * transition_duration
v_in_next = f"[v_scaled_{i+1}]"
v_out_current = f"[v_out{i+1}]" if i < stream_count - 2 else "[bg_video_stream]"
filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}")
last_v_out = v_out_current
drawbox_filter = "drawbox=w=iw*0.8:h=ih*0.8:x=(iw-iw*0.8)/2:y=(ih-ih*0.8)/2:color=0x808080@0.7:t=fill"
filter_parts.append(f"[bg_video_stream]{drawbox_filter}[v_with_mask]")
filter_parts.append(f"[v_with_mask]ass=filename='{ass_path_str}'[final_v]")
filter_complex_str = ";".join(filter_parts)
ffmpeg_main_content_cmd = [
"ffmpeg", "-y", *inputs_cmd,
"-filter_complex", filter_complex_str,
"-map", "[final_v]",
"-map", f"{stream_count}:a:0",
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast",
"-c:a", "aac", "-b:a", "192k",
"-shortest",
str(bg_final_path)
]
print("🚀 正在執行 FFmpeg 主要內容合成指令 (含色彩校正)...")
result = subprocess.run(ffmpeg_main_content_cmd, check=True, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr)
print(f"✅ 主要內容影片已生成: {bg_final_path}")
# --- 3. 合成最終的影片序列 (logo -> open -> bg_final -> end) ---
if final_video_path.exists():
print(f"✅ 最終影片已存在,跳過組裝: {final_video_path}")
return True, f"✅ 影片已存在於 {final_video_path}", final_video_path
print("🎬 開始動態生成 xfade 合成指令...")
videos_to_concat = [logo_video, open_video, bg_final_path, end_video]
final_inputs_cmd, final_filter_parts, video_infos = [], [], []
for video_path in videos_to_concat:
final_inputs_cmd.extend(["-i", str(video_path)])
info = get_media_info(video_path)
video_infos.append(info)
for i, info in enumerate(video_infos):
if not info["has_audio"]:
duration = info['duration']
final_filter_parts.append(f"anullsrc=r=44100:cl=stereo:d={duration:.4f}[silent_a_{i}]")
for i in range(len(videos_to_concat)):
# 注意:這裡我們不對 logo/open/end 影片進行調色,以保留它們的原始風格
final_filter_parts.append(
f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS}[v_final_scaled_{i}]"
)
last_v_out = "[v_final_scaled_0]"
last_a_out = "[0:a]" if video_infos[0]["has_audio"] else "[silent_a_0]"
for i in range(len(videos_to_concat) - 1):
offset = sum(info['duration'] for info in video_infos[:i+1]) - (i + 1) * transition_duration
v_in_next = f"[v_final_scaled_{i+1}]"
a_in_next = f"[{i+1}:a]" if video_infos[i+1]["has_audio"] else f"[silent_a_{i+1}]"
v_out_current = f"[v_out{i+1}]" if i < len(videos_to_concat) - 2 else "[video]"
a_out_current = f"[a_out{i+1}]" if i < len(videos_to_concat) - 2 else "[audio]"
final_filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}")
final_filter_parts.append(f"{last_a_out}{a_in_next}acrossfade=d={transition_duration}{a_out_current}")
last_v_out, last_a_out = v_out_current, a_out_current
final_filter_complex_str = ";".join(final_filter_parts)
ffmpeg_concat_cmd = [
"ffmpeg", "-y", *final_inputs_cmd,
"-filter_complex", final_filter_complex_str,
"-map", "[video]", "-map", "[audio]",
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast",
"-movflags", "+faststart",
"-c:a", "aac", "-b:a", "192k",
str(final_video_path)
]
print("🚀 執行 FFmpeg 最終序列合成指令...")
result = subprocess.run(ffmpeg_concat_cmd, check=True, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr)
print(f"✅ 影片已成功組裝並儲存至 {final_video_path}")
return True, f"✅ 影片已成功組裝並儲存至 {final_video_path}", final_video_path
except subprocess.CalledProcessError as e:
error_output = e.stderr if e.stderr else str(e)
return False, f"❌ FFmpeg 執行失敗:\n{error_output}", None
except Exception as e:
return False, f"❌ 發生未預期錯誤: {e}", None