import streamlit as st import json import requests import time from pathlib import Path from notion_client import Client from notion_client.helpers import iterate_paginated_api import subprocess import librosa def get_media_info(media_path: Path) -> dict: """使用 ffprobe 獲取媒體檔案的詳細資訊 (時長、是否有音訊)。""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration:stream=codec_type", "-of", "json", str(media_path) ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) info = json.loads(result.stdout) duration = float(info.get("format", {}).get("duration", 0)) has_audio = any(s.get("codec_type") == "audio" for s in info.get("streams", [])) if duration == 0: raise ValueError("無法獲取或時長為 0") return {"duration": duration, "has_audio": has_audio} except (subprocess.CalledProcessError, json.JSONDecodeError, ValueError) as e: print(f"❌ 無法獲取媒體資訊 {media_path}: {e}") raise def get_media_duration(file_path: Path) -> float | None: """使用 ffprobe 或 librosa 獲取媒體檔案的時長(秒)。""" try: if file_path.suffix.lower() in ['.wav', '.mp3', '.aac']: return librosa.get_duration(path=file_path) else: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path) ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return float(result.stdout.strip()) except Exception as e: print(f"無法讀取檔案 {file_path} 的時長: {e}") return None def display_operation_status(): """ 檢查 session_state 中是否有操作狀態訊息, 並使用 st.toast 顯示它,然後清除。 """ if "operation_status" in st.session_state and st.session_state.operation_status: status = st.session_state.operation_status message_type = status.get("type", "info") message = status.get("message", "") print(f"{message}") if message_type == "success": st.toast(f"✅ {message}") elif message_type == "error": st.toast(f"❌ {message}") else: st.toast(f"ℹ️ {message}") # 清除狀態,防止重複顯示 del st.session_state.operation_status def analyze_ass_for_keywords(paths: dict) -> list: data_file = paths["data"] if not data_file.exists(): return ["", "", ""] try: data = json.loads(data_file.read_text(encoding="utf-8")) full_text = " ".join([item.get('english', '') for item in data.get('script', [])]) words = [word.strip(".,!?") for word in full_text.lower().split() if len(word) > 4] if not words: return ["nature", "technology", "business"] unique_words = list(dict.fromkeys(words)) suggestions = unique_words[:3] while len(suggestions) < 3: suggestions.append("") return suggestions except Exception: return ["nature", "technology", "business"] def search_pixabay_videos(api_key, query, target_count=20, buffer=2): if not api_key: return False, "請在 Streamlit secrets 中設定 PIXABAY_API_KEY。", [] if not query.strip(): return False, "請輸入搜尋關鍵字。", [] url = "https://pixabay.com/api/videos/" valid_hits, page, per_page_request, max_pages = [], 1, 50, 5 while len(valid_hits) < (target_count + buffer) and page <= max_pages: params = {"key": api_key, "q": query, "per_page": per_page_request, "safesearch": "true", "page": page} try: response = requests.get(url, params=params) response.raise_for_status() data = response.json() if not data.get("hits"): break for video in data["hits"]: try: if video.get('duration', 0) < 12: break video_details = video.get('videos', {}).get('large', {}) width, height = video_details.get('width', 0), video_details.get('height', 0) if width > 0 and height > 0 and width >= height: valid_hits.append(video) if len(valid_hits) >= (target_count + buffer): break except (KeyError, TypeError): continue if len(valid_hits) >= (target_count + buffer): break page += 1 time.sleep(1) except requests.RequestException as e: return False, f"API 請求失敗: {e}", [] final_results = valid_hits[:target_count] if len(final_results) > 0: return True, f"成功找到並過濾出 {len(final_results)} 個橫式影片。", final_results else: return True, "找不到符合條件的橫式影片,請嘗試其他關鍵字。", [] def search_pexels_videos(api_key: str, query: str, target_count: int = 20) -> tuple[bool, str, list]: """ 從 Pexels API 搜尋橫向影片。 Args: api_key (str): Pexels API 金鑰。 query (str): 搜尋關鍵字。 target_count (int): 目標搜尋結果數量。 Returns: tuple[bool, str, list]: (成功狀態, 訊息, 影片結果列表) """ if not api_key: return False, "請在 Streamlit secrets 中設定 PEXELS_API_KEY。", [] if not query.strip(): return False, "請輸入搜尋關鍵字。", [] url = "https://api.pexels.com/v1/videos/search" headers = {"Authorization": api_key} params = { "query": query, "per_page": target_count + 5, # 多取一些以過濾 "orientation": 'landscape', "min_duration": 12 } try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() data = response.json() videos = data.get("videos", []) if not videos: return True, "在 Pexels 找不到符合條件的影片,請嘗試其他關鍵字。", [] filtered_videos = [ v for v in videos if v.get("duration", 0) >= 12 ] if not filtered_videos: return True, "找到影片,但沒有任何一部長度超過 12 秒。", [] # 取回目標數量的結果 final_results = filtered_videos[:target_count] return True, f"成功找到 {len(final_results)} 個符合條件的影片。", final_results except requests.RequestException as e: # Pexels 的錯誤訊息通常在 response body 中 error_info = "" if e.response is not None: try: error_info = e.response.json().get('error', str(e)) except json.JSONDecodeError: error_info = e.response.text return False, f"Pexels API 請求失敗: {error_info}", [] def get_notion_page_titles(api_key: str, database_id: str) -> dict: """獲取 Notion 資料庫中所有頁面的標題和對應的 page_id。""" client = Client(auth=api_key) pages = list(iterate_paginated_api(client.databases.query, database_id=database_id)) title_map = {} for page in pages: title_property = page['properties'].get('Name', {}) if title_property.get('title'): title = title_property['title'][0]['plain_text'] title_map[title] = page['id'] return title_map