import streamlit as st from pathlib import Path from scripts.step1_notion_sync import create_project_from_page from utils.helpers import calculate_loop_count from scripts.step6_assemble_video import run_step6_assemble_video from utils.paths import PROJECTS_DIR import requests # --- 專案與流程控制 Callbacks --- def callback_set_project(): st.session_state.current_project = st.session_state.project_selector st.session_state.final_video_path = None st.session_state.show_video = False st.session_state.operation_status = {} st.session_state.active_preview_id = None def callback_create_project(): page_id = st.session_state.page_titles_map[st.session_state.selected_title] with st.spinner("正在建立專案..."): success, msg, new_project_name = create_project_from_page(api_key=st.secrets.get("NOTION_API_KEY"), page_id=page_id, project_name=st.session_state.selected_title, projects_dir=PROJECTS_DIR) if success: st.session_state.current_project = new_project_name st.session_state.selected_title = "" st.session_state.operation_status = {"success": success, "message": msg, "source": "create_project"} def callback_assemble_final_video(paths, **kwargs): """ 影片合成的專用「準備」回呼。 它會先計算循環次數 p,然後將任務轉交給通用的 callback_run_step。 """ # 步驟 1: 執行前置計算 st.write("⏳ 正在計算影片循環次數...") # 提供即時回饋 audio_path = paths["combined_audio"] video_folder = paths["output"] / "test" p, error_msg = calculate_loop_count(audio_path, video_folder, 1.0) # 步驟 2: 檢查前置計算的結果 if error_msg: st.session_state.operation_status = {"success": False, "message": f"❌ 無法合成影片: {error_msg}", "source": "assemble_video"} return # 如果計算失敗,直接終止流程 # 步驟 3: 將計算結果加入到參數中,並轉交給通用執行器 kwargs["p_loop_count"] = p # 現在,我們呼叫標準的執行回呼,讓它來處理 spinner 和後續流程 callback_run_step(run_step6_assemble_video, **kwargs) def callback_run_step(step_function, source="unknown", **kwargs): spinner_text = kwargs.pop("spinner_text", "正在處理中,請稍候...") with st.spinner(spinner_text): result = step_function(**kwargs) if len(result) == 2: success, msg = result elif len(result) == 3: success, msg, _ = result else: success, msg = False, "步驟函式回傳格式不符!" st.session_state.operation_status = {"success": success, "message": msg, "source": source} if step_function == run_step6_assemble_video and success: st.session_state.final_video_path = str(result[2]) st.session_state.show_video = True def callback_delete_selected_videos(paths: dict): output_dir = paths["output"] deleted_files_count = 0 errors = [] files_to_delete = [] for key, value in st.session_state.items(): if key.startswith("delete_cb_") and value: filename = key.replace("delete_cb_", "", 1) files_to_delete.append(filename) if not files_to_delete: st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的影片。", "source": "delete_videos"} return for filename in files_to_delete: try: file_path = output_dir / filename if file_path.exists(): if st.session_state.get("final_video_path") and Path(st.session_state.final_video_path) == file_path: st.session_state.final_video_path = None st.session_state.show_video = False file_path.unlink() deleted_files_count += 1 del st.session_state[f"delete_cb_{filename}"] except Exception as e: errors.append(f"刪除 {filename} 時出錯: {e}") if errors: message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) st.session_state.operation_status = {"success": False, "message": message, "source": "delete_videos"} else: message = f"✅ 成功刪除 {deleted_files_count} 個影片。" st.session_state.operation_status = {"success": True, "message": message, "source": "delete_videos"} def toggle_all_video_checkboxes(video_files): select_all_state = st.session_state.get('select_all_videos', False) for video_file in video_files: st.session_state[f"delete_cb_{video_file.name}"] = select_all_state def callback_delete_selected_audios(paths: dict): deleted_files_count = 0 errors = [] files_to_delete_names = [] for key, value in st.session_state.items(): if key.startswith("delete_audio_cb_") and value: filename = key.replace("delete_audio_cb_", "", 1) files_to_delete_names.append(filename) if not files_to_delete_names: st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的音訊。", "source": "delete_audios"} return audio_dir = paths["audio"] output_dir = paths["output"] for filename in files_to_delete_names: try: file_path_audio = audio_dir / filename file_path_output = output_dir / filename file_path_to_delete = None if file_path_audio.exists(): file_path_to_delete = file_path_audio elif file_path_output.exists(): file_path_to_delete = file_path_output if file_path_to_delete: file_path_to_delete.unlink() deleted_files_count += 1 del st.session_state[f"delete_audio_cb_{filename}"] else: errors.append(f"找不到檔案 {filename}。") except Exception as e: errors.append(f"刪除 {filename} 時出錯: {e}") if errors: message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors) st.session_state.operation_status = {"success": False, "message": message, "source": "delete_audios"} else: message = f"✅ 成功刪除 {deleted_files_count} 個音訊。" st.session_state.operation_status = {"success": True, "message": message, "source": "delete_audios"} def toggle_all_audio_checkboxes(audio_files): select_all_state = st.session_state.get('select_all_audios', False) for audio_file in audio_files: st.session_state[f"delete_audio_cb_{audio_file.name}"] = select_all_state def callback_toggle_preview(video_id): if st.session_state.active_preview_id == video_id: st.session_state.active_preview_id = None else: st.session_state.active_preview_id = video_id def callback_download_videos(paths: dict): with st.spinner("正在下載影片,請稍候..."): videos_to_download = {vid: info for vid, info in st.session_state.selected_videos.items() if info['selected']} if not videos_to_download: st.session_state.operation_status = {"success": False, "message": "尚未選擇任何影片。", "source": "download_videos"} return download_dir = paths["output"] / "test" download_dir.mkdir(parents=True, exist_ok=True) # --- 循序命名邏輯 START --- existing_files = list(download_dir.glob('*.mp4')) + list(download_dir.glob('*.mov')) existing_numbers = [] for f in existing_files: try: # 從檔名 (不含副檔名) 中提取數字 existing_numbers.append(int(f.stem)) except ValueError: # 忽略那些不是純數字的檔名 continue # 決定起始編號 start_counter = max(existing_numbers) + 1 if existing_numbers else 1 counter = start_counter # --- 循序命名邏輯 END --- total, download_count, errors = len(videos_to_download), 0, [] for i, (video_id, info) in enumerate(videos_to_download.items()): url = info['url'] original_path = Path(url) # 使用循序命名 new_video_name = f"{counter}{original_path.suffix}" save_path = download_dir / new_video_name try: if not save_path.exists(): response = requests.get(url, stream=True) response.raise_for_status() with open(save_path, 'wb') as f: f.write(response.content) download_count += 1 counter += 1 # 只有在成功下載後才增加計數器 except Exception as e: errors.append(f"下載影片 {original_path.name} 失敗: {e}") if errors: final_message = f"任務完成,但有 {len(errors)} 個錯誤。成功處理 {download_count}/{total} 個影片。\n" + "\n".join(errors) st.session_state.operation_status = {"success": False, "message": final_message, "source": "download_videos"} else: final_message = f"任務完成!成功下載 {download_count}/{total} 個影片到專案的 `output/test` 資料夾,並已自動循序命名。" st.session_state.operation_status = {"success": True, "message": final_message, "source": "download_videos"} @st.fragment def video_management_fragment(paths: dict): """A self-contained fragment for managing and deleting project videos.""" with st.expander("🎬 管理專案影片素材"): output_dir = paths["output"] video_files = [] if output_dir.exists(): video_files = sorted( [f for f in output_dir.iterdir() if f.suffix.lower() in ['.mp4', '.mov']], key=lambda f: f.stat().st_mtime, reverse=True ) if not video_files: st.info("專案輸出資料夾 (`/output`) 中目前沒有影片檔。") else: all_selected = all(st.session_state.get(f"delete_cb_{f.name}", False) for f in video_files) if st.session_state.get('select_all_videos', False) != all_selected: st.session_state.select_all_videos = all_selected st.markdown("勾選您想要刪除的影片,然後點擊下方的按鈕。") st.checkbox( "全選/取消全選", key="select_all_videos", on_change=toggle_all_video_checkboxes, args=(video_files,) ) with st.form("delete_videos_form"): for video_file in video_files: file_size_mb = video_file.stat().st_size / (1024 * 1024) st.checkbox( f"**{video_file.name}** ({file_size_mb:.2f} MB)", key=f"delete_cb_{video_file.name}" ) submitted = st.form_submit_button("🟥 確認刪除選取的影片", use_container_width=True, type="primary") if submitted: callback_delete_selected_videos(paths) st.rerun(scope="fragment")