Files
gain/utils/callbacks.py
2025-07-08 15:27:03 +08:00

252 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
from pathlib import Path
from scripts.step1_notion_sync import create_project_from_page
from utils.helpers import calculate_loop_count
from scripts.step6_assemble_video import run_step6_assemble_video
from utils.paths import PROJECTS_DIR
import requests
# --- 專案與流程控制 Callbacks ---
def callback_set_project():
st.session_state.current_project = st.session_state.project_selector
st.session_state.final_video_path = None
st.session_state.show_video = False
st.session_state.operation_status = {}
st.session_state.active_preview_id = None
def callback_create_project():
page_id = st.session_state.page_titles_map[st.session_state.selected_title]
with st.spinner("正在建立專案..."):
success, msg, new_project_name = create_project_from_page(api_key=st.secrets.get("NOTION_API_KEY"), page_id=page_id, project_name=st.session_state.selected_title, projects_dir=PROJECTS_DIR)
if success:
st.session_state.current_project = new_project_name
st.session_state.selected_title = ""
st.session_state.operation_status = {"success": success, "message": msg, "source": "create_project"}
def callback_assemble_final_video(paths, **kwargs):
"""
影片合成的專用「準備」回呼。
它會先計算循環次數 p然後將任務轉交給通用的 callback_run_step。
"""
# 步驟 1: 執行前置計算
st.write("⏳ 正在計算影片循環次數...") # 提供即時回饋
audio_path = paths["combined_audio"]
video_folder = paths["output"] / "test"
p, error_msg = calculate_loop_count(audio_path, video_folder, 1.0)
# 步驟 2: 檢查前置計算的結果
if error_msg:
st.session_state.operation_status = {"success": False, "message": f"❌ 無法合成影片: {error_msg}", "source": "assemble_video"}
return # 如果計算失敗,直接終止流程
# 步驟 3: 將計算結果加入到參數中,並轉交給通用執行器
kwargs["p_loop_count"] = p
# 現在,我們呼叫標準的執行回呼,讓它來處理 spinner 和後續流程
callback_run_step(run_step6_assemble_video, **kwargs)
def callback_run_step(step_function, source="unknown", **kwargs):
spinner_text = kwargs.pop("spinner_text", "正在處理中,請稍候...")
with st.spinner(spinner_text):
result = step_function(**kwargs)
if len(result) == 2: success, msg = result
elif len(result) == 3: success, msg, _ = result
else: success, msg = False, "步驟函式回傳格式不符!"
st.session_state.operation_status = {"success": success, "message": msg, "source": source}
if step_function == run_step6_assemble_video and success:
st.session_state.final_video_path = str(result[2])
st.session_state.show_video = True
def callback_delete_selected_videos(paths: dict):
output_dir = paths["output"]
deleted_files_count = 0
errors = []
files_to_delete = []
for key, value in st.session_state.items():
if key.startswith("delete_cb_") and value:
filename = key.replace("delete_cb_", "", 1)
files_to_delete.append(filename)
if not files_to_delete:
st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的影片。", "source": "delete_videos"}
return
for filename in files_to_delete:
try:
file_path = output_dir / filename
if file_path.exists():
if st.session_state.get("final_video_path") and Path(st.session_state.final_video_path) == file_path:
st.session_state.final_video_path = None
st.session_state.show_video = False
file_path.unlink()
deleted_files_count += 1
del st.session_state[f"delete_cb_{filename}"]
except Exception as e:
errors.append(f"刪除 {filename} 時出錯: {e}")
if errors:
message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors)
st.session_state.operation_status = {"success": False, "message": message, "source": "delete_videos"}
else:
message = f"✅ 成功刪除 {deleted_files_count} 個影片。"
st.session_state.operation_status = {"success": True, "message": message, "source": "delete_videos"}
def toggle_all_video_checkboxes(video_files):
select_all_state = st.session_state.get('select_all_videos', False)
for video_file in video_files:
st.session_state[f"delete_cb_{video_file.name}"] = select_all_state
def callback_delete_selected_audios(paths: dict):
deleted_files_count = 0
errors = []
files_to_delete_names = []
for key, value in st.session_state.items():
if key.startswith("delete_audio_cb_") and value:
filename = key.replace("delete_audio_cb_", "", 1)
files_to_delete_names.append(filename)
if not files_to_delete_names:
st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的音訊。", "source": "delete_audios"}
return
audio_dir = paths["audio"]
output_dir = paths["output"]
for filename in files_to_delete_names:
try:
file_path_audio = audio_dir / filename
file_path_output = output_dir / filename
file_path_to_delete = None
if file_path_audio.exists():
file_path_to_delete = file_path_audio
elif file_path_output.exists():
file_path_to_delete = file_path_output
if file_path_to_delete:
file_path_to_delete.unlink()
deleted_files_count += 1
del st.session_state[f"delete_audio_cb_{filename}"]
else:
errors.append(f"找不到檔案 {filename}")
except Exception as e:
errors.append(f"刪除 {filename} 時出錯: {e}")
if errors:
message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors)
st.session_state.operation_status = {"success": False, "message": message, "source": "delete_audios"}
else:
message = f"✅ 成功刪除 {deleted_files_count} 個音訊。"
st.session_state.operation_status = {"success": True, "message": message, "source": "delete_audios"}
def toggle_all_audio_checkboxes(audio_files):
select_all_state = st.session_state.get('select_all_audios', False)
for audio_file in audio_files:
st.session_state[f"delete_audio_cb_{audio_file.name}"] = select_all_state
def callback_toggle_preview(video_id):
if st.session_state.active_preview_id == video_id: st.session_state.active_preview_id = None
else: st.session_state.active_preview_id = video_id
def callback_download_videos(paths: dict):
with st.spinner("正在下載影片,請稍候..."):
videos_to_download = {vid: info for vid, info in st.session_state.selected_videos.items() if info['selected']}
if not videos_to_download:
st.session_state.operation_status = {"success": False, "message": "尚未選擇任何影片。", "source": "download_videos"}
return
download_dir = paths["output"] / "test"
download_dir.mkdir(parents=True, exist_ok=True)
# --- 循序命名邏輯 START ---
existing_files = list(download_dir.glob('*.mp4')) + list(download_dir.glob('*.mov'))
existing_numbers = []
for f in existing_files:
try:
# 從檔名 (不含副檔名) 中提取數字
existing_numbers.append(int(f.stem))
except ValueError:
# 忽略那些不是純數字的檔名
continue
# 決定起始編號
start_counter = max(existing_numbers) + 1 if existing_numbers else 1
counter = start_counter
# --- 循序命名邏輯 END ---
total, download_count, errors = len(videos_to_download), 0, []
for i, (video_id, info) in enumerate(videos_to_download.items()):
url = info['url']
original_path = Path(url)
# 使用循序命名
new_video_name = f"{counter}{original_path.suffix}"
save_path = download_dir / new_video_name
try:
if not save_path.exists():
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as f:
f.write(response.content)
download_count += 1
counter += 1 # 只有在成功下載後才增加計數器
except Exception as e:
errors.append(f"下載影片 {original_path.name} 失敗: {e}")
if errors:
final_message = f"任務完成,但有 {len(errors)} 個錯誤。成功處理 {download_count}/{total} 個影片。\n" + "\n".join(errors)
st.session_state.operation_status = {"success": False, "message": final_message, "source": "download_videos"}
else:
final_message = f"任務完成!成功下載 {download_count}/{total} 個影片到專案的 `output/test` 資料夾,並已自動循序命名。"
st.session_state.operation_status = {"success": True, "message": final_message, "source": "download_videos"}
@st.fragment
def video_management_fragment(paths: dict):
"""A self-contained fragment for managing and deleting project videos."""
with st.expander("🎬 管理專案影片素材"):
output_dir = paths["output"]
video_files = []
if output_dir.exists():
video_files = sorted(
[f for f in output_dir.iterdir() if f.suffix.lower() in ['.mp4', '.mov']],
key=lambda f: f.stat().st_mtime,
reverse=True
)
if not video_files:
st.info("專案輸出資料夾 (`/output`) 中目前沒有影片檔。")
else:
all_selected = all(st.session_state.get(f"delete_cb_{f.name}", False) for f in video_files)
if st.session_state.get('select_all_videos', False) != all_selected:
st.session_state.select_all_videos = all_selected
st.markdown("勾選您想要刪除的影片,然後點擊下方的按鈕。")
st.checkbox(
"全選/取消全選",
key="select_all_videos",
on_change=toggle_all_video_checkboxes,
args=(video_files,)
)
with st.form("delete_videos_form"):
for video_file in video_files:
file_size_mb = video_file.stat().st_size / (1024 * 1024)
st.checkbox(
f"**{video_file.name}** ({file_size_mb:.2f} MB)",
key=f"delete_cb_{video_file.name}"
)
submitted = st.form_submit_button("🟥 確認刪除選取的影片", use_container_width=True, type="primary")
if submitted:
callback_delete_selected_videos(paths)
st.rerun(scope="fragment")