#!/usr/bin/env python3 """ Media Automator Orchestrates the pipeline: qBittorrent → namer → Whisparr / Stash → Whisparr """ import os import shutil import time import threading import logging import json import traceback from datetime import datetime from pathlib import Path from collections import deque from typing import Optional import requests import yaml import psutil # ============================================================================= # Logging Setup # ============================================================================= logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("automator") # ============================================================================= # Config Loader # ============================================================================= class Config: def __init__(self, path: str = "/config/config.yml"): with open(path) as f: self._data = yaml.safe_load(f) def __getattr__(self, name): return self._data.get(name, {}) def get(self, *keys, default=None): d = self._data for k in keys: if not isinstance(d, dict): return default d = d.get(k, default) return d # ============================================================================= # Shared State (thread-safe with a lock) # ============================================================================= class State: """Central state store for the automator. Accessed by both the pipeline threads and the web UI.""" def __init__(self, history_size: int = 100): self._lock = threading.Lock() self.history: deque = deque(maxlen=history_size) self.active_jobs: dict = {} # hash → {stage, name, started} self.paused: bool = False self.pause_reason: str = "" self.stats = { "total_processed": 0, "whisparr_imported": 0, "stash_identified": 0, "errors": 0, "started_at": datetime.utcnow().isoformat(), } def add_job(self, hash_: str, name: str, stage: str): with self._lock: self.active_jobs[hash_] = { "name": name, "stage": stage, "started": datetime.utcnow().isoformat(), } def update_job(self, hash_: str, stage: str): with self._lock: if hash_ in self.active_jobs: self.active_jobs[hash_]["stage"] = stage def complete_job(self, hash_: str, outcome: str): with self._lock: job = self.active_jobs.pop(hash_, None) if job: job["completed"] = datetime.utcnow().isoformat() job["outcome"] = outcome self.history.appendleft(job) self.stats["total_processed"] += 1 def set_pause(self, reason: str = ""): with self._lock: self.paused = bool(reason) self.pause_reason = reason def snapshot(self) -> dict: with self._lock: return { "paused": self.paused, "pause_reason": self.pause_reason, "active_jobs": dict(self.active_jobs), "history": list(self.history), "stats": dict(self.stats), } # ============================================================================= # Discord Notifier # ============================================================================= class Discord: COLORS = {"INFO": 0x5865F2, "WARNING": 0xFEE75C, "ERROR": 0xED4245, "SUCCESS": 0x57F287} LEVELS = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3} def __init__(self, cfg: Config): self.webhook = cfg.get("discord", "webhook_url", default="") self.min_level = self.LEVELS.get(cfg.get("discord", "notify_level", default="INFO"), 1) def _send(self, title: str, message: str, level: str = "INFO"): level_val = self.LEVELS.get(level, 1) if level_val < self.min_level or not self.webhook: return color = self.COLORS.get(level, 0x5865F2) payload = { "embeds": [{ "title": title, "description": message, "color": color, "timestamp": datetime.utcnow().isoformat(), "footer": {"text": "Media Automator"}, }] } try: requests.post(self.webhook, json=payload, timeout=10) except Exception as e: log.warning(f"Discord notification failed: {e}") def info(self, title: str, msg: str): self._send(title, msg, "INFO") def warn(self, title: str, msg: str): self._send(title, msg, "WARNING") def error(self, title: str, msg: str): self._send(title, msg, "ERROR") def success(self, title: str, msg: str): self._send(title, msg, "SUCCESS") # ============================================================================= # qBittorrent Client # ============================================================================= class QBitClient: def __init__(self, cfg: Config): qb = cfg.qbittorrent self.base = f"http://{qb['host']}:{qb['port']}" self.user = qb["username"] self.pwd = qb["password"] self.cat_downloading = qb["category_downloading"] self.cat_copying = qb["category_copying"] self.cat_seeding = qb["category_seeding"] self._session = requests.Session() self._logged_in = False def _login(self): r = self._session.post( f"{self.base}/api/v2/auth/login", data={"username": self.user, "password": self.pwd}, timeout=10, ) self._logged_in = r.text == "Ok." if not self._logged_in: raise RuntimeError(f"qBittorrent login failed: {r.text}") def _get(self, path: str, **params): if not self._logged_in: self._login() r = self._session.get(f"{self.base}{path}", params=params, timeout=15) r.raise_for_status() return r.json() def _post(self, path: str, data: dict): if not self._logged_in: self._login() r = self._session.post(f"{self.base}{path}", data=data, timeout=15) r.raise_for_status() return r def get_completed_torrents(self) -> list: """Return all torrents in the 'downloading' category that are 100% done.""" try: torrents = self._get( "/api/v2/torrents/info", category=self.cat_downloading, filter="completed", ) return [t for t in torrents if t.get("progress", 0) >= 1.0] except Exception as e: log.error(f"qBit: failed to get torrents: {e}") return [] def get_torrent_files(self, hash_: str) -> list: """Return list of file paths within a torrent.""" try: return self._get("/api/v2/torrents/files", hash=hash_) except Exception as e: log.error(f"qBit: failed to get files for {hash_}: {e}") return [] def set_category(self, hash_: str, category: str): self._post("/api/v2/torrents/setCategory", {"hashes": hash_, "category": category}) log.info(f"qBit: set category '{category}' on {hash_[:8]}") def get_torrent_info(self, hash_: str) -> Optional[dict]: try: results = self._get("/api/v2/torrents/info", hashes=hash_) return results[0] if results else None except Exception: return None def wait_for_move(self, hash_: str, expected_save_path: str, timeout: int = 120, poll: int = 3) -> bool: """Block until qBit has moved the torrent files to the expected save path.""" deadline = time.time() + timeout while time.time() < deadline: info = self.get_torrent_info(hash_) if info and info.get("save_path", "").rstrip("/") == expected_save_path.rstrip("/"): return True time.sleep(poll) return False # ============================================================================= # Whisparr Client # ============================================================================= class WhisparrClient: def __init__(self, cfg: Config): w = cfg.whisparr self.base = f"http://{w['host']}:{w['port']}/api/v3" self.headers = {"X-Api-Key": w["api_key"], "Content-Type": "application/json"} self.timeout = w.get("import_timeout", 120) def scan_import_folder(self, folder: str) -> list: """Ask Whisparr to scan a folder and return what it found + matched.""" r = requests.get( f"{self.base}/manualimport", params={"folder": folder, "filterExistingFiles": "false"}, headers=self.headers, timeout=30, ) r.raise_for_status() return r.json() def execute_import(self, import_items: list) -> bool: """Execute a manual import for the given pre-matched items.""" if not import_items: log.warning("Whisparr: no items to import") return False # Only import items that have a valid scene match valid = [i for i in import_items if i.get("scene") or i.get("sceneId")] if not valid: log.warning("Whisparr: no matched scenes in import list") return False r = requests.post( f"{self.base}/manualimport", json=valid, headers=self.headers, timeout=30, ) r.raise_for_status() log.info(f"Whisparr: imported {len(valid)} file(s)") return True def import_folder(self, folder: str) -> tuple[int, int]: """Full scan + import cycle. Returns (matched, unmatched) counts.""" items = self.scan_import_folder(folder) if not items: return (0, 0) matched = [i for i in items if i.get("scene") or i.get("sceneId")] unmatched = [i for i in items if not (i.get("scene") or i.get("sceneId"))] if matched: self.execute_import(matched) return (len(matched), len(unmatched)) # ============================================================================= # Stash GraphQL Client # ============================================================================= class StashClient: def __init__(self, cfg: Config): s = cfg.stash self.endpoint = f"http://{s['host']}:{s['port']}/graphql" self.headers = {"ApiKey": s["api_key"], "Content-Type": "application/json"} self.stashdb_url = s.get("stashdb_endpoint", "https://stashdb.org/graphql") self.job_timeout = s.get("job_timeout", 600) self.job_poll = s.get("job_poll_interval", 5) def _gql(self, query: str, variables: dict = None) -> dict: payload = {"query": query} if variables: payload["variables"] = variables r = requests.post(self.endpoint, json=payload, headers=self.headers, timeout=30) r.raise_for_status() data = r.json() if "errors" in data: raise RuntimeError(f"Stash GraphQL error: {data['errors']}") return data.get("data", {}) def _wait_for_job(self, job_id: str) -> bool: """Poll until a job finishes. Returns True on success, False on failure/timeout.""" deadline = time.time() + self.job_timeout while time.time() < deadline: time.sleep(self.job_poll) try: data = self._gql( "query FindJob($id: ID!) { findJob(id: $id) { id status } }", {"id": job_id}, ) status = data.get("findJob", {}).get("status", "") if status == "FINISHED": return True if status in ("FAILED", "CANCELLED"): log.error(f"Stash job {job_id} ended with status: {status}") return False except Exception as e: log.warning(f"Stash: error polling job {job_id}: {e}") log.error(f"Stash: job {job_id} timed out after {self.job_timeout}s") return False def scan_path(self, path: str) -> bool: """Trigger a metadata scan on a specific path and wait for completion.""" data = self._gql( """ mutation Scan($input: ScanMetadataInput!) { metadataScan(input: $input) } """, {"input": {"paths": [path]}}, ) job_id = data.get("metadataScan") if not job_id: raise RuntimeError("Stash scan did not return a job ID") log.info(f"Stash: scan job {job_id} started for {path}") return self._wait_for_job(job_id) def identify_path(self, path: str) -> bool: """Trigger metadata identify on a path using StashDB and wait for completion.""" data = self._gql( """ mutation Identify($input: IdentifyMetadataInput!) { metadataIdentify(input: $input) } """, { "input": { "paths": [path], "sources": [ {"source": {"stash_box_endpoint": self.stashdb_url}} ], "options": { "fieldOptions": [ {"field": "title", "strategy": "OVERWRITE"}, {"field": "date", "strategy": "OVERWRITE"}, {"field": "studio", "strategy": "OVERWRITE", "createMissing": True}, {"field": "performers", "strategy": "OVERWRITE", "createMissing": True}, {"field": "tags", "strategy": "MERGE", "createMissing": True}, {"field": "stash_ids", "strategy": "MERGE"}, ], "setOrganized": False, }, } }, ) job_id = data.get("metadataIdentify") if not job_id: raise RuntimeError("Stash identify did not return a job ID") log.info(f"Stash: identify job {job_id} started for {path}") return self._wait_for_job(job_id) # ============================================================================= # Disk Space Helpers # ============================================================================= def free_gb(path: str) -> float: """Return free space in GB for the filesystem containing path.""" usage = psutil.disk_usage(path) return usage.free / (1024 ** 3) def file_size_gb(path: str) -> float: """Return size in GB of a file or directory.""" p = Path(path) if p.is_file(): return p.stat().st_size / (1024 ** 3) total = sum(f.stat().st_size for f in p.rglob("*") if f.is_file()) return total / (1024 ** 3) # ============================================================================= # File Utilities # ============================================================================= VIDEO_EXTS_DEFAULT = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".m4v", ".ts", ".m2ts", ".webm", ".divx", ".mpg", ".mpeg"} def find_video_files(directory: str, extensions: set) -> list: """Return all video files in a directory tree (flattened list).""" found = [] for root, _, files in os.walk(directory): for f in files: if Path(f).suffix.lower() in extensions: found.append(os.path.join(root, f)) return found def copy_file_flat(src: str, dest_dir: str) -> str: """Copy src file into dest_dir, using only the filename (no subfolders). If a file with the same name exists, appends a counter.""" name = Path(src).name dest = os.path.join(dest_dir, name) counter = 1 while os.path.exists(dest): stem = Path(src).stem ext = Path(src).suffix dest = os.path.join(dest_dir, f"{stem}_{counter}{ext}") counter += 1 shutil.copy2(src, dest) log.info(f"Copied: {Path(src).name} → {dest_dir}") return dest def wait_for_file_stable(path: str, stable_seconds: int = 30, poll: int = 5) -> bool: """Wait until a file hasn't changed size for stable_seconds. Returns True when stable.""" deadline = time.time() + stable_seconds * 10 # max 10x settle time last_size = -1 stable_since = None while time.time() < deadline: try: size = os.path.getsize(path) except OSError: time.sleep(poll) continue if size == last_size: if stable_since is None: stable_since = time.time() elif time.time() - stable_since >= stable_seconds: return True else: last_size = size stable_since = None time.sleep(poll) return False # ============================================================================= # stash-temp Watcher (runs in its own thread) # ============================================================================= class StashTempWatcher(threading.Thread): """Watches stash-temp for new video files, waits for them to settle, then moves them to the Whisparr import dir and triggers a Whisparr import.""" def __init__(self, cfg: Config, whisparr: WhisparrClient, discord: Discord, state: State, video_exts: set): super().__init__(daemon=True, name="stash-temp-watcher") self.watch_dir = cfg.get("paths", "stash_temp") self.import_dir = cfg.get("paths", "whisparr_import") self.settle_secs = cfg.get("timing", "stash_temp_settle_seconds", default=30) self.whisparr = whisparr self.discord = discord self.state = state self.video_exts = video_exts self._seen: set = set() def run(self): log.info(f"StashTempWatcher: watching {self.watch_dir}") while True: try: self._scan() except Exception as e: log.error(f"StashTempWatcher error: {e}") time.sleep(10) def _scan(self): for f in Path(self.watch_dir).iterdir(): if not f.is_file(): continue if f.suffix.lower() not in self.video_exts: continue fpath = str(f) if fpath in self._seen: continue self._seen.add(fpath) # Process in a new thread so we don't block the watcher threading.Thread( target=self._process, args=(fpath,), daemon=True, name=f"stash-temp-{f.name[:20]}", ).start() def _process(self, src_path: str): log.info(f"StashTempWatcher: new file detected: {Path(src_path).name}") # Wait for file to finish being written/renamed by the plugin stable = wait_for_file_stable(src_path, self.settle_secs) if not stable: log.warning(f"StashTempWatcher: file never stabilised: {src_path}") if not os.path.exists(src_path): log.info(f"StashTempWatcher: file gone before we could move it: {src_path}") self._seen.discard(src_path) return try: dest = copy_file_flat(src_path, self.import_dir) os.remove(src_path) self._seen.discard(src_path) log.info(f"StashTempWatcher: moved to Whisparr import: {Path(dest).name}") matched, unmatched = self.whisparr.import_folder(self.import_dir) if matched: self.state.stats["whisparr_imported"] += matched self.discord.success( "✅ Whisparr Import (via Stash)", f"**{Path(dest).name}**\nMatched and imported {matched} scene(s).", ) if unmatched: self.discord.warn( "⚠️ Whisparr: Unmatched Files", f"{unmatched} file(s) in import folder could not be matched.\n" f"Check Whisparr's manual import UI.", ) except Exception as e: log.error(f"StashTempWatcher: error processing {src_path}: {e}") self.discord.error("❌ stash-temp Processing Error", str(e)) # ============================================================================= # namer Output Watcher (runs in its own thread) # ============================================================================= class NamerOutputWatcher(threading.Thread): """Watches namer's renamed/ and failed/ folders. - renamed → move to Whisparr import dir, trigger import - failed → move to Stash library dir, trigger scan+identify """ def __init__(self, cfg: Config, whisparr: WhisparrClient, stash: StashClient, discord: Discord, state: State, video_exts: set): super().__init__(daemon=True, name="namer-watcher") self.renamed_dir = cfg.get("paths", "namer_renamed") self.failed_dir = cfg.get("paths", "namer_failed") self.import_dir = cfg.get("paths", "whisparr_import") self.stash_lib = cfg.get("paths", "stash_library") self.whisparr = whisparr self.stash = stash self.discord = discord self.state = state self.video_exts = video_exts self._seen: set = set() def run(self): log.info(f"NamerOutputWatcher: watching {self.renamed_dir} and {self.failed_dir}") while True: try: self._scan_dir(self.renamed_dir, self._handle_renamed) self._scan_dir(self.failed_dir, self._handle_failed) except Exception as e: log.error(f"NamerOutputWatcher error: {e}") time.sleep(10) def _scan_dir(self, directory: str, handler): try: for f in Path(directory).iterdir(): if not f.is_file(): continue if f.suffix.lower() not in self.video_exts: continue fpath = str(f) key = f"{directory}:{fpath}" if key in self._seen: continue self._seen.add(key) threading.Thread( target=handler, args=(fpath,), daemon=True ).start() except FileNotFoundError: pass # directory may not exist yet def _handle_renamed(self, src_path: str): """namer successfully renamed → send to Whisparr.""" name = Path(src_path).name log.info(f"NamerWatcher: renamed file ready: {name}") try: dest = copy_file_flat(src_path, self.import_dir) os.remove(src_path) matched, unmatched = self.whisparr.import_folder(self.import_dir) if matched: self.state.stats["whisparr_imported"] += matched self.discord.success( "✅ Whisparr Import", f"**{name}** → imported {matched} scene(s) into Whisparr.", ) else: self.discord.warn( "⚠️ Whisparr: No Match", f"**{name}** was renamed by namer but Whisparr couldn't match it.\n" f"Check Whisparr's manual import UI at `{self.import_dir}`.", ) except Exception as e: log.error(f"NamerWatcher: error handling renamed file {src_path}: {e}") self.discord.error("❌ namer→Whisparr Error", f"**{name}**\n{e}") def _handle_failed(self, src_path: str): """namer could not identify → send to Stash for phash identification.""" name = Path(src_path).name log.info(f"NamerWatcher: failed file, sending to Stash: {name}") try: dest = copy_file_flat(src_path, self.stash_lib) os.remove(src_path) self.discord.info( "🔍 Sending to Stash (namer failed)", f"**{name}** → `{self.stash_lib}`\nRunning scan + identify via StashDB...", ) # Scan so Stash knows about the file scan_ok = self.stash.scan_path(dest) if not scan_ok: raise RuntimeError("Stash scan job failed or timed out") # Identify using StashDB identify_ok = self.stash.identify_path(dest) if identify_ok: self.state.stats["stash_identified"] += 1 self.discord.success( "✅ Stash Identify Complete", f"**{name}** identified via StashDB.\n" f"rename-file-on-update plugin should fire shortly.", ) else: self.discord.warn( "⚠️ Stash Identify Failed/Timed Out", f"**{name}** — identify job didn't complete cleanly.\n" f"You may need to run identify manually in Stash.", ) except Exception as e: log.error(f"NamerWatcher: error handling failed file {src_path}: {e}") self.discord.error("❌ namer→Stash Error", f"**{name}**\n{e}") # ============================================================================= # Main Pipeline (runs in its own thread, polls qBit) # ============================================================================= class Pipeline: def __init__(self, cfg: Config, qbit: QBitClient, discord: Discord, state: State): self.cfg = cfg self.qbit = qbit self.discord = discord self.state = state self.video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT))) self.namer_watch = cfg.get("paths", "namer_watch") self.transient = cfg.get("paths", "transient_dir") self.namer_ssd_min = cfg.get("disk_space", "namer_ssd_min_free_gb", default=10) self.move_timeout = cfg.get("timing", "qbit_move_timeout", default=120) self.move_poll = cfg.get("timing", "qbit_move_poll_interval", default=3) self._active_hashes: set = set() def run(self): poll_interval = self.cfg.get("timing", "qbit_poll_interval", default=15) log.info("Pipeline: starting main loop") while True: try: if not self.state.paused: self._tick() except Exception as e: log.error(f"Pipeline error: {e}\n{traceback.format_exc()}") self.discord.error("❌ Pipeline Error", str(e)) time.sleep(poll_interval) def _tick(self): completed = self.qbit.get_completed_torrents() for torrent in completed: hash_ = torrent["hash"] if hash_ in self._active_hashes: continue self._active_hashes.add(hash_) threading.Thread( target=self._process_torrent, args=(torrent,), daemon=True, name=f"torrent-{hash_[:8]}", ).start() def _process_torrent(self, torrent: dict): hash_ = torrent["hash"] name = torrent["name"] log.info(f"Pipeline: processing torrent '{name}' [{hash_[:8]}]") self.state.add_job(hash_, name, "checking space") try: self._run_pipeline(torrent) self.state.complete_job(hash_, "success") except Exception as e: log.error(f"Pipeline: error on '{name}': {e}\n{traceback.format_exc()}") self.discord.error(f"❌ Pipeline Failed: {name}", str(e)) self.state.complete_job(hash_, f"error: {e}") finally: self._active_hashes.discard(hash_) def _run_pipeline(self, torrent: dict): hash_ = torrent["hash"] name = torrent["name"] # ── 1. Check free space on namer SSD ────────────────────────────────── torrent_gb = torrent.get("size", 0) / (1024 ** 3) self._check_space(self.namer_watch, self.namer_ssd_min, torrent_gb, name) self.state.update_job(hash_, "setting category: copying") # ── 2. Set category → 2 - copying (qBit moves files to transient) ──── self.qbit.set_category(hash_, self.qbit.cat_copying) # ── 3. Wait for qBit to finish moving files to transient dir ────────── self.state.update_job(hash_, "waiting for qBit file move") moved = self.qbit.wait_for_move( hash_, self.transient, self.move_timeout, self.move_poll ) if not moved: # Not fatal — qBit might report save_path differently; try to proceed log.warning(f"Pipeline: qBit save_path didn't update for {name}, proceeding anyway") # ── 4. Find video files in transient dir for this torrent ───────────── self.state.update_job(hash_, "copying to namer") # qBit may have created a subfolder named after the torrent torrent_dir = os.path.join(self.transient, torrent.get("content_path", "").lstrip("/")) if not os.path.exists(torrent_dir): torrent_dir = self.transient # flat download, files directly in transient video_files = find_video_files(torrent_dir, self.video_exts) if not video_files: # Fallback: scan whole transient dir for files matching torrent hash video_files = find_video_files(self.transient, self.video_exts) if not video_files: raise RuntimeError(f"No video files found in transient dir for '{name}'") log.info(f"Pipeline: found {len(video_files)} video file(s) for '{name}'") self.discord.info( f"📋 Processing: {name}", f"Found {len(video_files)} video file(s). Copying to namer...", ) # ── 5. Copy video files (flattened) to namer watch dir ──────────────── copied = [] for vf in video_files: dest = copy_file_flat(vf, self.namer_watch) copied.append(dest) log.info(f"Pipeline: copied {len(copied)} file(s) to namer watch dir") # ── 6. Set category → 3 - seeding (qBit moves torrent to spinning array) self.state.update_job(hash_, "setting category: seeding") self.qbit.set_category(hash_, self.qbit.cat_seeding) log.info(f"Pipeline: torrent '{name}' set to seeding") self.discord.success( f"✅ Copied & Seeding: {name}", f"Copied {len(copied)} video file(s) to namer.\n" f"Torrent moved to seeding. Waiting for namer to process...", ) def _check_space(self, path: str, min_free_gb: float, needed_gb: float, name: str): """Check free space; pause and wait + alert if insufficient.""" check_interval = self.cfg.get("disk_space", "space_check_interval", default=60) while True: available = free_gb(path) if available >= (min_free_gb + needed_gb): self.state.set_pause() # clear any previous pause return reason = ( f"Low disk space on namer SSD: {available:.1f} GB free, " f"need {min_free_gb + needed_gb:.1f} GB for '{name}'" ) if not self.state.paused: log.warning(f"Pipeline: PAUSED — {reason}") self.state.set_pause(reason) self.discord.warn("⏸️ Pipeline Paused — Low Disk Space", reason) time.sleep(check_interval) # ============================================================================= # Web UI (Flask, runs in its own thread) # ============================================================================= def create_web_app(state: State): from flask import Flask, jsonify, render_template_string app = Flask(__name__) HTML = """ Media Automator

📡 Media Automator

Auto-refreshes every 15s  ·  {{ now }}
{% if snap.paused %} {% endif %}
Processed
{{ snap.stats.total_processed }}
Whisparr Imports
{{ snap.stats.whisparr_imported }}
Stash Identified
{{ snap.stats.stash_identified }}
Errors
{{ snap.stats.errors }}
Active Jobs
{{ snap.active_jobs|length }}

Active Jobs

{% if snap.active_jobs %} {% for hash, job in snap.active_jobs.items() %} {% endfor %}
TorrentStageStarted
{{ job.name }} {{ job.stage }} {{ job.started }}
{% else %}
No active jobs.
{% endif %}

Recent History

{% if snap.history %} {% for job in snap.history %} {% endfor %}
TorrentOutcomeCompleted
{{ job.name }} {% if job.outcome == 'success' %} ✓ success {% else %} ✗ {{ job.outcome[:40] }} {% endif %} {{ job.get('completed', '—') }}
{% else %}
No completed jobs yet.
{% endif %}
""" @app.route("/") def index(): snap = state.snapshot() now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") return render_template_string(HTML, snap=snap, now=now) @app.route("/api/state") def api_state(): return jsonify(state.snapshot()) return app # ============================================================================= # Entry Point # ============================================================================= def main(): config_path = os.environ.get("CONFIG_PATH", "/config/config.yml") log.info(f"Loading config from {config_path}") cfg = Config(config_path) video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT))) state = State(history_size=cfg.get("webui", "history_size", default=100)) discord = Discord(cfg) qbit = QBitClient(cfg) whisparr = WhisparrClient(cfg) stash = StashClient(cfg) # Ensure output directories exist for p in [ cfg.get("paths", "namer_watch"), cfg.get("paths", "whisparr_import"), cfg.get("paths", "stash_library"), cfg.get("paths", "stash_temp"), ]: if p: Path(p).mkdir(parents=True, exist_ok=True) discord.info("🚀 Media Automator Started", "All systems initialised. Pipeline is running.") # ── Start background watchers ────────────────────────────────────────────── namer_watcher = NamerOutputWatcher(cfg, whisparr, stash, discord, state, video_exts) namer_watcher.start() stash_watcher = StashTempWatcher(cfg, whisparr, discord, state, video_exts) stash_watcher.start() # ── Start main pipeline in its own thread ───────────────────────────────── pipeline = Pipeline(cfg, qbit, discord, state) pipeline_thread = threading.Thread(target=pipeline.run, daemon=True, name="pipeline") pipeline_thread.start() # ── Start web UI (blocks main thread) ───────────────────────────────────── port = cfg.get("webui", "port", default=8888) log.info(f"Web UI starting on port {port}") app = create_web_app(state) app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False) if __name__ == "__main__": main()