diff --git a/.gitignore b/.gitignore index 16319d5..c4b1db0 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ projects/ shared_assets/ black-abode-267208-b3fc82ecef45.json black-abode-267208-edc57f25cd47.json - +.streamlit/ +.venv/ +.vscode/ \ No newline at end of file diff --git a/app.py b/app.py index f40251c..24c92c2 100644 --- a/app.py +++ b/app.py @@ -3,31 +3,29 @@ import time from pathlib import Path import json +from ui_fragments import tab0_data_processing,tab1_asset_generation,tab2_online_video_search,tab3_video_composition + # --- 本地模組匯入 --- -from utils.paths import get_project_paths, SHARED_ASSETS_DIR, PROJECTS_DIR -from utils.callbacks import callback_set_project, callback_create_project, callback_run_step,callback_assemble_final_video -from utils.helpers import get_notion_page_titles, display_global_status_message -from ui_fragments.audio_manager import audio_management_fragment -from ui_fragments.video_manager import video_management_fragment -from ui_fragments.video_search import video_search_fragment +from utils.paths import get_project_paths, SHARED_ASSETS_DIR, PROJECTS_DIR,get_project_list +import callbacks +from utils.helpers import get_notion_page_titles, display_operation_status +# from ui_fragments.audio_manager import audio_management_fragment +# from ui_fragments.video_manager import video_management_fragment +# from ui_fragments.video_search import video_search_fragment # --- 匯入外部處理腳本 --- # 雖然這些腳本主要在 callbacks 中被呼叫,但在此處匯入有助於理解全貌 -from scripts.step1_notion_sync import update_project_from_notion -from scripts.step2_translate_ipa import run_step2_translate_ipa -from scripts.step3_generate_audio import run_step3_generate_audio -from scripts.step4_concatenate_audio import run_step4_concatenate_audio -from scripts.step5_generate_ass import run_step5_generate_ass -from scripts.step6_assemble_video import run_step6_assemble_video +# from scripts.step1_notion_sync import update_project_from_notion +# from scripts.step2_translate_ipa import run_step2_translate_ipa +# from scripts.step3_generate_audio import run_step3_generate_audio +# from scripts.step4_concatenate_audio import run_step4_concatenate_audio +# from scripts.step5_generate_ass import run_step5_generate_ass +# from scripts.step6_assemble_video import run_step6_assemble_video # --- Streamlit UI 設定 --- st.set_page_config(layout="wide", page_title="英語影片自動化工作流程") # --- 狀態初始化 --- -if 'current_project' not in st.session_state: - st.session_state.current_project = None -if 'final_video_path' not in st.session_state: - st.session_state.final_video_path = None if 'show_video' not in st.session_state: st.session_state.show_video = False if 'operation_status' not in st.session_state: @@ -41,9 +39,14 @@ if 'selected_videos' not in st.session_state: if 'active_preview_id' not in st.session_state: st.session_state.active_preview_id = None +if 'project' not in st.session_state: + st.session_state.project = None # 'project' 現在儲存的是 Project 物件或 None +if 'project_selector' not in st.session_state: + st.session_state.project_selector = None # 'project_selector' 用來追蹤 selectbox 的狀態 + # --- UI 介面 --- st.title("🎬 英語影片自動化工作流程") -display_global_status_message() +display_operation_status() # --- 側邊欄 --- with st.sidebar: st.header("API & Project Control") @@ -56,161 +59,126 @@ with st.sidebar: st.header("1. 建立新專案") try: if notion_api_key and notion_database_id: - with st.spinner("載入 Notion 頁面..."): - page_titles_map = get_notion_page_titles(notion_api_key, notion_database_id) - st.session_state.page_titles_map = page_titles_map - st.selectbox("選擇一個 Notion 頁面來建立專案:", options=[""] + list(page_titles_map.keys()), index=0, key="selected_title", placeholder="選擇一個頁面...") + page_titles_map = get_notion_page_titles(notion_api_key, notion_database_id) + st.session_state.page_titles_map = page_titles_map + st.selectbox("選擇 Notion 頁面以建立新專案:", + options=[""] + list(page_titles_map.keys()), + key="selected_title") + if st.session_state.selected_title: - st.button(f"從 '{st.session_state.selected_title}' 建立專案", on_click=callback_create_project) + st.button(f"從 '{st.session_state.selected_title}' 建立專案", + on_click=callbacks.callback_create_project) # 綁定到標準回呼 else: st.warning("請在 Streamlit secrets 中設定 Notion API 金鑰和資料庫 ID。") except Exception as e: st.error(f"無法載入 Notion 頁面: {e}") - + st.divider() st.header("2. 選擇現有專案") - existing_projects = [p.name for p in PROJECTS_DIR.iterdir() if p.is_dir()] - selected_project_idx = existing_projects.index(st.session_state.current_project) + 1 if st.session_state.current_project in existing_projects else 0 + + # 1. 介面邏輯變得極簡:直接呼叫工具函式 + existing_projects = get_project_list() + + # 2. UI 只負責綁定 key 和 on_change 事件 st.selectbox( "或選擇一個現有專案:", options=[""] + existing_projects, - index=selected_project_idx, - key="project_selector", - on_change=callback_set_project, + key="project_selector", # selectbox 的狀態由 st.session_state.project_selector 控制 + on_change=callbacks.callback_set_project, # 綁定到標準回呼 help="選擇您已經建立的專案。" ) +# --- 主畫面 ---# 1. 從 session_state 中獲取 Project 物件。這是與專案互動的唯一入口。 +project = st.session_state.get('project') -# --- 主畫面 --- -if not st.session_state.current_project: +# 2. 判斷的依據變成了 project 物件是否存在,而不是一個字串。 +if not project: + # 如果沒有專案,提示使用者。這部分的邏輯是完美的。 st.info("👈 請在側邊欄建立新專案或選擇一個現有專案以開始。") else: - paths = get_project_paths(st.session_state.current_project) - project_path = paths["root"] + st.header(f"目前專案:`{project.name}`") - st.header(f"目前專案:`{st.session_state.current_project}`") + tab_names = ["1. 資料處理 & AI 加註", "2. 素材生成", " 線上素材搜尋", "3. 影片合成"] + active_tab = st.radio("選擇工作流程步驟:", + options=tab_names, + key="main_tabs_radio", + horizontal=True, + label_visibility="collapsed") + st.markdown("---") - tab_names = ["1. 資料處理 & AI 加註", "2. 素材生成", "2.5. 線上素材搜尋", "3. 影片合成"] - active_tab = st.radio("選擇工作流程步驟:", options=tab_names, key="main_tabs_radio", horizontal=True, label_visibility="collapsed") - st.markdown("---") - - # --- 分頁內容 --- +# # --- 分頁內容 --- if active_tab == tab_names[0]: - data_file = paths["data"] - if data_file.exists(): - with st.container(): - col1, col2 = st.columns([3, 1]) - with col1: - st.subheader("專案資料") - with col2: - st.button("🔄 從 Notion 同步更新", on_click=callback_run_step, args=(update_project_from_notion,), kwargs={"source": "sync_notion", "spinner_text": "正在同步...", "api_key": notion_api_key, "project_path": project_path}, help="從 Notion 抓取此專案的最新資料並覆寫本地檔案。") - with st.expander("預覽專案資料 (data.json)", expanded=False): - st.json(json.loads(data_file.read_text(encoding="utf-8"))) - st.divider() - st.subheader("AI 自動加註") - st.button("2. 添加翻譯與 IPA 音標", on_click=callback_run_step, args=(run_step2_translate_ipa,), kwargs={"source": "run_translate", "spinner_text": "正在調用 AI...", "project_path": project_path, "google_creds_path": google_creds_for_translate_path, "notion_api_key": notion_api_key, "notion_database_id": notion_database_id}, disabled=not google_creds_for_translate_path) - if not google_creds_for_translate_path: - st.warning("請在 secrets.toml 中提供 Google Cloud 翻譯認證檔案的路徑。") - else: - st.warning("找不到 data.json 檔案。請先從 Notion 同步。") - st.divider() - st.info("✍️ **人工檢查點**:\n\n1. 請檢查 `Notion` 中的內容是否準確。\n2. 修改完成後,**請務必點擊上方的「從 Notion 同步更新」按鈕**。") - - elif active_tab == tab_names[1]: with st.container(border=True): - st.subheader("步驟 3.1: 生成單句音訊") - st.button("執行生成", on_click=callback_run_step, args=(run_step3_generate_audio,), kwargs={"source": "gen_audio", "spinner_text": "正在生成音訊...", "project_path": project_path, "google_creds_path": google_creds_for_TTS_path}, help="根據 data.json 中的每一句英文,生成對應的 .wav 音訊檔並存放在 audio 資料夾。") - - st.divider() + tab0_data_processing.render_tab(project) - st.markdown("##### 步驟 3.2: 組合完整音訊") - audio_folder, single_audio_exists = paths["audio"], False - if audio_folder.exists() and any(audio_folder.iterdir()): - single_audio_exists = True - - if not single_audio_exists: - st.info("請先執行步驟 3.1 以生成單句音訊。") - st.button("執行組合", on_click=callback_run_step, args=(run_step4_concatenate_audio,), kwargs={"source": "concat_audio", "spinner_text": "正在組合音訊...", "project_path": project_path}, help="將 audio 資料夾中所有的 .wav 檔,按順序組合成一個名為 combined_audio.wav 的檔案。", disabled=not single_audio_exists) - - combined_audio_path = paths["combined_audio"] - if combined_audio_path.exists(): - st.success("🎉 完整音訊已組合成功!") - st.audio(str(combined_audio_path)) - - st.divider() - - with st.container(border=True): - st.subheader("步驟 4: 生成 ASS 字幕檔") - if not single_audio_exists: - st.info("請先執行步驟 3.1 以生成字幕所需的時間戳。") - st.button("📝 生成 .ass 字幕檔", on_click=callback_run_step, args=(run_step5_generate_ass,), kwargs={"source": "gen_ass", "spinner_text": "正在生成字幕檔...", "project_path": project_path}, disabled=not single_audio_exists) - - st.divider() - audio_management_fragment(paths) - - elif active_tab == tab_names[2]: - video_search_fragment(paths) + if active_tab == tab_names[1]: + tab1_asset_generation.render_tab(project, callbacks) - elif active_tab == tab_names[3]: - video_management_fragment(paths) + + if active_tab == tab_names[2]: + tab2_online_video_search.render_tab(project,callbacks) + + if active_tab == tab_names[3]: + tab3_video_composition.render_tab(project, callbacks) + st.divider() - st.subheader("步驟 6.1: 管理與選擇共享影片素材") - shared_videos = [""] + [f.name for f in sorted(SHARED_ASSETS_DIR.glob('*.mp4'))] + [f.name for f in sorted(SHARED_ASSETS_DIR.glob('*.mov'))] - with st.container(border=True): - st.markdown("##### 從共享素材庫中選擇影片") - c1, c2 = st.columns(2) - with c1: - logo_selection = st.selectbox("選擇 Logo 影片:", shared_videos, key="logo_select") - open_selection = st.selectbox("選擇開場影片:", shared_videos, key="open_select") - with c2: - end_selection = st.selectbox("選擇結尾影片:", shared_videos, key="end_select") +# st.subheader("步驟 6.1: 管理與選擇共享影片素材") +# shared_videos = [""] + [f.name for f in sorted(SHARED_ASSETS_DIR.glob('*.mp4'))] + [f.name for f in sorted(SHARED_ASSETS_DIR.glob('*.mov'))] +# with st.container(border=True): +# st.markdown("##### 從共享素材庫中選擇影片") +# c1, c2 = st.columns(2) +# with c1: +# logo_selection = st.selectbox("選擇 Logo 影片:", shared_videos, key="logo_select") +# open_selection = st.selectbox("選擇開場影片:", shared_videos, key="open_select") +# with c2: +# end_selection = st.selectbox("選擇結尾影片:", shared_videos, key="end_select") - with st.expander("**上傳新的共享素材**"): - st.markdown("上傳的影片將存入 `shared_assets` 公共資料夾,可供所有專案重複使用。") - uploaded_files = st.file_uploader("上傳新的影片到共享素材庫", type=["mp4", "mov"], accept_multiple_files=True, label_visibility="collapsed") - if uploaded_files: - for uploaded_file in uploaded_files: - with open(SHARED_ASSETS_DIR / uploaded_file.name, "wb") as f: - f.write(uploaded_file.getbuffer()) - st.success(f"成功上傳 {len(uploaded_files)} 個檔案到共享素材庫!") - time.sleep(1) - st.rerun() +# with st.expander("**上傳新的共享素材**"): +# st.markdown("上傳的影片將存入 `shared_assets` 公共資料夾,可供所有專案重複使用。") +# uploaded_files = st.file_uploader("上傳新的影片到共享素材庫", type=["mp4", "mov"], accept_multiple_files=True, label_visibility="collapsed") +# if uploaded_files: +# for uploaded_file in uploaded_files: +# with open(SHARED_ASSETS_DIR / uploaded_file.name, "wb") as f: +# f.write(uploaded_file.getbuffer()) +# st.success(f"成功上傳 {len(uploaded_files)} 個檔案到共享素材庫!") +# time.sleep(1) +# st.rerun() - st.divider() - st.subheader("步驟 6.2: 執行最終影片合成") - all_videos_selected = all([logo_selection, open_selection, end_selection]) - if not all_videos_selected: - st.info("請從上方的下拉選單中選擇所有四個影片以啟用合成按鈕。") +# st.divider() +# st.subheader("步驟 6.2: 執行最終影片合成") +# all_videos_selected = all([logo_selection, open_selection, end_selection]) +# if not all_videos_selected: +# st.info("請從上方的下拉選單中選擇所有四個影片以啟用合成按鈕。") - st.button( - "🎬 合成最終影片", - on_click=callback_assemble_final_video, - kwargs={ - "paths": paths, - "source": "assemble_video", - "spinner_text": "影片合成中,請稍候...", - "project_path": paths['root'], # 傳遞給 run_step6_assemble_video - "logo_video": SHARED_ASSETS_DIR / logo_selection if logo_selection else None, - "open_video": SHARED_ASSETS_DIR / open_selection if open_selection else None, - "end_video": SHARED_ASSETS_DIR / end_selection if end_selection else None - }, - disabled=not all_videos_selected - ) +# st.button( +# "🎬 合成最終影片", +# on_click=callback_assemble_final_video, +# kwargs={ +# "paths": paths, +# "source": "assemble_video", +# "spinner_text": "影片合成中,請稍候...", +# "project_path": paths['root'], # 傳遞給 run_step6_assemble_video +# "logo_video": SHARED_ASSETS_DIR / logo_selection if logo_selection else None, +# "open_video": SHARED_ASSETS_DIR / open_selection if open_selection else None, +# "end_video": SHARED_ASSETS_DIR / end_selection if end_selection else None +# }, +# disabled=not all_videos_selected +# ) - if st.session_state.final_video_path and Path(st.session_state.final_video_path).exists(): - st.divider() - st.header("🎉 影片製作完成!") - st.checkbox("顯示/隱藏影片", key="show_video", value=True) - if st.session_state.show_video: - st.video(st.session_state.final_video_path) +# if st.session_state.final_video_path and Path(st.session_state.final_video_path).exists(): +# st.divider() +# st.header("🎉 影片製作完成!") +# st.checkbox("顯示/隱藏影片", key="show_video", value=True) +# if st.session_state.show_video: +# st.video(st.session_state.final_video_path) - with open(st.session_state.final_video_path, "rb") as file: - st.download_button( - "📥 下載專案影片", - data=file, - file_name=Path(st.session_state.final_video_path).name, - mime="video/mp4", - use_container_width=True - ) +# with open(st.session_state.final_video_path, "rb") as file: +# st.download_button( +# "📥 下載專案影片", +# data=file, +# file_name=Path(st.session_state.final_video_path).name, +# mime="video/mp4", +# use_container_width=True +# ) diff --git a/callbacks.py b/callbacks.py new file mode 100644 index 0000000..9c93617 --- /dev/null +++ b/callbacks.py @@ -0,0 +1,329 @@ +import streamlit as st +from pathlib import Path +from scripts.step1_notion_sync import create_project_from_page +from scripts.step6_assemble_video import run_step6_assemble_video +from utils.paths import PROJECTS_DIR ,get_project_list +from utils import asset_manager +import requests +from project_model import Project +from config import SHARED_ASSETS_DIR + +# --- 專案與流程控制 Callbacks --- +def callback_set_project(): + """當使用者從下拉選單選擇一個現有專案時,設定當前的 Project 物件。""" + selected_name = st.session_state.get("project_selector") + if selected_name: + with st.spinner(f"正在載入專案 '{selected_name}'..."): + st.session_state.project = Project(project_name=selected_name) + # 【新】回報操作狀態 + st.session_state.operation_status = { + "type": "success", + "message": f"專案 '{selected_name}' 已成功載入。" + } + else: + st.session_state.project = None + +def callback_create_project(): + """從 Notion 頁面建立一個新專案的回呼函式。""" + project_name = st.session_state.get("selected_title") + if not project_name: + # 【改動】不再呼叫 st.warning,而是設定狀態 + st.session_state.operation_status = {"type": "warning", "message": "請先選擇一個 Notion 頁面。"} + return + + if project_name in get_project_list(): + # 【改動】不再呼叫 st.error,而是設定狀態 + st.session_state.operation_status = { + "type": "error", + "message": f"專案 '{project_name}' 已存在。請從下方的選單選擇它。" + } + st.session_state.project_selector = project_name + callback_set_project() + st.rerun() # 此處的 rerun 用於強制更新 selectbox,是可接受的進階用法 + return + + page_titles_map = st.session_state.get("page_titles_map", {}) + page_id_to_create = page_titles_map.get(project_name) + + if not page_id_to_create: + st.session_state.operation_status = { + "type": "error", + "message": f"內部錯誤:找不到 '{project_name}' 的 Notion 頁面 ID。" + } + return + + try: + new_project = Project.create_new(project_name=project_name, notion_page_id=page_id_to_create) + st.session_state.project = new_project + st.session_state.project_selector = new_project.name + # 【改動】不再呼叫 st.success,而是設定狀態 + st.session_state.operation_status = { + "type": "success", + "message": f"專案 '{project_name}' 的框架已成功建立!" + } + st.session_state.selected_title = "" + + except Exception as e: + st.session_state.operation_status = {"type": "error", "message": f"建立專案時發生錯誤: {e}"} + +def _execute_project_method(method_name: str, spinner_text: str): + """(新) 輔助函式,用於執行 Project 方法並處理回傳的狀態。""" + project = st.session_state.get('project') + if not project: + st.session_state.operation_status = {"type": "warning", "message": "請先選擇一個專案。"} + return + + method_to_call = getattr(project, method_name, None) + if not method_to_call: + st.session_state.operation_status = {"type": "error", "message": f"內部錯誤:找不到方法 {method_name}。"} + return + + with st.spinner(spinner_text): + success= method_to_call() + st.session_state.operation_status = {"type": "success" if success else "error"} + +# --- 所有後續的 Callback 現在都變得極其簡潔 --- + +def callback_sync_notion(): + """回呼:執行 Notion 同步。""" + _execute_project_method("sync_from_notion", "正在從 Notion 同步...") + +def callback_add_zh_ipa(): + """回呼:執行 AI 加註(翻譯與 IPA)。""" + _execute_project_method("add_translation_and_ipa", "正在呼叫 AI 進行翻譯與音標加註...") + +def callback_generate_sentence_audio(): + """回呼:生成音訊片段。""" + _execute_project_method("generate_sentence_audio", "正在生成音訊片段...") + +def callback_concatenate_audio(): + """回呼:組合完整音訊。""" + _execute_project_method("concatenate_audio", "正在組合完整音訊...") + +def callback_generate_subtitles(): + """回呼:生成 ASS 字幕檔。""" + _execute_project_method("generate_ass_subtitles", "正在生成 ASS 字幕檔...") + + +def assemble_final_video(logo_video_name, open_video_name, end_video_name): + """ + 【新】回呼:執行最終影片的合成。 + 只接收檔名,內部處理路徑和業務邏輯。 + """ + try: + project = st.session_state.get('project') + if not project: + # 如果找不到專案,立即更新狀態並終止函式,避免錯誤 + st.session_state.operation_status = { + "type": "error", + "message": "操作失敗:找不到有效的專案。請重新選擇或建立一個專案。" + } + return # 提前退出函式 + # 1. 在回呼函式內部,根據檔名組合出完整的路徑 + logo_path = SHARED_ASSETS_DIR / logo_video_name if logo_video_name else None + open_path = SHARED_ASSETS_DIR / open_video_name if open_video_name else None + end_path = SHARED_ASSETS_DIR / end_video_name if end_video_name else None + + # 2. 將核心業務邏輯委派給 Project 物件 + # 我們將在這裡呼叫一個新的 project.assemble_video 方法 + with st.spinner("影片合成中,這可能需要幾分鐘,請耐心等候..."): + project.assemble_video( + logo_video=logo_path, + open_video=open_path, + end_video=end_path + ) + + except Exception as e: + # 無論是上面的 ValueError,還是 project.assemble_video 中未被捕捉的錯誤 + error_message = f"執行影片合成時發生錯誤: {e}" + + + # 將統一格式的錯誤訊息回報給 UI,讓使用者知道發生了什麼 + st.session_state.operation_status = { + "type": "error", + "message": error_message + } + +def callback_run_step(step_function, source="unknown", **kwargs): + spinner_text = kwargs.pop("spinner_text", "正在處理中,請稍候...") + with st.spinner(spinner_text): + result = step_function(**kwargs) + if len(result) == 2: success, msg = result + elif len(result) == 3: success, msg, _ = result + else: success, msg = False, "步驟函式回傳格式不符!" + st.session_state.operation_status = {"success": success, "message": msg, "source": source} + if step_function == run_step6_assemble_video and success: + st.session_state.final_video_path = str(result[2]) + st.session_state.show_video = True + +def toggle_all_video_checkboxes(self, video_files): + """【新】切換所有影片核取方塊的選取狀態。""" + select_all_state = st.session_state.get('select_all_videos', False) + for video_file in video_files: + st.session_state[f"delete_cb_{video_file.name}"] = select_all_state + +def delete_selected_videos(self): + """【新】刪除在 UI 中選取的影片。""" + if not self.project: + st.session_state.operation_status = {"type": "warning", "message": "請先選擇一個專案。"} + return + + output_dir = self.project.paths["output"] + deleted_files_count = 0 + errors = [] + files_to_delete = [ + key.replace("delete_cb_", "", 1) + for key, value in st.session_state.items() + if key.startswith("delete_cb_") and value + ] + + if not files_to_delete: + st.session_state.operation_status = {"type": "info", "message": "沒有選擇任何要刪除的影片。"} + return + + for filename in files_to_delete: + try: + file_path = output_dir / filename + if file_path.exists(): + if st.session_state.get("final_video_path") and Path(st.session_state.final_video_path) == file_path: + st.session_state.final_video_path = None + file_path.unlink() + deleted_files_count += 1 + del st.session_state[f"delete_cb_{filename}"] + except Exception as e: + errors.append(f"刪除 {filename} 時出錯: {e}") + + if errors: + message = f"刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) + st.session_state.operation_status = {"type": "error", "message": message} + else: + message = f"成功刪除 {deleted_files_count} 個影片。" + st.session_state.operation_status = {"type": "success", "message": message} + +def upload_shared_videos(): + """ + 回呼:處理共享影片的上傳請求。 + """ + uploaded_files = st.session_state.get("shared_video_uploader", []) + + # 委派任務給 asset_manager + saved_count, errors = asset_manager.save_uploaded_shared_videos(uploaded_files) + + # 根據結果更新 UI 狀態 + if errors: + message = f"處理完成。成功 {saved_count} 個,失敗 {len(errors)} 個。錯誤: {', '.join(errors)}" + st.session_state.operation_status = {"type": "error", "message": message} + elif saved_count > 0: + message = f"成功上傳 {saved_count} 個新素材到共享庫!" + st.session_state.operation_status = {"type": "success", "message": message} + + st.session_state.shared_video_uploader = [] + +def callback_download_videos(paths: dict): + with st.spinner("正在下載影片,請稍候..."): + videos_to_download = {vid: info for vid, info in st.session_state.selected_videos.items() if info['selected']} + if not videos_to_download: + st.session_state.operation_status = {"success": False, "message": "尚未選擇任何影片。", "source": "download_videos"} + return + + download_dir = paths["output"] / "test" + download_dir.mkdir(parents=True, exist_ok=True) + + # --- 循序命名邏輯 START --- + existing_files = list(download_dir.glob('*.mp4')) + list(download_dir.glob('*.mov')) + existing_numbers = [] + for f in existing_files: + try: + # 從檔名 (不含副檔名) 中提取數字 + existing_numbers.append(int(f.stem)) + except ValueError: + # 忽略那些不是純數字的檔名 + continue + + # 決定起始編號 + start_counter = max(existing_numbers) + 1 if existing_numbers else 1 + counter = start_counter + # --- 循序命名邏輯 END --- + + total, download_count, errors = len(videos_to_download), 0, [] + + for i, (video_id, info) in enumerate(videos_to_download.items()): + url = info['url'] + original_path = Path(url) + + # 使用循序命名 + new_video_name = f"{counter}{original_path.suffix}" + save_path = download_dir / new_video_name + + try: + if not save_path.exists(): + response = requests.get(url, stream=True) + response.raise_for_status() + with open(save_path, 'wb') as f: + f.write(response.content) + download_count += 1 + counter += 1 # 只有在成功下載後才增加計數器 + except Exception as e: + errors.append(f"下載影片 {original_path.name} 失敗: {e}") + + if errors: + final_message = f"任務完成,但有 {len(errors)} 個錯誤。成功處理 {download_count}/{total} 個影片。\n" + "\n".join(errors) + st.session_state.operation_status = {"success": False, "message": final_message, "source": "download_videos"} + else: + final_message = f"任務完成!成功下載 {download_count}/{total} 個影片到專案的 `output/test` 資料夾,並已自動循序命名。" + st.session_state.operation_status = {"success": True, "message": final_message, "source": "download_videos"} + +def callback_toggle_preview(video_id): + if st.session_state.active_preview_id == video_id: st.session_state.active_preview_id = None + else: st.session_state.active_preview_id = video_id + + +def callback_delete_selected_audios(paths: dict): + deleted_files_count = 0 + errors = [] + + files_to_delete_names = [] + for key, value in st.session_state.items(): + if key.startswith("delete_audio_cb_") and value: + filename = key.replace("delete_audio_cb_", "", 1) + files_to_delete_names.append(filename) + + if not files_to_delete_names: + st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的音訊。", "source": "delete_audios"} + return + + audio_dir = paths["audio"] + output_dir = paths["output"] + + for filename in files_to_delete_names: + try: + file_path_audio = audio_dir / filename + file_path_output = output_dir / filename + + file_path_to_delete = None + if file_path_audio.exists(): + file_path_to_delete = file_path_audio + elif file_path_output.exists(): + file_path_to_delete = file_path_output + + if file_path_to_delete: + file_path_to_delete.unlink() + deleted_files_count += 1 + del st.session_state[f"delete_audio_cb_{filename}"] + else: + errors.append(f"找不到檔案 {filename}。") + + except Exception as e: + errors.append(f"刪除 {filename} 時出錯: {e}") + + if errors: + message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) + st.session_state.operation_status = {"success": False, "message": message, "source": "delete_audios"} + else: + message = f"✅ 成功刪除 {deleted_files_count} 個音訊。" + st.session_state.operation_status = {"success": True, "message": message, "source": "delete_audios"} + +def toggle_all_audio_checkboxes(audio_files): + select_all_state = st.session_state.get('select_all_audios', False) + for audio_file in audio_files: + st.session_state[f"delete_audio_cb_{audio_file.name}"] = select_all_state \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..baf78e6 --- /dev/null +++ b/config.py @@ -0,0 +1,15 @@ +# config.py + +from pathlib import Path + +# 專案的根目錄,可以根據您的實際結構調整 +# __file__ 是目前檔案 (config.py) 的路徑 +# .parent 會取得其所在的目錄 +# .parent.parent 會取得上上層目錄 +APP_ROOT = Path(__file__).parent + +# 所有專案的根目錄 +PROJECTS_DIR = APP_ROOT / "projects" + +# 共享素材庫的目錄 +SHARED_ASSETS_DIR = APP_ROOT / "shared_assets" \ No newline at end of file diff --git a/project_model.py b/project_model.py new file mode 100644 index 0000000..7a06b17 --- /dev/null +++ b/project_model.py @@ -0,0 +1,916 @@ +from utils.paths import PROJECTS_DIR,get_project_paths +from utils.helpers import get_media_duration,get_media_info +from pathlib import Path +from typing import List, Dict, Optional +import streamlit as st # 為了存取 secrets +from scripts import step1_notion_sync, step2_translate_ipa +import json +from notion_client import Client +from google.cloud import translate_v2 as translate +import eng_to_ipa +from google.cloud import texttospeech +from pydub import AudioSegment +from google.api_core.exceptions import GoogleAPICallError + +import re # 用於正規表示式匹配檔案名稱 +import pysubs2 # 專業字幕處理函式庫 +import librosa # 專業音訊分析函式庫 +import subprocess +import logging +import math +import os +# 設定日誌 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +class Project: + """ + 代表一個獨立的影片專案,封裝其檔案結構和資料。 + """ + def __init__(self, project_name: str, page_id: str | None = None): + """ + 初始化一個 Project 實例。注意:這不會在磁碟上建立檔案。 + 若要建立新專案,請使用 Project.create_new()。 + + Args: + project_name (str): 專案的名稱,對應其目錄名稱。 + """ + if not project_name or not isinstance(project_name, str): + raise ValueError("專案名稱必須是一個非空的字串。") + self.name = project_name + self.root_path: Path = PROJECTS_DIR / self.name + self.paths: dict[str, Path] = get_project_paths(self.name) + self.data = self._load_data() + if page_id: + self.page_id = page_id + elif self.data and 'id' in self.data: + self.page_id = self.data['id'] + else: + self.page_id = None + + + def _load_data(self) -> dict | None: + """嘗試從 data.json 載入資料,若失敗則回傳 None。""" + data_path = self.paths.get('data') + if data_path and data_path.exists(): + try: + return json.loads(data_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, FileNotFoundError): + return None + return None + + def create_directories(self): + """ + 確保專案所需的所有目錄都存在。 + 這個方法是「冪等」的,即使目錄已存在,重複呼叫也不會出錯。 + """ + print(f"正在為專案 '{self.name}' 檢查並建立目錄結構...") + + # 根據 get_project_paths 的定義,我們知道 'audio' 和 'output' 是需要建立的目錄 + # 將需要建立的目錄鍵名放入一個列表中,方便管理 + required_dirs = ['root','audio', 'output'] + + for dir_key in required_dirs: + directory_path = self.paths.get(dir_key) + if directory_path: + # 使用 exist_ok=True 避免在目錄已存在時拋出錯誤 + # 使用 parents=True 確保即使未來路徑變為巢狀結構也能成功建立 + directory_path.mkdir(parents=True, exist_ok=True) + print(f" - 目錄 '{directory_path}' 已確認存在。") + else: + print(f" - 警告:在路徑設定中找不到鍵名 '{dir_key}'。") + + @classmethod + def create_new(cls, project_name: str, notion_page_id: str) -> 'Project': + """ + 在檔案系統上建立一個新的專案目錄結構。 + + Args: + project_name (str): 新專案的名稱。 + + Returns: + Project: 新建立專案的實例。 + + Raises: + FileExistsError: 如果同名專案已存在。 + """ + project = cls(project_name, page_id=notion_page_id) + project.create_directories() + return project + + def __repr__(self) -> str: + return f"" + + def __str__(self) -> str: + return self.name + + def sync_from_notion(self) -> bool: + """ + 從 Notion 同步更新專案的 data.json 檔案。 + 這個方法是自給自足的,它知道自己的 page_id 和如何獲取 API Key。 + """ + if not self.page_id: + st.session_state.operation_status = { + "type": "error", + "message": "內部錯誤:此專案沒有關聯的 Notion 頁面 ID。" + } + return False + target_page_id = self.page_id + + api_key = st.secrets.get("NOTION_API_KEY") + if not api_key: + st.session_state.operation_status = { + "type": "error", + "message": "錯誤:找不到 Notion API Key。" + } + return False + + try: + client = Client(auth=api_key) + print(f"正在從 Notion 更新頁面 ID: {target_page_id}") + page_data = client.pages.retrieve(page_id=target_page_id) + + # 這裡可以重用 step1 中的 extract_property_value 邏輯 + # 最好是將該函式也移入 Project 類別作為一個私有方法 _extract_property_value + properties = page_data['properties'] + updated_entry = {"id": page_data['id']} + for prop_name, prop_data in properties.items(): + updated_entry[prop_name] = self._extract_property_value(prop_data) + + # 更新 data.json 檔案 + with open(self.paths['data'], 'w', encoding='utf-8') as f: + json.dump(updated_entry, f, ensure_ascii=False, indent=2) + + # 同時更新記憶體中的資料,保持同步 + self.data = updated_entry + self.data = self._load_data() + st.session_state.operation_status = { + "type": "success", + "message": f"專案 '{self.name}' 已成功從 Notion 同步更新。" + } + return True + + except Exception as e: + st.session_state.operation_status = { + "type": "error", + "message": f"更新專案 '{self.name}' 時發生錯誤: {e}" + } + return False + + def _extract_property_value(self,property_data): + """從 Notion 頁面屬性中提取純文字值。""" + prop_type = property_data.get('type') + if prop_type == 'title': + return property_data['title'][0]['plain_text'] if property_data.get('title') else "" + elif prop_type == 'rich_text': + return "\n".join([text['plain_text'] for text in property_data.get('rich_text', [])]) + elif prop_type == 'select' and property_data.get('select'): + return property_data['select']['name'] + elif prop_type == 'date' and property_data.get('date'): + return property_data['date']['start'] + return None + + @classmethod + def create_from_notion(cls, page_id: str, page_title: str): + """ + 工廠方法:從一個 Notion 頁面完整建立一個新專案。 + """ + api_key = st.secrets.get("NOTION_API_KEY") + if not api_key: + raise ValueError("無法建立專案,缺少 Notion API Key。") + + # 1. 先使用既有的 create_new 建立專案實體和目錄結構 + project = cls.create_new(project_name=page_title) + + # 2. 執行從 Notion 抓取資料並寫入 data.json 的邏輯 + try: + client = Client(auth=api_key) + page_data = client.pages.retrieve(page_id=page_id) + properties = page_data['properties'] + + # 使用 _extract_property_value 來解析資料 + page_entry = {"id": page_data['id']} + for prop_name, prop_data in properties.items(): + page_entry[prop_name] = project._extract_property_value(prop_data) + + # 將資料寫入檔案並更新物件狀態 + with open(project.paths['data'], 'w', encoding='utf-8') as f: + json.dump(page_entry, f, ensure_ascii=False, indent=2) + project.data = page_entry + + return project # 回傳完整初始化的專案物件 + + except Exception as e: + # 如果出錯,可能需要考慮刪除已建立的空資料夾,以保持系統乾淨 + # (這部分屬於錯誤處理的細化) + raise IOError(f"從 Notion 頁面 (ID: {page_id}) 抓取資料時失敗: {e}") + + + _translate_client = None + + @property + def translate_client(self): + """ + 延遲初始化 (Lazy Initialization) Google Translate 客戶端。 + 只有在第一次真正需要翻譯時,才會建立客戶端實例, + 並將其儲存起來以供後續重複使用,避免重複認證。 + """ + if self._translate_client is None: + creds_path = st.secrets.get("GOOGLE_CREDS_TRANSLATE_PATH") + if not creds_path: + raise ValueError("未在 secrets 中設定 Google Cloud 翻譯認證檔案路徑。") + + print("正在初始化 Google Translate 客戶端...") + self._translate_client = translate.Client.from_service_account_json(creds_path) + + return self._translate_client + + def add_translation_and_ipa(self) -> bool: + """ + 為專案資料添加翻譯和 IPA 音標。 + 這是協調器,負責整個流程的控制。 + """ + # 1. 檢查前置條件:確保核心資料已存在 + if not self.data or "en" not in self.data: + st.session_state.operation_status = { + "type": "error", + "message": "錯誤:專案資料不完整或缺少英文原文 ('en')。" + } + return False + + # 2. 準備處理資料 + # 從 `data.json` 讀取英文句子列表[1] + english_sentences = self.data["en"].strip().split('\n') + + translated_sentences = [] + ipa_sentences = [] + + # 3. 遍歷每一句,呼叫輔助方法進行處理 + for sentence in english_sentences: + if not sentence: + continue + + # 呼叫封裝好的翻譯方法 + translated = self._call_google_translate(sentence) + translated_sentences.append(translated if translated is not None else "[翻譯失敗]") + + # 呼叫封裝好的 IPA 轉換方法 + ipa = self._get_ipa_for_text(sentence) + ipa_sentences.append(ipa if ipa is not None else "[轉換失敗]") + + # 4. 將處理結果更新回 self.data + self.data['zh'] = "\n".join(translated_sentences) + self.data['ipa'] = "\n".join(ipa_sentences) + + try: + # 首先,將更新後的資料寫回本地 data.json + with open(self.paths['data'], 'w', encoding='utf-8') as f: + json.dump(self.data, f, ensure_ascii=False, indent=2) + + # 然後,呼叫新方法將結果同步回 Notion + if self._update_single_notion_page(): + st.session_state.operation_status = { + "type": "success", + "message": "AI 加註完成,並已成功寫回 Notion!🚀" + } + return True + else: + st.session_state.operation_status = { + "type": "warning", + "message": "AI 加註已完成,但寫回 Notion 時失敗。請稍後手動同步。" + } + return False + + except IOError as e: + st.session_state.operation_status = { + "type": "error", + "message": f"寫入本地 data.json 時發生錯誤: {e}" + } + return False + + def _update_single_notion_page(self) -> bool: + """ + (私有方法) 將 self.data 中的 'zh' 和 'ipa' 欄位更新回對應的 Notion 頁面。 + """ + if not self.page_id: + st.session_state.operation_status = { + "type": "error", + "message": "無法更新 Notion:專案缺少 page_id。" + } + return False + + api_key = st.secrets.get("NOTION_API_KEY") + if not api_key: + st.session_state.operation_status = { + "type": "error", + "message": "無法更新 Notion:缺少 API Key。" + } + return False + + try: + client = Client(auth=api_key) + + # 準備要更新的屬性 + # 注意:這裡的 "中文翻譯" 和 "IPA音標" 必須與您 Notion Database 中的欄位名稱完全一致! + + properties_to_update = { + # 假設您在 Notion 中的欄位名稱就是 "zh" + # 如果不是,請修改 "zh" 為您實際的欄位名稱,例如 "中文翻譯" + "zh": { + "rich_text": [ + { + "type": "text", + "text": { + "content": self.data.get('zh', '') + } + } + ] + }, + # 假設您在 Notion 中的欄位名稱就是 "ipa" + # 如果不是,請修改 "ipa" 為您實際的欄位名稱,例如 "IPA" + "ipa": { + "rich_text": [ + { + "type": "text", + "text": { + "content": self.data.get('ipa', '') + } + } + ] + } + } + print(f"正在將 AI 加註結果寫回 Notion 頁面: {self.page_id}") + client.pages.update(page_id=self.page_id, properties=properties_to_update) + return True + + except Exception as e: + st.session_state.operation_status = { + "type": "error", + "message": f"更新 Notion 頁面時發生 API 錯誤: {e}" + } + return False + + def _call_google_translate(self, text: str) -> str | None: + """ + (輔助) 呼叫 Google Translate API。 + 成功時返回翻譯字串,失敗時在後台記錄錯誤並返回 None。 + """ + try: + result = self.translate_client.translate(text, target_language="zh-TW") + return result.get('translatedText') + except GoogleAPICallError as e: + # 不直接顯示 UI 警告,改為在後台記錄,讓呼叫者決定如何處理 + print(f"警告:Google API 呼叫失敗 - {e.message}") + return None + except Exception as e: + # 同上,記錄詳細錯誤 + print(f"錯誤:翻譯 '{text[:20]}...' 時發生未預期的錯誤: {e}") + return None + + + def _get_ipa_for_text(self, text: str) -> str | None: + """ + 為給定的英文文字獲取 IPA 音標。 + 失敗時在後台記錄錯誤並返回 None。 + """ + if not text: + return "" + + try: + # ... (您原有的 IPA 轉換邏輯保持不變) ... + ipa_word_list = eng_to_ipa.convert(text, keep_punct=True) + if not isinstance(ipa_word_list, list): + return str(ipa_word_list) + + final_ipa_string = ' '.join(ipa_word_list) + final_ipa_string = final_ipa_string.replace('*', '') + final_ipa_string = final_ipa_string.replace(' ,', ',').replace(' .', '.').replace(' ?', '?').replace(' !', '!') + + return final_ipa_string + + except Exception as e: + print(f"警告:無法為 '{text[:20]}...' 獲取 IPA 音標: {e}") + return None + + + + _tts_client = None # 為 TTS Client 新增一個儲存屬性 + + @property + def tts_client(self): + """延遲初始化 Google Text-to-Speech 客戶端,高效且僅在需要時執行一次。""" + if self._tts_client is None: + creds_path = st.secrets.get("GOOGLE_CREDS_TTS_PATH") + if not creds_path: + raise ValueError("未在 secrets 中設定 Google Cloud TTS 認證檔案路徑。") + self._tts_client = texttospeech.TextToSpeechClient.from_service_account_json(creds_path) + return self._tts_client + + def generate_sentence_audio(self) -> bool: + """ + 為專案中每一對中英文句子,生成一個包含多種聲音的「教學音訊片段」。 + """ + # 1. 檢查前置條件 + if not self.data or "en" not in self.data or "zh" not in self.data: + st.session_state.operation_status = { + "type": "error", + "message": "錯誤:專案資料不完整,必須同時包含 'en' 和 'zh' 欄位。" + } + return False + + # 2. 讀取並配對中英文句子 + english_sentences = [line for line in self.data["en"].strip().split('\n') if line] + chinese_sentences = [line for line in self.data["zh"].strip().split('\n') if line] + + if len(english_sentences) != len(chinese_sentences): + st.session_state.operation_status = { + "type": "error", + "message": "en zh數量不符合。" + } + return False + # 3. 從 data.json 讀取 voice 設定,如果沒有則使用預設值 + voice_config = { + 'english_voice_1': self.data.get("english_voice_1", "en-US-Wavenet-I"), + 'english_voice_2': self.data.get("english_voice_2", "en-US-Wavenet-F"), + 'chinese_voice': self.data.get("chinese_voice", "cmn-TW-Wavenet-B") + } + print("使用的語音設定:", voice_config) # 提供除錯資訊 + + audio_dir = self.paths['audio'] + audio_dir.mkdir(parents=True, exist_ok=True) + + success_count = 0 + total_count = min(len(english_sentences), len(chinese_sentences)) + if total_count == 0: + print("沒有找到任何可處理的句子對。") + return False + + progress_bar = st.progress(0, text="正在生成教學音訊片段...") + + for i in range(total_count): + item_en = english_sentences[i] + item_zh = chinese_sentences[i] + output_path = audio_dir / f"{i:03d}.wav" + + # 4. 呼叫新的 SSML 生成器 + ssml_content = self._generate_ssml(item_en, item_zh, voice_config) + + # 5. 呼叫 TTS API (無需修改) + if self._call_google_tts(ssml_content, output_path,default_voice_name=voice_config['english_voice_1']): + success_count += 1 + + progress_bar.progress((i + 1) / total_count, text=f"正在生成教學音訊... ({i+1}/{total_count})") + + progress_bar.empty() + + if success_count > 0: + st.session_state.operation_status = { + "type": "success", + "message": "音訊生成成功!。" + } + return True + else: + st.session_state.operation_status = { + "type": "error", + "message": "音訊生成失敗!。" + } + return False + + + def _generate_ssml(self, item_en: str, item_zh: str, voice_config: dict) -> str: + """ + (新輔助方法) 根據模板,生成包含多種聲音的教學 SSML。 + """ + # 對文本進行 XML 轉義,防止特殊字元破壞 SSML 結構 + safe_en = item_en.replace('&', '&').replace('<', '<').replace('>', '>') + safe_zh = item_zh.replace('&', '&').replace('<', '<').replace('>', '>') + + return f""" + + + + {safe_en} + + + + {safe_en} + + + + {safe_zh} + + + + {safe_en} + + + +""" + + def _call_google_tts(self, ssml_content: str, output_path: Path, default_voice_name: str) -> bool: + """ + (輔助方法已更新) 現在只接收 SSML 內容,不再需要 voice_name 參數。 + """ + try: + synthesis_input = texttospeech.SynthesisInput(ssml=ssml_content) + + voice_params = texttospeech.VoiceSelectionParams( + # 從 'en-US-Wavenet-D' 中提取 'en-US' + language_code='-'.join(default_voice_name.split('-')[:2]), + name=default_voice_name + ) + audio_config = texttospeech.AudioConfig( + audio_encoding=texttospeech.AudioEncoding.LINEAR16, + sample_rate_hertz=24000 + ) + + response = self.tts_client.synthesize_speech( + input=synthesis_input, + voice=voice_params, + audio_config=audio_config + ) + with open(output_path, "wb") as out: + out.write(response.audio_content) + return True + except Exception as e: + st.session_state.operation_status = { + "type": "error", + "message": "音訊生成發生錯誤!" + } + return False + + def concatenate_audio(self) -> bool: + """ + 將 audio 資料夾中所有獨立的 .wav 檔,按檔名順序拼接成一個完整的音訊檔。 + """ + # 1. 檢查前置條件:使用我們之前建立的狀態檢查方法 + if not self.has_sentence_audio(): + st.session_state.operation_status = { + "type": "error", + "message": "錯誤:在 audio 資料夾中找不到任何 .wav 檔案,無法進行組合。" + } + return False + + audio_dir = self.paths['audio'] + output_path = self.paths['combined_audio'] + + try: + # 2. 獲取所有 .wav 檔案並進行排序 + # 使用 sorted() 確保檔案是按字母順序(例如 000.wav, 001.wav, ...)處理的 + wav_files = sorted(audio_dir.glob("*.wav")) + + if not wav_files: + st.session_state.operation_status = { + "type": "error", + "message": " audio 資料夾中找到了目錄,但沒有找到 .wav 檔案。" + } + return False + + # 3. 初始化一個空的 AudioSegment 作為拼接的基礎 + # 這是比「拿第一個檔案當基礎」更穩健的做法 + combined_audio = AudioSegment.empty() + + # 4. 遍歷所有音訊檔並依次拼接 + for wav_file in wav_files: + # 讀取單個 .wav 檔案 + segment = AudioSegment.from_wav(wav_file) + + # 使用 `+` 運算子將音訊片段拼接到末尾 + combined_audio += segment + + # 5. 匯出拼接好的完整音訊檔 + # format="wav" 明確指定輸出格式 + print(f"正在將組合音訊匯出到: {output_path}") + combined_audio.export(output_path, format="wav") + + return True + + except FileNotFoundError: + st.session_state.operation_status = { + "type": "error", + "message": "FFmpeg 未安裝或未在系統路徑中。Pydub 需要它來處理音訊。" + } + print("Pydub 錯誤:請確保 FFmpeg 已安裝並在系統的 PATH 環境變數中。") + return False + except Exception as e: + st.session_state.operation_status = { + "type": "error", + "message": f"組合音訊時發生未預期的錯誤: {e}" + } + print(f"組合音訊時出錯: {e}") + return False + + def generate_ass_subtitles(self) -> bool: + """ + 根據 data.json 的內容和 audio/ 目錄中每個音訊檔的時長, + 生成一個包含多種樣式和四層字幕的 .ass 檔案。 + 此版本精確複製了新的 step5_generate_ass.py 的邏輯[1]。 + """ + # 1. 檢查前置條件 + if not self.has_sentence_audio(): + st.session_state.operation_status = { + "type": "error", + "message": "錯誤:找不到單句音訊檔,無法生成字幕。" + } + return False + + if not self.data: + st.session_state.operation_status = { + "type": "error", + "message": "錯誤:專案資料未載入。" + } + return False + + # 2. 準備路徑和資料 + ass_path = self.paths['ass_file'] + audio_dir = self.paths['audio'] + + try: + # 從 self.data 讀取各語言的文本行 + en_lines = [line.strip() for line in self.data.get("en", "").split('\n') if line.strip()] + zh_lines = [line.strip() for line in self.data.get("zh", "").split('\n') if line.strip()] + ipa_lines = [line.strip() for line in self.data.get("ipa", "").split('\n') if line.strip()] + + # 3. 使用正規表示式獲取並排序音訊檔案 + file_pattern = r"(\d{3})\.wav" # 根據您的腳本,檔名應為 vocab_00.wav, vocab_01.wav 等 + pattern = re.compile(file_pattern) + wav_files = sorted( + [p for p in audio_dir.iterdir() if p.is_file() and pattern.fullmatch(p.name)], + key=lambda p: int(pattern.fullmatch(p.name).group(1)) + ) + + # 4. 嚴格檢查資料數量是否一致 + if not (len(wav_files) == len(en_lines) == len(zh_lines) == len(ipa_lines)): + msg = f"錯誤:資料數量不一致!音訊({len(wav_files)}), EN({len(en_lines)}), ZH({len(zh_lines)}), IPA({len(ipa_lines)})" + st.session_state.operation_status = {"type": "error", "message": msg} + return False + + total_files = len(wav_files) + if total_files == 0: + st.session_state.operation_status = {"type": "warning", "message": "找不到任何匹配的音訊檔來生成字幕。"} + return False + + # 5. 使用 pysubs2 建立 .ass 檔案物件 + subs = pysubs2.SSAFile() + subs.info["PlayResX"] = "1920" + subs.info["PlayResY"] = "1080" + subs.info["Title"] = self.name + + # 6. 定義所有需要的樣式 (從您的腳本中精確複製)[1] + subs.styles["EN"] = pysubs2.SSAStyle(fontname="Noto Sans", fontsize=140, primarycolor=pysubs2.Color(255, 248, 231), outlinecolor=pysubs2.Color(255, 248, 231), outline=2, alignment=pysubs2.Alignment.TOP_CENTER, marginv=280) + subs.styles["IPA"] = pysubs2.SSAStyle(fontname="Noto Sans", fontsize=110, primarycolor=pysubs2.Color(255, 140, 0), outlinecolor=pysubs2.Color(255, 140, 0), outline=1, alignment=pysubs2.Alignment.TOP_CENTER, marginv=340) + subs.styles["ZH"] = pysubs2.SSAStyle(fontname="Noto Sans TC", fontsize=140, primarycolor=pysubs2.Color(102, 128, 153), outlinecolor=pysubs2.Color(102, 128, 153), outline=1, alignment=pysubs2.Alignment.TOP_CENTER, marginv=440) + subs.styles["NUMBER"] = pysubs2.SSAStyle(fontname="Segoe UI Symbol", fontsize=120, primarycolor=pysubs2.Color(144, 144, 144), outlinecolor=pysubs2.Color(144, 144, 144), bold=True, outline=1, borderstyle=1,alignment=pysubs2.Alignment.TOP_RIGHT, marginl=0, marginr=260, marginv=160) + + # 7. 遍歷音訊檔,生成四層字幕事件 + st.session_state.operation_status = { + "type": "progress", "value": 0, "message": "正在準備生成字幕..." + } + current_time_ms = 0 + for i, wav_path in enumerate(wav_files): + # 使用 librosa 獲取精確時長 (秒),並轉換為毫秒 + duration_ms = int(librosa.get_duration(path=str(wav_path)) * 1000) + + start_time = current_time_ms + end_time = current_time_ms + duration_ms + + # 為每一層字幕建立一個事件 + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=en_lines[i], style="EN")) + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=f"[{ipa_lines[i]}]", style="IPA")) + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=zh_lines[i], style="ZH")) + subs.append(pysubs2.SSAEvent(start=start_time, end=end_time, text=i, style="NUMBER")) + + current_time_ms = end_time + st.session_state.operation_status = { + "type": "progress", + "value": (i + 1) / total_files, + "message": f"正在處理字幕... ({i+1}/{total_files})" + } + # 8. 儲存 .ass 檔案 + subs.save(str(ass_path)) + st.session_state.operation_status = { + "type": "success", + "message": f"ASS 字幕檔已成功生成並儲存於 {ass_path.name}!🎉" + } + return True + + except Exception as e: + st.session_state.operation_status = { + "type": "error", + "message": f"生成 ASS 字幕時發生未預期的錯誤: {e}" + } + return False + + def has_sentence_audio(self) -> bool: + """檢查 audio 資料夾是否存在且包含 .wav 檔案。""" + audio_dir = self.paths.get('audio') + return audio_dir and audio_dir.exists() and any(f.suffix == '.wav' for f in audio_dir.iterdir()) + + def has_combined_audio(self) -> bool: + """檢查組合後的音訊檔是否存在。""" + combined_path = self.paths.get('combined_audio') + return combined_path and combined_path.exists() + + + + def assemble_video(self, logo_video: Path, open_video: Path, end_video: Path): + """ + 使用 FFmpeg 的 xfade 濾鏡組裝最終影片,並在過程中統一影片屬性與進行色彩校正。 + """ + # --- 標準化設定 --- + TARGET_WIDTH = 1920 + TARGET_HEIGHT = 1080 + TARGET_FPS = 30 + + def escape_ffmpeg_path_for_filter(path: Path) -> str: + """為在 FFmpeg 濾鏡圖中安全使用而轉義路徑。""" + path_str = str(path.resolve()) + if os.name == 'nt': + return path_str.replace('\\', '\\\\').replace(':', '\\:') + else: + return path_str.replace("'", "'\\\\\\''") + try: + # --- 1. 路徑與檔案檢查 --- + output_dir = self.paths['output'] + temp_video_dir = self.paths['temp_video'] + audio_path = self.paths['combined_audio'] + ass_path = self.paths['ass_file'] + final_video_path = self.paths['final_video'] + bg_final_path = output_dir / "bg_final.mp4" + transition_duration = 1.0 + print("111") + for file_path in [logo_video, open_video, end_video, audio_path, ass_path]: + if not file_path or not file_path.exists(): + st.session_state.operation_status = { + "type": "error", + "message": f"缺少必需的檔案: {e}" + } + return + print("222") + + base_videos = sorted([p for p in temp_video_dir.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']]) + if not base_videos: + st.session_state.operation_status = { + "type": "error", + "message": f"資料夾中沒有影片可供合成。: {e}" + } + return + print("333") + + + p_loop_count, error_msg = self.calculate_loop_count(transition_duration=transition_duration) + if error_msg: + # 如果計算出錯,拋出一個錯誤讓外層的 except 捕捉 + raise ValueError(f"計算循環次數時失敗: {error_msg}") + + print("444") + # --- 2. 一步式生成主要內容影片 (bg_final.mp4) --- + if not bg_final_path.exists(): + print(f"⚙️ 準備一步式生成主要內容影片 (循環 {p_loop_count} 次)...") + + looped_video_list = base_videos * p_loop_count + inputs_cmd = [] + for video_path in looped_video_list: + inputs_cmd.extend(["-i", str(video_path)]) + + inputs_cmd.extend(["-i", str(audio_path)]) + ass_path_str = escape_ffmpeg_path_for_filter(ass_path) + + filter_parts = [] + stream_count = len(looped_video_list) + + # 【核心修改】定義色彩校正濾鏡 + color_correction_filter = "eq=brightness=-0.05:contrast=0.95:saturation=0.7" + + # 將標準化 (尺寸、影格率) 與色彩校正合併 + for i in range(stream_count): + filter_parts.append( + f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS},{color_correction_filter}[v_scaled_{i}]" + ) + + last_v_out = "[v_scaled_0]" + for i in range(stream_count - 1): + offset = sum(get_media_duration(v) for v in looped_video_list[:i+1]) - (i + 1) * transition_duration + v_in_next = f"[v_scaled_{i+1}]" + v_out_current = f"[v_out{i+1}]" if i < stream_count - 2 else "[bg_video_stream]" + filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}") + last_v_out = v_out_current + + drawbox_filter = "drawbox=w=iw*0.8:h=ih*0.8:x=(iw-iw*0.8)/2:y=(ih-ih*0.8)/2:color=0x808080@0.7:t=fill" + filter_parts.append(f"[bg_video_stream]{drawbox_filter}[v_with_mask]") + filter_parts.append(f"[v_with_mask]ass=filename='{ass_path_str}'[final_v]") + filter_complex_str = ";".join(filter_parts) + + ffmpeg_main_content_cmd = [ + "ffmpeg", "-y", *inputs_cmd, + "-filter_complex", filter_complex_str, + "-map", "[final_v]", + "-map", f"{stream_count}:a:0", + "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast", + "-c:a", "aac", "-b:a", "192k", + "-shortest", + str(bg_final_path) + ] + + print("🚀 正在執行 FFmpeg 主要內容合成指令 (含色彩校正)...") + result = subprocess.run(ffmpeg_main_content_cmd, check=True, capture_output=True, text=True, encoding='utf-8') + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr) + print(f"✅ 主要內容影片已生成: {bg_final_path}") + + # --- 3. 合成最終的影片序列 (logo -> open -> bg_final -> end) --- + if final_video_path.exists(): + print(f"✅ 最終影片已存在,跳過組裝: {final_video_path}") + return True, f"✅ 影片已存在於 {final_video_path}", final_video_path + + print("🎬 開始動態生成 xfade 合成指令...") + videos_to_concat = [logo_video, open_video, bg_final_path, end_video] + final_inputs_cmd, final_filter_parts, video_infos = [], [], [] + + for video_path in videos_to_concat: + final_inputs_cmd.extend(["-i", str(video_path)]) + info = get_media_info(video_path) + video_infos.append(info) + + for i, info in enumerate(video_infos): + if not info["has_audio"]: + duration = info['duration'] + final_filter_parts.append(f"anullsrc=r=44100:cl=stereo:d={duration:.4f}[silent_a_{i}]") + + for i in range(len(videos_to_concat)): + # 注意:這裡我們不對 logo/open/end 影片進行調色,以保留它們的原始風格 + final_filter_parts.append( + f"[{i}:v]scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease,pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={TARGET_FPS}[v_final_scaled_{i}]" + ) + + last_v_out = "[v_final_scaled_0]" + last_a_out = "[0:a]" if video_infos[0]["has_audio"] else "[silent_a_0]" + + for i in range(len(videos_to_concat) - 1): + offset = sum(info['duration'] for info in video_infos[:i+1]) - (i + 1) * transition_duration + v_in_next = f"[v_final_scaled_{i+1}]" + a_in_next = f"[{i+1}:a]" if video_infos[i+1]["has_audio"] else f"[silent_a_{i+1}]" + + v_out_current = f"[v_out{i+1}]" if i < len(videos_to_concat) - 2 else "[video]" + a_out_current = f"[a_out{i+1}]" if i < len(videos_to_concat) - 2 else "[audio]" + + final_filter_parts.append(f"{last_v_out}{v_in_next}xfade=transition=fade:duration={transition_duration}:offset={offset:.4f}{v_out_current}") + final_filter_parts.append(f"{last_a_out}{a_in_next}acrossfade=d={transition_duration}{a_out_current}") + + last_v_out, last_a_out = v_out_current, a_out_current + + final_filter_complex_str = ";".join(final_filter_parts) + + ffmpeg_concat_cmd = [ + "ffmpeg", "-y", *final_inputs_cmd, + "-filter_complex", final_filter_complex_str, + "-map", "[video]", "-map", "[audio]", + "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "fast", + "-movflags", "+faststart", + "-c:a", "aac", "-b:a", "192k", + str(final_video_path) + ] + + print("🚀 執行 FFmpeg 最終序列合成指令...") + result = subprocess.run(ffmpeg_concat_cmd, check=True, capture_output=True, text=True, encoding='utf-8') + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, result.args, stderr=result.stderr) + + print(f"✅ 影片已成功組裝並儲存至 {final_video_path}") + return True, f"✅ 影片已成功組裝並儲存至 {final_video_path}", final_video_path + + except subprocess.CalledProcessError as e: + error_output = e.stderr if e.stderr else str(e) + return False, f"❌ FFmpeg 執行失敗:\n{error_output}", None + except Exception as e: + return False, f"❌ 發生未預期錯誤: {e}", None + + + # 【新增】計算循環次數 p 的函式 + def calculate_loop_count(self, transition_duration: float) -> tuple[int | None, str | None]: + """ + 根據音訊長度和一系列影片,計算所需的最小循環次數 p。 + 成功時回傳 (p, None),失敗時回傳 (None, error_message)。 + """ + print("aaa") + m_prime = get_media_duration(self.paths['combined_audio']) + if m_prime is None: + return None, "無法讀取音訊檔案長度。" + print("bbb") + video_folder=self.paths['temp_video'] + video_paths = [p for p in video_folder.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']] + video_lengths = [get_media_duration(p) for p in video_paths] + video_lengths = [l for l in video_lengths if l is not None and l > 0] + print("ccc") + if not video_lengths: + return None, "在 `test` 資料夾中找不到有效的影片檔案。" + print("ddd") + n = len(video_lengths) + m = sum(video_lengths) + tr = transition_duration + + denominator = m - (n - 1) * tr + + if denominator <= 0: + return None, f"影片的有效長度 ({denominator:.2f}s) 小於或等於零,無法進行循環。請增加影片時長或減少轉場時間。" + + p = math.ceil(m_prime / denominator) + return p, None diff --git a/ui_fragments/audio_manager.py b/ui_fragments/audio_manager.py index a1fce8f..433bb39 100644 --- a/ui_fragments/audio_manager.py +++ b/ui_fragments/audio_manager.py @@ -1,5 +1,5 @@ import streamlit as st -from utils.callbacks import toggle_all_audio_checkboxes, callback_delete_selected_audios +from callbacks import toggle_all_audio_checkboxes, callback_delete_selected_audios @st.fragment def audio_management_fragment(paths: dict): diff --git a/ui_fragments/tab0_data_processing.py b/ui_fragments/tab0_data_processing.py new file mode 100644 index 0000000..3eb0d2c --- /dev/null +++ b/ui_fragments/tab0_data_processing.py @@ -0,0 +1,38 @@ +# ui_fragments/tab_data_processing.py + +import streamlit as st +import callbacks +from project_model import Project # 引入 Project 以便進行型別註記 + +@st.fragment +def render_tab(project: Project): + """ + 負責顯示「1. 資料處理 & AI 加註」Tab 的所有 UI 元件和邏輯。 + """ + # 根據 project.data 的狀態來決定顯示哪個 UI + if project.data: + # --- 狀態一:專案「已同步」--- + st.session_state.operation_status = { + "success": True, + "message": "✅ 專案資料已就緒。" + } + with st.container(border=True): + col1, col2 = st.columns([3, 1]) + with col1: + st.subheader("專案資料與 AI 加註") + with col2: + st.button("🔄 從 Notion 同步更新", on_click=callbacks.callback_sync_notion) + with st.expander("預覽專案資料", expanded=False): + st.json(project.data) + st.divider() + st.subheader("AI 自動加註") + # ... (AI 加註的按鈕和邏輯) ... + st.button("執行 AI 翻譯與音標加註並寫回 Notion", on_click=callbacks.callback_add_zh_ipa) + + else: + # --- 狀態二:專案「僅初始化」--- + st.info("ℹ️ 這個專案的框架已建立,請執行第一步來同步核心資料。") + st.button("🚀 步驟一:從 Notion 抓取專案資料", on_click=callbacks.callback_sync_notion, type="primary") + + st.divider() + st.info("✍️ **人工檢查點**:\n\n1. 請確保 `Notion` 中的內容是您想要的最終版本。\n2. 修改完成後,記得點擊上方的同步按鈕。") \ No newline at end of file diff --git a/ui_fragments/tab1_asset_generation.py b/ui_fragments/tab1_asset_generation.py new file mode 100644 index 0000000..9457024 --- /dev/null +++ b/ui_fragments/tab1_asset_generation.py @@ -0,0 +1,101 @@ +# utils/tab1_asset_generation.py + +import streamlit as st + + +@st.fragment +def render_tab(project, callbacks): + """ + 渲染「素材生成」標籤頁的 UI。 + + 這個函式負責處理所有與音效生成和字幕檔生成相關的介面元件。 + + Args: + project: 存放專案狀態和路徑的物件。 + callbacks: 存放按鈕回呼函式的物件。 + """ + # --- 音效生成區塊 --- + with st.container(border=True): + st.subheader("音效生成") + + # 步驟 3.1 + st.markdown("##### 步驟 3.1: 生成單句音訊") + st.button("執行生成", on_click=callbacks.callback_generate_sentence_audio) + + st.divider() + + # 步驟 3.2 + st.markdown("##### 步驟 3.2: 組合完整音訊") + can_concatenate = project.has_sentence_audio() + + if not can_concatenate: + st.info("請先執行步驟 3.1 以生成單句音訊。") + + st.button("執行組合", on_click=callbacks.callback_concatenate_audio, disabled=not can_concatenate) + + if project.has_combined_audio(): + st.session_state.operation_status = { + "success": True, + "message": "🎉 完整音訊已組合成功!" + } + st.audio(str(project.paths['combined_audio'])) + + audio_management_fragment(project,callbacks) + + st.divider() + + # --- 字幕生成區塊 --- + with st.container(border=True): + st.subheader("步驟 4: 生成 ASS 字幕檔") + can_generate_ass = project.has_sentence_audio() + + if not can_generate_ass: + st.info("請先執行步驟 3.1 以生成字幕所需的時間戳。") + + st.button("📝 生成 .ass 字幕檔", on_click=callbacks.callback_generate_subtitles, disabled=not can_generate_ass) + + st.divider() + + +@st.fragment +def audio_management_fragment(project,callbacks): + """A self-contained fragment for managing and deleting project audio files.""" + with st.expander("🎧 管理專案音訊素材"): + paths = project.paths + + audio_dir = paths["audio"] + output_dir = paths["output"] + + audio_files = [] + if audio_dir.exists(): + audio_files.extend(sorted(audio_dir.glob('*.wav'))) + if output_dir.exists(): + audio_files.extend(sorted(output_dir.glob('*.wav'))) + + if not audio_files: + st.info("專案的 `audio` 和 `output` 資料夾中目前沒有音訊檔。") + else: + all_selected = all(st.session_state.get(f"delete_audio_cb_{f.name}", False) for f in audio_files) + if st.session_state.get('select_all_audios', False) != all_selected: + st.session_state.select_all_audios = all_selected + + st.markdown("勾選您想要刪除的音訊檔案,然後點擊下方的按鈕。") + + st.checkbox( + "全選/取消全選", + key="select_all_audios", + on_change=callbacks.toggle_all_audio_checkboxes, + args=(audio_files,) + ) + + with st.form("delete_audios_form"): + for audio_file in audio_files: + file_size_kb = audio_file.stat().st_size / 1024 + label = f"**{audio_file.parent.name}/{audio_file.name}** ({file_size_kb:.2f} KB)" + st.checkbox(label, key=f"delete_audio_cb_{audio_file.name}") + + submitted = st.form_submit_button("🟥 確認刪除選取的音訊", use_container_width=True, type="primary") + + if submitted: + callbacks.callback_delete_selected_audios(paths) + st.rerun(scope="fragment") \ No newline at end of file diff --git a/ui_fragments/tab2_online_video_search.py b/ui_fragments/tab2_online_video_search.py new file mode 100644 index 0000000..08bee38 --- /dev/null +++ b/ui_fragments/tab2_online_video_search.py @@ -0,0 +1,142 @@ +# utils/tab2_search_video.py + +import streamlit as st +import os +from pathlib import Path +from utils.helpers import analyze_ass_for_keywords, search_pixabay_videos, search_pexels_videos + +@st.fragment +def render_tab(project,callbacks): + """一個用於從多個來源搜尋並下載影片的獨立UI片段。""" + paths = project.paths + st.header("步驟 5: 從線上圖庫搜尋影片素材") + pixabay_api_key = st.secrets.get("PIXABAY_API_KEY") + pexels_api_key = st.secrets.get("PEXELS_API_KEY") + + if not pixabay_api_key and not pexels_api_key: + st.warning("請至少在 Streamlit secrets 中設定 `PIXABAY_API_KEY` 或 `PEXELS_API_KEY` 以使用此功能。") + return + + st.subheader("5.1 搜尋影片") + + # 【新增】影片來源選擇 + available_sources = [] + if pixabay_api_key: available_sources.append("Pixabay") + if pexels_api_key: available_sources.append("Pexels") + + if not available_sources: + st.session_state.operation_status = { + "success": False, + "message": "沒有可用的影片來源 API 金鑰。" + } + return + + source_choice = st.radio( + "選擇影片來源:", + options=available_sources, + horizontal=True, + key="video_source_choice" + ) + + keywords = analyze_ass_for_keywords(paths) + default_query = keywords[0] if keywords else "" + + with st.form(key="search_form"): + col1, col2 = st.columns([4, 1]) + with col1: + st.text_input( + "Search Videos", + value=default_query, + key="search_query_input", + placeholder="輸入關鍵字後按 Enter 或點擊右側按鈕搜尋", + label_visibility="collapsed" + ) + with col2: + submitted = st.form_submit_button("🔍 搜尋影片", use_container_width=True) + + if submitted: + with st.spinner("正在搜尋並過濾影片..."): + query = st.session_state.get("search_query_input", "") + + # 【修改】根據選擇呼叫對應的 API + if source_choice == "Pixabay": + success, message, results = search_pixabay_videos(pixabay_api_key, query) + elif source_choice == "Pexels": + success, message, results = search_pexels_videos(pexels_api_key, query) + else: + success, message, results = False, "未知的影片來源", [] + + st.session_state.operation_status = {"success": success, "message": message, "source": "search_videos"} + + # --- 【修改】資料標準化 --- + standardized_results = [] + if success and results: + if source_choice == "Pixabay": + for v in results: + try: + standardized_results.append({ + 'id': f"pixabay-{v['id']}", + 'thumbnail_url': v['videos']['tiny']['thumbnail'], + 'video_url': v['videos']['large']['url'], + 'preview_url': v['videos']['tiny']['url'], + 'width': v['videos']['large']['width'], + 'height': v['videos']['large']['height'] + }) + except KeyError: continue + elif source_choice == "Pexels": + for v in results: + try: + # 尋找合適的影片檔案連結 + video_link_hd = next((f['link'] for f in v['video_files'] if f.get('quality') == 'hd'), None) + video_link_sd = next((f['link'] for f in v['video_files'] if f.get('quality') == 'sd'), None) + + # 優先使用 HD 畫質,若無則用 SD,再沒有就用第一個 + final_video_url = video_link_hd or video_link_sd or v['video_files'][0]['link'] + + standardized_results.append({ + 'id': f"pexels-{v['id']}", + 'thumbnail_url': v['image'], + 'video_url': final_video_url, + 'preview_url': video_link_sd or final_video_url, # 預覽用 SD 或更高畫質 + 'width': v['width'], + 'height': v['height'] + }) + except (KeyError, IndexError): continue + + st.session_state.search_results = standardized_results + st.session_state.selected_videos = {str(v['id']): {"url": v['video_url'], "selected": False} for v in standardized_results} + st.session_state.active_preview_id = None + + if keywords and (keywords[1] or keywords[2]): + st.caption(f"建議關鍵字: `{keywords[1]}` `{keywords[2]}`") + + st.divider() + + if st.session_state.search_results: + st.subheader("5.2 選擇影片並下載") + st.button("📥 下載選取的影片到專案素材庫", on_click=callbacks.callback_download_videos, args=(paths,), help="將下方勾選的影片下載至目前專案的 `output/test` 資料夾,並自動循序命名。") + + num_cols = 5 + cols = st.columns(num_cols) + # 【修改】使用標準化後的結果進行渲染 + for i, video in enumerate(st.session_state.search_results): + with cols[i % num_cols]: + with st.container(border=True): + media_placeholder = st.empty() + video_id_str = str(video['id']) + + if st.session_state.active_preview_id == video_id_str: + media_placeholder.video(video['preview_url']) + preview_button_text, preview_button_type = "⏹️ 停止預覽", "secondary" + else: + media_placeholder.image(video['thumbnail_url'], use_container_width=True) + preview_button_text, preview_button_type = "▶️ 預覽", "primary" + + st.caption(f"ID: {video_id_str}") + st.caption(f"尺寸: {video['width']}x{video['height']}") + + is_selected = st.checkbox("選取", key=f"select_{video_id_str}", value=st.session_state.selected_videos.get(video_id_str, {}).get('selected', False)) + if video_id_str in st.session_state.selected_videos: + st.session_state.selected_videos[video_id_str]['selected'] = is_selected + + st.button(preview_button_text, key=f"preview_{video_id_str}", on_click=callbacks.callback_toggle_preview, args=(video_id_str,), use_container_width=True, type=preview_button_type) \ No newline at end of file diff --git a/ui_fragments/tab3_video_composition.py b/ui_fragments/tab3_video_composition.py new file mode 100644 index 0000000..415e9ab --- /dev/null +++ b/ui_fragments/tab3_video_composition.py @@ -0,0 +1,124 @@ + +import streamlit as st +from pathlib import Path +from config import SHARED_ASSETS_DIR + +@st.fragment +def render_tab(project, callbacks): + """渲染「最終影片合成」標籤頁的 UI。""" + + _render_video_management_section(project, callbacks) + _render_shared_asset_uploader(callbacks) + _render_shared_asset_selection(callbacks) + + st.divider() + +@st.fragment +def _render_video_management_section(project, callbacks): + """ + 一個私有的輔助函式,用於渲染影片管理的 UI 部分。 + """ + with st.expander("🎬 管理專案影片素材", expanded=False): + paths = project.paths + output_dir = paths.get("output", Path("./output")) # 使用 .get 提供預設值 + video_files = [] + + if output_dir.exists(): + video_files = sorted( + [f for f in output_dir.iterdir() if f.suffix.lower() in ['.mp4', '.mov']], + key=lambda f: f.stat().st_mtime, + reverse=True + ) + + if not video_files: + st.info("專案輸出資料夾 (`/output`) 中目前沒有影片檔。") + else: + st.markdown("勾選您想要刪除的影片,然後點擊下方的按鈕。") + + st.checkbox( + "全選/取消全選", + key="select_all_videos", + on_change=callbacks.toggle_all_video_checkboxes, + args=(video_files,) + ) + + for video_file in video_files: + file_size_mb = video_file.stat().st_size / (1024 * 1024) + st.checkbox( + f"**{video_file.name}** ({file_size_mb:.2f} MB)", + key=f"delete_cb_{video_file.name}" + ) + + st.button( + "🟥 確認刪除選取的影片", + on_click=callbacks.delete_selected_videos, + use_container_width=True, + type="primary" + ) + +def _render_shared_asset_selection(callbacks): + """ + 一個私有的輔助函式,用於渲染共享素材選擇和上傳的 UI。 + """ + st.subheader("步驟 3.2: 選擇共享影片素材") + st.subheader("輔助工具") + + + # 【改動 1】更健壯的檔案列表讀取方式 + try: + shared_videos = [""] + sorted( + [f.name for f in SHARED_ASSETS_DIR.glob('*') if f.suffix.lower() in ['.mp4', '.mov']] + ) + except Exception as e: + st.error(f"無法讀取共享素材庫 '{SHARED_ASSETS_DIR}': {e}") + shared_videos = [""] + + with st.container(border=True): + st.markdown("##### 從共享素材庫中選擇影片") + c1, c2 = st.columns(2) + with c1: + logo_selection = st.selectbox("選擇 Logo 影片:", shared_videos, key="logo_select") + open_selection = st.selectbox("選擇開場影片:", shared_videos, key="open_select") + with c2: + end_selection = st.selectbox("選擇結尾影片:", shared_videos, key="end_select") + + st.subheader("步驟 3.2: 執行最終影片合成") + all_videos_selected = all([logo_selection, open_selection, end_selection]) + + if not all_videos_selected: + st.info("請從上方的下拉選單中選擇所有 Logo、開場和結尾影片以啟用合成按鈕。") + + + # 【核心改動】st.button 的呼叫現在非常簡潔 + st.button( + "🎬 合成最終影片", + on_click=callbacks.assemble_final_video, + kwargs={ + # 只傳遞 UI 直接相關的、最少的資訊 (檔名) + "logo_video_name": logo_selection, + "open_video_name": open_selection, + "end_video_name": end_selection + }, + disabled=not all_videos_selected, + use_container_width=True + ) + + + # 將選擇的結果回傳給主函式 + return logo_selection, open_selection, end_selection + +def _render_shared_asset_uploader(callbacks): + """一個私有的輔助函式,用於渲染共享素材上傳工具。""" + with st.expander("📂 管理與上傳共享素材", expanded=False): + st.markdown("這裡上傳的影片將存入 `shared_assets` 公共資料夾,可供所有專案重複使用。") + + st.file_uploader( + "上傳新的影片到共享素材庫", + type=["mp4", "mov"], + accept_multiple_files=True, + label_visibility="collapsed", + key="shared_video_uploader", + on_change=callbacks.upload_shared_videos + ) + + \ No newline at end of file diff --git a/ui_fragments/video_manager.py b/ui_fragments/video_manager.py index 6994908..19cd429 100644 --- a/ui_fragments/video_manager.py +++ b/ui_fragments/video_manager.py @@ -1,5 +1,5 @@ import streamlit as st -from utils.callbacks import toggle_all_video_checkboxes, callback_delete_selected_videos +from callbacks import toggle_all_video_checkboxes, callback_delete_selected_videos @st.fragment def video_management_fragment(paths): diff --git a/ui_fragments/video_search.py b/ui_fragments/video_search.py index 320f89f..290a2e4 100644 --- a/ui_fragments/video_search.py +++ b/ui_fragments/video_search.py @@ -1,6 +1,6 @@ import streamlit as st from utils.helpers import analyze_ass_for_keywords, search_pixabay_videos, search_pexels_videos -from utils.callbacks import callback_download_videos, callback_toggle_preview +from callbacks import callback_download_videos, callback_toggle_preview @st.fragment diff --git a/utils/asset_manager.py b/utils/asset_manager.py new file mode 100644 index 0000000..a4ee9ec --- /dev/null +++ b/utils/asset_manager.py @@ -0,0 +1,45 @@ +# utils/asset_manager.py + +import streamlit as st +from pathlib import Path +from config import SHARED_ASSETS_DIR + + +def list_shared_videos() -> list[str]: + """列出所有共享影片的檔案名稱。""" + try: + if not SHARED_ASSETS_DIR.exists(): + return [] + + return sorted( + [f.name for f in SHARED_ASSETS_DIR.glob('*') if f.suffix.lower() in ['.mp4', '.mov']] + ) + except Exception as e: + # 可以在這裡記錄錯誤日誌 + st.error(f"讀取共享素材庫時出錯: {e}") + return [] + +def save_uploaded_shared_videos(uploaded_files: list) -> tuple[int, list[str]]: + """ + 將上傳的檔案儲存到共享素材庫。 + 回傳 (成功數量, 錯誤訊息列表)。 + """ + if not uploaded_files: + return 0, [] + + saved_count = 0 + errors = [] + + try: + SHARED_ASSETS_DIR.mkdir(parents=True, exist_ok=True) + for uploaded_file in uploaded_files: + try: + with open(SHARED_ASSETS_DIR / uploaded_file.name, "wb") as f: + f.write(uploaded_file.getbuffer()) + saved_count += 1 + except Exception as e: + errors.append(f"儲存 '{uploaded_file.name}' 時出錯: {e}") + except Exception as e: + errors.append(f"無法建立共享資料夾 '{SHARED_ASSETS_DIR}': {e}") + + return saved_count, errors diff --git a/utils/callbacks.py b/utils/callbacks.py deleted file mode 100644 index 73875b4..0000000 --- a/utils/callbacks.py +++ /dev/null @@ -1,251 +0,0 @@ -import streamlit as st -from pathlib import Path -from scripts.step1_notion_sync import create_project_from_page -from utils.helpers import calculate_loop_count -from scripts.step6_assemble_video import run_step6_assemble_video -from utils.paths import PROJECTS_DIR -import requests - -# --- 專案與流程控制 Callbacks --- -def callback_set_project(): - st.session_state.current_project = st.session_state.project_selector - st.session_state.final_video_path = None - st.session_state.show_video = False - st.session_state.operation_status = {} - st.session_state.active_preview_id = None - -def callback_create_project(): - page_id = st.session_state.page_titles_map[st.session_state.selected_title] - with st.spinner("正在建立專案..."): - success, msg, new_project_name = create_project_from_page(api_key=st.secrets.get("NOTION_API_KEY"), page_id=page_id, project_name=st.session_state.selected_title, projects_dir=PROJECTS_DIR) - if success: - st.session_state.current_project = new_project_name - st.session_state.selected_title = "" - st.session_state.operation_status = {"success": success, "message": msg, "source": "create_project"} - -def callback_assemble_final_video(paths, **kwargs): - """ - 影片合成的專用「準備」回呼。 - 它會先計算循環次數 p,然後將任務轉交給通用的 callback_run_step。 - """ - # 步驟 1: 執行前置計算 - st.write("⏳ 正在計算影片循環次數...") # 提供即時回饋 - audio_path = paths["combined_audio"] - video_folder = paths["output"] / "test" - - p, error_msg = calculate_loop_count(audio_path, video_folder, 1.0) - - # 步驟 2: 檢查前置計算的結果 - if error_msg: - st.session_state.operation_status = {"success": False, "message": f"❌ 無法合成影片: {error_msg}", "source": "assemble_video"} - return # 如果計算失敗,直接終止流程 - - # 步驟 3: 將計算結果加入到參數中,並轉交給通用執行器 - kwargs["p_loop_count"] = p - - # 現在,我們呼叫標準的執行回呼,讓它來處理 spinner 和後續流程 - callback_run_step(run_step6_assemble_video, **kwargs) - -def callback_run_step(step_function, source="unknown", **kwargs): - spinner_text = kwargs.pop("spinner_text", "正在處理中,請稍候...") - with st.spinner(spinner_text): - result = step_function(**kwargs) - if len(result) == 2: success, msg = result - elif len(result) == 3: success, msg, _ = result - else: success, msg = False, "步驟函式回傳格式不符!" - st.session_state.operation_status = {"success": success, "message": msg, "source": source} - if step_function == run_step6_assemble_video and success: - st.session_state.final_video_path = str(result[2]) - st.session_state.show_video = True - -def callback_delete_selected_videos(paths: dict): - output_dir = paths["output"] - deleted_files_count = 0 - errors = [] - - files_to_delete = [] - for key, value in st.session_state.items(): - if key.startswith("delete_cb_") and value: - filename = key.replace("delete_cb_", "", 1) - files_to_delete.append(filename) - - if not files_to_delete: - st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的影片。", "source": "delete_videos"} - return - - for filename in files_to_delete: - try: - file_path = output_dir / filename - if file_path.exists(): - if st.session_state.get("final_video_path") and Path(st.session_state.final_video_path) == file_path: - st.session_state.final_video_path = None - st.session_state.show_video = False - - file_path.unlink() - deleted_files_count += 1 - del st.session_state[f"delete_cb_{filename}"] - except Exception as e: - errors.append(f"刪除 {filename} 時出錯: {e}") - - if errors: - message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) - st.session_state.operation_status = {"success": False, "message": message, "source": "delete_videos"} - else: - message = f"✅ 成功刪除 {deleted_files_count} 個影片。" - st.session_state.operation_status = {"success": True, "message": message, "source": "delete_videos"} - -def toggle_all_video_checkboxes(video_files): - select_all_state = st.session_state.get('select_all_videos', False) - for video_file in video_files: - st.session_state[f"delete_cb_{video_file.name}"] = select_all_state - -def callback_delete_selected_audios(paths: dict): - deleted_files_count = 0 - errors = [] - - files_to_delete_names = [] - for key, value in st.session_state.items(): - if key.startswith("delete_audio_cb_") and value: - filename = key.replace("delete_audio_cb_", "", 1) - files_to_delete_names.append(filename) - - if not files_to_delete_names: - st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的音訊。", "source": "delete_audios"} - return - - audio_dir = paths["audio"] - output_dir = paths["output"] - - for filename in files_to_delete_names: - try: - file_path_audio = audio_dir / filename - file_path_output = output_dir / filename - - file_path_to_delete = None - if file_path_audio.exists(): - file_path_to_delete = file_path_audio - elif file_path_output.exists(): - file_path_to_delete = file_path_output - - if file_path_to_delete: - file_path_to_delete.unlink() - deleted_files_count += 1 - del st.session_state[f"delete_audio_cb_{filename}"] - else: - errors.append(f"找不到檔案 {filename}。") - - except Exception as e: - errors.append(f"刪除 {filename} 時出錯: {e}") - - if errors: - message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) - st.session_state.operation_status = {"success": False, "message": message, "source": "delete_audios"} - else: - message = f"✅ 成功刪除 {deleted_files_count} 個音訊。" - st.session_state.operation_status = {"success": True, "message": message, "source": "delete_audios"} - -def toggle_all_audio_checkboxes(audio_files): - select_all_state = st.session_state.get('select_all_audios', False) - for audio_file in audio_files: - st.session_state[f"delete_audio_cb_{audio_file.name}"] = select_all_state - -def callback_toggle_preview(video_id): - if st.session_state.active_preview_id == video_id: st.session_state.active_preview_id = None - else: st.session_state.active_preview_id = video_id - -def callback_download_videos(paths: dict): - with st.spinner("正在下載影片,請稍候..."): - videos_to_download = {vid: info for vid, info in st.session_state.selected_videos.items() if info['selected']} - if not videos_to_download: - st.session_state.operation_status = {"success": False, "message": "尚未選擇任何影片。", "source": "download_videos"} - return - - download_dir = paths["output"] / "test" - download_dir.mkdir(parents=True, exist_ok=True) - - # --- 循序命名邏輯 START --- - existing_files = list(download_dir.glob('*.mp4')) + list(download_dir.glob('*.mov')) - existing_numbers = [] - for f in existing_files: - try: - # 從檔名 (不含副檔名) 中提取數字 - existing_numbers.append(int(f.stem)) - except ValueError: - # 忽略那些不是純數字的檔名 - continue - - # 決定起始編號 - start_counter = max(existing_numbers) + 1 if existing_numbers else 1 - counter = start_counter - # --- 循序命名邏輯 END --- - - total, download_count, errors = len(videos_to_download), 0, [] - - for i, (video_id, info) in enumerate(videos_to_download.items()): - url = info['url'] - original_path = Path(url) - - # 使用循序命名 - new_video_name = f"{counter}{original_path.suffix}" - save_path = download_dir / new_video_name - - try: - if not save_path.exists(): - response = requests.get(url, stream=True) - response.raise_for_status() - with open(save_path, 'wb') as f: - f.write(response.content) - download_count += 1 - counter += 1 # 只有在成功下載後才增加計數器 - except Exception as e: - errors.append(f"下載影片 {original_path.name} 失敗: {e}") - - if errors: - final_message = f"任務完成,但有 {len(errors)} 個錯誤。成功處理 {download_count}/{total} 個影片。\n" + "\n".join(errors) - st.session_state.operation_status = {"success": False, "message": final_message, "source": "download_videos"} - else: - final_message = f"任務完成!成功下載 {download_count}/{total} 個影片到專案的 `output/test` 資料夾,並已自動循序命名。" - st.session_state.operation_status = {"success": True, "message": final_message, "source": "download_videos"} - -@st.fragment -def video_management_fragment(paths: dict): - """A self-contained fragment for managing and deleting project videos.""" - with st.expander("🎬 管理專案影片素材"): - output_dir = paths["output"] - video_files = [] - if output_dir.exists(): - video_files = sorted( - [f for f in output_dir.iterdir() if f.suffix.lower() in ['.mp4', '.mov']], - key=lambda f: f.stat().st_mtime, - reverse=True - ) - - if not video_files: - st.info("專案輸出資料夾 (`/output`) 中目前沒有影片檔。") - else: - all_selected = all(st.session_state.get(f"delete_cb_{f.name}", False) for f in video_files) - if st.session_state.get('select_all_videos', False) != all_selected: - st.session_state.select_all_videos = all_selected - - st.markdown("勾選您想要刪除的影片,然後點擊下方的按鈕。") - - st.checkbox( - "全選/取消全選", - key="select_all_videos", - on_change=toggle_all_video_checkboxes, - args=(video_files,) - ) - - with st.form("delete_videos_form"): - for video_file in video_files: - file_size_mb = video_file.stat().st_size / (1024 * 1024) - st.checkbox( - f"**{video_file.name}** ({file_size_mb:.2f} MB)", - key=f"delete_cb_{video_file.name}" - ) - - submitted = st.form_submit_button("🟥 確認刪除選取的影片", use_container_width=True, type="primary") - - if submitted: - callback_delete_selected_videos(paths) - st.rerun(scope="fragment") diff --git a/utils/helpers.py b/utils/helpers.py index a6243d8..1a532dc 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -6,7 +6,7 @@ from pathlib import Path from notion_client import Client from notion_client.helpers import iterate_paginated_api import subprocess -import math + import librosa def get_media_info(media_path: Path) -> dict: @@ -42,48 +42,26 @@ def get_media_duration(file_path: Path) -> float | None: print(f"無法讀取檔案 {file_path} 的時長: {e}") return None -# 【新增】計算循環次數 p 的函式 -def calculate_loop_count(audio_path: Path, video_folder: Path, transition_duration: float) -> tuple[int | None, str | None]: + +def display_operation_status(): """ - 根據音訊長度和一系列影片,計算所需的最小循環次數 p。 - 成功時回傳 (p, None),失敗時回傳 (None, error_message)。 + 檢查 session_state 中是否有操作狀態訊息, + 並使用 st.toast 顯示它,然後清除。 """ - m_prime = get_media_duration(audio_path) - if m_prime is None: - return None, "無法讀取音訊檔案長度。" - - video_paths = [p for p in video_folder.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']] - video_lengths = [get_media_duration(p) for p in video_paths] - video_lengths = [l for l in video_lengths if l is not None and l > 0] - - if not video_lengths: - return None, "在 `test` 資料夾中找不到有效的影片檔案。" - - n = len(video_lengths) - m = sum(video_lengths) - tr = transition_duration - - denominator = m - (n - 1) * tr - - if denominator <= 0: - return None, f"影片的有效長度 ({denominator:.2f}s) 小於或等於零,無法進行循環。請增加影片時長或減少轉場時間。" - - p = math.ceil(m_prime / denominator) - return p, None -def display_global_status_message(): - """ - 檢查會話狀態中是否存在任何操作狀態,如果存在則顯示它。 - 顯示後會立即清除狀態,以確保訊息只顯示一次。 - """ - if 'operation_status' in st.session_state and st.session_state.operation_status: + if "operation_status" in st.session_state and st.session_state.operation_status: status = st.session_state.operation_status - # 根據成功與否,選擇對應的訊息元件 - if status.get("success"): - st.success(status["message"], icon="✅") + message_type = status.get("type", "info") + message = status.get("message", "") + + if message_type == "success": + st.toast(f"✅ {message}") + elif message_type == "error": + st.toast(f"❌ {message}") else: - st.error(status["message"], icon="❌") - # 清除狀態,避免在下一次刷新時重複顯示 - st.session_state.operation_status = {} + st.toast(f"ℹ️ {message}") + + # 清除狀態,防止重複顯示 + del st.session_state.operation_status def analyze_ass_for_keywords(paths: dict) -> list: diff --git a/utils/paths.py b/utils/paths.py index 4fed02e..1a2f55e 100644 --- a/utils/paths.py +++ b/utils/paths.py @@ -7,15 +7,35 @@ PROJECTS_DIR.mkdir(exist_ok=True) SHARED_ASSETS_DIR.mkdir(exist_ok=True) def get_project_paths(project_name: str) -> dict: - """為給定專案回傳一個包含所有重要路徑的字典。""" + """ + 為給定專案回傳一個包含所有重要路徑的字典。 + 這是一個純粹的、無副作用的函式,是路徑規則的「單一事實來源」。 + """ if not project_name: return {} + project_root = PROJECTS_DIR / project_name + output_dir = project_root / "output" # 將所有生成物放在 output 資料夾 + return { "root": project_root, "data": project_root / "data.json", - "audio": project_root / "audio", - "output": project_root / "output", - "combined_audio": project_root / "output" / "combined_audio.wav", - "ass_file": project_root / "output" / f"{project_name}.ass" - } \ No newline at end of file + "audio": project_root / "audio", # 原始音訊片段 + "output": output_dir, # 輸出的根目錄 + "temp_video":output_dir / "temp_video", + "combined_audio": output_dir / "combined_audio.wav", + "ass_file": output_dir / "subtitles.ass", + "final_video": output_dir / f"{project_name}_final_video.mp4" + } +def get_project_list() -> list[str]: + """ + 掃描專案根目錄並回傳所有專案名稱的列表。 + 這將成為獲取專案列表的「單一事實來源」。 + """ + if not PROJECTS_DIR.is_dir(): + PROJECTS_DIR.mkdir() # 如果根目錄不存在,就建立它 + return [] + # 確保只回傳目錄,並過濾掉像 .DS_Store 這樣的隱藏檔案 + project_names = [d.name for d in PROJECTS_DIR.iterdir() if d.is_dir() and not d.name.startswith('.')] + project_names.sort() # 排序以獲得一致的顯示順序 + return project_names \ No newline at end of file