Files
gain/project_model.py
2025-07-15 14:11:39 +08:00

917 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from utils.paths import PROJECTS_DIR,get_project_paths
from utils.helpers import get_media_duration,get_media_info
from pathlib import Path
from typing import List, Dict, Optional
import streamlit as st # 為了存取 secrets
from scripts import step1_notion_sync, step2_translate_ipa
import json
from notion_client import Client
from google.cloud import translate_v2 as translate
import eng_to_ipa
from google.cloud import texttospeech
from pydub import AudioSegment
from google.api_core.exceptions import GoogleAPICallError
import re # 用於正規表示式匹配檔案名稱
import pysubs2 # 專業字幕處理函式庫
import librosa # 專業音訊分析函式庫
import subprocess
import logging
import math
import os
# 設定日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Project:
"""
代表一個獨立的影片專案,封裝其檔案結構和資料。
"""
def __init__(self, project_name: str, page_id: str | None = None):
"""
初始化一個 Project 實例。注意:這不會在磁碟上建立檔案。
若要建立新專案,請使用 Project.create_new()。
Args:
project_name (str): 專案的名稱,對應其目錄名稱。
"""
if not project_name or not isinstance(project_name, str):
raise ValueError("專案名稱必須是一個非空的字串。")
self.name = project_name
self.root_path: Path = PROJECTS_DIR / self.name
self.paths: dict[str, Path] = get_project_paths(self.name)
self.data = self._load_data()
if page_id:
self.page_id = page_id
elif self.data and 'id' in self.data:
self.page_id = self.data['id']
else:
self.page_id = None
def _load_data(self) -> dict | None:
"""嘗試從 data.json 載入資料,若失敗則回傳 None。"""
data_path = self.paths.get('data')
if data_path and data_path.exists():
try:
return json.loads(data_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, FileNotFoundError):
return None
return None
def create_directories(self):
"""
確保專案所需的所有目錄都存在。
這個方法是「冪等」的,即使目錄已存在,重複呼叫也不會出錯。
"""
print(f"正在為專案 '{self.name}' 檢查並建立目錄結構...")
# 根據 get_project_paths 的定義,我們知道 'audio' 和 'output' 是需要建立的目錄
# 將需要建立的目錄鍵名放入一個列表中,方便管理
required_dirs = ['root','audio', 'output']
for dir_key in required_dirs:
directory_path = self.paths.get(dir_key)
if directory_path:
# 使用 exist_ok=True 避免在目錄已存在時拋出錯誤
# 使用 parents=True 確保即使未來路徑變為巢狀結構也能成功建立
directory_path.mkdir(parents=True, exist_ok=True)
print(f" - 目錄 '{directory_path}' 已確認存在。")
else:
print(f" - 警告:在路徑設定中找不到鍵名 '{dir_key}'")
@classmethod
def create_new(cls, project_name: str, notion_page_id: str) -> 'Project':
"""
在檔案系統上建立一個新的專案目錄結構。
Args:
project_name (str): 新專案的名稱。
Returns:
Project: 新建立專案的實例。
Raises:
FileExistsError: 如果同名專案已存在。
"""
project = cls(project_name, page_id=notion_page_id)
project.create_directories()
return project
def __repr__(self) -> str:
return f"<Project(name='{self.name}')>"
def __str__(self) -> str:
return self.name
def sync_from_notion(self) -> bool:
"""
從 Notion 同步更新專案的 data.json 檔案。
這個方法是自給自足的,它知道自己的 page_id 和如何獲取 API Key。
"""
if not self.page_id:
st.session_state.operation_status = {
"type": "error",
"message": "內部錯誤:此專案沒有關聯的 Notion 頁面 ID。"
}
return False
target_page_id = self.page_id
api_key = st.secrets.get("NOTION_API_KEY")
if not api_key:
st.session_state.operation_status = {
"type": "error",
"message": "錯誤:找不到 Notion API Key。"
}
return False
try:
client = Client(auth=api_key)
print(f"正在從 Notion 更新頁面 ID: {target_page_id}")
page_data = client.pages.retrieve(page_id=target_page_id)
# 這裡可以重用 step1 中的 extract_property_value 邏輯
# 最好是將該函式也移入 Project 類別作為一個私有方法 _extract_property_value
properties = page_data['properties']
updated_entry = {"id": page_data['id']}
for prop_name, prop_data in properties.items():
updated_entry[prop_name] = self._extract_property_value(prop_data)
# 更新 data.json 檔案
with open(self.paths['data'], 'w', encoding='utf-8') as f:
json.dump(updated_entry, f, ensure_ascii=False, indent=2)
# 同時更新記憶體中的資料,保持同步
self.data = updated_entry
self.data = self._load_data()
st.session_state.operation_status = {
"type": "success",
"message": f"專案 '{self.name}' 已成功從 Notion 同步更新。"
}
return True
except Exception as e:
st.session_state.operation_status = {
"type": "error",
"message": f"更新專案 '{self.name}' 時發生錯誤: {e}"
}
return False
def _extract_property_value(self,property_data):
"""從 Notion 頁面屬性中提取純文字值。"""
prop_type = property_data.get('type')
if prop_type == 'title':
return property_data['title'][0]['plain_text'] if property_data.get('title') else ""
elif prop_type == 'rich_text':
return "\n".join([text['plain_text'] for text in property_data.get('rich_text', [])])
elif prop_type == 'select' and property_data.get('select'):
return property_data['select']['name']
elif prop_type == 'date' and property_data.get('date'):
return property_data['date']['start']
return None
@classmethod
def create_from_notion(cls, page_id: str, page_title: str):
"""
工廠方法:從一個 Notion 頁面完整建立一個新專案。
"""
api_key = st.secrets.get("NOTION_API_KEY")
if not api_key:
raise ValueError("無法建立專案,缺少 Notion API Key。")
# 1. 先使用既有的 create_new 建立專案實體和目錄結構
project = cls.create_new(project_name=page_title)
# 2. 執行從 Notion 抓取資料並寫入 data.json 的邏輯
try:
client = Client(auth=api_key)
page_data = client.pages.retrieve(page_id=page_id)
properties = page_data['properties']
# 使用 _extract_property_value 來解析資料
page_entry = {"id": page_data['id']}
for prop_name, prop_data in properties.items():
page_entry[prop_name] = project._extract_property_value(prop_data)
# 將資料寫入檔案並更新物件狀態
with open(project.paths['data'], 'w', encoding='utf-8') as f:
json.dump(page_entry, f, ensure_ascii=False, indent=2)
project.data = page_entry
return project # 回傳完整初始化的專案物件
except Exception as e:
# 如果出錯,可能需要考慮刪除已建立的空資料夾,以保持系統乾淨
# (這部分屬於錯誤處理的細化)
raise IOError(f"從 Notion 頁面 (ID: {page_id}) 抓取資料時失敗: {e}")
_translate_client = None
@property
def translate_client(self):
"""
延遲初始化 (Lazy Initialization) Google Translate 客戶端。
只有在第一次真正需要翻譯時,才會建立客戶端實例,
並將其儲存起來以供後續重複使用,避免重複認證。
"""
if self._translate_client is None:
creds_path = st.secrets.get("GOOGLE_CREDS_TRANSLATE_PATH")
if not creds_path:
raise ValueError("未在 secrets 中設定 Google Cloud 翻譯認證檔案路徑。")
print("正在初始化 Google Translate 客戶端...")
self._translate_client = translate.Client.from_service_account_json(creds_path)
return self._translate_client
def add_translation_and_ipa(self) -> bool:
"""
為專案資料添加翻譯和 IPA 音標。
這是協調器,負責整個流程的控制。
"""
# 1. 檢查前置條件:確保核心資料已存在
if not self.data or "en" not in self.data:
st.session_state.operation_status = {
"type": "error",
"message": "錯誤:專案資料不完整或缺少英文原文 ('en')。"
}
return False
# 2. 準備處理資料
# 從 `data.json` 讀取英文句子列表[1]
english_sentences = self.data["en"].strip().split('\n')
translated_sentences = []
ipa_sentences = []
# 3. 遍歷每一句,呼叫輔助方法進行處理
for sentence in english_sentences:
if not sentence:
continue
# 呼叫封裝好的翻譯方法
translated = self._call_google_translate(sentence)
translated_sentences.append(translated if translated is not None else "[翻譯失敗]")
# 呼叫封裝好的 IPA 轉換方法
ipa = self._get_ipa_for_text(sentence)
ipa_sentences.append(ipa if ipa is not None else "[轉換失敗]")
# 4. 將處理結果更新回 self.data
self.data['zh'] = "\n".join(translated_sentences)
self.data['ipa'] = "\n".join(ipa_sentences)
try:
# 首先,將更新後的資料寫回本地 data.json
with open(self.paths['data'], 'w', encoding='utf-8') as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
# 然後,呼叫新方法將結果同步回 Notion
if self._update_single_notion_page():
st.session_state.operation_status = {
"type": "success",
"message": "AI 加註完成,並已成功寫回 Notion🚀"
}
return True
else:
st.session_state.operation_status = {
"type": "warning",
"message": "AI 加註已完成,但寫回 Notion 時失敗。請稍後手動同步。"
}
return False
except IOError as e:
st.session_state.operation_status = {
"type": "error",
"message": f"寫入本地 data.json 時發生錯誤: {e}"
}
return False
def _update_single_notion_page(self) -> bool:
"""
(私有方法) 將 self.data 中的 'zh''ipa' 欄位更新回對應的 Notion 頁面。
"""
if not self.page_id:
st.session_state.operation_status = {
"type": "error",
"message": "無法更新 Notion專案缺少 page_id。"
}
return False
api_key = st.secrets.get("NOTION_API_KEY")
if not api_key:
st.session_state.operation_status = {
"type": "error",
"message": "無法更新 Notion缺少 API Key。"
}
return False
try:
client = Client(auth=api_key)
# 準備要更新的屬性
# 注意:這裡的 "中文翻譯" 和 "IPA音標" 必須與您 Notion Database 中的欄位名稱完全一致!
properties_to_update = {
# 假設您在 Notion 中的欄位名稱就是 "zh"
# 如果不是,請修改 "zh" 為您實際的欄位名稱,例如 "中文翻譯"
"zh": {
"rich_text": [
{
"type": "text",
"text": {
"content": self.data.get('zh', '')
}
}
]
},
# 假設您在 Notion 中的欄位名稱就是 "ipa"
# 如果不是,請修改 "ipa" 為您實際的欄位名稱,例如 "IPA"
"ipa": {
"rich_text": [
{
"type": "text",
"text": {
"content": self.data.get('ipa', '')
}
}
]
}
}
print(f"正在將 AI 加註結果寫回 Notion 頁面: {self.page_id}")
client.pages.update(page_id=self.page_id, properties=properties_to_update)
return True
except Exception as e:
st.session_state.operation_status = {
"type": "error",
"message": f"更新 Notion 頁面時發生 API 錯誤: {e}"
}
return False
def _call_google_translate(self, text: str) -> str | None:
"""
(輔助) 呼叫 Google Translate API。
成功時返回翻譯字串,失敗時在後台記錄錯誤並返回 None。
"""
try:
result = self.translate_client.translate(text, target_language="zh-TW")
return result.get('translatedText')
except GoogleAPICallError as e:
# 不直接顯示 UI 警告,改為在後台記錄,讓呼叫者決定如何處理
print(f"警告Google API 呼叫失敗 - {e.message}")
return None
except Exception as e:
# 同上,記錄詳細錯誤
print(f"錯誤:翻譯 '{text[:20]}...' 時發生未預期的錯誤: {e}")
return None
def _get_ipa_for_text(self, text: str) -> str | None:
"""
為給定的英文文字獲取 IPA 音標。
失敗時在後台記錄錯誤並返回 None。
"""
if not text:
return ""
try:
# ... (您原有的 IPA 轉換邏輯保持不變) ...
ipa_word_list = eng_to_ipa.convert(text, keep_punct=True)
if not isinstance(ipa_word_list, list):
return str(ipa_word_list)
final_ipa_string = ' '.join(ipa_word_list)
final_ipa_string = final_ipa_string.replace('*', '')
final_ipa_string = final_ipa_string.replace(' ,', ',').replace(' .', '.').replace(' ?', '?').replace(' !', '!')
return final_ipa_string
except Exception as e:
print(f"警告:無法為 '{text[:20]}...' 獲取 IPA 音標: {e}")
return None
_tts_client = None # 為 TTS Client 新增一個儲存屬性
@property
def tts_client(self):
"""延遲初始化 Google Text-to-Speech 客戶端,高效且僅在需要時執行一次。"""
if self._tts_client is None:
creds_path = st.secrets.get("GOOGLE_CREDS_TTS_PATH")
if not creds_path:
raise ValueError("未在 secrets 中設定 Google Cloud TTS 認證檔案路徑。")
self._tts_client = texttospeech.TextToSpeechClient.from_service_account_json(creds_path)
return self._tts_client
def generate_sentence_audio(self) -> bool:
"""
為專案中每一對中英文句子,生成一個包含多種聲音的「教學音訊片段」。
"""
# 1. 檢查前置條件
if not self.data or "en" not in self.data or "zh" not in self.data:
st.session_state.operation_status = {
"type": "error",
"message": "錯誤:專案資料不完整,必須同時包含 'en''zh' 欄位。"
}
return False
# 2. 讀取並配對中英文句子
english_sentences = [line for line in self.data["en"].strip().split('\n') if line]
chinese_sentences = [line for line in self.data["zh"].strip().split('\n') if line]
if len(english_sentences) != len(chinese_sentences):
st.session_state.operation_status = {
"type": "error",
"message": "en zh數量不符合。"
}
return False
# 3. 從 data.json 讀取 voice 設定,如果沒有則使用預設值
voice_config = {
'english_voice_1': self.data.get("english_voice_1", "en-US-Wavenet-I"),
'english_voice_2': self.data.get("english_voice_2", "en-US-Wavenet-F"),
'chinese_voice': self.data.get("chinese_voice", "cmn-TW-Wavenet-B")
}
print("使用的語音設定:", voice_config) # 提供除錯資訊
audio_dir = self.paths['audio']
audio_dir.mkdir(parents=True, exist_ok=True)
success_count = 0
total_count = min(len(english_sentences), len(chinese_sentences))
if total_count == 0:
print("沒有找到任何可處理的句子對。")
return False
progress_bar = st.progress(0, text="正在生成教學音訊片段...")
for i in range(total_count):
item_en = english_sentences[i]
item_zh = chinese_sentences[i]
output_path = audio_dir / f"{i:03d}.wav"
# 4. 呼叫新的 SSML 生成器
ssml_content = self._generate_ssml(item_en, item_zh, voice_config)
# 5. 呼叫 TTS API (無需修改)
if self._call_google_tts(ssml_content, output_path,default_voice_name=voice_config['english_voice_1']):
success_count += 1
progress_bar.progress((i + 1) / total_count, text=f"正在生成教學音訊... ({i+1}/{total_count})")
progress_bar.empty()
if success_count > 0:
st.session_state.operation_status = {
"type": "success",
"message": "音訊生成成功!。"
}
return True
else:
st.session_state.operation_status = {
"type": "error",
"message": "音訊生成失敗!。"
}
return False
def _generate_ssml(self, item_en: str, item_zh: str, voice_config: dict) -> str:
"""
(新輔助方法) 根據模板,生成包含多種聲音的教學 SSML。
"""
# 對文本進行 XML 轉義,防止特殊字元破壞 SSML 結構
safe_en = item_en.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
safe_zh = item_zh.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
return f"""
<speak>
<break time="2s"/>
<voice name="{voice_config['english_voice_1']}">
<prosody rate="medium" pitch="medium">{safe_en}</prosody>
</voice>
<break time="2s"/>
<voice name="{voice_config['english_voice_2']}">
<prosody rate="70%" pitch="medium">{safe_en}</prosody>
</voice>
<break time="2s"/>
<voice name="{voice_config['chinese_voice']}">
<prosody rate="medium" pitch="+2st">{safe_zh}</prosody>
</voice>
<break time="1.5s"/>
<voice name="{voice_config['english_voice_2']}">
<prosody rate="110%" pitch="medium">{safe_en}</prosody>
</voice>
<break time="1s"/>
</speak>
"""
def _call_google_tts(self, ssml_content: str, output_path: Path, default_voice_name: str) -> bool:
"""
(輔助方法已更新) 現在只接收 SSML 內容,不再需要 voice_name 參數。
"""
try:
synthesis_input = texttospeech.SynthesisInput(ssml=ssml_content)
voice_params = texttospeech.VoiceSelectionParams(
# 從 'en-US-Wavenet-D' 中提取 'en-US'
language_code='-'.join(default_voice_name.split('-')[:2]),
name=default_voice_name
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=24000
)
response = self.tts_client.synthesize_speech(
input=synthesis_input,
voice=voice_params,
audio_config=audio_config
)
with open(output_path, "wb") as out:
out.write(response.audio_content)
return True
except Exception as e:
st.session_state.operation_status = {
"type": "error",
"message": "音訊生成發生錯誤!"
}
return False
def concatenate_audio(self) -> bool:
"""
將 audio 資料夾中所有獨立的 .wav 檔,按檔名順序拼接成一個完整的音訊檔。
"""
# 1. 檢查前置條件:使用我們之前建立的狀態檢查方法
if not self.has_sentence_audio():
st.session_state.operation_status = {
"type": "error",
"message": "錯誤:在 audio 資料夾中找不到任何 .wav 檔案,無法進行組合。"
}
return False
audio_dir = self.paths['audio']
output_path = self.paths['combined_audio']
try:
# 2. 獲取所有 .wav 檔案並進行排序
# 使用 sorted() 確保檔案是按字母順序(例如 000.wav, 001.wav, ...)處理的
wav_files = sorted(audio_dir.glob("*.wav"))
if not wav_files:
st.session_state.operation_status = {
"type": "error",
"message": " audio 資料夾中找到了目錄,但沒有找到 .wav 檔案。"
}
return False
# 3. 初始化一個空的 AudioSegment 作為拼接的基礎
# 這是比「拿第一個檔案當基礎」更穩健的做法
combined_audio = AudioSegment.empty()
# 4. 遍歷所有音訊檔並依次拼接
for wav_file in wav_files:
# 讀取單個 .wav 檔案
segment = AudioSegment.from_wav(wav_file)
# 使用 `+` 運算子將音訊片段拼接到末尾
combined_audio += segment
# 5. 匯出拼接好的完整音訊檔
# format="wav" 明確指定輸出格式
print(f"正在將組合音訊匯出到: {output_path}")
combined_audio.export(output_path, format="wav")
return True
except FileNotFoundError:
st.session_state.operation_status = {
"type": "error",
"message": "FFmpeg 未安裝或未在系統路徑中。Pydub 需要它來處理音訊。"
}
print("Pydub 錯誤:請確保 FFmpeg 已安裝並在系統的 PATH 環境變數中。")
return False
except Exception as e:
st.session_state.operation_status = {
"type": "error",
"message": f"組合音訊時發生未預期的錯誤: {e}"
}
print(f"組合音訊時出錯: {e}")
return False
def generate_ass_subtitles(self) -> bool:
"""
根據 data.json 的內容和 audio/ 目錄中每個音訊檔的時長,
生成一個包含多種樣式和四層字幕的 .ass 檔案。
此版本精確複製了新的 step5_generate_ass.py 的邏輯[1]。
"""
# 1. 檢查前置條件
if not self.has_sentence_audio():
st.session_state.operation_status = {
"type": "error",
"message": "錯誤:找不到單句音訊檔,無法生成字幕。"
}
return False
if not self.data:
st.session_state.operation_status = {
"type": "error",
"message": "錯誤:專案資料未載入。"
}
return False
# 2. 準備路徑和資料
ass_path = self.paths['ass_file']
audio_dir = self.paths['audio']
try:
# 從 self.data 讀取各語言的文本行
en_lines = [line.strip() for line in self.data.get("en", "").split('\n') if line.strip()]
zh_lines = [line.strip() for line in self.data.get("zh", "").split('\n') if line.strip()]
ipa_lines = [line.strip() for line in self.data.get("ipa", "").split('\n') if line.strip()]
# 3. 使用正規表示式獲取並排序音訊檔案
file_pattern = r"(\d{3})\.wav" # 根據您的腳本,檔名應為 vocab_00.wav, vocab_01.wav 等
pattern = re.compile(file_pattern)
wav_files = sorted(
[p for p in audio_dir.iterdir() if p.is_file() and pattern.fullmatch(p.name)],
key=lambda p: int(pattern.fullmatch(p.name).group(1))
)
# 4. 嚴格檢查資料數量是否一致
if not (len(wav_files) == len(en_lines) == len(zh_lines) == len(ipa_lines)):
msg = f"錯誤:資料數量不一致!音訊({len(wav_files)}), EN({len(en_lines)}), ZH({len(zh_lines)}), IPA({len(ipa_lines)})"
st.session_state.operation_status = {"type": "error", "message": msg}
return False
total_files = len(wav_files)
if total_files == 0:
st.session_state.operation_status = {"type": "warning", "message": "找不到任何匹配的音訊檔來生成字幕。"}
return False
# 5. 使用 pysubs2 建立 .ass 檔案物件
subs = pysubs2.SSAFile()
subs.info["PlayResX"] = "1920"
subs.info["PlayResY"] = "1080"
subs.info["Title"] = self.name
# 6. 定義所有需要的樣式 (從您的腳本中精確複製)[1]
subs.styles["EN"] = pysubs2.SSAStyle(fontname="Noto Sans", fontsize=140, primarycolor=pysubs2.Color(255, 248, 231), outlinecolor=pysubs2.Color(255, 248, 231), outline=2, alignment=pysubs2.Alignment.TOP_CENTER, marginv=280)
subs.styles["IPA"] = pysubs2.SSAStyle(fontname="Noto Sans", fontsize=110, primarycolor=pysubs2.Color(255, 140, 0), outlinecolor=pysubs2.Color(255, 140, 0), outline=1, alignment=pysubs2.Alignment.TOP_CENTER, marginv=340)
subs.styles["ZH"] = pysubs2.SSAStyle(fontname="Noto Sans TC", fontsize=140, primarycolor=pysubs2.Color(102, 128, 153), outlinecolor=pysubs2.Color(102, 128, 153), outline=1, alignment=pysubs2.Alignment.TOP_CENTER, marginv=440)
subs.styles["NUMBER"] = pysubs2.SSAStyle(fontname="Segoe UI Symbol", fontsize=120, primarycolor=pysubs2.Color(144, 144, 144), outlinecolor=pysubs2.Color(144, 144, 144), bold=True, outline=1, borderstyle=1,alignment=pysubs2.Alignment.TOP_RIGHT, marginl=0, marginr=260, marginv=160)
# 7. 遍歷音訊檔,生成四層字幕事件
st.session_state.operation_status = {
"type": "progress", "value": 0, "message": "正在準備生成字幕..."
}
current_time_ms = 0
for i, wav_path in enumerate(wav_files):
# 使用 librosa 獲取精確時長 (秒),並轉換為毫秒
duration_ms = int(librosa.get_duration(path=str(wav_path)) * 1000)
start_time = current_time_ms
end_time = current_time_ms + duration_ms
# 為每一層字幕建立一個事件
subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=en_lines[i], style="EN"))
subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=f"[{ipa_lines[i]}]", style="IPA"))
subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=zh_lines[i], style="ZH"))
subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=i, style="NUMBER"))
current_time_ms = end_time
st.session_state.operation_status = {
"type": "progress",
"value": (i + 1) / total_files,
"message": f"正在處理字幕... ({i+1}/{total_files})"
}
# 8. 儲存 .ass 檔案
subs.save(str(ass_path))
st.session_state.operation_status = {
"type": "success",
"message": f"ASS 字幕檔已成功生成並儲存於 {ass_path.name}!🎉"
}
return True
except Exception as e:
st.session_state.operation_status = {
"type": "error",
"message": f"生成 ASS 字幕時發生未預期的錯誤: {e}"
}
return False
def has_sentence_audio(self) -> bool:
"""檢查 audio 資料夾是否存在且包含 .wav 檔案。"""
audio_dir = self.paths.get('audio')
return audio_dir and audio_dir.exists() and any(f.suffix == '.wav' for f in audio_dir.iterdir())
def has_combined_audio(self) -> bool:
"""檢查組合後的音訊檔是否存在。"""
combined_path = self.paths.get('combined_audio')
return combined_path and combined_path.exists()
def assemble_video(self, logo_video: Path, open_video: Path, end_video: Path):
"""
使用 FFmpeg 的 xfade 濾鏡組裝最終影片,並在過程中統一影片屬性與進行色彩校正。
"""
# --- 標準化設定 ---
TARGET_WIDTH = 1920
TARGET_HEIGHT = 1080
TARGET_FPS = 30
def escape_ffmpeg_path_for_filter(path: Path) -> str:
"""為在 FFmpeg 濾鏡圖中安全使用而轉義路徑。"""
path_str = str(path.resolve())
if os.name == 'nt':
return path_str.replace('\\', '\\\\').replace(':', '\\:')
else:
return path_str.replace("'", "'\\\\\\''")
try:
# --- 1. 路徑與檔案檢查 ---
output_dir = self.paths['output']
temp_video_dir = self.paths['temp_video']
audio_path = self.paths['combined_audio']
ass_path = self.paths['ass_file']
final_video_path = self.paths['final_video']
bg_final_path = output_dir / "bg_final.mp4"
transition_duration = 1.0
print("111")
for file_path in [logo_video, open_video, end_video, audio_path, ass_path]:
if not file_path or not file_path.exists():
st.session_state.operation_status = {
"type": "error",
"message": f"缺少必需的檔案: {e}"
}
return
print("222")
base_videos = sorted([p for p in temp_video_dir.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']])
if not base_videos:
st.session_state.operation_status = {
"type": "error",
"message": f"資料夾中沒有影片可供合成。: {e}"
}
return
print("333")
p_loop_count, error_msg = self.calculate_loop_count(transition_duration=transition_duration)
if error_msg:
# 如果計算出錯,拋出一個錯誤讓外層的 except 捕捉
raise ValueError(f"計算循環次數時失敗: {error_msg}")
print("444")
# --- 2. 一步式生成主要內容影片 (bg_final.mp4) ---
if not bg_final_path.exists():
print(f"⚙️ 準備一步式生成主要內容影片 (循環 {p_loop_count} 次)...")
looped_video_list = base_videos * p_loop_count
inputs_cmd = []
for video_path in looped_video_list:
inputs_cmd.extend(["-i", str(video_path)])
inputs_cmd.extend(["-i", str(audio_path)])
ass_path_str = escape_ffmpeg_path_for_filter(ass_path)
filter_parts = []
stream_count = len(looped_video_list)
# 【核心修改】定義色彩校正濾鏡
color_correction_filter = "eq=brightness=-0.05:contrast=0.95:saturation=0.7"
# 將標準化 (尺寸、影格率) 與色彩校正合併
for i in range(stream_count):
filter_parts.append(
f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS},{color_correction_filter}[v_scaled_{i}]"
)
last_v_out = "[v_scaled_0]"
for i in range(stream_count - 1):
offset = sum(get_media_duration(v) for v in looped_video_list[:i+1]) - (i + 1) * transition_duration
v_in_next = f"[v_scaled_{i+1}]"
v_out_current = f"[v_out{i+1}]" if i < stream_count - 2 else "[bg_video_stream]"
filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}")
last_v_out = v_out_current
drawbox_filter = "drawbox=w=iw*0.8:h=ih*0.8:x=(iw-iw*0.8)/2:y=(ih-ih*0.8)/2:color=0x808080@0.7:t=fill"
filter_parts.append(f"[bg_video_stream]{drawbox_filter}[v_with_mask]")
filter_parts.append(f"[v_with_mask]ass=filename='{ass_path_str}'[final_v]")
filter_complex_str = ";".join(filter_parts)
ffmpeg_main_content_cmd = [
"ffmpeg", "-y", *inputs_cmd,
"-filter_complex", filter_complex_str,
"-map", "[final_v]",
"-map", f"{stream_count}:a:0",
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast",
"-c:a", "aac", "-b:a", "192k",
"-shortest",
str(bg_final_path)
]
print("🚀 正在執行 FFmpeg 主要內容合成指令 (含色彩校正)...")
result = subprocess.run(ffmpeg_main_content_cmd, check=True, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr)
print(f"✅ 主要內容影片已生成: {bg_final_path}")
# --- 3. 合成最終的影片序列 (logo -> open -> bg_final -> end) ---
if final_video_path.exists():
print(f"✅ 最終影片已存在,跳過組裝: {final_video_path}")
return True, f"✅ 影片已存在於 {final_video_path}", final_video_path
print("🎬 開始動態生成 xfade 合成指令...")
videos_to_concat = [logo_video, open_video, bg_final_path, end_video]
final_inputs_cmd, final_filter_parts, video_infos = [], [], []
for video_path in videos_to_concat:
final_inputs_cmd.extend(["-i", str(video_path)])
info = get_media_info(video_path)
video_infos.append(info)
for i, info in enumerate(video_infos):
if not info["has_audio"]:
duration = info['duration']
final_filter_parts.append(f"anullsrc=r=44100:cl=stereo:d={duration:.4f}[silent_a_{i}]")
for i in range(len(videos_to_concat)):
# 注意:這裡我們不對 logo/open/end 影片進行調色,以保留它們的原始風格
final_filter_parts.append(
f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS}[v_final_scaled_{i}]"
)
last_v_out = "[v_final_scaled_0]"
last_a_out = "[0:a]" if video_infos[0]["has_audio"] else "[silent_a_0]"
for i in range(len(videos_to_concat) - 1):
offset = sum(info['duration'] for info in video_infos[:i+1]) - (i + 1) * transition_duration
v_in_next = f"[v_final_scaled_{i+1}]"
a_in_next = f"[{i+1}:a]" if video_infos[i+1]["has_audio"] else f"[silent_a_{i+1}]"
v_out_current = f"[v_out{i+1}]" if i < len(videos_to_concat) - 2 else "[video]"
a_out_current = f"[a_out{i+1}]" if i < len(videos_to_concat) - 2 else "[audio]"
final_filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}")
final_filter_parts.append(f"{last_a_out}{a_in_next}acrossfade=d={transition_duration}{a_out_current}")
last_v_out, last_a_out = v_out_current, a_out_current
final_filter_complex_str = ";".join(final_filter_parts)
ffmpeg_concat_cmd = [
"ffmpeg", "-y", *final_inputs_cmd,
"-filter_complex", final_filter_complex_str,
"-map", "[video]", "-map", "[audio]",
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast",
"-movflags", "+faststart",
"-c:a", "aac", "-b:a", "192k",
str(final_video_path)
]
print("🚀 執行 FFmpeg 最終序列合成指令...")
result = subprocess.run(ffmpeg_concat_cmd, check=True, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr)
print(f"✅ 影片已成功組裝並儲存至 {final_video_path}")
return True, f"✅ 影片已成功組裝並儲存至 {final_video_path}", final_video_path
except subprocess.CalledProcessError as e:
error_output = e.stderr if e.stderr else str(e)
return False, f"❌ FFmpeg 執行失敗:\n{error_output}", None
except Exception as e:
return False, f"❌ 發生未預期錯誤: {e}", None
# 【新增】計算循環次數 p 的函式
def calculate_loop_count(self, transition_duration: float) -> tuple[int | None, str | None]:
"""
根據音訊長度和一系列影片,計算所需的最小循環次數 p。
成功時回傳 (p, None),失敗時回傳 (None, error_message)。
"""
print("aaa")
m_prime = get_media_duration(self.paths['combined_audio'])
if m_prime is None:
return None, "無法讀取音訊檔案長度。"
print("bbb")
video_folder=self.paths['temp_video']
video_paths = [p for p in video_folder.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']]
video_lengths = [get_media_duration(p) for p in video_paths]
video_lengths = [l for l in video_lengths if l is not None and l > 0]
print("ccc")
if not video_lengths:
return None, "在 `test` 資料夾中找不到有效的影片檔案。"
print("ddd")
n = len(video_lengths)
m = sum(video_lengths)
tr = transition_duration
denominator = m - (n - 1) * tr
if denominator <= 0:
return None, f"影片的有效長度 ({denominator:.2f}s) 小於或等於零,無法進行循環。請增加影片時長或減少轉場時間。"
p = math.ceil(m_prime / denominator)
return p, None