965 lines
39 KiB
Python
965 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Media Automator
|
|
Orchestrates the pipeline: qBittorrent → namer → Whisparr / Stash → Whisparr
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import time
|
|
import threading
|
|
import logging
|
|
import json
|
|
import traceback
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from collections import deque
|
|
from typing import Optional
|
|
|
|
import requests
|
|
import yaml
|
|
import psutil
|
|
|
|
# =============================================================================
|
|
# Logging Setup
|
|
# =============================================================================
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
log = logging.getLogger("automator")
|
|
|
|
|
|
# =============================================================================
|
|
# Config Loader
|
|
# =============================================================================
|
|
class Config:
|
|
def __init__(self, path: str = "/config/config.yml"):
|
|
with open(path) as f:
|
|
self._data = yaml.safe_load(f)
|
|
|
|
def __getattr__(self, name):
|
|
return self._data.get(name, {})
|
|
|
|
def get(self, *keys, default=None):
|
|
d = self._data
|
|
for k in keys:
|
|
if not isinstance(d, dict):
|
|
return default
|
|
d = d.get(k, default)
|
|
return d
|
|
|
|
|
|
# =============================================================================
|
|
# Shared State (thread-safe with a lock)
|
|
# =============================================================================
|
|
class State:
|
|
"""Central state store for the automator. Accessed by both the pipeline
|
|
threads and the web UI."""
|
|
|
|
def __init__(self, history_size: int = 100):
|
|
self._lock = threading.Lock()
|
|
self.history: deque = deque(maxlen=history_size)
|
|
self.active_jobs: dict = {} # hash → {stage, name, started}
|
|
self.paused: bool = False
|
|
self.pause_reason: str = ""
|
|
self.stats = {
|
|
"total_processed": 0,
|
|
"whisparr_imported": 0,
|
|
"stash_identified": 0,
|
|
"errors": 0,
|
|
"started_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
def add_job(self, hash_: str, name: str, stage: str):
|
|
with self._lock:
|
|
self.active_jobs[hash_] = {
|
|
"name": name,
|
|
"stage": stage,
|
|
"started": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
def update_job(self, hash_: str, stage: str):
|
|
with self._lock:
|
|
if hash_ in self.active_jobs:
|
|
self.active_jobs[hash_]["stage"] = stage
|
|
|
|
def complete_job(self, hash_: str, outcome: str):
|
|
with self._lock:
|
|
job = self.active_jobs.pop(hash_, None)
|
|
if job:
|
|
job["completed"] = datetime.utcnow().isoformat()
|
|
job["outcome"] = outcome
|
|
self.history.appendleft(job)
|
|
self.stats["total_processed"] += 1
|
|
|
|
def set_pause(self, reason: str = ""):
|
|
with self._lock:
|
|
self.paused = bool(reason)
|
|
self.pause_reason = reason
|
|
|
|
def snapshot(self) -> dict:
|
|
with self._lock:
|
|
return {
|
|
"paused": self.paused,
|
|
"pause_reason": self.pause_reason,
|
|
"active_jobs": dict(self.active_jobs),
|
|
"history": list(self.history),
|
|
"stats": dict(self.stats),
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Discord Notifier
|
|
# =============================================================================
|
|
class Discord:
|
|
COLORS = {"INFO": 0x5865F2, "WARNING": 0xFEE75C, "ERROR": 0xED4245, "SUCCESS": 0x57F287}
|
|
LEVELS = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
|
|
|
|
def __init__(self, cfg: Config):
|
|
self.webhook = cfg.get("discord", "webhook_url", default="")
|
|
self.min_level = self.LEVELS.get(cfg.get("discord", "notify_level", default="INFO"), 1)
|
|
|
|
def _send(self, title: str, message: str, level: str = "INFO"):
|
|
level_val = self.LEVELS.get(level, 1)
|
|
if level_val < self.min_level or not self.webhook:
|
|
return
|
|
color = self.COLORS.get(level, 0x5865F2)
|
|
payload = {
|
|
"embeds": [{
|
|
"title": title,
|
|
"description": message,
|
|
"color": color,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"footer": {"text": "Media Automator"},
|
|
}]
|
|
}
|
|
try:
|
|
requests.post(self.webhook, json=payload, timeout=10)
|
|
except Exception as e:
|
|
log.warning(f"Discord notification failed: {e}")
|
|
|
|
def info(self, title: str, msg: str): self._send(title, msg, "INFO")
|
|
def warn(self, title: str, msg: str): self._send(title, msg, "WARNING")
|
|
def error(self, title: str, msg: str): self._send(title, msg, "ERROR")
|
|
def success(self, title: str, msg: str): self._send(title, msg, "SUCCESS")
|
|
|
|
|
|
# =============================================================================
|
|
# qBittorrent Client
|
|
# =============================================================================
|
|
class QBitClient:
|
|
def __init__(self, cfg: Config):
|
|
qb = cfg.qbittorrent
|
|
self.base = f"http://{qb['host']}:{qb['port']}"
|
|
self.user = qb["username"]
|
|
self.pwd = qb["password"]
|
|
self.cat_downloading = qb["category_downloading"]
|
|
self.cat_copying = qb["category_copying"]
|
|
self.cat_seeding = qb["category_seeding"]
|
|
self._session = requests.Session()
|
|
self._logged_in = False
|
|
|
|
def _login(self):
|
|
r = self._session.post(
|
|
f"{self.base}/api/v2/auth/login",
|
|
data={"username": self.user, "password": self.pwd},
|
|
timeout=10,
|
|
)
|
|
self._logged_in = r.text == "Ok."
|
|
if not self._logged_in:
|
|
raise RuntimeError(f"qBittorrent login failed: {r.text}")
|
|
|
|
def _get(self, path: str, **params):
|
|
if not self._logged_in:
|
|
self._login()
|
|
r = self._session.get(f"{self.base}{path}", params=params, timeout=15)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def _post(self, path: str, data: dict):
|
|
if not self._logged_in:
|
|
self._login()
|
|
r = self._session.post(f"{self.base}{path}", data=data, timeout=15)
|
|
r.raise_for_status()
|
|
return r
|
|
|
|
def get_completed_torrents(self) -> list:
|
|
"""Return all torrents in the 'downloading' category that are 100% done."""
|
|
try:
|
|
torrents = self._get(
|
|
"/api/v2/torrents/info",
|
|
category=self.cat_downloading,
|
|
filter="completed",
|
|
)
|
|
return [t for t in torrents if t.get("progress", 0) >= 1.0]
|
|
except Exception as e:
|
|
log.error(f"qBit: failed to get torrents: {e}")
|
|
return []
|
|
|
|
def get_torrent_files(self, hash_: str) -> list:
|
|
"""Return list of file paths within a torrent."""
|
|
try:
|
|
return self._get("/api/v2/torrents/files", hash=hash_)
|
|
except Exception as e:
|
|
log.error(f"qBit: failed to get files for {hash_}: {e}")
|
|
return []
|
|
|
|
def set_category(self, hash_: str, category: str):
|
|
self._post("/api/v2/torrents/setCategory", {"hashes": hash_, "category": category})
|
|
log.info(f"qBit: set category '{category}' on {hash_[:8]}")
|
|
|
|
def get_torrent_info(self, hash_: str) -> Optional[dict]:
|
|
try:
|
|
results = self._get("/api/v2/torrents/info", hashes=hash_)
|
|
return results[0] if results else None
|
|
except Exception:
|
|
return None
|
|
|
|
def wait_for_move(self, hash_: str, expected_save_path: str,
|
|
timeout: int = 120, poll: int = 3) -> bool:
|
|
"""Block until qBit has moved the torrent files to the expected save path."""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
info = self.get_torrent_info(hash_)
|
|
if info and info.get("save_path", "").rstrip("/") == expected_save_path.rstrip("/"):
|
|
return True
|
|
time.sleep(poll)
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# Whisparr Client
|
|
# =============================================================================
|
|
class WhisparrClient:
|
|
def __init__(self, cfg: Config):
|
|
w = cfg.whisparr
|
|
self.base = f"http://{w['host']}:{w['port']}/api/v3"
|
|
self.headers = {"X-Api-Key": w["api_key"], "Content-Type": "application/json"}
|
|
self.timeout = w.get("import_timeout", 120)
|
|
|
|
def scan_import_folder(self, folder: str) -> list:
|
|
"""Ask Whisparr to scan a folder and return what it found + matched."""
|
|
r = requests.get(
|
|
f"{self.base}/manualimport",
|
|
params={"folder": folder, "filterExistingFiles": "false"},
|
|
headers=self.headers,
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def execute_import(self, import_items: list) -> bool:
|
|
"""Execute a manual import for the given pre-matched items."""
|
|
if not import_items:
|
|
log.warning("Whisparr: no items to import")
|
|
return False
|
|
# Only import items that have a valid scene match
|
|
valid = [i for i in import_items if i.get("scene") or i.get("sceneId")]
|
|
if not valid:
|
|
log.warning("Whisparr: no matched scenes in import list")
|
|
return False
|
|
r = requests.post(
|
|
f"{self.base}/manualimport",
|
|
json=valid,
|
|
headers=self.headers,
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|
|
log.info(f"Whisparr: imported {len(valid)} file(s)")
|
|
return True
|
|
|
|
def import_folder(self, folder: str) -> tuple[int, int]:
|
|
"""Full scan + import cycle. Returns (matched, unmatched) counts."""
|
|
items = self.scan_import_folder(folder)
|
|
if not items:
|
|
return (0, 0)
|
|
matched = [i for i in items if i.get("scene") or i.get("sceneId")]
|
|
unmatched = [i for i in items if not (i.get("scene") or i.get("sceneId"))]
|
|
if matched:
|
|
self.execute_import(matched)
|
|
return (len(matched), len(unmatched))
|
|
|
|
|
|
# =============================================================================
|
|
# Stash GraphQL Client
|
|
# =============================================================================
|
|
class StashClient:
|
|
def __init__(self, cfg: Config):
|
|
s = cfg.stash
|
|
self.endpoint = f"http://{s['host']}:{s['port']}/graphql"
|
|
self.headers = {"ApiKey": s["api_key"], "Content-Type": "application/json"}
|
|
self.stashdb_url = s.get("stashdb_endpoint", "https://stashdb.org/graphql")
|
|
self.job_timeout = s.get("job_timeout", 600)
|
|
self.job_poll = s.get("job_poll_interval", 5)
|
|
|
|
def _gql(self, query: str, variables: dict = None) -> dict:
|
|
payload = {"query": query}
|
|
if variables:
|
|
payload["variables"] = variables
|
|
r = requests.post(self.endpoint, json=payload, headers=self.headers, timeout=30)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
if "errors" in data:
|
|
raise RuntimeError(f"Stash GraphQL error: {data['errors']}")
|
|
return data.get("data", {})
|
|
|
|
def _wait_for_job(self, job_id: str) -> bool:
|
|
"""Poll until a job finishes. Returns True on success, False on failure/timeout."""
|
|
deadline = time.time() + self.job_timeout
|
|
while time.time() < deadline:
|
|
time.sleep(self.job_poll)
|
|
try:
|
|
data = self._gql(
|
|
"query FindJob($id: ID!) { findJob(id: $id) { id status } }",
|
|
{"id": job_id},
|
|
)
|
|
status = data.get("findJob", {}).get("status", "")
|
|
if status == "FINISHED":
|
|
return True
|
|
if status in ("FAILED", "CANCELLED"):
|
|
log.error(f"Stash job {job_id} ended with status: {status}")
|
|
return False
|
|
except Exception as e:
|
|
log.warning(f"Stash: error polling job {job_id}: {e}")
|
|
log.error(f"Stash: job {job_id} timed out after {self.job_timeout}s")
|
|
return False
|
|
|
|
def scan_path(self, path: str) -> bool:
|
|
"""Trigger a metadata scan on a specific path and wait for completion."""
|
|
data = self._gql(
|
|
"""
|
|
mutation Scan($input: ScanMetadataInput!) {
|
|
metadataScan(input: $input)
|
|
}
|
|
""",
|
|
{"input": {"paths": [path]}},
|
|
)
|
|
job_id = data.get("metadataScan")
|
|
if not job_id:
|
|
raise RuntimeError("Stash scan did not return a job ID")
|
|
log.info(f"Stash: scan job {job_id} started for {path}")
|
|
return self._wait_for_job(job_id)
|
|
|
|
def identify_path(self, path: str) -> bool:
|
|
"""Trigger metadata identify on a path using StashDB and wait for completion."""
|
|
data = self._gql(
|
|
"""
|
|
mutation Identify($input: IdentifyMetadataInput!) {
|
|
metadataIdentify(input: $input)
|
|
}
|
|
""",
|
|
{
|
|
"input": {
|
|
"paths": [path],
|
|
"sources": [
|
|
{"source": {"stash_box_endpoint": self.stashdb_url}}
|
|
],
|
|
"options": {
|
|
"fieldOptions": [
|
|
{"field": "title", "strategy": "OVERWRITE"},
|
|
{"field": "date", "strategy": "OVERWRITE"},
|
|
{"field": "studio", "strategy": "OVERWRITE", "createMissing": True},
|
|
{"field": "performers", "strategy": "OVERWRITE", "createMissing": True},
|
|
{"field": "tags", "strategy": "MERGE", "createMissing": True},
|
|
{"field": "stash_ids", "strategy": "MERGE"},
|
|
],
|
|
"setOrganized": False,
|
|
},
|
|
}
|
|
},
|
|
)
|
|
job_id = data.get("metadataIdentify")
|
|
if not job_id:
|
|
raise RuntimeError("Stash identify did not return a job ID")
|
|
log.info(f"Stash: identify job {job_id} started for {path}")
|
|
return self._wait_for_job(job_id)
|
|
|
|
|
|
# =============================================================================
|
|
# Disk Space Helpers
|
|
# =============================================================================
|
|
def free_gb(path: str) -> float:
|
|
"""Return free space in GB for the filesystem containing path."""
|
|
usage = psutil.disk_usage(path)
|
|
return usage.free / (1024 ** 3)
|
|
|
|
|
|
def file_size_gb(path: str) -> float:
|
|
"""Return size in GB of a file or directory."""
|
|
p = Path(path)
|
|
if p.is_file():
|
|
return p.stat().st_size / (1024 ** 3)
|
|
total = sum(f.stat().st_size for f in p.rglob("*") if f.is_file())
|
|
return total / (1024 ** 3)
|
|
|
|
|
|
# =============================================================================
|
|
# File Utilities
|
|
# =============================================================================
|
|
VIDEO_EXTS_DEFAULT = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv",
|
|
".m4v", ".ts", ".m2ts", ".webm", ".divx", ".mpg", ".mpeg"}
|
|
|
|
|
|
def find_video_files(directory: str, extensions: set) -> list:
|
|
"""Return all video files in a directory tree (flattened list)."""
|
|
found = []
|
|
for root, _, files in os.walk(directory):
|
|
for f in files:
|
|
if Path(f).suffix.lower() in extensions:
|
|
found.append(os.path.join(root, f))
|
|
return found
|
|
|
|
|
|
def copy_file_flat(src: str, dest_dir: str) -> str:
|
|
"""Copy src file into dest_dir, using only the filename (no subfolders).
|
|
If a file with the same name exists, appends a counter."""
|
|
name = Path(src).name
|
|
dest = os.path.join(dest_dir, name)
|
|
counter = 1
|
|
while os.path.exists(dest):
|
|
stem = Path(src).stem
|
|
ext = Path(src).suffix
|
|
dest = os.path.join(dest_dir, f"{stem}_{counter}{ext}")
|
|
counter += 1
|
|
shutil.copy2(src, dest)
|
|
log.info(f"Copied: {Path(src).name} → {dest_dir}")
|
|
return dest
|
|
|
|
|
|
def wait_for_file_stable(path: str, stable_seconds: int = 30, poll: int = 5) -> bool:
|
|
"""Wait until a file hasn't changed size for stable_seconds. Returns True when stable."""
|
|
deadline = time.time() + stable_seconds * 10 # max 10x settle time
|
|
last_size = -1
|
|
stable_since = None
|
|
while time.time() < deadline:
|
|
try:
|
|
size = os.path.getsize(path)
|
|
except OSError:
|
|
time.sleep(poll)
|
|
continue
|
|
if size == last_size:
|
|
if stable_since is None:
|
|
stable_since = time.time()
|
|
elif time.time() - stable_since >= stable_seconds:
|
|
return True
|
|
else:
|
|
last_size = size
|
|
stable_since = None
|
|
time.sleep(poll)
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# stash-temp Watcher (runs in its own thread)
|
|
# =============================================================================
|
|
class StashTempWatcher(threading.Thread):
|
|
"""Watches stash-temp for new video files, waits for them to settle,
|
|
then moves them to the Whisparr import dir and triggers a Whisparr import."""
|
|
|
|
def __init__(self, cfg: Config, whisparr: WhisparrClient,
|
|
discord: Discord, state: State, video_exts: set):
|
|
super().__init__(daemon=True, name="stash-temp-watcher")
|
|
self.watch_dir = cfg.get("paths", "stash_temp")
|
|
self.import_dir = cfg.get("paths", "whisparr_import")
|
|
self.settle_secs = cfg.get("timing", "stash_temp_settle_seconds", default=30)
|
|
self.whisparr = whisparr
|
|
self.discord = discord
|
|
self.state = state
|
|
self.video_exts = video_exts
|
|
self._seen: set = set()
|
|
|
|
def run(self):
|
|
log.info(f"StashTempWatcher: watching {self.watch_dir}")
|
|
while True:
|
|
try:
|
|
self._scan()
|
|
except Exception as e:
|
|
log.error(f"StashTempWatcher error: {e}")
|
|
time.sleep(10)
|
|
|
|
def _scan(self):
|
|
for f in Path(self.watch_dir).iterdir():
|
|
if not f.is_file():
|
|
continue
|
|
if f.suffix.lower() not in self.video_exts:
|
|
continue
|
|
fpath = str(f)
|
|
if fpath in self._seen:
|
|
continue
|
|
self._seen.add(fpath)
|
|
# Process in a new thread so we don't block the watcher
|
|
threading.Thread(
|
|
target=self._process,
|
|
args=(fpath,),
|
|
daemon=True,
|
|
name=f"stash-temp-{f.name[:20]}",
|
|
).start()
|
|
|
|
def _process(self, src_path: str):
|
|
log.info(f"StashTempWatcher: new file detected: {Path(src_path).name}")
|
|
# Wait for file to finish being written/renamed by the plugin
|
|
stable = wait_for_file_stable(src_path, self.settle_secs)
|
|
if not stable:
|
|
log.warning(f"StashTempWatcher: file never stabilised: {src_path}")
|
|
|
|
if not os.path.exists(src_path):
|
|
log.info(f"StashTempWatcher: file gone before we could move it: {src_path}")
|
|
self._seen.discard(src_path)
|
|
return
|
|
|
|
try:
|
|
dest = copy_file_flat(src_path, self.import_dir)
|
|
os.remove(src_path)
|
|
self._seen.discard(src_path)
|
|
log.info(f"StashTempWatcher: moved to Whisparr import: {Path(dest).name}")
|
|
|
|
matched, unmatched = self.whisparr.import_folder(self.import_dir)
|
|
if matched:
|
|
self.state.stats["whisparr_imported"] += matched
|
|
self.discord.success(
|
|
"✅ Whisparr Import (via Stash)",
|
|
f"**{Path(dest).name}**\nMatched and imported {matched} scene(s).",
|
|
)
|
|
if unmatched:
|
|
self.discord.warn(
|
|
"⚠️ Whisparr: Unmatched Files",
|
|
f"{unmatched} file(s) in import folder could not be matched.\n"
|
|
f"Check Whisparr's manual import UI.",
|
|
)
|
|
except Exception as e:
|
|
log.error(f"StashTempWatcher: error processing {src_path}: {e}")
|
|
self.discord.error("❌ stash-temp Processing Error", str(e))
|
|
|
|
|
|
# =============================================================================
|
|
# namer Output Watcher (runs in its own thread)
|
|
# =============================================================================
|
|
class NamerOutputWatcher(threading.Thread):
|
|
"""Watches namer's renamed/ and failed/ folders.
|
|
- renamed → move to Whisparr import dir, trigger import
|
|
- failed → move to Stash library dir, trigger scan+identify
|
|
"""
|
|
|
|
def __init__(self, cfg: Config, whisparr: WhisparrClient,
|
|
stash: StashClient, discord: Discord, state: State, video_exts: set):
|
|
super().__init__(daemon=True, name="namer-watcher")
|
|
self.renamed_dir = cfg.get("paths", "namer_renamed")
|
|
self.failed_dir = cfg.get("paths", "namer_failed")
|
|
self.import_dir = cfg.get("paths", "whisparr_import")
|
|
self.stash_lib = cfg.get("paths", "stash_library")
|
|
self.whisparr = whisparr
|
|
self.stash = stash
|
|
self.discord = discord
|
|
self.state = state
|
|
self.video_exts = video_exts
|
|
self._seen: set = set()
|
|
|
|
def run(self):
|
|
log.info(f"NamerOutputWatcher: watching {self.renamed_dir} and {self.failed_dir}")
|
|
while True:
|
|
try:
|
|
self._scan_dir(self.renamed_dir, self._handle_renamed)
|
|
self._scan_dir(self.failed_dir, self._handle_failed)
|
|
except Exception as e:
|
|
log.error(f"NamerOutputWatcher error: {e}")
|
|
time.sleep(10)
|
|
|
|
def _scan_dir(self, directory: str, handler):
|
|
try:
|
|
for f in Path(directory).iterdir():
|
|
if not f.is_file():
|
|
continue
|
|
if f.suffix.lower() not in self.video_exts:
|
|
continue
|
|
fpath = str(f)
|
|
key = f"{directory}:{fpath}"
|
|
if key in self._seen:
|
|
continue
|
|
self._seen.add(key)
|
|
threading.Thread(
|
|
target=handler, args=(fpath,), daemon=True
|
|
).start()
|
|
except FileNotFoundError:
|
|
pass # directory may not exist yet
|
|
|
|
def _handle_renamed(self, src_path: str):
|
|
"""namer successfully renamed → send to Whisparr."""
|
|
name = Path(src_path).name
|
|
log.info(f"NamerWatcher: renamed file ready: {name}")
|
|
try:
|
|
dest = copy_file_flat(src_path, self.import_dir)
|
|
os.remove(src_path)
|
|
|
|
matched, unmatched = self.whisparr.import_folder(self.import_dir)
|
|
if matched:
|
|
self.state.stats["whisparr_imported"] += matched
|
|
self.discord.success(
|
|
"✅ Whisparr Import",
|
|
f"**{name}** → imported {matched} scene(s) into Whisparr.",
|
|
)
|
|
else:
|
|
self.discord.warn(
|
|
"⚠️ Whisparr: No Match",
|
|
f"**{name}** was renamed by namer but Whisparr couldn't match it.\n"
|
|
f"Check Whisparr's manual import UI at `{self.import_dir}`.",
|
|
)
|
|
except Exception as e:
|
|
log.error(f"NamerWatcher: error handling renamed file {src_path}: {e}")
|
|
self.discord.error("❌ namer→Whisparr Error", f"**{name}**\n{e}")
|
|
|
|
def _handle_failed(self, src_path: str):
|
|
"""namer could not identify → send to Stash for phash identification."""
|
|
name = Path(src_path).name
|
|
log.info(f"NamerWatcher: failed file, sending to Stash: {name}")
|
|
try:
|
|
dest = copy_file_flat(src_path, self.stash_lib)
|
|
os.remove(src_path)
|
|
self.discord.info(
|
|
"🔍 Sending to Stash (namer failed)",
|
|
f"**{name}** → `{self.stash_lib}`\nRunning scan + identify via StashDB...",
|
|
)
|
|
|
|
# Scan so Stash knows about the file
|
|
scan_ok = self.stash.scan_path(dest)
|
|
if not scan_ok:
|
|
raise RuntimeError("Stash scan job failed or timed out")
|
|
|
|
# Identify using StashDB
|
|
identify_ok = self.stash.identify_path(dest)
|
|
if identify_ok:
|
|
self.state.stats["stash_identified"] += 1
|
|
self.discord.success(
|
|
"✅ Stash Identify Complete",
|
|
f"**{name}** identified via StashDB.\n"
|
|
f"rename-file-on-update plugin should fire shortly.",
|
|
)
|
|
else:
|
|
self.discord.warn(
|
|
"⚠️ Stash Identify Failed/Timed Out",
|
|
f"**{name}** — identify job didn't complete cleanly.\n"
|
|
f"You may need to run identify manually in Stash.",
|
|
)
|
|
except Exception as e:
|
|
log.error(f"NamerWatcher: error handling failed file {src_path}: {e}")
|
|
self.discord.error("❌ namer→Stash Error", f"**{name}**\n{e}")
|
|
|
|
|
|
# =============================================================================
|
|
# Main Pipeline (runs in its own thread, polls qBit)
|
|
# =============================================================================
|
|
class Pipeline:
|
|
def __init__(self, cfg: Config, qbit: QBitClient, discord: Discord, state: State):
|
|
self.cfg = cfg
|
|
self.qbit = qbit
|
|
self.discord = discord
|
|
self.state = state
|
|
self.video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT)))
|
|
self.namer_watch = cfg.get("paths", "namer_watch")
|
|
self.transient = cfg.get("paths", "transient_dir")
|
|
self.namer_ssd_min = cfg.get("disk_space", "namer_ssd_min_free_gb", default=10)
|
|
self.move_timeout = cfg.get("timing", "qbit_move_timeout", default=120)
|
|
self.move_poll = cfg.get("timing", "qbit_move_poll_interval", default=3)
|
|
self._active_hashes: set = set()
|
|
|
|
def run(self):
|
|
poll_interval = self.cfg.get("timing", "qbit_poll_interval", default=15)
|
|
log.info("Pipeline: starting main loop")
|
|
while True:
|
|
try:
|
|
if not self.state.paused:
|
|
self._tick()
|
|
except Exception as e:
|
|
log.error(f"Pipeline error: {e}\n{traceback.format_exc()}")
|
|
self.discord.error("❌ Pipeline Error", str(e))
|
|
time.sleep(poll_interval)
|
|
|
|
def _tick(self):
|
|
completed = self.qbit.get_completed_torrents()
|
|
for torrent in completed:
|
|
hash_ = torrent["hash"]
|
|
if hash_ in self._active_hashes:
|
|
continue
|
|
self._active_hashes.add(hash_)
|
|
threading.Thread(
|
|
target=self._process_torrent,
|
|
args=(torrent,),
|
|
daemon=True,
|
|
name=f"torrent-{hash_[:8]}",
|
|
).start()
|
|
|
|
def _process_torrent(self, torrent: dict):
|
|
hash_ = torrent["hash"]
|
|
name = torrent["name"]
|
|
log.info(f"Pipeline: processing torrent '{name}' [{hash_[:8]}]")
|
|
self.state.add_job(hash_, name, "checking space")
|
|
|
|
try:
|
|
self._run_pipeline(torrent)
|
|
self.state.complete_job(hash_, "success")
|
|
except Exception as e:
|
|
log.error(f"Pipeline: error on '{name}': {e}\n{traceback.format_exc()}")
|
|
self.discord.error(f"❌ Pipeline Failed: {name}", str(e))
|
|
self.state.complete_job(hash_, f"error: {e}")
|
|
finally:
|
|
self._active_hashes.discard(hash_)
|
|
|
|
def _run_pipeline(self, torrent: dict):
|
|
hash_ = torrent["hash"]
|
|
name = torrent["name"]
|
|
|
|
# ── 1. Check free space on namer SSD ──────────────────────────────────
|
|
torrent_gb = torrent.get("size", 0) / (1024 ** 3)
|
|
self._check_space(self.namer_watch, self.namer_ssd_min, torrent_gb, name)
|
|
|
|
self.state.update_job(hash_, "setting category: copying")
|
|
|
|
# ── 2. Set category → 2 - copying (qBit moves files to transient) ────
|
|
self.qbit.set_category(hash_, self.qbit.cat_copying)
|
|
|
|
# ── 3. Wait for qBit to finish moving files to transient dir ──────────
|
|
self.state.update_job(hash_, "waiting for qBit file move")
|
|
moved = self.qbit.wait_for_move(
|
|
hash_, self.transient, self.move_timeout, self.move_poll
|
|
)
|
|
if not moved:
|
|
# Not fatal — qBit might report save_path differently; try to proceed
|
|
log.warning(f"Pipeline: qBit save_path didn't update for {name}, proceeding anyway")
|
|
|
|
# ── 4. Find video files in transient dir for this torrent ─────────────
|
|
self.state.update_job(hash_, "copying to namer")
|
|
# qBit may have created a subfolder named after the torrent
|
|
torrent_dir = os.path.join(self.transient, torrent.get("content_path", "").lstrip("/"))
|
|
if not os.path.exists(torrent_dir):
|
|
torrent_dir = self.transient # flat download, files directly in transient
|
|
|
|
video_files = find_video_files(torrent_dir, self.video_exts)
|
|
if not video_files:
|
|
# Fallback: scan whole transient dir for files matching torrent hash
|
|
video_files = find_video_files(self.transient, self.video_exts)
|
|
|
|
if not video_files:
|
|
raise RuntimeError(f"No video files found in transient dir for '{name}'")
|
|
|
|
log.info(f"Pipeline: found {len(video_files)} video file(s) for '{name}'")
|
|
self.discord.info(
|
|
f"📋 Processing: {name}",
|
|
f"Found {len(video_files)} video file(s). Copying to namer...",
|
|
)
|
|
|
|
# ── 5. Copy video files (flattened) to namer watch dir ────────────────
|
|
copied = []
|
|
for vf in video_files:
|
|
dest = copy_file_flat(vf, self.namer_watch)
|
|
copied.append(dest)
|
|
|
|
log.info(f"Pipeline: copied {len(copied)} file(s) to namer watch dir")
|
|
|
|
# ── 6. Set category → 3 - seeding (qBit moves torrent to spinning array)
|
|
self.state.update_job(hash_, "setting category: seeding")
|
|
self.qbit.set_category(hash_, self.qbit.cat_seeding)
|
|
log.info(f"Pipeline: torrent '{name}' set to seeding")
|
|
self.discord.success(
|
|
f"✅ Copied & Seeding: {name}",
|
|
f"Copied {len(copied)} video file(s) to namer.\n"
|
|
f"Torrent moved to seeding. Waiting for namer to process...",
|
|
)
|
|
|
|
def _check_space(self, path: str, min_free_gb: float, needed_gb: float, name: str):
|
|
"""Check free space; pause and wait + alert if insufficient."""
|
|
check_interval = self.cfg.get("disk_space", "space_check_interval", default=60)
|
|
while True:
|
|
available = free_gb(path)
|
|
if available >= (min_free_gb + needed_gb):
|
|
self.state.set_pause() # clear any previous pause
|
|
return
|
|
reason = (
|
|
f"Low disk space on namer SSD: {available:.1f} GB free, "
|
|
f"need {min_free_gb + needed_gb:.1f} GB for '{name}'"
|
|
)
|
|
if not self.state.paused:
|
|
log.warning(f"Pipeline: PAUSED — {reason}")
|
|
self.state.set_pause(reason)
|
|
self.discord.warn("⏸️ Pipeline Paused — Low Disk Space", reason)
|
|
time.sleep(check_interval)
|
|
|
|
|
|
# =============================================================================
|
|
# Web UI (Flask, runs in its own thread)
|
|
# =============================================================================
|
|
def create_web_app(state: State):
|
|
from flask import Flask, jsonify, render_template_string
|
|
app = Flask(__name__)
|
|
|
|
HTML = """<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>Media Automator</title>
|
|
<meta http-equiv="refresh" content="15">
|
|
<style>
|
|
:root {
|
|
--bg: #0f1117; --surface: #1a1d27; --surface2: #22263a;
|
|
--accent: #5865f2; --green: #57f287; --yellow: #fee75c;
|
|
--red: #ed4245; --text: #e0e0e0; --muted: #888;
|
|
--border: #2e3250; --radius: 8px;
|
|
}
|
|
* { box-sizing: border-box; margin: 0; padding: 0; }
|
|
body { background: var(--bg); color: var(--text); font-family: 'Segoe UI', sans-serif;
|
|
font-size: 14px; line-height: 1.6; padding: 24px; }
|
|
h1 { font-size: 22px; color: #fff; margin-bottom: 4px; }
|
|
.subtitle { color: var(--muted); font-size: 12px; margin-bottom: 24px; }
|
|
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
|
|
gap: 16px; margin-bottom: 24px; }
|
|
.stat { background: var(--surface); border: 1px solid var(--border);
|
|
border-radius: var(--radius); padding: 16px; }
|
|
.stat .label { color: var(--muted); font-size: 11px; text-transform: uppercase;
|
|
letter-spacing: .05em; margin-bottom: 6px; }
|
|
.stat .value { font-size: 28px; font-weight: 700; color: var(--accent); }
|
|
.card { background: var(--surface); border: 1px solid var(--border);
|
|
border-radius: var(--radius); padding: 16px; margin-bottom: 16px; }
|
|
.card h2 { font-size: 14px; color: var(--muted); text-transform: uppercase;
|
|
letter-spacing: .05em; margin-bottom: 12px; }
|
|
table { width: 100%; border-collapse: collapse; }
|
|
th { text-align: left; color: var(--muted); font-size: 11px; text-transform: uppercase;
|
|
padding: 6px 8px; border-bottom: 1px solid var(--border); }
|
|
td { padding: 8px; border-bottom: 1px solid var(--border); font-size: 13px; }
|
|
tr:last-child td { border-bottom: none; }
|
|
.badge { display: inline-block; padding: 2px 8px; border-radius: 20px;
|
|
font-size: 11px; font-weight: 600; }
|
|
.badge-blue { background: rgba(88,101,242,.2); color: var(--accent); }
|
|
.badge-green { background: rgba(87,242,135,.2); color: var(--green); }
|
|
.badge-yellow { background: rgba(254,231,92,.2); color: var(--yellow); }
|
|
.badge-red { background: rgba(237,66,69,.2); color: var(--red); }
|
|
.banner { padding: 12px 16px; border-radius: var(--radius); margin-bottom: 16px;
|
|
font-weight: 600; background: rgba(254,231,92,.15);
|
|
border: 1px solid rgba(254,231,92,.3); color: var(--yellow); }
|
|
.empty { color: var(--muted); font-style: italic; padding: 8px; }
|
|
.ts { color: var(--muted); font-size: 11px; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1>📡 Media Automator</h1>
|
|
<div class="subtitle">Auto-refreshes every 15s · {{ now }}</div>
|
|
|
|
{% if snap.paused %}
|
|
<div class="banner">⏸️ PAUSED — {{ snap.pause_reason }}</div>
|
|
{% endif %}
|
|
|
|
<div class="grid">
|
|
<div class="stat"><div class="label">Processed</div>
|
|
<div class="value">{{ snap.stats.total_processed }}</div></div>
|
|
<div class="stat"><div class="label">Whisparr Imports</div>
|
|
<div class="value" style="color:var(--green)">{{ snap.stats.whisparr_imported }}</div></div>
|
|
<div class="stat"><div class="label">Stash Identified</div>
|
|
<div class="value" style="color:var(--accent)">{{ snap.stats.stash_identified }}</div></div>
|
|
<div class="stat"><div class="label">Errors</div>
|
|
<div class="value" style="color:var(--red)">{{ snap.stats.errors }}</div></div>
|
|
<div class="stat"><div class="label">Active Jobs</div>
|
|
<div class="value" style="color:var(--yellow)">{{ snap.active_jobs|length }}</div></div>
|
|
</div>
|
|
|
|
<div class="card">
|
|
<h2>Active Jobs</h2>
|
|
{% if snap.active_jobs %}
|
|
<table>
|
|
<tr><th>Torrent</th><th>Stage</th><th>Started</th></tr>
|
|
{% for hash, job in snap.active_jobs.items() %}
|
|
<tr>
|
|
<td>{{ job.name }}</td>
|
|
<td><span class="badge badge-blue">{{ job.stage }}</span></td>
|
|
<td class="ts">{{ job.started }}</td>
|
|
</tr>
|
|
{% endfor %}
|
|
</table>
|
|
{% else %}<div class="empty">No active jobs.</div>{% endif %}
|
|
</div>
|
|
|
|
<div class="card">
|
|
<h2>Recent History</h2>
|
|
{% if snap.history %}
|
|
<table>
|
|
<tr><th>Torrent</th><th>Outcome</th><th>Completed</th></tr>
|
|
{% for job in snap.history %}
|
|
<tr>
|
|
<td>{{ job.name }}</td>
|
|
<td>
|
|
{% if job.outcome == 'success' %}
|
|
<span class="badge badge-green">✓ success</span>
|
|
{% else %}
|
|
<span class="badge badge-red">✗ {{ job.outcome[:40] }}</span>
|
|
{% endif %}
|
|
</td>
|
|
<td class="ts">{{ job.get('completed', '—') }}</td>
|
|
</tr>
|
|
{% endfor %}
|
|
</table>
|
|
{% else %}<div class="empty">No completed jobs yet.</div>{% endif %}
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
|
|
@app.route("/")
|
|
def index():
|
|
snap = state.snapshot()
|
|
now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
return render_template_string(HTML, snap=snap, now=now)
|
|
|
|
@app.route("/api/state")
|
|
def api_state():
|
|
return jsonify(state.snapshot())
|
|
|
|
return app
|
|
|
|
|
|
# =============================================================================
|
|
# Entry Point
|
|
# =============================================================================
|
|
def main():
|
|
config_path = os.environ.get("CONFIG_PATH", "/config/config.yml")
|
|
log.info(f"Loading config from {config_path}")
|
|
cfg = Config(config_path)
|
|
|
|
video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT)))
|
|
|
|
state = State(history_size=cfg.get("webui", "history_size", default=100))
|
|
discord = Discord(cfg)
|
|
qbit = QBitClient(cfg)
|
|
whisparr = WhisparrClient(cfg)
|
|
stash = StashClient(cfg)
|
|
|
|
# Ensure output directories exist
|
|
for p in [
|
|
cfg.get("paths", "namer_watch"),
|
|
cfg.get("paths", "whisparr_import"),
|
|
cfg.get("paths", "stash_library"),
|
|
cfg.get("paths", "stash_temp"),
|
|
]:
|
|
if p:
|
|
Path(p).mkdir(parents=True, exist_ok=True)
|
|
|
|
discord.info("🚀 Media Automator Started", "All systems initialised. Pipeline is running.")
|
|
|
|
# ── Start background watchers ──────────────────────────────────────────────
|
|
namer_watcher = NamerOutputWatcher(cfg, whisparr, stash, discord, state, video_exts)
|
|
namer_watcher.start()
|
|
|
|
stash_watcher = StashTempWatcher(cfg, whisparr, discord, state, video_exts)
|
|
stash_watcher.start()
|
|
|
|
# ── Start main pipeline in its own thread ─────────────────────────────────
|
|
pipeline = Pipeline(cfg, qbit, discord, state)
|
|
pipeline_thread = threading.Thread(target=pipeline.run, daemon=True, name="pipeline")
|
|
pipeline_thread.start()
|
|
|
|
# ── Start web UI (blocks main thread) ─────────────────────────────────────
|
|
port = cfg.get("webui", "port", default=8888)
|
|
log.info(f"Web UI starting on port {port}")
|
|
app = create_web_app(state)
|
|
app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|