189 lines
8.1 KiB
Python
189 lines
8.1 KiB
Python
import streamlit as st
|
|
import json
|
|
import requests
|
|
import time
|
|
from pathlib import Path
|
|
from notion_client import Client
|
|
from notion_client.helpers import iterate_paginated_api
|
|
import subprocess
|
|
import math
|
|
import librosa
|
|
|
|
def get_media_info(media_path: Path) -> dict:
|
|
"""使用 ffprobe 獲取媒體檔案的詳細資訊 (時長、是否有音訊)。"""
|
|
cmd = [
|
|
"ffprobe", "-v", "error", "-show_entries", "format=duration:stream=codec_type",
|
|
"-of", "json", str(media_path)
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
info = json.loads(result.stdout)
|
|
duration = float(info.get("format", {}).get("duration", 0))
|
|
has_audio = any(s.get("codec_type") == "audio" for s in info.get("streams", []))
|
|
if duration == 0: raise ValueError("無法獲取或時長為 0")
|
|
return {"duration": duration, "has_audio": has_audio}
|
|
except (subprocess.CalledProcessError, json.JSONDecodeError, ValueError) as e:
|
|
print(f"❌ 無法獲取媒體資訊 {media_path}: {e}")
|
|
raise
|
|
|
|
def get_media_duration(file_path: Path) -> float | None:
|
|
"""使用 ffprobe 或 librosa 獲取媒體檔案的時長(秒)。"""
|
|
try:
|
|
if file_path.suffix.lower() in ['.wav', '.mp3', '.aac']:
|
|
return librosa.get_duration(path=file_path)
|
|
else:
|
|
cmd = [
|
|
"ffprobe", "-v", "error", "-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1", str(file_path)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
return float(result.stdout.strip())
|
|
except Exception as e:
|
|
print(f"無法讀取檔案 {file_path} 的時長: {e}")
|
|
return None
|
|
|
|
# 【新增】計算循環次數 p 的函式
|
|
def calculate_loop_count(audio_path: Path, video_folder: Path, transition_duration: float) -> tuple[int | None, str | None]:
|
|
"""
|
|
根據音訊長度和一系列影片,計算所需的最小循環次數 p。
|
|
成功時回傳 (p, None),失敗時回傳 (None, error_message)。
|
|
"""
|
|
m_prime = get_media_duration(audio_path)
|
|
if m_prime is None:
|
|
return None, "無法讀取音訊檔案長度。"
|
|
|
|
video_paths = [p for p in video_folder.iterdir() if p.is_file() and p.suffix.lower() in ['.mp4', '.mov']]
|
|
video_lengths = [get_media_duration(p) for p in video_paths]
|
|
video_lengths = [l for l in video_lengths if l is not None and l > 0]
|
|
|
|
if not video_lengths:
|
|
return None, "在 `test` 資料夾中找不到有效的影片檔案。"
|
|
|
|
n = len(video_lengths)
|
|
m = sum(video_lengths)
|
|
tr = transition_duration
|
|
|
|
denominator = m - (n - 1) * tr
|
|
|
|
if denominator <= 0:
|
|
return None, f"影片的有效長度 ({denominator:.2f}s) 小於或等於零,無法進行循環。請增加影片時長或減少轉場時間。"
|
|
|
|
p = math.ceil(m_prime / denominator)
|
|
return p, None
|
|
def display_global_status_message():
|
|
"""
|
|
檢查會話狀態中是否存在任何操作狀態,如果存在則顯示它。
|
|
顯示後會立即清除狀態,以確保訊息只顯示一次。
|
|
"""
|
|
if 'operation_status' in st.session_state and st.session_state.operation_status:
|
|
status = st.session_state.operation_status
|
|
# 根據成功與否,選擇對應的訊息元件
|
|
if status.get("success"):
|
|
st.success(status["message"], icon="✅")
|
|
else:
|
|
st.error(status["message"], icon="❌")
|
|
# 清除狀態,避免在下一次刷新時重複顯示
|
|
st.session_state.operation_status = {}
|
|
|
|
|
|
def analyze_ass_for_keywords(paths: dict) -> list:
|
|
data_file = paths["data"]
|
|
if not data_file.exists(): return ["", "", ""]
|
|
try:
|
|
data = json.loads(data_file.read_text(encoding="utf-8"))
|
|
full_text = " ".join([item.get('english', '') for item in data.get('script', [])])
|
|
words = [word.strip(".,!?") for word in full_text.lower().split() if len(word) > 4]
|
|
if not words: return ["nature", "technology", "business"]
|
|
unique_words = list(dict.fromkeys(words))
|
|
suggestions = unique_words[:3]
|
|
while len(suggestions) < 3: suggestions.append("")
|
|
return suggestions
|
|
except Exception: return ["nature", "technology", "business"]
|
|
|
|
def search_pixabay_videos(api_key, query, target_count=20, buffer=2):
|
|
if not api_key: return False, "請在 Streamlit secrets 中設定 PIXABAY_API_KEY。", []
|
|
if not query.strip(): return False, "請輸入搜尋關鍵字。", []
|
|
url = "https://pixabay.com/api/videos/"
|
|
valid_hits, page, per_page_request, max_pages = [], 1, 50, 5
|
|
while len(valid_hits) < (target_count + buffer) and page <= max_pages:
|
|
params = {"key": api_key, "q": query, "per_page": per_page_request, "safesearch": "true", "page": page}
|
|
try:
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if not data.get("hits"): break
|
|
for video in data["hits"]:
|
|
try:
|
|
video_details = video.get('videos', {}).get('large', {})
|
|
width, height = video_details.get('width', 0), video_details.get('height', 0)
|
|
if width > 0 and height > 0 and width >= height:
|
|
valid_hits.append(video)
|
|
if len(valid_hits) >= (target_count + buffer): break
|
|
except (KeyError, TypeError): continue
|
|
if len(valid_hits) >= (target_count + buffer): break
|
|
page += 1
|
|
time.sleep(1)
|
|
except requests.RequestException as e: return False, f"API 請求失敗: {e}", []
|
|
|
|
final_results = valid_hits[:target_count]
|
|
if len(final_results) > 0: return True, f"成功找到並過濾出 {len(final_results)} 個橫式影片。", final_results
|
|
else: return True, "找不到符合條件的橫式影片,請嘗試其他關鍵字。", []
|
|
def search_pexels_videos(api_key: str, query: str, target_count: int = 20) -> tuple[bool, str, list]:
|
|
"""
|
|
從 Pexels API 搜尋橫向影片。
|
|
|
|
Args:
|
|
api_key (str): Pexels API 金鑰。
|
|
query (str): 搜尋關鍵字。
|
|
target_count (int): 目標搜尋結果數量。
|
|
|
|
Returns:
|
|
tuple[bool, str, list]: (成功狀態, 訊息, 影片結果列表)
|
|
"""
|
|
if not api_key:
|
|
return False, "請在 Streamlit secrets 中設定 PEXELS_API_KEY。", []
|
|
if not query.strip():
|
|
return False, "請輸入搜尋關鍵字。", []
|
|
|
|
url = "https://api.pexels.com/v1/videos/search"
|
|
headers = {"Authorization": api_key}
|
|
params = {
|
|
"query": query,
|
|
"per_page": target_count + 5, # 多取一些以過濾
|
|
"orientation": 'landscape'
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
videos = data.get("videos", [])
|
|
if not videos:
|
|
return True, "在 Pexels 找不到符合條件的影片,請嘗試其他關鍵字。", []
|
|
|
|
final_results = videos[:target_count]
|
|
return True, f"成功從 Pexels 找到 {len(final_results)} 個橫式影片。", final_results
|
|
|
|
except requests.RequestException as e:
|
|
# Pexels 的錯誤訊息通常在 response body 中
|
|
error_info = ""
|
|
if e.response is not None:
|
|
try:
|
|
error_info = e.response.json().get('error', str(e))
|
|
except json.JSONDecodeError:
|
|
error_info = e.response.text
|
|
return False, f"Pexels API 請求失敗: {error_info}", []
|
|
|
|
def get_notion_page_titles(api_key: str, database_id: str) -> dict:
|
|
"""獲取 Notion 資料庫中所有頁面的標題和對應的 page_id。"""
|
|
client = Client(auth=api_key)
|
|
pages = list(iterate_paginated_api(client.databases.query, database_id=database_id))
|
|
|
|
title_map = {}
|
|
for page in pages:
|
|
title_property = page['properties'].get('Name', {})
|
|
if title_property.get('title'):
|
|
title = title_property['title'][0]['plain_text']
|
|
title_map[title] = page['id']
|
|
return title_map |