Files
gain/callbacks.py
2025-07-22 13:39:46 +08:00

317 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
from pathlib import Path
# from scripts_.step1_notion_sync import create_project_from_page
# from scripts_.step6_assemble_video import run_step6_assemble_video
from utils.paths import PROJECTS_DIR ,get_project_list
from utils import asset_manager
import requests
from project_model import Project
from config import SHARED_ASSETS_DIR
# --- 專案與流程控制 Callbacks ---
def callback_set_project():
"""當使用者從下拉選單選擇一個現有專案時,設定當前的 Project 物件。"""
selected_name = st.session_state.get("project_selector")
if selected_name:
with st.spinner(f"正在載入專案 '{selected_name}'..."):
st.session_state.project = Project(project_name=selected_name)
# 【新】回報操作狀態
st.session_state.operation_status = {
"type": "success",
"message": f"專案 '{selected_name}' 已成功載入。"
}
else:
st.session_state.project = None
def callback_create_project():
"""從 Notion 頁面建立一個新專案的回呼函式。"""
project_name = st.session_state.get("selected_title")
if not project_name:
# 【改動】不再呼叫 st.warning而是設定狀態
st.session_state.operation_status = {"type": "warning", "message": "請先選擇一個 Notion 頁面。"}
return
if project_name in get_project_list():
# 【改動】不再呼叫 st.error而是設定狀態
st.session_state.operation_status = {
"type": "error",
"message": f"專案 '{project_name}' 已存在。請從下方的選單選擇它。"
}
st.session_state.project_selector = project_name
callback_set_project()
st.rerun() # 此處的 rerun 用於強制更新 selectbox是可接受的進階用法
return
page_titles_map = st.session_state.get("page_titles_map", {})
page_id_to_create = page_titles_map.get(project_name)
if not page_id_to_create:
st.session_state.operation_status = {
"type": "error",
"message": f"內部錯誤:找不到 '{project_name}' 的 Notion 頁面 ID。"
}
return
try:
new_project = Project.create_new(project_name=project_name, notion_page_id=page_id_to_create)
st.session_state.project = new_project
st.session_state.project_selector = new_project.name
# 【改動】不再呼叫 st.success而是設定狀態
st.session_state.operation_status = {
"type": "success",
"message": f"專案 '{project_name}' 的框架已成功建立!"
}
st.session_state.selected_title = ""
except Exception as e:
st.session_state.operation_status = {"type": "error", "message": f"建立專案時發生錯誤: {e}"}
def _execute_project_method(method_name: str, spinner_text: str):
"""(新) 輔助函式,用於執行 Project 方法並處理回傳的狀態。"""
project = st.session_state.get('project')
if not project:
st.session_state.operation_status = {"type": "warning", "message": "請先選擇一個專案。"}
return
method_to_call = getattr(project, method_name, None)
if not method_to_call:
st.session_state.operation_status = {"type": "error", "message": f"內部錯誤:找不到方法 {method_name}"}
return
with st.spinner(spinner_text):
method_to_call()
# --- 所有後續的 Callback 現在都變得極其簡潔 ---
def callback_sync_notion():
"""回呼:執行 Notion 同步。"""
_execute_project_method("sync_from_notion", "正在從 Notion 同步...")
def callback_add_zh_ipa():
"""回呼:執行 AI 加註(翻譯與 IPA"""
_execute_project_method("add_translation_and_ipa", "正在呼叫 AI 進行翻譯與音標加註...")
def callback_generate_sentence_audio():
"""回呼:生成音訊片段。"""
_execute_project_method("generate_sentence_audio", "正在生成音訊片段...")
def callback_concatenate_audio():
"""回呼:組合完整音訊。"""
_execute_project_method("concatenate_audio", "正在組合完整音訊...")
def callback_generate_subtitles():
"""回呼:生成 ASS 字幕檔。"""
_execute_project_method("generate_ass_subtitles", "正在生成 ASS 字幕檔...")
def assemble_final_video(logo_video_name, open_video_name, end_video_name):
"""
【新】回呼:執行最終影片的合成。
只接收檔名,內部處理路徑和業務邏輯。
"""
try:
project = st.session_state.get('project')
if not project:
# 如果找不到專案,立即更新狀態並終止函式,避免錯誤
st.session_state.operation_status = {
"type": "error",
"message": "操作失敗:找不到有效的專案。請重新選擇或建立一個專案。"
}
return # 提前退出函式
# 1. 在回呼函式內部,根據檔名組合出完整的路徑
logo_path = SHARED_ASSETS_DIR / logo_video_name if logo_video_name else None
open_path = SHARED_ASSETS_DIR / open_video_name if open_video_name else None
end_path = SHARED_ASSETS_DIR / end_video_name if end_video_name else None
# 2. 將核心業務邏輯委派給 Project 物件
# 我們將在這裡呼叫一個新的 project.assemble_video 方法
with st.spinner("影片合成中,這可能需要幾分鐘,請耐心等候..."):
project.assemble_video(
logo_video=logo_path,
open_video=open_path,
end_video=end_path
)
except Exception as e:
# 無論是上面的 ValueError還是 project.assemble_video 中未被捕捉的錯誤
error_message = f"執行影片合成時發生錯誤: {e}"
# 將統一格式的錯誤訊息回報給 UI讓使用者知道發生了什麼
st.session_state.operation_status = {
"type": "error",
"message": error_message
}
def toggle_all_video_checkboxes( video_files):
"""【新】切換所有影片核取方塊的選取狀態。"""
select_all_state = st.session_state.get('select_all_videos', False)
for video_file in video_files:
st.session_state[f"delete_cb_{video_file.name}"] = select_all_state
def delete_selected_videos():
"""【新】刪除在 UI 中選取的影片。"""
if not st.session_state.project:
st.session_state.operation_status = {"type": "warning", "message": "請先選擇一個專案。"}
return
output_dir = st.session_state.project.paths["output"]
deleted_files_count = 0
errors = []
files_to_delete = [
key.replace("delete_cb_", "", 1)
for key, value in st.session_state.items()
if key.startswith("delete_cb_") and value
]
if not files_to_delete:
st.session_state.operation_status = {"type": "info", "message": "沒有選擇任何要刪除的影片。"}
return
for filename in files_to_delete:
try:
file_path = output_dir / filename
if file_path.exists():
if st.session_state.get("final_video_path") and Path(st.session_state.final_video_path) == file_path:
st.session_state.final_video_path = None
file_path.unlink()
deleted_files_count += 1
del st.session_state[f"delete_cb_{filename}"]
except Exception as e:
errors.append(f"刪除 {filename} 時出錯: {e}")
if errors:
message = f"刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors)
st.session_state.operation_status = {"type": "error", "message": message}
else:
message = f"成功刪除 {deleted_files_count} 個影片。"
st.session_state.operation_status = {"type": "success", "message": message}
def upload_shared_videos():
"""
回呼:處理共享影片的上傳請求。
"""
uploaded_files = st.session_state.get("shared_video_uploader", [])
# 委派任務給 asset_manager
saved_count, errors = asset_manager.save_uploaded_shared_videos(uploaded_files)
# 根據結果更新 UI 狀態
if errors:
message = f"處理完成。成功 {saved_count} 個,失敗 {len(errors)} 個。錯誤: {', '.join(errors)}"
st.session_state.operation_status = {"type": "error", "message": message}
elif saved_count > 0:
message = f"成功上傳 {saved_count} 個新素材到共享庫!"
st.session_state.operation_status = {"type": "success", "message": message}
st.session_state.shared_video_uploader = []
def callback_download_videos(paths: dict):
with st.spinner("正在下載影片,請稍候..."):
videos_to_download = {vid: info for vid, info in st.session_state.selected_videos.items() if info['selected']}
if not videos_to_download:
st.session_state.operation_status = {"success": False, "message": "尚未選擇任何影片。", "source": "download_videos"}
return
download_dir = paths["temp_video"]
download_dir.mkdir(parents=True, exist_ok=True)
# --- 循序命名邏輯 START ---
existing_files = list(download_dir.glob('*.mp4')) + list(download_dir.glob('*.mov'))
existing_numbers = []
for f in existing_files:
try:
# 從檔名 (不含副檔名) 中提取數字
existing_numbers.append(int(f.stem))
except ValueError:
# 忽略那些不是純數字的檔名
continue
# 決定起始編號
start_counter = max(existing_numbers) + 1 if existing_numbers else 1
counter = start_counter
# --- 循序命名邏輯 END ---
total, download_count, errors = len(videos_to_download), 0, []
for i, (video_id, info) in enumerate(videos_to_download.items()):
url = info['url']
original_path = Path(url)
# 使用循序命名
new_video_name = f"{counter:02d}{original_path.suffix}"
save_path = download_dir / new_video_name
try:
if not save_path.exists():
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as f:
f.write(response.content)
download_count += 1
counter += 1 # 只有在成功下載後才增加計數器
except Exception as e:
errors.append(f"下載影片 {original_path.name} 失敗: {e}")
if errors:
final_message = f"任務完成,但有 {len(errors)} 個錯誤。成功處理 {download_count}/{total} 個影片。\n" + "\n".join(errors)
st.session_state.operation_status = {"success": False, "message": final_message, "source": "download_videos"}
else:
final_message = f"任務完成!成功下載 {download_count}/{total} 個影片到專案的 `output/test` 資料夾,並已自動循序命名。"
st.session_state.operation_status = {"success": True, "message": final_message, "source": "download_videos"}
def callback_toggle_preview(video_id):
if st.session_state.active_preview_id == video_id: st.session_state.active_preview_id = None
else: st.session_state.active_preview_id = video_id
def callback_delete_selected_audios(paths: dict):
deleted_files_count = 0
errors = []
files_to_delete_names = []
for key, value in st.session_state.items():
if key.startswith("delete_audio_cb_") and value:
filename = key.replace("delete_audio_cb_", "", 1)
files_to_delete_names.append(filename)
if not files_to_delete_names:
st.session_state.operation_status = {"success": True, "message": "🤔 沒有選擇任何要刪除的音訊。", "source": "delete_audios"}
return
audio_dir = paths["audio"]
output_dir = paths["output"]
for filename in files_to_delete_names:
try:
file_path_audio = audio_dir / filename
file_path_output = output_dir / filename
file_path_to_delete = None
if file_path_audio.exists():
file_path_to_delete = file_path_audio
elif file_path_output.exists():
file_path_to_delete = file_path_output
if file_path_to_delete:
file_path_to_delete.unlink()
deleted_files_count += 1
del st.session_state[f"delete_audio_cb_{filename}"]
else:
errors.append(f"找不到檔案 {filename}")
except Exception as e:
errors.append(f"刪除 {filename} 時出錯: {e}")
if errors:
message = f"❌ 刪除操作有誤。成功刪除 {deleted_files_count} 個檔案,但發生以下錯誤: " + ", ".join(errors)
st.session_state.operation_status = {"success": False, "message": message, "source": "delete_audios"}
else:
message = f"✅ 成功刪除 {deleted_files_count} 個音訊。"
st.session_state.operation_status = {"success": True, "message": message, "source": "delete_audios"}
def toggle_all_audio_checkboxes(audio_files):
select_all_state = st.session_state.get('select_all_audios', False)
for audio_file in audio_files:
st.session_state[f"delete_audio_cb_{audio_file.name}"] = select_all_state