mapletree/lxc3/porntumate/main.py
admin_jk e6ba4970cf
Some checks are pending
PVE2 Infrastructure Deploy / terraform (push) Waiting to run
porntumate add
2026-03-01 15:15:42 -07:00

965 lines
39 KiB
Python

#!/usr/bin/env python3
"""
Media Automator
Orchestrates the pipeline: qBittorrent → namer → Whisparr / Stash → Whisparr
"""
import os
import shutil
import time
import threading
import logging
import json
import traceback
from datetime import datetime
from pathlib import Path
from collections import deque
from typing import Optional
import requests
import yaml
import psutil
# =============================================================================
# Logging Setup
# =============================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("automator")
# =============================================================================
# Config Loader
# =============================================================================
class Config:
def __init__(self, path: str = "/config/config.yml"):
with open(path) as f:
self._data = yaml.safe_load(f)
def __getattr__(self, name):
return self._data.get(name, {})
def get(self, *keys, default=None):
d = self._data
for k in keys:
if not isinstance(d, dict):
return default
d = d.get(k, default)
return d
# =============================================================================
# Shared State (thread-safe with a lock)
# =============================================================================
class State:
"""Central state store for the automator. Accessed by both the pipeline
threads and the web UI."""
def __init__(self, history_size: int = 100):
self._lock = threading.Lock()
self.history: deque = deque(maxlen=history_size)
self.active_jobs: dict = {} # hash → {stage, name, started}
self.paused: bool = False
self.pause_reason: str = ""
self.stats = {
"total_processed": 0,
"whisparr_imported": 0,
"stash_identified": 0,
"errors": 0,
"started_at": datetime.utcnow().isoformat(),
}
def add_job(self, hash_: str, name: str, stage: str):
with self._lock:
self.active_jobs[hash_] = {
"name": name,
"stage": stage,
"started": datetime.utcnow().isoformat(),
}
def update_job(self, hash_: str, stage: str):
with self._lock:
if hash_ in self.active_jobs:
self.active_jobs[hash_]["stage"] = stage
def complete_job(self, hash_: str, outcome: str):
with self._lock:
job = self.active_jobs.pop(hash_, None)
if job:
job["completed"] = datetime.utcnow().isoformat()
job["outcome"] = outcome
self.history.appendleft(job)
self.stats["total_processed"] += 1
def set_pause(self, reason: str = ""):
with self._lock:
self.paused = bool(reason)
self.pause_reason = reason
def snapshot(self) -> dict:
with self._lock:
return {
"paused": self.paused,
"pause_reason": self.pause_reason,
"active_jobs": dict(self.active_jobs),
"history": list(self.history),
"stats": dict(self.stats),
}
# =============================================================================
# Discord Notifier
# =============================================================================
class Discord:
COLORS = {"INFO": 0x5865F2, "WARNING": 0xFEE75C, "ERROR": 0xED4245, "SUCCESS": 0x57F287}
LEVELS = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
def __init__(self, cfg: Config):
self.webhook = cfg.get("discord", "webhook_url", default="")
self.min_level = self.LEVELS.get(cfg.get("discord", "notify_level", default="INFO"), 1)
def _send(self, title: str, message: str, level: str = "INFO"):
level_val = self.LEVELS.get(level, 1)
if level_val < self.min_level or not self.webhook:
return
color = self.COLORS.get(level, 0x5865F2)
payload = {
"embeds": [{
"title": title,
"description": message,
"color": color,
"timestamp": datetime.utcnow().isoformat(),
"footer": {"text": "Media Automator"},
}]
}
try:
requests.post(self.webhook, json=payload, timeout=10)
except Exception as e:
log.warning(f"Discord notification failed: {e}")
def info(self, title: str, msg: str): self._send(title, msg, "INFO")
def warn(self, title: str, msg: str): self._send(title, msg, "WARNING")
def error(self, title: str, msg: str): self._send(title, msg, "ERROR")
def success(self, title: str, msg: str): self._send(title, msg, "SUCCESS")
# =============================================================================
# qBittorrent Client
# =============================================================================
class QBitClient:
def __init__(self, cfg: Config):
qb = cfg.qbittorrent
self.base = f"http://{qb['host']}:{qb['port']}"
self.user = qb["username"]
self.pwd = qb["password"]
self.cat_downloading = qb["category_downloading"]
self.cat_copying = qb["category_copying"]
self.cat_seeding = qb["category_seeding"]
self._session = requests.Session()
self._logged_in = False
def _login(self):
r = self._session.post(
f"{self.base}/api/v2/auth/login",
data={"username": self.user, "password": self.pwd},
timeout=10,
)
self._logged_in = r.text == "Ok."
if not self._logged_in:
raise RuntimeError(f"qBittorrent login failed: {r.text}")
def _get(self, path: str, **params):
if not self._logged_in:
self._login()
r = self._session.get(f"{self.base}{path}", params=params, timeout=15)
r.raise_for_status()
return r.json()
def _post(self, path: str, data: dict):
if not self._logged_in:
self._login()
r = self._session.post(f"{self.base}{path}", data=data, timeout=15)
r.raise_for_status()
return r
def get_completed_torrents(self) -> list:
"""Return all torrents in the 'downloading' category that are 100% done."""
try:
torrents = self._get(
"/api/v2/torrents/info",
category=self.cat_downloading,
filter="completed",
)
return [t for t in torrents if t.get("progress", 0) >= 1.0]
except Exception as e:
log.error(f"qBit: failed to get torrents: {e}")
return []
def get_torrent_files(self, hash_: str) -> list:
"""Return list of file paths within a torrent."""
try:
return self._get("/api/v2/torrents/files", hash=hash_)
except Exception as e:
log.error(f"qBit: failed to get files for {hash_}: {e}")
return []
def set_category(self, hash_: str, category: str):
self._post("/api/v2/torrents/setCategory", {"hashes": hash_, "category": category})
log.info(f"qBit: set category '{category}' on {hash_[:8]}")
def get_torrent_info(self, hash_: str) -> Optional[dict]:
try:
results = self._get("/api/v2/torrents/info", hashes=hash_)
return results[0] if results else None
except Exception:
return None
def wait_for_move(self, hash_: str, expected_save_path: str,
timeout: int = 120, poll: int = 3) -> bool:
"""Block until qBit has moved the torrent files to the expected save path."""
deadline = time.time() + timeout
while time.time() < deadline:
info = self.get_torrent_info(hash_)
if info and info.get("save_path", "").rstrip("/") == expected_save_path.rstrip("/"):
return True
time.sleep(poll)
return False
# =============================================================================
# Whisparr Client
# =============================================================================
class WhisparrClient:
def __init__(self, cfg: Config):
w = cfg.whisparr
self.base = f"http://{w['host']}:{w['port']}/api/v3"
self.headers = {"X-Api-Key": w["api_key"], "Content-Type": "application/json"}
self.timeout = w.get("import_timeout", 120)
def scan_import_folder(self, folder: str) -> list:
"""Ask Whisparr to scan a folder and return what it found + matched."""
r = requests.get(
f"{self.base}/manualimport",
params={"folder": folder, "filterExistingFiles": "false"},
headers=self.headers,
timeout=30,
)
r.raise_for_status()
return r.json()
def execute_import(self, import_items: list) -> bool:
"""Execute a manual import for the given pre-matched items."""
if not import_items:
log.warning("Whisparr: no items to import")
return False
# Only import items that have a valid scene match
valid = [i for i in import_items if i.get("scene") or i.get("sceneId")]
if not valid:
log.warning("Whisparr: no matched scenes in import list")
return False
r = requests.post(
f"{self.base}/manualimport",
json=valid,
headers=self.headers,
timeout=30,
)
r.raise_for_status()
log.info(f"Whisparr: imported {len(valid)} file(s)")
return True
def import_folder(self, folder: str) -> tuple[int, int]:
"""Full scan + import cycle. Returns (matched, unmatched) counts."""
items = self.scan_import_folder(folder)
if not items:
return (0, 0)
matched = [i for i in items if i.get("scene") or i.get("sceneId")]
unmatched = [i for i in items if not (i.get("scene") or i.get("sceneId"))]
if matched:
self.execute_import(matched)
return (len(matched), len(unmatched))
# =============================================================================
# Stash GraphQL Client
# =============================================================================
class StashClient:
def __init__(self, cfg: Config):
s = cfg.stash
self.endpoint = f"http://{s['host']}:{s['port']}/graphql"
self.headers = {"ApiKey": s["api_key"], "Content-Type": "application/json"}
self.stashdb_url = s.get("stashdb_endpoint", "https://stashdb.org/graphql")
self.job_timeout = s.get("job_timeout", 600)
self.job_poll = s.get("job_poll_interval", 5)
def _gql(self, query: str, variables: dict = None) -> dict:
payload = {"query": query}
if variables:
payload["variables"] = variables
r = requests.post(self.endpoint, json=payload, headers=self.headers, timeout=30)
r.raise_for_status()
data = r.json()
if "errors" in data:
raise RuntimeError(f"Stash GraphQL error: {data['errors']}")
return data.get("data", {})
def _wait_for_job(self, job_id: str) -> bool:
"""Poll until a job finishes. Returns True on success, False on failure/timeout."""
deadline = time.time() + self.job_timeout
while time.time() < deadline:
time.sleep(self.job_poll)
try:
data = self._gql(
"query FindJob($id: ID!) { findJob(id: $id) { id status } }",
{"id": job_id},
)
status = data.get("findJob", {}).get("status", "")
if status == "FINISHED":
return True
if status in ("FAILED", "CANCELLED"):
log.error(f"Stash job {job_id} ended with status: {status}")
return False
except Exception as e:
log.warning(f"Stash: error polling job {job_id}: {e}")
log.error(f"Stash: job {job_id} timed out after {self.job_timeout}s")
return False
def scan_path(self, path: str) -> bool:
"""Trigger a metadata scan on a specific path and wait for completion."""
data = self._gql(
"""
mutation Scan($input: ScanMetadataInput!) {
metadataScan(input: $input)
}
""",
{"input": {"paths": [path]}},
)
job_id = data.get("metadataScan")
if not job_id:
raise RuntimeError("Stash scan did not return a job ID")
log.info(f"Stash: scan job {job_id} started for {path}")
return self._wait_for_job(job_id)
def identify_path(self, path: str) -> bool:
"""Trigger metadata identify on a path using StashDB and wait for completion."""
data = self._gql(
"""
mutation Identify($input: IdentifyMetadataInput!) {
metadataIdentify(input: $input)
}
""",
{
"input": {
"paths": [path],
"sources": [
{"source": {"stash_box_endpoint": self.stashdb_url}}
],
"options": {
"fieldOptions": [
{"field": "title", "strategy": "OVERWRITE"},
{"field": "date", "strategy": "OVERWRITE"},
{"field": "studio", "strategy": "OVERWRITE", "createMissing": True},
{"field": "performers", "strategy": "OVERWRITE", "createMissing": True},
{"field": "tags", "strategy": "MERGE", "createMissing": True},
{"field": "stash_ids", "strategy": "MERGE"},
],
"setOrganized": False,
},
}
},
)
job_id = data.get("metadataIdentify")
if not job_id:
raise RuntimeError("Stash identify did not return a job ID")
log.info(f"Stash: identify job {job_id} started for {path}")
return self._wait_for_job(job_id)
# =============================================================================
# Disk Space Helpers
# =============================================================================
def free_gb(path: str) -> float:
"""Return free space in GB for the filesystem containing path."""
usage = psutil.disk_usage(path)
return usage.free / (1024 ** 3)
def file_size_gb(path: str) -> float:
"""Return size in GB of a file or directory."""
p = Path(path)
if p.is_file():
return p.stat().st_size / (1024 ** 3)
total = sum(f.stat().st_size for f in p.rglob("*") if f.is_file())
return total / (1024 ** 3)
# =============================================================================
# File Utilities
# =============================================================================
VIDEO_EXTS_DEFAULT = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv",
".m4v", ".ts", ".m2ts", ".webm", ".divx", ".mpg", ".mpeg"}
def find_video_files(directory: str, extensions: set) -> list:
"""Return all video files in a directory tree (flattened list)."""
found = []
for root, _, files in os.walk(directory):
for f in files:
if Path(f).suffix.lower() in extensions:
found.append(os.path.join(root, f))
return found
def copy_file_flat(src: str, dest_dir: str) -> str:
"""Copy src file into dest_dir, using only the filename (no subfolders).
If a file with the same name exists, appends a counter."""
name = Path(src).name
dest = os.path.join(dest_dir, name)
counter = 1
while os.path.exists(dest):
stem = Path(src).stem
ext = Path(src).suffix
dest = os.path.join(dest_dir, f"{stem}_{counter}{ext}")
counter += 1
shutil.copy2(src, dest)
log.info(f"Copied: {Path(src).name}{dest_dir}")
return dest
def wait_for_file_stable(path: str, stable_seconds: int = 30, poll: int = 5) -> bool:
"""Wait until a file hasn't changed size for stable_seconds. Returns True when stable."""
deadline = time.time() + stable_seconds * 10 # max 10x settle time
last_size = -1
stable_since = None
while time.time() < deadline:
try:
size = os.path.getsize(path)
except OSError:
time.sleep(poll)
continue
if size == last_size:
if stable_since is None:
stable_since = time.time()
elif time.time() - stable_since >= stable_seconds:
return True
else:
last_size = size
stable_since = None
time.sleep(poll)
return False
# =============================================================================
# stash-temp Watcher (runs in its own thread)
# =============================================================================
class StashTempWatcher(threading.Thread):
"""Watches stash-temp for new video files, waits for them to settle,
then moves them to the Whisparr import dir and triggers a Whisparr import."""
def __init__(self, cfg: Config, whisparr: WhisparrClient,
discord: Discord, state: State, video_exts: set):
super().__init__(daemon=True, name="stash-temp-watcher")
self.watch_dir = cfg.get("paths", "stash_temp")
self.import_dir = cfg.get("paths", "whisparr_import")
self.settle_secs = cfg.get("timing", "stash_temp_settle_seconds", default=30)
self.whisparr = whisparr
self.discord = discord
self.state = state
self.video_exts = video_exts
self._seen: set = set()
def run(self):
log.info(f"StashTempWatcher: watching {self.watch_dir}")
while True:
try:
self._scan()
except Exception as e:
log.error(f"StashTempWatcher error: {e}")
time.sleep(10)
def _scan(self):
for f in Path(self.watch_dir).iterdir():
if not f.is_file():
continue
if f.suffix.lower() not in self.video_exts:
continue
fpath = str(f)
if fpath in self._seen:
continue
self._seen.add(fpath)
# Process in a new thread so we don't block the watcher
threading.Thread(
target=self._process,
args=(fpath,),
daemon=True,
name=f"stash-temp-{f.name[:20]}",
).start()
def _process(self, src_path: str):
log.info(f"StashTempWatcher: new file detected: {Path(src_path).name}")
# Wait for file to finish being written/renamed by the plugin
stable = wait_for_file_stable(src_path, self.settle_secs)
if not stable:
log.warning(f"StashTempWatcher: file never stabilised: {src_path}")
if not os.path.exists(src_path):
log.info(f"StashTempWatcher: file gone before we could move it: {src_path}")
self._seen.discard(src_path)
return
try:
dest = copy_file_flat(src_path, self.import_dir)
os.remove(src_path)
self._seen.discard(src_path)
log.info(f"StashTempWatcher: moved to Whisparr import: {Path(dest).name}")
matched, unmatched = self.whisparr.import_folder(self.import_dir)
if matched:
self.state.stats["whisparr_imported"] += matched
self.discord.success(
"✅ Whisparr Import (via Stash)",
f"**{Path(dest).name}**\nMatched and imported {matched} scene(s).",
)
if unmatched:
self.discord.warn(
"⚠️ Whisparr: Unmatched Files",
f"{unmatched} file(s) in import folder could not be matched.\n"
f"Check Whisparr's manual import UI.",
)
except Exception as e:
log.error(f"StashTempWatcher: error processing {src_path}: {e}")
self.discord.error("❌ stash-temp Processing Error", str(e))
# =============================================================================
# namer Output Watcher (runs in its own thread)
# =============================================================================
class NamerOutputWatcher(threading.Thread):
"""Watches namer's renamed/ and failed/ folders.
- renamed → move to Whisparr import dir, trigger import
- failed → move to Stash library dir, trigger scan+identify
"""
def __init__(self, cfg: Config, whisparr: WhisparrClient,
stash: StashClient, discord: Discord, state: State, video_exts: set):
super().__init__(daemon=True, name="namer-watcher")
self.renamed_dir = cfg.get("paths", "namer_renamed")
self.failed_dir = cfg.get("paths", "namer_failed")
self.import_dir = cfg.get("paths", "whisparr_import")
self.stash_lib = cfg.get("paths", "stash_library")
self.whisparr = whisparr
self.stash = stash
self.discord = discord
self.state = state
self.video_exts = video_exts
self._seen: set = set()
def run(self):
log.info(f"NamerOutputWatcher: watching {self.renamed_dir} and {self.failed_dir}")
while True:
try:
self._scan_dir(self.renamed_dir, self._handle_renamed)
self._scan_dir(self.failed_dir, self._handle_failed)
except Exception as e:
log.error(f"NamerOutputWatcher error: {e}")
time.sleep(10)
def _scan_dir(self, directory: str, handler):
try:
for f in Path(directory).iterdir():
if not f.is_file():
continue
if f.suffix.lower() not in self.video_exts:
continue
fpath = str(f)
key = f"{directory}:{fpath}"
if key in self._seen:
continue
self._seen.add(key)
threading.Thread(
target=handler, args=(fpath,), daemon=True
).start()
except FileNotFoundError:
pass # directory may not exist yet
def _handle_renamed(self, src_path: str):
"""namer successfully renamed → send to Whisparr."""
name = Path(src_path).name
log.info(f"NamerWatcher: renamed file ready: {name}")
try:
dest = copy_file_flat(src_path, self.import_dir)
os.remove(src_path)
matched, unmatched = self.whisparr.import_folder(self.import_dir)
if matched:
self.state.stats["whisparr_imported"] += matched
self.discord.success(
"✅ Whisparr Import",
f"**{name}** → imported {matched} scene(s) into Whisparr.",
)
else:
self.discord.warn(
"⚠️ Whisparr: No Match",
f"**{name}** was renamed by namer but Whisparr couldn't match it.\n"
f"Check Whisparr's manual import UI at `{self.import_dir}`.",
)
except Exception as e:
log.error(f"NamerWatcher: error handling renamed file {src_path}: {e}")
self.discord.error("❌ namer→Whisparr Error", f"**{name}**\n{e}")
def _handle_failed(self, src_path: str):
"""namer could not identify → send to Stash for phash identification."""
name = Path(src_path).name
log.info(f"NamerWatcher: failed file, sending to Stash: {name}")
try:
dest = copy_file_flat(src_path, self.stash_lib)
os.remove(src_path)
self.discord.info(
"🔍 Sending to Stash (namer failed)",
f"**{name}** → `{self.stash_lib}`\nRunning scan + identify via StashDB...",
)
# Scan so Stash knows about the file
scan_ok = self.stash.scan_path(dest)
if not scan_ok:
raise RuntimeError("Stash scan job failed or timed out")
# Identify using StashDB
identify_ok = self.stash.identify_path(dest)
if identify_ok:
self.state.stats["stash_identified"] += 1
self.discord.success(
"✅ Stash Identify Complete",
f"**{name}** identified via StashDB.\n"
f"rename-file-on-update plugin should fire shortly.",
)
else:
self.discord.warn(
"⚠️ Stash Identify Failed/Timed Out",
f"**{name}** — identify job didn't complete cleanly.\n"
f"You may need to run identify manually in Stash.",
)
except Exception as e:
log.error(f"NamerWatcher: error handling failed file {src_path}: {e}")
self.discord.error("❌ namer→Stash Error", f"**{name}**\n{e}")
# =============================================================================
# Main Pipeline (runs in its own thread, polls qBit)
# =============================================================================
class Pipeline:
def __init__(self, cfg: Config, qbit: QBitClient, discord: Discord, state: State):
self.cfg = cfg
self.qbit = qbit
self.discord = discord
self.state = state
self.video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT)))
self.namer_watch = cfg.get("paths", "namer_watch")
self.transient = cfg.get("paths", "transient_dir")
self.namer_ssd_min = cfg.get("disk_space", "namer_ssd_min_free_gb", default=10)
self.move_timeout = cfg.get("timing", "qbit_move_timeout", default=120)
self.move_poll = cfg.get("timing", "qbit_move_poll_interval", default=3)
self._active_hashes: set = set()
def run(self):
poll_interval = self.cfg.get("timing", "qbit_poll_interval", default=15)
log.info("Pipeline: starting main loop")
while True:
try:
if not self.state.paused:
self._tick()
except Exception as e:
log.error(f"Pipeline error: {e}\n{traceback.format_exc()}")
self.discord.error("❌ Pipeline Error", str(e))
time.sleep(poll_interval)
def _tick(self):
completed = self.qbit.get_completed_torrents()
for torrent in completed:
hash_ = torrent["hash"]
if hash_ in self._active_hashes:
continue
self._active_hashes.add(hash_)
threading.Thread(
target=self._process_torrent,
args=(torrent,),
daemon=True,
name=f"torrent-{hash_[:8]}",
).start()
def _process_torrent(self, torrent: dict):
hash_ = torrent["hash"]
name = torrent["name"]
log.info(f"Pipeline: processing torrent '{name}' [{hash_[:8]}]")
self.state.add_job(hash_, name, "checking space")
try:
self._run_pipeline(torrent)
self.state.complete_job(hash_, "success")
except Exception as e:
log.error(f"Pipeline: error on '{name}': {e}\n{traceback.format_exc()}")
self.discord.error(f"❌ Pipeline Failed: {name}", str(e))
self.state.complete_job(hash_, f"error: {e}")
finally:
self._active_hashes.discard(hash_)
def _run_pipeline(self, torrent: dict):
hash_ = torrent["hash"]
name = torrent["name"]
# ── 1. Check free space on namer SSD ──────────────────────────────────
torrent_gb = torrent.get("size", 0) / (1024 ** 3)
self._check_space(self.namer_watch, self.namer_ssd_min, torrent_gb, name)
self.state.update_job(hash_, "setting category: copying")
# ── 2. Set category → 2 - copying (qBit moves files to transient) ────
self.qbit.set_category(hash_, self.qbit.cat_copying)
# ── 3. Wait for qBit to finish moving files to transient dir ──────────
self.state.update_job(hash_, "waiting for qBit file move")
moved = self.qbit.wait_for_move(
hash_, self.transient, self.move_timeout, self.move_poll
)
if not moved:
# Not fatal — qBit might report save_path differently; try to proceed
log.warning(f"Pipeline: qBit save_path didn't update for {name}, proceeding anyway")
# ── 4. Find video files in transient dir for this torrent ─────────────
self.state.update_job(hash_, "copying to namer")
# qBit may have created a subfolder named after the torrent
torrent_dir = os.path.join(self.transient, torrent.get("content_path", "").lstrip("/"))
if not os.path.exists(torrent_dir):
torrent_dir = self.transient # flat download, files directly in transient
video_files = find_video_files(torrent_dir, self.video_exts)
if not video_files:
# Fallback: scan whole transient dir for files matching torrent hash
video_files = find_video_files(self.transient, self.video_exts)
if not video_files:
raise RuntimeError(f"No video files found in transient dir for '{name}'")
log.info(f"Pipeline: found {len(video_files)} video file(s) for '{name}'")
self.discord.info(
f"📋 Processing: {name}",
f"Found {len(video_files)} video file(s). Copying to namer...",
)
# ── 5. Copy video files (flattened) to namer watch dir ────────────────
copied = []
for vf in video_files:
dest = copy_file_flat(vf, self.namer_watch)
copied.append(dest)
log.info(f"Pipeline: copied {len(copied)} file(s) to namer watch dir")
# ── 6. Set category → 3 - seeding (qBit moves torrent to spinning array)
self.state.update_job(hash_, "setting category: seeding")
self.qbit.set_category(hash_, self.qbit.cat_seeding)
log.info(f"Pipeline: torrent '{name}' set to seeding")
self.discord.success(
f"✅ Copied & Seeding: {name}",
f"Copied {len(copied)} video file(s) to namer.\n"
f"Torrent moved to seeding. Waiting for namer to process...",
)
def _check_space(self, path: str, min_free_gb: float, needed_gb: float, name: str):
"""Check free space; pause and wait + alert if insufficient."""
check_interval = self.cfg.get("disk_space", "space_check_interval", default=60)
while True:
available = free_gb(path)
if available >= (min_free_gb + needed_gb):
self.state.set_pause() # clear any previous pause
return
reason = (
f"Low disk space on namer SSD: {available:.1f} GB free, "
f"need {min_free_gb + needed_gb:.1f} GB for '{name}'"
)
if not self.state.paused:
log.warning(f"Pipeline: PAUSED — {reason}")
self.state.set_pause(reason)
self.discord.warn("⏸️ Pipeline Paused — Low Disk Space", reason)
time.sleep(check_interval)
# =============================================================================
# Web UI (Flask, runs in its own thread)
# =============================================================================
def create_web_app(state: State):
from flask import Flask, jsonify, render_template_string
app = Flask(__name__)
HTML = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Media Automator</title>
<meta http-equiv="refresh" content="15">
<style>
:root {
--bg: #0f1117; --surface: #1a1d27; --surface2: #22263a;
--accent: #5865f2; --green: #57f287; --yellow: #fee75c;
--red: #ed4245; --text: #e0e0e0; --muted: #888;
--border: #2e3250; --radius: 8px;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body { background: var(--bg); color: var(--text); font-family: 'Segoe UI', sans-serif;
font-size: 14px; line-height: 1.6; padding: 24px; }
h1 { font-size: 22px; color: #fff; margin-bottom: 4px; }
.subtitle { color: var(--muted); font-size: 12px; margin-bottom: 24px; }
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
gap: 16px; margin-bottom: 24px; }
.stat { background: var(--surface); border: 1px solid var(--border);
border-radius: var(--radius); padding: 16px; }
.stat .label { color: var(--muted); font-size: 11px; text-transform: uppercase;
letter-spacing: .05em; margin-bottom: 6px; }
.stat .value { font-size: 28px; font-weight: 700; color: var(--accent); }
.card { background: var(--surface); border: 1px solid var(--border);
border-radius: var(--radius); padding: 16px; margin-bottom: 16px; }
.card h2 { font-size: 14px; color: var(--muted); text-transform: uppercase;
letter-spacing: .05em; margin-bottom: 12px; }
table { width: 100%; border-collapse: collapse; }
th { text-align: left; color: var(--muted); font-size: 11px; text-transform: uppercase;
padding: 6px 8px; border-bottom: 1px solid var(--border); }
td { padding: 8px; border-bottom: 1px solid var(--border); font-size: 13px; }
tr:last-child td { border-bottom: none; }
.badge { display: inline-block; padding: 2px 8px; border-radius: 20px;
font-size: 11px; font-weight: 600; }
.badge-blue { background: rgba(88,101,242,.2); color: var(--accent); }
.badge-green { background: rgba(87,242,135,.2); color: var(--green); }
.badge-yellow { background: rgba(254,231,92,.2); color: var(--yellow); }
.badge-red { background: rgba(237,66,69,.2); color: var(--red); }
.banner { padding: 12px 16px; border-radius: var(--radius); margin-bottom: 16px;
font-weight: 600; background: rgba(254,231,92,.15);
border: 1px solid rgba(254,231,92,.3); color: var(--yellow); }
.empty { color: var(--muted); font-style: italic; padding: 8px; }
.ts { color: var(--muted); font-size: 11px; }
</style>
</head>
<body>
<h1>📡 Media Automator</h1>
<div class="subtitle">Auto-refreshes every 15s &nbsp;·&nbsp; {{ now }}</div>
{% if snap.paused %}
<div class="banner">⏸️ PAUSED — {{ snap.pause_reason }}</div>
{% endif %}
<div class="grid">
<div class="stat"><div class="label">Processed</div>
<div class="value">{{ snap.stats.total_processed }}</div></div>
<div class="stat"><div class="label">Whisparr Imports</div>
<div class="value" style="color:var(--green)">{{ snap.stats.whisparr_imported }}</div></div>
<div class="stat"><div class="label">Stash Identified</div>
<div class="value" style="color:var(--accent)">{{ snap.stats.stash_identified }}</div></div>
<div class="stat"><div class="label">Errors</div>
<div class="value" style="color:var(--red)">{{ snap.stats.errors }}</div></div>
<div class="stat"><div class="label">Active Jobs</div>
<div class="value" style="color:var(--yellow)">{{ snap.active_jobs|length }}</div></div>
</div>
<div class="card">
<h2>Active Jobs</h2>
{% if snap.active_jobs %}
<table>
<tr><th>Torrent</th><th>Stage</th><th>Started</th></tr>
{% for hash, job in snap.active_jobs.items() %}
<tr>
<td>{{ job.name }}</td>
<td><span class="badge badge-blue">{{ job.stage }}</span></td>
<td class="ts">{{ job.started }}</td>
</tr>
{% endfor %}
</table>
{% else %}<div class="empty">No active jobs.</div>{% endif %}
</div>
<div class="card">
<h2>Recent History</h2>
{% if snap.history %}
<table>
<tr><th>Torrent</th><th>Outcome</th><th>Completed</th></tr>
{% for job in snap.history %}
<tr>
<td>{{ job.name }}</td>
<td>
{% if job.outcome == 'success' %}
<span class="badge badge-green">✓ success</span>
{% else %}
<span class="badge badge-red">✗ {{ job.outcome[:40] }}</span>
{% endif %}
</td>
<td class="ts">{{ job.get('completed', '') }}</td>
</tr>
{% endfor %}
</table>
{% else %}<div class="empty">No completed jobs yet.</div>{% endif %}
</div>
</body>
</html>"""
@app.route("/")
def index():
snap = state.snapshot()
now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
return render_template_string(HTML, snap=snap, now=now)
@app.route("/api/state")
def api_state():
return jsonify(state.snapshot())
return app
# =============================================================================
# Entry Point
# =============================================================================
def main():
config_path = os.environ.get("CONFIG_PATH", "/config/config.yml")
log.info(f"Loading config from {config_path}")
cfg = Config(config_path)
video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT)))
state = State(history_size=cfg.get("webui", "history_size", default=100))
discord = Discord(cfg)
qbit = QBitClient(cfg)
whisparr = WhisparrClient(cfg)
stash = StashClient(cfg)
# Ensure output directories exist
for p in [
cfg.get("paths", "namer_watch"),
cfg.get("paths", "whisparr_import"),
cfg.get("paths", "stash_library"),
cfg.get("paths", "stash_temp"),
]:
if p:
Path(p).mkdir(parents=True, exist_ok=True)
discord.info("🚀 Media Automator Started", "All systems initialised. Pipeline is running.")
# ── Start background watchers ──────────────────────────────────────────────
namer_watcher = NamerOutputWatcher(cfg, whisparr, stash, discord, state, video_exts)
namer_watcher.start()
stash_watcher = StashTempWatcher(cfg, whisparr, discord, state, video_exts)
stash_watcher.start()
# ── Start main pipeline in its own thread ─────────────────────────────────
pipeline = Pipeline(cfg, qbit, discord, state)
pipeline_thread = threading.Thread(target=pipeline.run, daemon=True, name="pipeline")
pipeline_thread.start()
# ── Start web UI (blocks main thread) ─────────────────────────────────────
port = cfg.get("webui", "port", default=8888)
log.info(f"Web UI starting on port {port}")
app = create_web_app(state)
app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False)
if __name__ == "__main__":
main()