From dab062549a4cf69e613071a3dc7a2a32faf1c965 Mon Sep 17 00:00:00 2001 From: bruce Date: Tue, 8 Jul 2025 15:27:03 +0800 Subject: [PATCH] add project --- .gitignore | 2 + .streamlit/secrets.toml | 8 + app.py | 216 +++++++++++++++++++++++++ scripts/__init__.py | 0 scripts/step1_notion_sync.py | 87 ++++++++++ scripts/step2_translate_ipa.py | 155 ++++++++++++++++++ scripts/step3_generate_audio.py | 104 ++++++++++++ scripts/step4_concatenate_audio.py | 78 +++++++++ scripts/step5_generate_ass.py | 107 ++++++++++++ scripts/step6_assemble_video.py | 166 +++++++++++++++++++ ui_fragments/audio_manager.py | 43 +++++ ui_fragments/video_manager.py | 45 ++++++ ui_fragments/video_search.py | 136 ++++++++++++++++ utils/__init__.py | 0 utils/callbacks.py | 251 +++++++++++++++++++++++++++++ utils/helpers.py | 189 ++++++++++++++++++++++ utils/paths.py | 21 +++ 17 files changed, 1608 insertions(+) create mode 100644 .gitignore create mode 100644 .streamlit/secrets.toml create mode 100644 app.py create mode 100644 scripts/__init__.py create mode 100644 scripts/step1_notion_sync.py create mode 100644 scripts/step2_translate_ipa.py create mode 100644 scripts/step3_generate_audio.py create mode 100644 scripts/step4_concatenate_audio.py create mode 100644 scripts/step5_generate_ass.py create mode 100644 scripts/step6_assemble_video.py create mode 100644 ui_fragments/audio_manager.py create mode 100644 ui_fragments/video_manager.py create mode 100644 ui_fragments/video_search.py create mode 100644 utils/__init__.py create mode 100644 utils/callbacks.py create mode 100644 utils/helpers.py create mode 100644 utils/paths.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b4f9491 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +projects/ \ No newline at end of file diff --git a/.streamlit/secrets.toml b/.streamlit/secrets.toml new file mode 100644 index 0000000..4a75b2d --- /dev/null +++ b/.streamlit/secrets.toml @@ -0,0 +1,8 @@ +# .streamlit/secrets.toml + +NOTION_API_KEY = "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" +NOTION_DATABASE_ID = "2176c85b7c80800db30fd1403338069d" +GOOGLE_CREDS_TRANSLATE_PATH = "D:\\Python_tool\\video_workflow\\black-abode-267208-edc57f25cd47.json" +GOOGLE_CREDS_TTS_PATH = "D:\\Python_tool\\video_workflow\\black-abode-267208-b3fc82ecef45.json" +PIXABAY_API_KEY = "51169279-fa11876cb5b46d49afd0aec5d" +PEXELS_API_KEY = "q5TZIBjveEsJTe2KUCGVimPQIdPHKKOOMNJxm2M2vKk1F8QCfsi7xogv" \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..f40251c --- /dev/null +++ b/app.py @@ -0,0 +1,216 @@ +import streamlit as st +import time +from pathlib import Path +import json + +# --- 本地模組匯入 --- +from utils.paths import get_project_paths, SHARED_ASSETS_DIR, PROJECTS_DIR +from utils.callbacks import callback_set_project, callback_create_project, callback_run_step,callback_assemble_final_video +from utils.helpers import get_notion_page_titles, display_global_status_message +from ui_fragments.audio_manager import audio_management_fragment +from ui_fragments.video_manager import video_management_fragment +from ui_fragments.video_search import video_search_fragment + +# --- 匯入外部處理腳本 --- +# 雖然這些腳本主要在 callbacks 中被呼叫,但在此處匯入有助於理解全貌 +from scripts.step1_notion_sync import update_project_from_notion +from scripts.step2_translate_ipa import run_step2_translate_ipa +from scripts.step3_generate_audio import run_step3_generate_audio +from scripts.step4_concatenate_audio import run_step4_concatenate_audio +from scripts.step5_generate_ass import run_step5_generate_ass +from scripts.step6_assemble_video import run_step6_assemble_video + +# --- Streamlit UI 設定 --- +st.set_page_config(layout="wide", page_title="英語影片自動化工作流程") + +# --- 狀態初始化 --- +if 'current_project' not in st.session_state: + st.session_state.current_project = None +if 'final_video_path' not in st.session_state: + st.session_state.final_video_path = None +if 'show_video' not in st.session_state: + st.session_state.show_video = False +if 'operation_status' not in st.session_state: + st.session_state.operation_status = {} +if 'search_query' not in st.session_state: + st.session_state.search_query = "" +if 'search_results' not in st.session_state: + st.session_state.search_results = [] +if 'selected_videos' not in st.session_state: + st.session_state.selected_videos = {} +if 'active_preview_id' not in st.session_state: + st.session_state.active_preview_id = None + +# --- UI 介面 --- +st.title("🎬 英語影片自動化工作流程") +display_global_status_message() +# --- 側邊欄 --- +with st.sidebar: + st.header("API & Project Control") + notion_api_key = st.secrets.get("NOTION_API_KEY") + notion_database_id = st.secrets.get("NOTION_DATABASE_ID") + google_creds_for_translate_path = st.secrets.get("GOOGLE_CREDS_TRANSLATE_PATH") + google_creds_for_TTS_path = st.secrets.get("GOOGLE_CREDS_TTS_PATH") + st.divider() + + st.header("1. 建立新專案") + try: + if notion_api_key and notion_database_id: + with st.spinner("載入 Notion 頁面..."): + page_titles_map = get_notion_page_titles(notion_api_key, notion_database_id) + st.session_state.page_titles_map = page_titles_map + st.selectbox("選擇一個 Notion 頁面來建立專案:", options=[""] + list(page_titles_map.keys()), index=0, key="selected_title", placeholder="選擇一個頁面...") + if st.session_state.selected_title: + st.button(f"從 '{st.session_state.selected_title}' 建立專案", on_click=callback_create_project) + else: + st.warning("請在 Streamlit secrets 中設定 Notion API 金鑰和資料庫 ID。") + except Exception as e: + st.error(f"無法載入 Notion 頁面: {e}") + + st.divider() + + st.header("2. 選擇現有專案") + existing_projects = [p.name for p in PROJECTS_DIR.iterdir() if p.is_dir()] + selected_project_idx = existing_projects.index(st.session_state.current_project) + 1 if st.session_state.current_project in existing_projects else 0 + st.selectbox( + "或選擇一個現有專案:", + options=[""] + existing_projects, + index=selected_project_idx, + key="project_selector", + on_change=callback_set_project, + help="選擇您已經建立的專案。" + ) + +# --- 主畫面 --- +if not st.session_state.current_project: + st.info("👈 請在側邊欄建立新專案或選擇一個現有專案以開始。") +else: + paths = get_project_paths(st.session_state.current_project) + project_path = paths["root"] + + st.header(f"目前專案:`{st.session_state.current_project}`") + + tab_names = ["1. 資料處理 & AI 加註", "2. 素材生成", "2.5. 線上素材搜尋", "3. 影片合成"] + active_tab = st.radio("選擇工作流程步驟:", options=tab_names, key="main_tabs_radio", horizontal=True, label_visibility="collapsed") + st.markdown("---") + + # --- 分頁內容 --- + if active_tab == tab_names[0]: + data_file = paths["data"] + if data_file.exists(): + with st.container(): + col1, col2 = st.columns([3, 1]) + with col1: + st.subheader("專案資料") + with col2: + st.button("🔄 從 Notion 同步更新", on_click=callback_run_step, args=(update_project_from_notion,), kwargs={"source": "sync_notion", "spinner_text": "正在同步...", "api_key": notion_api_key, "project_path": project_path}, help="從 Notion 抓取此專案的最新資料並覆寫本地檔案。") + with st.expander("預覽專案資料 (data.json)", expanded=False): + st.json(json.loads(data_file.read_text(encoding="utf-8"))) + st.divider() + st.subheader("AI 自動加註") + st.button("2. 添加翻譯與 IPA 音標", on_click=callback_run_step, args=(run_step2_translate_ipa,), kwargs={"source": "run_translate", "spinner_text": "正在調用 AI...", "project_path": project_path, "google_creds_path": google_creds_for_translate_path, "notion_api_key": notion_api_key, "notion_database_id": notion_database_id}, disabled=not google_creds_for_translate_path) + if not google_creds_for_translate_path: + st.warning("請在 secrets.toml 中提供 Google Cloud 翻譯認證檔案的路徑。") + else: + st.warning("找不到 data.json 檔案。請先從 Notion 同步。") + st.divider() + st.info("✍️ **人工檢查點**:\n\n1. 請檢查 `Notion` 中的內容是否準確。\n2. 修改完成後,**請務必點擊上方的「從 Notion 同步更新」按鈕**。") + + elif active_tab == tab_names[1]: + with st.container(border=True): + st.subheader("步驟 3.1: 生成單句音訊") + st.button("執行生成", on_click=callback_run_step, args=(run_step3_generate_audio,), kwargs={"source": "gen_audio", "spinner_text": "正在生成音訊...", "project_path": project_path, "google_creds_path": google_creds_for_TTS_path}, help="根據 data.json 中的每一句英文,生成對應的 .wav 音訊檔並存放在 audio 資料夾。") + + st.divider() + + st.markdown("##### 步驟 3.2: 組合完整音訊") + audio_folder, single_audio_exists = paths["audio"], False + if audio_folder.exists() and any(audio_folder.iterdir()): + single_audio_exists = True + + if not single_audio_exists: + st.info("請先執行步驟 3.1 以生成單句音訊。") + st.button("執行組合", on_click=callback_run_step, args=(run_step4_concatenate_audio,), kwargs={"source": "concat_audio", "spinner_text": "正在組合音訊...", "project_path": project_path}, help="將 audio 資料夾中所有的 .wav 檔,按順序組合成一個名為 combined_audio.wav 的檔案。", disabled=not single_audio_exists) + + combined_audio_path = paths["combined_audio"] + if combined_audio_path.exists(): + st.success("🎉 完整音訊已組合成功!") + st.audio(str(combined_audio_path)) + + st.divider() + + with st.container(border=True): + st.subheader("步驟 4: 生成 ASS 字幕檔") + if not single_audio_exists: + st.info("請先執行步驟 3.1 以生成字幕所需的時間戳。") + st.button("📝 生成 .ass 字幕檔", on_click=callback_run_step, args=(run_step5_generate_ass,), kwargs={"source": "gen_ass", "spinner_text": "正在生成字幕檔...", "project_path": project_path}, disabled=not single_audio_exists) + + st.divider() + audio_management_fragment(paths) + + elif active_tab == tab_names[2]: + video_search_fragment(paths) + + elif active_tab == tab_names[3]: + video_management_fragment(paths) + st.divider() + + st.subheader("步驟 6.1: 管理與選擇共享影片素材") + shared_videos = [""] + [f.name for f in sorted(SHARED_ASSETS_DIR.glob('*.mp4'))] + [f.name for f in sorted(SHARED_ASSETS_DIR.glob('*.mov'))] + with st.container(border=True): + st.markdown("##### 從共享素材庫中選擇影片") + c1, c2 = st.columns(2) + with c1: + logo_selection = st.selectbox("選擇 Logo 影片:", shared_videos, key="logo_select") + open_selection = st.selectbox("選擇開場影片:", shared_videos, key="open_select") + with c2: + end_selection = st.selectbox("選擇結尾影片:", shared_videos, key="end_select") + + with st.expander("**上傳新的共享素材**"): + st.markdown("上傳的影片將存入 `shared_assets` 公共資料夾,可供所有專案重複使用。") + uploaded_files = st.file_uploader("上傳新的影片到共享素材庫", type=["mp4", "mov"], accept_multiple_files=True, label_visibility="collapsed") + if uploaded_files: + for uploaded_file in uploaded_files: + with open(SHARED_ASSETS_DIR / uploaded_file.name, "wb") as f: + f.write(uploaded_file.getbuffer()) + st.success(f"成功上傳 {len(uploaded_files)} 個檔案到共享素材庫!") + time.sleep(1) + st.rerun() + + st.divider() + st.subheader("步驟 6.2: 執行最終影片合成") + all_videos_selected = all([logo_selection, open_selection, end_selection]) + if not all_videos_selected: + st.info("請從上方的下拉選單中選擇所有四個影片以啟用合成按鈕。") + + st.button( + "🎬 合成最終影片", + on_click=callback_assemble_final_video, + kwargs={ + "paths": paths, + "source": "assemble_video", + "spinner_text": "影片合成中,請稍候...", + "project_path": paths['root'], # 傳遞給 run_step6_assemble_video + "logo_video": SHARED_ASSETS_DIR / logo_selection if logo_selection else None, + "open_video": SHARED_ASSETS_DIR / open_selection if open_selection else None, + "end_video": SHARED_ASSETS_DIR / end_selection if end_selection else None + }, + disabled=not all_videos_selected + ) + + + if st.session_state.final_video_path and Path(st.session_state.final_video_path).exists(): + st.divider() + st.header("🎉 影片製作完成!") + st.checkbox("顯示/隱藏影片", key="show_video", value=True) + if st.session_state.show_video: + st.video(st.session_state.final_video_path) + + with open(st.session_state.final_video_path, "rb") as file: + st.download_button( + "📥 下載專案影片", + data=file, + file_name=Path(st.session_state.final_video_path).name, + mime="video/mp4", + use_container_width=True + ) diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/step1_notion_sync.py b/scripts/step1_notion_sync.py new file mode 100644 index 0000000..39bbd83 --- /dev/null +++ b/scripts/step1_notion_sync.py @@ -0,0 +1,87 @@ +# scripts/step1_notion_sync.py +import os +import json +import re +from pathlib import Path +from notion_client import Client +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def extract_property_value(property_data): + """從 Notion 頁面屬性中提取純文字值。""" + prop_type = property_data.get('type') + if prop_type == 'title': + return property_data['title'][0]['plain_text'] if property_data.get('title') else "" + elif prop_type == 'rich_text': + return "\n".join([text['plain_text'] for text in property_data.get('rich_text', [])]) + elif prop_type == 'select' and property_data.get('select'): + return property_data['select']['name'] + elif prop_type == 'date' and property_data.get('date'): + return property_data['date']['start'] + return None + + + +def create_project_from_page(api_key: str, page_id: str, project_name: str, projects_dir: Path): + """根據單一 Notion 頁面建立一個新的本地專案資料夾。""" + sanitized_name = re.sub(r'[\\/*?:"<>|]', "", project_name).replace(" ", "_").lower() + project_path = projects_dir / sanitized_name + output_json_path = project_path / "data.json" + + if project_path.exists(): + return False, f"⚠️ 專案 '{sanitized_name}' 已存在。", None + + project_path.mkdir(parents=True, exist_ok=True) + + try: + client = Client(auth=api_key) + page_data = client.pages.retrieve(page_id=page_id) + properties = page_data['properties'] + page_entry = {"id": page_data['id']} + for prop_name, prop_data in properties.items(): + page_entry[prop_name] = extract_property_value(prop_data) + + with open(output_json_path, 'w', encoding='utf-8') as f: + json.dump(page_entry, f, ensure_ascii=False, indent=2) + return True, f"✅ 專案 '{sanitized_name}' 建立成功!", sanitized_name + except Exception as e: + return False, f"❌ 處理頁面 (ID: {page_id}) 時出錯: {e}", None + +# --- 新增的更新函式 --- +def update_project_from_notion(api_key: str, project_path: Path): + """ + 從 Notion 更新單一專案的 data.json 檔案。 + 它會讀取本地 data.json 以取得 page_id,然後抓取最新資料。 + """ + data_file = project_path / "data.json" + if not data_file.exists(): + return False, f"❌ 錯誤:在專案 '{project_path.name}' 中找不到 data.json。" + + try: + with open(data_file, 'r', encoding='utf-8') as f: + local_data = json.load(f) + + page_id = local_data.get('id') + if not page_id: + return False, "❌ 錯誤:data.json 檔案中缺少 Notion page_id。" + + # 使用 page_id 從 Notion API 抓取最新資料 [4][6] + logging.info(f"正在從 Notion 更新頁面 ID: {page_id}") + client = Client(auth=api_key) + page_data = client.pages.retrieve(page_id=page_id) + + properties = page_data['properties'] + updated_entry = {"id": page_data['id']} + for prop_name, prop_data in properties.items(): + updated_entry[prop_name] = extract_property_value(prop_data) + + # 覆寫本地的 data.json 檔案 + with open(data_file, 'w', encoding='utf-8') as f: + json.dump(updated_entry, f, ensure_ascii=False, indent=2) + + return True, f"✅ 專案 '{project_path.name}' 已成功從 Notion 同步更新!" + + except Exception as e: + logging.error(f"更新專案 '{project_path.name}' 時發生錯誤: {e}") + return False, f"❌ 更新失敗: {e}" diff --git a/scripts/step2_translate_ipa.py b/scripts/step2_translate_ipa.py new file mode 100644 index 0000000..f0aa100 --- /dev/null +++ b/scripts/step2_translate_ipa.py @@ -0,0 +1,155 @@ +# scripts/step2_translate_ipa.py + +import os +import json +from pathlib import Path +from google.cloud import translate_v2 as translate +from notion_client import Client +from notion_client.errors import APIResponseError +import eng_to_ipa as e2i +import logging + +def _update_single_notion_page(client, page_id: str, zh_text: str, ipa_text: str): + """ + 將翻譯和 IPA 結果更新回指定的 Notion 頁面。 + + Args: + client: 已初始化的 Notion Client。 + page_id (str): 要更新的 Notion 頁面 ID。 + zh_text (str): 中文翻譯文字。 + ipa_text (str): IPA 音標文字。 + """ + try: + # --- ✨ 核心修改處:建立符合 Notion API 規範的 properties 物件 --- + properties_payload = { + # 假設您在 Notion 中的欄位名稱就是 "zh" + # 如果不是,請修改 "zh" 為您實際的欄位名稱,例如 "中文翻譯" + "zh": { + "rich_text": [ + { + "type": "text", + "text": { + "content": zh_text + } + } + ] + }, + # 假設您在 Notion 中的欄位名稱就是 "ipa" + # 如果不是,請修改 "ipa" 為您實際的欄位名稱,例如 "IPA" + "ipa": { + "rich_text": [ + { + "type": "text", + "text": { + "content": ipa_text + } + } + ] + } + } + + logging.info(f"正在更新 Notion 頁面 ID: {page_id}") + # 發送更新請求 + client.pages.update(page_id=page_id, properties=properties_payload) + logging.info(f"✅ 成功將結果寫回 Notion 頁面 {page_id}") + + except Exception as e: + # 捕捉並印出詳細錯誤,這對於除錯至關重要 + logging.error(f"❌ 更新 Notion 頁面 {page_id} 失敗: {e}") + + +def run_step2_translate_ipa( + project_path: Path, + google_creds_path: str, + notion_api_key: str = None, + notion_database_id: str = None +): + """ + 讀取專案中的 data.json,為其添加中文翻譯和 IPA 音標, + 並將結果寫回 data.json,同時可選擇性地更新回 Notion。 + Args: + project_path (Path): 專案資料夾的路徑。 + google_creds_path (str): Google Cloud 認證 JSON 檔案的路徑。 + notion_api_key (str, optional): Notion API Key. Defaults to None. + notion_database_id (str, optional): Notion Database ID. Defaults to None. + Returns: + tuple[bool, str]: (操作是否成功, 附帶的訊息)。 + """ + data_file = project_path / "data.json" + if not data_file.exists(): + return False, f"錯誤:在專案 '{project_path.name}' 中找不到 data.json 檔案。" + + # 1. 初始化 API 客戶端 + try: + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = google_creds_path + translate_client = translate.Client() + except Exception as e: + return False, f"初始化 Google Translate 客戶端失敗: {e}" + + notion_client = None + if notion_api_key and notion_database_id: + try: + notion_client = Client(auth=notion_api_key) + except Exception as e: + return False, f"初始化 Notion 客戶端失敗: {e}" + + # 2. 讀取並處理資料 + try: + with data_file.open("r", encoding="utf-8") as f: + data = json.load(f) + except Exception as e: + return False, f"讀取或解析 data.json 失敗: {e}" + + # --- ✨ 核心修改處:移除迴圈,直接處理 data 物件 --- + + # 使用 .get() 安全地獲取英文原文,避免 KeyError [7][14] + # 'en' 是您在 data.json 中定義的鍵 + english_text = data.get("en") + + if not english_text or not isinstance(english_text, str): + return False, "在 data.json 中找不到 'en' 鍵或其內容不是文字。" + + # --- 翻譯成中文 --- + # 將英文原文按行切分,並過濾掉空行 + english_lines = [line.strip() for line in english_text.strip().split('\n') if line.strip()] + translated_lines = [] + + # 迴圈遍歷每一行英文,單獨進行翻譯 + for line in english_lines: + try: + result = translate_client.translate(line, target_language="zh-TW") + translated_lines.append(result.get("translatedText", "")) + except Exception as e: + print(f"翻譯 '{line}' 時出錯: {e}") + translated_lines.append("") # 如果單行出錯,則添加空字串 + # 將翻譯好的各行重新用換行符組合起來 + translated_text = '\n'.join(translated_lines) + + # --- 生成 IPA 音標 --- + try: + english_lines = [line.strip() for line in english_text.strip().split('\n') if line.strip()] + ipa_lines = [e2i.convert(line) for line in english_lines] + ipa_text = '\n'.join(ipa_lines) + except Exception as e: + print(f"獲取 '{english_text}' 的 IPA 時出錯: {e}") + ipa_text = "" + + # 將結果存回主 data 字典中 + data["zh"] = translated_text + data["ipa"] = ipa_text + + print(f"已處理: '{english_text[:30]}...' -> '{translated_text[:30]}...' | '{ipa_text[:30]}...'") + + # --- 如果提供了 Notion 憑證,則更新回 Notion --- + page_id = data.get("id") + if notion_client and page_id: + _update_single_notion_page(notion_client, page_id, translated_text, ipa_text) + + # 3. 將更新後的單一物件寫回 JSON 檔案 + try: + with data_file.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + except Exception as e: + return False, f"寫回更新後的 data.json 失敗: {e}" + + return True, f"✅ 專案 '{project_path.name}' 的翻譯與 IPA 已成功處理!" diff --git a/scripts/step3_generate_audio.py b/scripts/step3_generate_audio.py new file mode 100644 index 0000000..781b968 --- /dev/null +++ b/scripts/step3_generate_audio.py @@ -0,0 +1,104 @@ +import json +import html +from pathlib import Path +from google.cloud import texttospeech +from google.oauth2 import service_account + +def generate_ssml_content(item_en, item_zh, english_voice_1, english_voice_2, chinese_voice): + return f""" + + + + {item_en} + + + + {item_en} + + + + {item_zh} + + + + {item_en} + + + + """ +def run_step3_generate_audio( + project_path: Path, + google_creds_path, + english_voice_1: str = "en-US-Wavenet-I", + english_voice_2: str = "en-US-Wavenet-F", + chinese_voice: str = "cmn-TW-Wavenet-B", +): + """ + 為每個詞彙項目生成獨立的音訊檔案。 + """ + try: + # 1. 定義路徑 + json_file_path = project_path / "data.json" + output_audio_folder = project_path / "audio" + output_audio_folder.mkdir(parents=True, exist_ok=True) + + # 2. 從 JSON 檔案載入資料 + if not json_file_path.exists(): + return False, f"錯誤:找不到 JSON 檔案 {json_file_path}" + + with open(json_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + # --- ✨ 核心修改處:將字串分割成列表 --- + + # 首先獲取完整的字串,如果鍵不存在則返回空字串 + en_text = data.get("en", "") + zh_text = data.get("zh", "") + + # 使用換行符 '\n' 將字串分割成列表,並過濾掉空行 + en_lines = [line.strip() for line in en_text.split('\n') if line.strip()] + zh_lines = [line.strip() for line in zh_text.split('\n') if line.strip()] + + # 現在 en_lines 和 zh_lines 是我們期望的列表格式了 + + # 進行驗證 + if not en_lines or not zh_lines or len(en_lines) != len(zh_lines): + return False, "錯誤:JSON 檔案中的英文和中文句子列表為空或長度不匹配。" + + # 3. 初始化 Google Text-to-Speech 客戶端 + creds = service_account.Credentials.from_service_account_file(google_creds_path) + client = texttospeech.TextToSpeechClient(credentials=creds) + + # 4. 迴圈遍歷每個詞彙項目並合成音訊 + total_files = len(en_lines) + for i, (item_en, item_zh) in enumerate(zip(en_lines, zh_lines)): + print(f"正在處理第 {i+1}/{total_files} 個檔案: {item_en}") + + safe_item_en = html.escape(item_en) + safe_item_zh = html.escape(item_zh) + + ssml_content = generate_ssml_content(safe_item_en, safe_item_zh, english_voice_1, english_voice_2, chinese_voice) + synthesis_input = texttospeech.SynthesisInput(ssml=ssml_content) + + voice_params = texttospeech.VoiceSelectionParams(language_code="en-US") + audio_config = texttospeech.AudioConfig( + audio_encoding=texttospeech.AudioEncoding.LINEAR16, + sample_rate_hertz=24000 + ) + + response = client.synthesize_speech( + input=synthesis_input, + voice=voice_params, + audio_config=audio_config + ) + + output_file = output_audio_folder / f"vocab_{i:02d}.wav" + with open(output_file, "wb") as out: + out.write(response.audio_content) + + return True, f"成功!已在 '{output_audio_folder}' 資料夾中生成 {total_files} 個音訊檔案。" + + except Exception as e: + error_message = f"生成音訊時發生未預期的錯誤: {e}" + print(error_message) + return False, error_message \ No newline at end of file diff --git a/scripts/step4_concatenate_audio.py b/scripts/step4_concatenate_audio.py new file mode 100644 index 0000000..4d9a823 --- /dev/null +++ b/scripts/step4_concatenate_audio.py @@ -0,0 +1,78 @@ +# scripts/step4_concatenate_audio.py +import re +from pathlib import Path +from pydub import AudioSegment + +def _concatenate_audio_files(audio_folder: Path, file_pattern: str, output_path: Path, delay_ms: int = 500): + """ + Internal helper function to find, sort, and concatenate audio files based on a pattern. + This is based on the script you provided [1]. + """ + if not audio_folder.is_dir(): + raise FileNotFoundError(f"Audio source folder '{audio_folder}' does not exist.") + + # Find all files matching the pattern and extract the number for sorting + compiled_pattern = re.compile(file_pattern) + matching_files = [] + for filepath in audio_folder.iterdir(): + if filepath.is_file(): + match = compiled_pattern.match(filepath.name) + if match and match.group(1).isdigit(): + matching_files.append((filepath, int(match.group(1)))) + + if not matching_files: + raise FileNotFoundError(f"No files matching pattern '{file_pattern}' found in '{audio_folder}'.") + + # Sort files numerically based on the extracted number + matching_files.sort(key=lambda x: x[1]) + + print("Found and sorted the following files for concatenation:") + for file_path, _ in matching_files: + print(f"- {file_path.name}") + + # Start with a silent segment (delay) + combined_audio = AudioSegment.silent(duration=delay_ms) + + # Concatenate all sorted audio files + for audio_file_path, _ in matching_files: + try: + segment = AudioSegment.from_file(audio_file_path) + combined_audio += segment + except Exception as e: + print(f"Warning: Could not process file '{audio_file_path.name}'. Skipping. Error: {e}") + + # End with a silent segment (delay) + combined_audio += AudioSegment.silent(duration=2000) + # Export the final combined audio file + output_path.parent.mkdir(parents=True, exist_ok=True) + combined_audio.export(output_path, format="mp3") + print(f"Successfully concatenated audio to '{output_path}'") + + +def run_step4_concatenate_audio(project_path: Path): + """ + Main function for Step 4. Finds all 'vocab_xx.wav' files in the project's + audio folder, concatenates them, and saves the result as a single MP3. + """ + try: + audio_folder = project_path / "audio" + output_dir = project_path / "output" + output_wav_path = output_dir / "combined_audio.wav" + + # Define the pattern for the audio files created in Step 3 + file_pattern = r"vocab_(\d{2})\.wav" + + _concatenate_audio_files( + audio_folder=audio_folder, + file_pattern=file_pattern, + output_path=output_wav_path, + delay_ms=0 # Start with a 1-second delay + ) + + return True, f"✅ Audio successfully concatenated and saved to '{output_wav_path}'" + + except FileNotFoundError as e: + return False, f"❌ Error: {e}", None + except Exception as e: + return False, f"❌ An unexpected error occurred during audio concatenation: {e}", None + diff --git a/scripts/step5_generate_ass.py b/scripts/step5_generate_ass.py new file mode 100644 index 0000000..8227006 --- /dev/null +++ b/scripts/step5_generate_ass.py @@ -0,0 +1,107 @@ +# scripts/step5_generate_ass.py +import json +import re +import librosa +from pathlib import Path +import pysubs2 # 使用 pysubs2 函式庫 + +def number_to_unicode_circled(num): + """ + 將數字 1-10 轉換為 Unicode 帶圈數字符號。 + """ + if num < 0: + return None + if num == 0: + return '\u24ea' + result = '' + digits = [] + while num > 0: + digits.append(num % 10) + num //= 10 + digits.reverse() + for d in digits: + if d == 0: + result += '\u24ea' + else: + result += chr(0x2460 + d - 1) + return result + +def run_step5_generate_ass(project_path: Path): + """ + 根據專案中的音訊檔案和 JSON 文本生成一個多層的 .ass 字幕檔。 + + Args: + project_path (Path): 專案的根目錄路徑。 + + Returns: + tuple[bool, str, Path | None]: (操作是否成功, 附帶的訊息, 輸出檔案的路徑或 None)。 + """ + try: + # 1. 定義路徑 + json_file_path = project_path / "data.json" + audio_folder = project_path / "audio" + output_dir = project_path / "output" + final_ass_path = output_dir / "subtitles.ass" + + # 確保輸出資料夾存在 + output_dir.mkdir(parents=True, exist_ok=True) + + # 2. 載入 JSON 資料並分割成行 + with open(json_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + en_lines = [line.strip() for line in data.get("en", "").split('\n') if line.strip()] + zh_lines = [line.strip() for line in data.get("zh", "").split('\n') if line.strip()] + ipa_lines = [line.strip() for line in data.get("ipa", "").split('\n') if line.strip()] + number_list = [number_to_unicode_circled(i + 1) for i in range(len(en_lines))] + + # 3. 獲取所有音訊檔案並排序 + file_pattern = r"vocab_(\d{2})\.wav" + pattern = re.compile(file_pattern) + wav_files = sorted( + [p for p in audio_folder.iterdir() if p.is_file() and pattern.fullmatch(p.name)], + key=lambda p: int(pattern.fullmatch(p.name).group(1)) + ) + + # 4. 檢查數量是否一致 + if not (len(wav_files) == len(en_lines) == len(zh_lines) == len(ipa_lines)): + msg = f"錯誤:音訊({len(wav_files)}), EN({len(en_lines)}), ZH({len(zh_lines)}), IPA({len(ipa_lines)}) 數量不一致!" + return False, msg, None + + # 5. 使用 pysubs2 建立 .ass 檔案 + subs = pysubs2.SSAFile() + # 設定畫布解析度(與影片一致) + subs.info["PlayResX"] = "1920" + subs.info["PlayResY"] = "1080" + + # 從您的 gen_ass.py 中複製樣式定義 [1] + subs.styles["EN"] = pysubs2.SSAStyle(fontname="Noto Sans", fontsize=140, primarycolor=pysubs2.Color (255, 248, 231), outlinecolor=pysubs2.Color (255, 248, 231),outline=2, borderstyle=1, alignment=pysubs2.Alignment.TOP_CENTER, marginv=280) + subs.styles["IPA"] = pysubs2.SSAStyle(fontname="Noto Sans", fontsize=110, primarycolor=pysubs2.Color(255,140,0), outlinecolor=pysubs2.Color(255,140,0), outline=1, borderstyle=1, alignment=pysubs2.Alignment.TOP_CENTER, marginv=340) + subs.styles["ZH"] = pysubs2.SSAStyle(fontname="Noto Sans TC", fontsize=140, primarycolor=pysubs2.Color(102,128,153), outlinecolor=pysubs2.Color(102,128,153), outline=1, borderstyle=1, alignment=pysubs2.Alignment.TOP_CENTER, marginv=440) + subs.styles["NUMBER"] = pysubs2.SSAStyle(fontname="Segoe UI Symbol", fontsize=120, primarycolor=pysubs2.Color(204, 136, 0), outlinecolor=pysubs2.Color(204, 136, 0), bold=True, scalex=120, outline=1, borderstyle=1, alignment=pysubs2.Alignment.TOP_RIGHT, marginl=0, marginr=260, marginv=160) + + # 6. 遍歷音訊檔,生成字幕事件 + current_time_ms = 0 + for i, wav_path in enumerate(wav_files): + duration_s = librosa.get_duration(path=str(wav_path)) + duration_ms = int(duration_s * 1000) + + start_time = current_time_ms + end_time = current_time_ms + duration_ms + + # 建立四層字幕事件 + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=en_lines[i], style="EN")) + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=f"[{ipa_lines[i]}]", style="IPA")) + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=zh_lines[i], style="ZH")) + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=number_list[i], style="NUMBER")) + + current_time_ms = end_time + + # 7. 儲存 .ass 檔案 + subs.save(str(final_ass_path)) + + return True, f"✅ ASS 字幕檔已成功生成並儲存至 '{final_ass_path}'", final_ass_path + + except Exception as e: + return False, f"❌ 在生成 ASS 字幕時發生未預期的錯誤: {e}", None + diff --git a/scripts/step6_assemble_video.py b/scripts/step6_assemble_video.py new file mode 100644 index 0000000..89f7d52 --- /dev/null +++ b/scripts/step6_assemble_video.py @@ -0,0 +1,166 @@ +import subprocess +from pathlib import Path +import os +from utils.helpers import get_media_info, get_media_duration + +# --- 標準化設定 --- +TARGET_WIDTH = 1920 +TARGET_HEIGHT = 1080 +TARGET_FPS = 30 + +def escape_ffmpeg_path_for_filter(path: Path) -> str: + """為在 FFmpeg 濾鏡圖中安全使用而轉義路徑。""" + path_str = str(path.resolve()) + if os.name == 'nt': + return path_str.replace('\\', '\\\\').replace(':', '\\:') + else: + return path_str.replace("'", "'\\\\\\''") + +def run_step6_assemble_video( + project_path: Path, + logo_video: Path, + open_video: Path, + end_video: Path, + p_loop_count: int, + transition_duration: float = 1.0 +): + """ + 使用 FFmpeg 的 xfade 濾鏡組裝最終影片,並在過程中統一影片屬性與進行色彩校正。 + """ + try: + # --- 1. 路徑與檔案檢查 --- + output_dir = project_path / "output" + test_dir = output_dir / "test" + audio_path = output_dir / "combined_audio.wav" + ass_path = output_dir / "subtitles.ass" + bg_final_path = output_dir / "bg_final.mp4" + final_video_path = output_dir / "final_video_full_sequence.mp4" + + for file_path in [logo_video, open_video, end_video, audio_path, ass_path]: + if not file_path or not file_path.exists(): + return False, f"❌ 找不到必要的輸入檔案: {file_path}", None + + base_videos = sorted([p for p in test_dir.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']]) + if not base_videos: + return False, "❌ `test` 資料夾中沒有影片可供合成。", None + + # --- 2. 一步式生成主要內容影片 (bg_final.mp4) --- + if not bg_final_path.exists(): + print(f"⚙️ 準備一步式生成主要內容影片 (循環 {p_loop_count} 次)...") + + looped_video_list = base_videos * p_loop_count + inputs_cmd = [] + for video_path in looped_video_list: + inputs_cmd.extend(["-i", str(video_path)]) + + inputs_cmd.extend(["-i", str(audio_path)]) + ass_path_str = escape_ffmpeg_path_for_filter(ass_path) + + filter_parts = [] + stream_count = len(looped_video_list) + + # 【核心修改】定義色彩校正濾鏡 + color_correction_filter = "eq=brightness=-0.05:contrast=0.95:saturation=0.7" + + # 將標準化 (尺寸、影格率) 與色彩校正合併 + for i in range(stream_count): + filter_parts.append( + f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS},{color_correction_filter}[v_scaled_{i}]" + ) + + last_v_out = "[v_scaled_0]" + for i in range(stream_count - 1): + offset = sum(get_media_duration(v) for v in looped_video_list[:i+1]) - (i + 1) * transition_duration + v_in_next = f"[v_scaled_{i+1}]" + v_out_current = f"[v_out{i+1}]" if i < stream_count - 2 else "[bg_video_stream]" + filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}") + last_v_out = v_out_current + + drawbox_filter = "drawbox=w=iw*0.8:h=ih*0.8:x=(iw-iw*0.8)/2:y=(ih-ih*0.8)/2:color=0x808080@0.7:t=fill" + filter_parts.append(f"[bg_video_stream]{drawbox_filter}[v_with_mask]") + filter_parts.append(f"[v_with_mask]ass=filename='{ass_path_str}'[final_v]") + filter_complex_str = ";".join(filter_parts) + + ffmpeg_main_content_cmd = [ + "ffmpeg", "-y", *inputs_cmd, + "-filter_complex", filter_complex_str, + "-map", "[final_v]", + "-map", f"{stream_count}:a:0", + "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast", + "-c:a", "aac", "-b:a", "192k", + "-shortest", + str(bg_final_path) + ] + + print("🚀 正在執行 FFmpeg 主要內容合成指令 (含色彩校正)...") + result = subprocess.run(ffmpeg_main_content_cmd, check=True, capture_output=True, text=True, encoding='utf-8') + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr) + print(f"✅ 主要內容影片已生成: {bg_final_path}") + + # --- 3. 合成最終的影片序列 (logo -> open -> bg_final -> end) --- + if final_video_path.exists(): + print(f"✅ 最終影片已存在,跳過組裝: {final_video_path}") + return True, f"✅ 影片已存在於 {final_video_path}", final_video_path + + print("🎬 開始動態生成 xfade 合成指令...") + videos_to_concat = [logo_video, open_video, bg_final_path, end_video] + final_inputs_cmd, final_filter_parts, video_infos = [], [], [] + + for video_path in videos_to_concat: + final_inputs_cmd.extend(["-i", str(video_path)]) + info = get_media_info(video_path) + video_infos.append(info) + + for i, info in enumerate(video_infos): + if not info["has_audio"]: + duration = info['duration'] + final_filter_parts.append(f"anullsrc=r=44100:cl=stereo:d={duration:.4f}[silent_a_{i}]") + + for i in range(len(videos_to_concat)): + # 注意:這裡我們不對 logo/open/end 影片進行調色,以保留它們的原始風格 + final_filter_parts.append( + f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS}[v_final_scaled_{i}]" + ) + + last_v_out = "[v_final_scaled_0]" + last_a_out = "[0:a]" if video_infos[0]["has_audio"] else "[silent_a_0]" + + for i in range(len(videos_to_concat) - 1): + offset = sum(info['duration'] for info in video_infos[:i+1]) - (i + 1) * transition_duration + v_in_next = f"[v_final_scaled_{i+1}]" + a_in_next = f"[{i+1}:a]" if video_infos[i+1]["has_audio"] else f"[silent_a_{i+1}]" + + v_out_current = f"[v_out{i+1}]" if i < len(videos_to_concat) - 2 else "[video]" + a_out_current = f"[a_out{i+1}]" if i < len(videos_to_concat) - 2 else "[audio]" + + final_filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}") + final_filter_parts.append(f"{last_a_out}{a_in_next}acrossfade=d={transition_duration}{a_out_current}") + + last_v_out, last_a_out = v_out_current, a_out_current + + final_filter_complex_str = ";".join(final_filter_parts) + + ffmpeg_concat_cmd = [ + "ffmpeg", "-y", *final_inputs_cmd, + "-filter_complex", final_filter_complex_str, + "-map", "[video]", "-map", "[audio]", + "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast", + "-movflags", "+faststart", + "-c:a", "aac", "-b:a", "192k", + str(final_video_path) + ] + + print("🚀 執行 FFmpeg 最終序列合成指令...") + result = subprocess.run(ffmpeg_concat_cmd, check=True, capture_output=True, text=True, encoding='utf-8') + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr) + + print(f"✅ 影片已成功組裝並儲存至 {final_video_path}") + return True, f"✅ 影片已成功組裝並儲存至 {final_video_path}", final_video_path + + except subprocess.CalledProcessError as e: + error_output = e.stderr if e.stderr else str(e) + return False, f"❌ FFmpeg 執行失敗:\n{error_output}", None + except Exception as e: + return False, f"❌ 發生未預期錯誤: {e}", None diff --git a/ui_fragments/audio_manager.py b/ui_fragments/audio_manager.py new file mode 100644 index 0000000..a1fce8f --- /dev/null +++ b/ui_fragments/audio_manager.py @@ -0,0 +1,43 @@ +import streamlit as st +from utils.callbacks import toggle_all_audio_checkboxes, callback_delete_selected_audios + +@st.fragment +def audio_management_fragment(paths: dict): + """A self-contained fragment for managing and deleting project audio files.""" + with st.expander("🎧 管理專案音訊素材"): + audio_dir = paths["audio"] + output_dir = paths["output"] + + audio_files = [] + if audio_dir.exists(): + audio_files.extend(sorted(audio_dir.glob('*.wav'))) + if output_dir.exists(): + audio_files.extend(sorted(output_dir.glob('*.wav'))) + + if not audio_files: + st.info("專案的 `audio` 和 `output` 資料夾中目前沒有音訊檔。") + else: + all_selected = all(st.session_state.get(f"delete_audio_cb_{f.name}", False) for f in audio_files) + if st.session_state.get('select_all_audios', False) != all_selected: + st.session_state.select_all_audios = all_selected + + st.markdown("勾選您想要刪除的音訊檔案,然後點擊下方的按鈕。") + + st.checkbox( + "全選/取消全選", + key="select_all_audios", + on_change=toggle_all_audio_checkboxes, + args=(audio_files,) + ) + + with st.form("delete_audios_form"): + for audio_file in audio_files: + file_size_kb = audio_file.stat().st_size / 1024 + label = f"**{audio_file.parent.name}/{audio_file.name}** ({file_size_kb:.2f} KB)" + st.checkbox(label, key=f"delete_audio_cb_{audio_file.name}") + + submitted = st.form_submit_button("🟥 確認刪除選取的音訊", use_container_width=True, type="primary") + + if submitted: + callback_delete_selected_audios(paths) + st.rerun(scope="fragment") \ No newline at end of file diff --git a/ui_fragments/video_manager.py b/ui_fragments/video_manager.py new file mode 100644 index 0000000..6994908 --- /dev/null +++ b/ui_fragments/video_manager.py @@ -0,0 +1,45 @@ +import streamlit as st +from utils.callbacks import toggle_all_video_checkboxes, callback_delete_selected_videos + +@st.fragment +def video_management_fragment(paths): + """A self-contained fragment for managing and deleting project videos.""" + with st.expander("🎬 管理專案影片素材"): + output_dir = paths["output"] + video_files = [] + if output_dir.exists(): + video_files = sorted( + [f for f in output_dir.iterdir() if f.suffix.lower() in ['.mp4', '.mov']], + key=lambda f: f.stat().st_mtime, + reverse=True + ) + + if not video_files: + st.info("專案輸出資料夾 (`/output`) 中目前沒有影片檔。") + else: + all_selected = all(st.session_state.get(f"delete_cb_{f.name}", False) for f in video_files) + if st.session_state.get('select_all_videos', False) != all_selected: + st.session_state.select_all_videos = all_selected + + st.markdown("勾選您想要刪除的影片,然後點擊下方的按鈕。") + + st.checkbox( + "全選/取消全選", + key="select_all_videos", + on_change=toggle_all_video_checkboxes, + args=(video_files,) + ) + + with st.form("delete_videos_form"): + for video_file in video_files: + file_size_mb = video_file.stat().st_size / (1024 * 1024) + st.checkbox( + f"**{video_file.name}** ({file_size_mb:.2f} MB)", + key=f"delete_cb_{video_file.name}" + ) + + submitted = st.form_submit_button("🟥 確認刪除選取的影片", use_container_width=True, type="primary") + + if submitted: + callback_delete_selected_videos(paths) + st.rerun(scope="fragment") \ No newline at end of file diff --git a/ui_fragments/video_search.py b/ui_fragments/video_search.py new file mode 100644 index 0000000..320f89f --- /dev/null +++ b/ui_fragments/video_search.py @@ -0,0 +1,136 @@ +import streamlit as st +from utils.helpers import analyze_ass_for_keywords, search_pixabay_videos, search_pexels_videos +from utils.callbacks import callback_download_videos, callback_toggle_preview + + +@st.fragment +def video_search_fragment(paths: dict): + """一個用於從多個來源搜尋並下載影片的獨立UI片段。""" + st.header("步驟 5: 從線上圖庫搜尋影片素材") + pixabay_api_key = st.secrets.get("PIXABAY_API_KEY") + pexels_api_key = st.secrets.get("PEXELS_API_KEY") + + if not pixabay_api_key and not pexels_api_key: + st.warning("請至少在 Streamlit secrets 中設定 `PIXABAY_API_KEY` 或 `PEXELS_API_KEY` 以使用此功能。") + return + + st.subheader("5.1 搜尋影片") + + # 【新增】影片來源選擇 + available_sources = [] + if pixabay_api_key: available_sources.append("Pixabay") + if pexels_api_key: available_sources.append("Pexels") + + if not available_sources: + st.error("沒有可用的影片來源 API 金鑰。") + return + + source_choice = st.radio( + "選擇影片來源:", + options=available_sources, + horizontal=True, + key="video_source_choice" + ) + + keywords = analyze_ass_for_keywords(paths) + default_query = keywords[0] if keywords else "" + + with st.form(key="search_form"): + col1, col2 = st.columns([4, 1]) + with col1: + st.text_input( + "Search Videos", + value=default_query, + key="search_query_input", + placeholder="輸入關鍵字後按 Enter 或點擊右側按鈕搜尋", + label_visibility="collapsed" + ) + with col2: + submitted = st.form_submit_button("🔍 搜尋影片", use_container_width=True) + + if submitted: + with st.spinner("正在搜尋並過濾影片..."): + query = st.session_state.get("search_query_input", "") + + # 【修改】根據選擇呼叫對應的 API + if source_choice == "Pixabay": + success, message, results = search_pixabay_videos(pixabay_api_key, query) + elif source_choice == "Pexels": + success, message, results = search_pexels_videos(pexels_api_key, query) + else: + success, message, results = False, "未知的影片來源", [] + + st.session_state.operation_status = {"success": success, "message": message, "source": "search_videos"} + + # --- 【修改】資料標準化 --- + standardized_results = [] + if success and results: + if source_choice == "Pixabay": + for v in results: + try: + standardized_results.append({ + 'id': f"pixabay-{v['id']}", + 'thumbnail_url': v['videos']['tiny']['thumbnail'], + 'video_url': v['videos']['large']['url'], + 'preview_url': v['videos']['tiny']['url'], + 'width': v['videos']['large']['width'], + 'height': v['videos']['large']['height'] + }) + except KeyError: continue + elif source_choice == "Pexels": + for v in results: + try: + # 尋找合適的影片檔案連結 + video_link_hd = next((f['link'] for f in v['video_files'] if f.get('quality') == 'hd'), None) + video_link_sd = next((f['link'] for f in v['video_files'] if f.get('quality') == 'sd'), None) + + # 優先使用 HD 畫質,若無則用 SD,再沒有就用第一個 + final_video_url = video_link_hd or video_link_sd or v['video_files'][0]['link'] + + standardized_results.append({ + 'id': f"pexels-{v['id']}", + 'thumbnail_url': v['image'], + 'video_url': final_video_url, + 'preview_url': video_link_sd or final_video_url, # 預覽用 SD 或更高畫質 + 'width': v['width'], + 'height': v['height'] + }) + except (KeyError, IndexError): continue + + st.session_state.search_results = standardized_results + st.session_state.selected_videos = {str(v['id']): {"url": v['video_url'], "selected": False} for v in standardized_results} + st.session_state.active_preview_id = None + + if keywords and (keywords[1] or keywords[2]): + st.caption(f"建議關鍵字: `{keywords[1]}` `{keywords[2]}`") + + st.divider() + + if st.session_state.search_results: + st.subheader("5.2 選擇影片並下載") + st.button("📥 下載選取的影片到專案素材庫", on_click=callback_download_videos, args=(paths,), help="將下方勾選的影片下載至目前專案的 `output/test` 資料夾,並自動循序命名。") + + num_cols = 5 + cols = st.columns(num_cols) + # 【修改】使用標準化後的結果進行渲染 + for i, video in enumerate(st.session_state.search_results): + with cols[i % num_cols]: + with st.container(border=True): + media_placeholder = st.empty() + video_id_str = str(video['id']) + + if st.session_state.active_preview_id == video_id_str: + media_placeholder.video(video['preview_url']) + preview_button_text, preview_button_type = "⏹️ 停止預覽", "secondary" + else: + media_placeholder.image(video['thumbnail_url'], use_container_width=True) + preview_button_text, preview_button_type = "▶️ 預覽", "primary" + + st.caption(f"ID: {video_id_str}") + st.caption(f"尺寸: {video['width']}x{video['height']}") + + is_selected = st.checkbox("選取", key=f"select_{video_id_str}", value=st.session_state.selected_videos.get(video_id_str, {}).get('selected', False)) + if video_id_str in st.session_state.selected_videos: + st.session_state.selected_videos[video_id_str]['selected'] = is_selected + + st.button(preview_button_text, key=f"preview_{video_id_str}", on_click=callback_toggle_preview, args=(video_id_str,), use_container_width=True, type=preview_button_type) \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/callbacks.py b/utils/callbacks.py new file mode 100644 index 0000000..73875b4 --- /dev/null +++ b/utils/callbacks.py @@ -0,0 +1,251 @@ +import streamlit as st +from pathlib import Path +from scripts.step1_notion_sync import create_project_from_page +from utils.helpers import calculate_loop_count +from scripts.step6_assemble_video import run_step6_assemble_video +from utils.paths import PROJECTS_DIR +import requests + +# --- 專案與流程控制 Callbacks --- +def callback_set_project(): + st.session_state.current_project = st.session_state.project_selector + st.session_state.final_video_path = None + st.session_state.show_video = False + st.session_state.operation_status = {} + st.session_state.active_preview_id = None + +def callback_create_project(): + page_id = st.session_state.page_titles_map[st.session_state.selected_title] + with st.spinner("正在建立專案..."): + success, msg, new_project_name = create_project_from_page(api_key=st.secrets.get("NOTION_API_KEY"), page_id=page_id, project_name=st.session_state.selected_title, projects_dir=PROJECTS_DIR) + if success: + st.session_state.current_project = new_project_name + st.session_state.selected_title = "" + st.session_state.operation_status = {"success": success, "message": msg, "source": "create_project"} + +def callback_assemble_final_video(paths, **kwargs): + """ + 影片合成的專用「準備」回呼。 + 它會先計算循環次數 p,然後將任務轉交給通用的 callback_run_step。 + """ + # 步驟 1: 執行前置計算 + st.write("⏳ 正在計算影片循環次數...") # 提供即時回饋 + audio_path = paths["combined_audio"] + video_folder = paths["output"] / "test" + + p, error_msg = calculate_loop_count(audio_path, video_folder, 1.0) + + # 步驟 2: 檢查前置計算的結果 + if error_msg: + st.session_state.operation_status = {"success": False, "message": f"❌ 無法合成影片: {error_msg}", "source": "assemble_video"} + return # 如果計算失敗,直接終止流程 + + # 步驟 3: 將計算結果加入到參數中,並轉交給通用執行器 + kwargs["p_loop_count"] = p + + # 現在,我們呼叫標準的執行回呼,讓它來處理 spinner 和後續流程 + callback_run_step(run_step6_assemble_video, **kwargs) + +def callback_run_step(step_function, source="unknown", **kwargs): + spinner_text = kwargs.pop("spinner_text", "正在處理中,請稍候...") + with st.spinner(spinner_text): + result = step_function(**kwargs) + if len(result) == 2: success, msg = result + elif len(result) == 3: success, msg, _ = result + else: success, msg = False, "步驟函式回傳格式不符!" + st.session_state.operation_status = {"success": success, "message": msg, "source": source} + if step_function == run_step6_assemble_video and success: + st.session_state.final_video_path = str(result[2]) + st.session_state.show_video = True + +def callback_delete_selected_videos(paths: dict): + output_dir = paths["output"] + deleted_files_count = 0 + errors = [] + + files_to_delete = [] + for key, value in st.session_state.items(): + if key.startswith("delete_cb_") and value: + filename = key.replace("delete_cb_", "", 1) + files_to_delete.append(filename) + + if not files_to_delete: + st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的影片。", "source": "delete_videos"} + return + + for filename in files_to_delete: + try: + file_path = output_dir / filename + if file_path.exists(): + if st.session_state.get("final_video_path") and Path(st.session_state.final_video_path) == file_path: + st.session_state.final_video_path = None + st.session_state.show_video = False + + file_path.unlink() + deleted_files_count += 1 + del st.session_state[f"delete_cb_{filename}"] + except Exception as e: + errors.append(f"刪除 {filename} 時出錯: {e}") + + if errors: + message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) + st.session_state.operation_status = {"success": False, "message": message, "source": "delete_videos"} + else: + message = f"✅ 成功刪除 {deleted_files_count} 個影片。" + st.session_state.operation_status = {"success": True, "message": message, "source": "delete_videos"} + +def toggle_all_video_checkboxes(video_files): + select_all_state = st.session_state.get('select_all_videos', False) + for video_file in video_files: + st.session_state[f"delete_cb_{video_file.name}"] = select_all_state + +def callback_delete_selected_audios(paths: dict): + deleted_files_count = 0 + errors = [] + + files_to_delete_names = [] + for key, value in st.session_state.items(): + if key.startswith("delete_audio_cb_") and value: + filename = key.replace("delete_audio_cb_", "", 1) + files_to_delete_names.append(filename) + + if not files_to_delete_names: + st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的音訊。", "source": "delete_audios"} + return + + audio_dir = paths["audio"] + output_dir = paths["output"] + + for filename in files_to_delete_names: + try: + file_path_audio = audio_dir / filename + file_path_output = output_dir / filename + + file_path_to_delete = None + if file_path_audio.exists(): + file_path_to_delete = file_path_audio + elif file_path_output.exists(): + file_path_to_delete = file_path_output + + if file_path_to_delete: + file_path_to_delete.unlink() + deleted_files_count += 1 + del st.session_state[f"delete_audio_cb_{filename}"] + else: + errors.append(f"找不到檔案 {filename}。") + + except Exception as e: + errors.append(f"刪除 {filename} 時出錯: {e}") + + if errors: + message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) + st.session_state.operation_status = {"success": False, "message": message, "source": "delete_audios"} + else: + message = f"✅ 成功刪除 {deleted_files_count} 個音訊。" + st.session_state.operation_status = {"success": True, "message": message, "source": "delete_audios"} + +def toggle_all_audio_checkboxes(audio_files): + select_all_state = st.session_state.get('select_all_audios', False) + for audio_file in audio_files: + st.session_state[f"delete_audio_cb_{audio_file.name}"] = select_all_state + +def callback_toggle_preview(video_id): + if st.session_state.active_preview_id == video_id: st.session_state.active_preview_id = None + else: st.session_state.active_preview_id = video_id + +def callback_download_videos(paths: dict): + with st.spinner("正在下載影片,請稍候..."): + videos_to_download = {vid: info for vid, info in st.session_state.selected_videos.items() if info['selected']} + if not videos_to_download: + st.session_state.operation_status = {"success": False, "message": "尚未選擇任何影片。", "source": "download_videos"} + return + + download_dir = paths["output"] / "test" + download_dir.mkdir(parents=True, exist_ok=True) + + # --- 循序命名邏輯 START --- + existing_files = list(download_dir.glob('*.mp4')) + list(download_dir.glob('*.mov')) + existing_numbers = [] + for f in existing_files: + try: + # 從檔名 (不含副檔名) 中提取數字 + existing_numbers.append(int(f.stem)) + except ValueError: + # 忽略那些不是純數字的檔名 + continue + + # 決定起始編號 + start_counter = max(existing_numbers) + 1 if existing_numbers else 1 + counter = start_counter + # --- 循序命名邏輯 END --- + + total, download_count, errors = len(videos_to_download), 0, [] + + for i, (video_id, info) in enumerate(videos_to_download.items()): + url = info['url'] + original_path = Path(url) + + # 使用循序命名 + new_video_name = f"{counter}{original_path.suffix}" + save_path = download_dir / new_video_name + + try: + if not save_path.exists(): + response = requests.get(url, stream=True) + response.raise_for_status() + with open(save_path, 'wb') as f: + f.write(response.content) + download_count += 1 + counter += 1 # 只有在成功下載後才增加計數器 + except Exception as e: + errors.append(f"下載影片 {original_path.name} 失敗: {e}") + + if errors: + final_message = f"任務完成,但有 {len(errors)} 個錯誤。成功處理 {download_count}/{total} 個影片。\n" + "\n".join(errors) + st.session_state.operation_status = {"success": False, "message": final_message, "source": "download_videos"} + else: + final_message = f"任務完成!成功下載 {download_count}/{total} 個影片到專案的 `output/test` 資料夾,並已自動循序命名。" + st.session_state.operation_status = {"success": True, "message": final_message, "source": "download_videos"} + +@st.fragment +def video_management_fragment(paths: dict): + """A self-contained fragment for managing and deleting project videos.""" + with st.expander("🎬 管理專案影片素材"): + output_dir = paths["output"] + video_files = [] + if output_dir.exists(): + video_files = sorted( + [f for f in output_dir.iterdir() if f.suffix.lower() in ['.mp4', '.mov']], + key=lambda f: f.stat().st_mtime, + reverse=True + ) + + if not video_files: + st.info("專案輸出資料夾 (`/output`) 中目前沒有影片檔。") + else: + all_selected = all(st.session_state.get(f"delete_cb_{f.name}", False) for f in video_files) + if st.session_state.get('select_all_videos', False) != all_selected: + st.session_state.select_all_videos = all_selected + + st.markdown("勾選您想要刪除的影片,然後點擊下方的按鈕。") + + st.checkbox( + "全選/取消全選", + key="select_all_videos", + on_change=toggle_all_video_checkboxes, + args=(video_files,) + ) + + with st.form("delete_videos_form"): + for video_file in video_files: + file_size_mb = video_file.stat().st_size / (1024 * 1024) + st.checkbox( + f"**{video_file.name}** ({file_size_mb:.2f} MB)", + key=f"delete_cb_{video_file.name}" + ) + + submitted = st.form_submit_button("🟥 確認刪除選取的影片", use_container_width=True, type="primary") + + if submitted: + callback_delete_selected_videos(paths) + st.rerun(scope="fragment") diff --git a/utils/helpers.py b/utils/helpers.py new file mode 100644 index 0000000..a6243d8 --- /dev/null +++ b/utils/helpers.py @@ -0,0 +1,189 @@ +import streamlit as st +import json +import requests +import time +from pathlib import Path +from notion_client import Client +from notion_client.helpers import iterate_paginated_api +import subprocess +import math +import librosa + +def get_media_info(media_path: Path) -> dict: + """使用 ffprobe 獲取媒體檔案的詳細資訊 (時長、是否有音訊)。""" + cmd = [ + "ffprobe", "-v", "error", "-show_entries", "format=duration:stream=codec_type", + "-of", "json", str(media_path) + ] + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + info = json.loads(result.stdout) + duration = float(info.get("format", {}).get("duration", 0)) + has_audio = any(s.get("codec_type") == "audio" for s in info.get("streams", [])) + if duration == 0: raise ValueError("無法獲取或時長為 0") + return {"duration": duration, "has_audio": has_audio} + except (subprocess.CalledProcessError, json.JSONDecodeError, ValueError) as e: + print(f"❌ 無法獲取媒體資訊 {media_path}: {e}") + raise + +def get_media_duration(file_path: Path) -> float | None: + """使用 ffprobe 或 librosa 獲取媒體檔案的時長(秒)。""" + try: + if file_path.suffix.lower() in ['.wav', '.mp3', '.aac']: + return librosa.get_duration(path=file_path) + else: + cmd = [ + "ffprobe", "-v", "error", "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", str(file_path) + ] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + return float(result.stdout.strip()) + except Exception as e: + print(f"無法讀取檔案 {file_path} 的時長: {e}") + return None + +# 【新增】計算循環次數 p 的函式 +def calculate_loop_count(audio_path: Path, video_folder: Path, transition_duration: float) -> tuple[int | None, str | None]: + """ + 根據音訊長度和一系列影片,計算所需的最小循環次數 p。 + 成功時回傳 (p, None),失敗時回傳 (None, error_message)。 + """ + m_prime = get_media_duration(audio_path) + if m_prime is None: + return None, "無法讀取音訊檔案長度。" + + video_paths = [p for p in video_folder.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']] + video_lengths = [get_media_duration(p) for p in video_paths] + video_lengths = [l for l in video_lengths if l is not None and l > 0] + + if not video_lengths: + return None, "在 `test` 資料夾中找不到有效的影片檔案。" + + n = len(video_lengths) + m = sum(video_lengths) + tr = transition_duration + + denominator = m - (n - 1) * tr + + if denominator <= 0: + return None, f"影片的有效長度 ({denominator:.2f}s) 小於或等於零,無法進行循環。請增加影片時長或減少轉場時間。" + + p = math.ceil(m_prime / denominator) + return p, None +def display_global_status_message(): + """ + 檢查會話狀態中是否存在任何操作狀態,如果存在則顯示它。 + 顯示後會立即清除狀態,以確保訊息只顯示一次。 + """ + if 'operation_status' in st.session_state and st.session_state.operation_status: + status = st.session_state.operation_status + # 根據成功與否,選擇對應的訊息元件 + if status.get("success"): + st.success(status["message"], icon="✅") + else: + st.error(status["message"], icon="❌") + # 清除狀態,避免在下一次刷新時重複顯示 + st.session_state.operation_status = {} + + +def analyze_ass_for_keywords(paths: dict) -> list: + data_file = paths["data"] + if not data_file.exists(): return ["", "", ""] + try: + data = json.loads(data_file.read_text(encoding="utf-8")) + full_text = " ".join([item.get('english', '') for item in data.get('script', [])]) + words = [word.strip(".,!?") for word in full_text.lower().split() if len(word) > 4] + if not words: return ["nature", "technology", "business"] + unique_words = list(dict.fromkeys(words)) + suggestions = unique_words[:3] + while len(suggestions) < 3: suggestions.append("") + return suggestions + except Exception: return ["nature", "technology", "business"] + +def search_pixabay_videos(api_key, query, target_count=20, buffer=2): + if not api_key: return False, "請在 Streamlit secrets 中設定 PIXABAY_API_KEY。", [] + if not query.strip(): return False, "請輸入搜尋關鍵字。", [] + url = "https://pixabay.com/api/videos/" + valid_hits, page, per_page_request, max_pages = [], 1, 50, 5 + while len(valid_hits) < (target_count + buffer) and page <= max_pages: + params = {"key": api_key, "q": query, "per_page": per_page_request, "safesearch": "true", "page": page} + try: + response = requests.get(url, params=params) + response.raise_for_status() + data = response.json() + if not data.get("hits"): break + for video in data["hits"]: + try: + video_details = video.get('videos', {}).get('large', {}) + width, height = video_details.get('width', 0), video_details.get('height', 0) + if width > 0 and height > 0 and width >= height: + valid_hits.append(video) + if len(valid_hits) >= (target_count + buffer): break + except (KeyError, TypeError): continue + if len(valid_hits) >= (target_count + buffer): break + page += 1 + time.sleep(1) + except requests.RequestException as e: return False, f"API 請求失敗: {e}", [] + + final_results = valid_hits[:target_count] + if len(final_results) > 0: return True, f"成功找到並過濾出 {len(final_results)} 個橫式影片。", final_results + else: return True, "找不到符合條件的橫式影片,請嘗試其他關鍵字。", [] +def search_pexels_videos(api_key: str, query: str, target_count: int = 20) -> tuple[bool, str, list]: + """ + 從 Pexels API 搜尋橫向影片。 + + Args: + api_key (str): Pexels API 金鑰。 + query (str): 搜尋關鍵字。 + target_count (int): 目標搜尋結果數量。 + + Returns: + tuple[bool, str, list]: (成功狀態, 訊息, 影片結果列表) + """ + if not api_key: + return False, "請在 Streamlit secrets 中設定 PEXELS_API_KEY。", [] + if not query.strip(): + return False, "請輸入搜尋關鍵字。", [] + + url = "https://api.pexels.com/v1/videos/search" + headers = {"Authorization": api_key} + params = { + "query": query, + "per_page": target_count + 5, # 多取一些以過濾 + "orientation": 'landscape' + } + + try: + response = requests.get(url, headers=headers, params=params) + response.raise_for_status() + data = response.json() + + videos = data.get("videos", []) + if not videos: + return True, "在 Pexels 找不到符合條件的影片,請嘗試其他關鍵字。", [] + + final_results = videos[:target_count] + return True, f"成功從 Pexels 找到 {len(final_results)} 個橫式影片。", final_results + + except requests.RequestException as e: + # Pexels 的錯誤訊息通常在 response body 中 + error_info = "" + if e.response is not None: + try: + error_info = e.response.json().get('error', str(e)) + except json.JSONDecodeError: + error_info = e.response.text + return False, f"Pexels API 請求失敗: {error_info}", [] + +def get_notion_page_titles(api_key: str, database_id: str) -> dict: + """獲取 Notion 資料庫中所有頁面的標題和對應的 page_id。""" + client = Client(auth=api_key) + pages = list(iterate_paginated_api(client.databases.query, database_id=database_id)) + + title_map = {} + for page in pages: + title_property = page['properties'].get('Name', {}) + if title_property.get('title'): + title = title_property['title'][0]['plain_text'] + title_map[title] = page['id'] + return title_map \ No newline at end of file diff --git a/utils/paths.py b/utils/paths.py new file mode 100644 index 0000000..4fed02e --- /dev/null +++ b/utils/paths.py @@ -0,0 +1,21 @@ +from pathlib import Path + +PROJECTS_DIR = Path("projects") +SHARED_ASSETS_DIR = Path("shared_assets") + +PROJECTS_DIR.mkdir(exist_ok=True) +SHARED_ASSETS_DIR.mkdir(exist_ok=True) + +def get_project_paths(project_name: str) -> dict: + """為給定專案回傳一個包含所有重要路徑的字典。""" + if not project_name: + return {} + project_root = PROJECTS_DIR / project_name + return { + "root": project_root, + "data": project_root / "data.json", + "audio": project_root / "audio", + "output": project_root / "output", + "combined_audio": project_root / "output" / "combined_audio.wav", + "ass_file": project_root / "output" / f"{project_name}.ass" + } \ No newline at end of file