From e6ba4970cf0ea42821baa03c89e0f7daea344b1a Mon Sep 17 00:00:00 2001 From: admin_jk Date: Sun, 1 Mar 2026 15:15:42 -0700 Subject: [PATCH] porntumate add --- lxc3/porntumate/Dockerfile | 23 + lxc3/porntumate/README.md | 136 ++++ lxc3/porntumate/config.yml | 121 ++++ lxc3/porntumate/docker-compose.yml | 58 ++ lxc3/porntumate/main.py | 965 +++++++++++++++++++++++++++++ lxc3/porntumate/requirements.txt | 6 + 6 files changed, 1309 insertions(+) create mode 100644 lxc3/porntumate/Dockerfile create mode 100644 lxc3/porntumate/README.md create mode 100644 lxc3/porntumate/config.yml create mode 100644 lxc3/porntumate/docker-compose.yml create mode 100644 lxc3/porntumate/main.py create mode 100644 lxc3/porntumate/requirements.txt diff --git a/lxc3/porntumate/Dockerfile b/lxc3/porntumate/Dockerfile new file mode 100644 index 0000000..edd6200 --- /dev/null +++ b/lxc3/porntumate/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system deps +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ . + +# Config is mounted at runtime +VOLUME ["/config"] + +EXPOSE 8888 + +HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ + CMD curl -f http://localhost:8888/ || exit 1 + +CMD ["python", "main.py"] diff --git a/lxc3/porntumate/README.md b/lxc3/porntumate/README.md new file mode 100644 index 0000000..741cc6d --- /dev/null +++ b/lxc3/porntumate/README.md @@ -0,0 +1,136 @@ +# Media Automator + +Automates the pipeline: + +``` +qBittorrent (Mint VM) + ↓ [completed torrent detected] +Category: 2 - copying → files move to transient dir + ↓ [copy video files, flattened] +namer watch/ + ├── renamed/ → Whisparr import dir → Whisparr manual import API + └── failed/ → Stash library dir → Stash scan + identify (StashDB) + ↓ [rename plugin fires] + stash-temp/ → Whisparr import dir → import +Category: 3 - seeding → files move to spinning array +``` + +--- + +## Setup + +### 1. Prerequisites + +- Docker + Docker Compose on your LXC +- The Mint VM's NFS shares mounted on the LXC (see below) +- qBittorrent Web UI enabled on the Mint VM +- Whisparr and Stash running and accessible from the LXC + +### 2. NFS Mounts (LXC side) + +The automator needs to see the Mint VM's SSD paths. On your LXC, add to `/etc/fstab`: + +``` +# Mint VM transient dir +:/transfer /mnt/ssd1/transfer nfs defaults,_netdev 0 0 +``` + +Then: `mount -a` + +Verify the path `/mnt/ssd1/transfer/2 - ready for copy/` is accessible from the LXC. + +### 3. qBittorrent Categories + +In qBittorrent on the Mint VM, configure these categories with their save paths: + +| Category | Save Path | +|------------------|----------------------------------------| +| `1 - downloading`| `/mnt/nvme/home/jkilloran/torrenting/downloads/` | +| `2 - copying` | `/transfer/2 - ready for copy/` | +| `3 - seeding` | `/qbt/taz-transfer/seeding/` | + +Make sure "Automatically manage torrent location" is enabled per category. + +### 4. Configure + +Copy `config.yml` to your LXC and fill in all `< >` placeholders: + +```bash +# On the LXC, in your docker-compose directory: +nano config.yml +``` + +Key values to fill in: +- `qbittorrent.host` — Mint VM IP address +- `qbittorrent.username` / `password` — qBit Web UI credentials +- `whisparr.host` / `api_key` +- `stash.host` / `api_key` +- `discord.webhook_url` — from Server Settings → Integrations → Webhooks + +### 5. Build and Run + +```bash +docker compose up -d --build +docker compose logs -f # watch the logs +``` + +Web UI is at: `http://:8888` + +--- + +## Flow Details + +### Space Management + +Before processing each torrent the automator checks: +- Free space on the namer SSD ≥ `namer_ssd_min_free_gb` + torrent size +- If not enough space: sends a Discord alert, pauses, and retries every `space_check_interval` seconds +- Resumes automatically once space is available (no intervention needed) + +### Disk Space Alerts + +You'll get Discord alerts for: +- Pipeline paused due to low space (and when it resumes) +- Each torrent starting to process +- Successful Whisparr import +- Files sent to Stash + identify result +- Any errors + +### namer failed files + +When namer can't identify a file it goes to `stash/`. The automator: +1. Moves the file to your Stash library path +2. Triggers `metadataScan` → waits for completion +3. Triggers `metadataIdentify` using StashDB → waits for completion +4. The `rename-file-on-update` plugin fires automatically → file appears in `stash-temp/` +5. Automator detects the file in `stash-temp/`, waits 30s for it to stabilise, moves to Whisparr import dir +6. Triggers Whisparr manual import + +--- + +## Troubleshooting + +### qBit category change isn't moving files +- Verify "Automatic torrent management" is enabled for each category in qBit settings +- Check the save paths match exactly what's in the qBit category config + +### Whisparr import returns 0 matches +- The file naming matters — namer should produce `Studio.YYYY-MM-DD.Title.ext` format +- Check Whisparr's manual import UI for the file to see why it's not matching + +### Stash identify never fires the rename plugin +- Confirm `rename-file-on-update` is enabled in Stash Settings → Plugins +- The plugin fires on scene update, not scan — identify must successfully update the scene + +### Permission errors +- Uncomment `user: "1000:1000"` in `docker-compose.yml` and set to your LXC user's UID:GID +- Check NFS mount options on the LXC + +--- + +## Adding the Docker qBittorrent Instance Later + +When you're ready to add the LXC's qBit container to the same pipeline, the config +will gain a `qbittorrent_docker:` section with its own host/port/credentials, and the +automator will poll both instances in the same main loop. Paths will be LXC-native +(no NFS needed for that instance). diff --git a/lxc3/porntumate/config.yml b/lxc3/porntumate/config.yml new file mode 100644 index 0000000..96fdedd --- /dev/null +++ b/lxc3/porntumate/config.yml @@ -0,0 +1,121 @@ +# ============================================================================= +# Media Automator - Configuration +# Fill in all values marked with < > before running +# ============================================================================= + +# ----------------------------------------------------------------------------- +# qBittorrent (Mint VM instance - private trackers) +# ----------------------------------------------------------------------------- +qbittorrent: + host: "" # e.g. 192.168.1.50 + port: 8080 + username: "" + password: "" + # Category names - must match exactly what's configured in qBittorrent + category_downloading: "1 - downloading" + category_copying: "2 - copying" + category_seeding: "3 - seeding" + +# ----------------------------------------------------------------------------- +# Whisparr +# ----------------------------------------------------------------------------- +whisparr: + host: "" # e.g. 192.168.1.100 + port: 6969 + api_key: "" + # How long to wait (seconds) for Whisparr to match files before giving up + import_timeout: 120 + +# ----------------------------------------------------------------------------- +# Stash +# ----------------------------------------------------------------------------- +stash: + host: "" # e.g. 192.168.1.100 + port: 9999 + api_key: "" + stashdb_endpoint: "https://stashdb.org/graphql" + # How long to poll (seconds) for scan/identify jobs to complete + job_timeout: 600 + job_poll_interval: 5 + +# ----------------------------------------------------------------------------- +# Discord Notifications +# ----------------------------------------------------------------------------- +discord: + webhook_url: "" + # Minimum level to notify: DEBUG, INFO, WARNING, ERROR + notify_level: "INFO" + +# ----------------------------------------------------------------------------- +# Paths (as seen from the automator container / LXC) +# ----------------------------------------------------------------------------- +paths: + # Where the transient "ready for copy" folder is (LXC/NFS view of Mint SSD) + transient_dir: "/mnt/ssd1/transfer/2 - ready for copy" + + # namer subfolders (base dir + subdirs namer uses) + namer_base: "/mnt/ssd3/namer" + namer_watch: "/mnt/ssd3/namer/watch" + namer_work: "/mnt/ssd3/namer/work" + namer_renamed: "/mnt/ssd3/namer/renamed" + namer_failed: "/mnt/ssd3/namer/failed" + + # Where Whisparr picks up files for import + whisparr_import: "/pool/other/root/import" + + # Where Stash stores its library (failed namer files land here) + stash_library: "/pool/other/root/stash" + + # Where the rename-file-on-update plugin outputs renamed files + stash_temp: "/pool/other/root/stash-temp" + +# ----------------------------------------------------------------------------- +# Disk Space Management +# ----------------------------------------------------------------------------- +disk_space: + # Minimum free space (GB) required on the namer SSD before copying a torrent + namer_ssd_min_free_gb: 10 + # Minimum free space (GB) required on the whisparr import SSD + import_min_free_gb: 5 + # How often (seconds) to re-check space when paused waiting for room + space_check_interval: 60 + +# ----------------------------------------------------------------------------- +# Timing / Polling +# ----------------------------------------------------------------------------- +timing: + # How often (seconds) the main loop polls qBittorrent for completed torrents + qbit_poll_interval: 15 + # How long (seconds) a file in stash-temp must be stable before we move it + stash_temp_settle_seconds: 30 + # How long (seconds) to wait for category change + file move in qBit + qbit_move_timeout: 120 + # How often (seconds) to check if qBit has finished moving files + qbit_move_poll_interval: 3 + +# ----------------------------------------------------------------------------- +# Web UI +# ----------------------------------------------------------------------------- +webui: + port: 8888 + # How many completed items to keep in the history log + history_size: 100 + +# ----------------------------------------------------------------------------- +# File Filtering +# Extensions we treat as video files (everything else is ignored) +# ----------------------------------------------------------------------------- +video_extensions: + - .mp4 + - .mkv + - .avi + - .mov + - .wmv + - .flv + - .m4v + - .ts + - .m2ts + - .webm + - .divx + - .mpg + - .mpeg diff --git a/lxc3/porntumate/docker-compose.yml b/lxc3/porntumate/docker-compose.yml new file mode 100644 index 0000000..8639dbf --- /dev/null +++ b/lxc3/porntumate/docker-compose.yml @@ -0,0 +1,58 @@ +--- +# Media Automator - Docker Compose +# Place this file alongside config.yml on your LXC +# Run: docker compose up -d + +services: + media-automator: + build: . + # Or use a pre-built image if you push one: + # image: your-registry/media-automator:latest + container_name: media-automator + restart: unless-stopped + + environment: + - CONFIG_PATH=/config/config.yml + - TZ=America/New_York # <-- set your timezone + + volumes: + # Config file + - ./config.yml:/config/config.yml:ro + + # namer folders (LXC paths) + - /mnt/ssd3/namer:/mnt/ssd3/namer + + # Transient / ready-for-copy (NFS mount from Mint VM, LXC path) + - /mnt/ssd1/transfer:/mnt/ssd1/transfer + + # Whisparr import dir + - /pool/other/root/import:/pool/other/root/import + + # Stash library dir + - /pool/other/root/stash:/pool/other/root/stash + + # Stash temp dir (plugin output) + - /pool/other/root/stash-temp:/pool/other/root/stash-temp + + # Seeding dir (optional - automator doesn't write here, but useful for visibility) + # - /mnt/hdd15/taz-transfer/seeding:/mnt/hdd15/taz-transfer/seeding:ro + + ports: + - "8888:8888" # Web UI - change left side if port is taken + + # Make sure this starts after your other containers + depends_on: [] + # Uncomment and fill in if you use container names instead of IPs: + # depends_on: + # - whisparr + # - stash + + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + + # Give the container read/write on mounted paths + # If you run into permission issues, set this to your LXC user's UID:GID + # user: "1000:1000" diff --git a/lxc3/porntumate/main.py b/lxc3/porntumate/main.py new file mode 100644 index 0000000..52be297 --- /dev/null +++ b/lxc3/porntumate/main.py @@ -0,0 +1,965 @@ +#!/usr/bin/env python3 +""" +Media Automator +Orchestrates the pipeline: qBittorrent → namer → Whisparr / Stash → Whisparr +""" + +import os +import shutil +import time +import threading +import logging +import json +import traceback +from datetime import datetime +from pathlib import Path +from collections import deque +from typing import Optional + +import requests +import yaml +import psutil + +# ============================================================================= +# Logging Setup +# ============================================================================= +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("automator") + + +# ============================================================================= +# Config Loader +# ============================================================================= +class Config: + def __init__(self, path: str = "/config/config.yml"): + with open(path) as f: + self._data = yaml.safe_load(f) + + def __getattr__(self, name): + return self._data.get(name, {}) + + def get(self, *keys, default=None): + d = self._data + for k in keys: + if not isinstance(d, dict): + return default + d = d.get(k, default) + return d + + +# ============================================================================= +# Shared State (thread-safe with a lock) +# ============================================================================= +class State: + """Central state store for the automator. Accessed by both the pipeline + threads and the web UI.""" + + def __init__(self, history_size: int = 100): + self._lock = threading.Lock() + self.history: deque = deque(maxlen=history_size) + self.active_jobs: dict = {} # hash → {stage, name, started} + self.paused: bool = False + self.pause_reason: str = "" + self.stats = { + "total_processed": 0, + "whisparr_imported": 0, + "stash_identified": 0, + "errors": 0, + "started_at": datetime.utcnow().isoformat(), + } + + def add_job(self, hash_: str, name: str, stage: str): + with self._lock: + self.active_jobs[hash_] = { + "name": name, + "stage": stage, + "started": datetime.utcnow().isoformat(), + } + + def update_job(self, hash_: str, stage: str): + with self._lock: + if hash_ in self.active_jobs: + self.active_jobs[hash_]["stage"] = stage + + def complete_job(self, hash_: str, outcome: str): + with self._lock: + job = self.active_jobs.pop(hash_, None) + if job: + job["completed"] = datetime.utcnow().isoformat() + job["outcome"] = outcome + self.history.appendleft(job) + self.stats["total_processed"] += 1 + + def set_pause(self, reason: str = ""): + with self._lock: + self.paused = bool(reason) + self.pause_reason = reason + + def snapshot(self) -> dict: + with self._lock: + return { + "paused": self.paused, + "pause_reason": self.pause_reason, + "active_jobs": dict(self.active_jobs), + "history": list(self.history), + "stats": dict(self.stats), + } + + +# ============================================================================= +# Discord Notifier +# ============================================================================= +class Discord: + COLORS = {"INFO": 0x5865F2, "WARNING": 0xFEE75C, "ERROR": 0xED4245, "SUCCESS": 0x57F287} + LEVELS = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3} + + def __init__(self, cfg: Config): + self.webhook = cfg.get("discord", "webhook_url", default="") + self.min_level = self.LEVELS.get(cfg.get("discord", "notify_level", default="INFO"), 1) + + def _send(self, title: str, message: str, level: str = "INFO"): + level_val = self.LEVELS.get(level, 1) + if level_val < self.min_level or not self.webhook: + return + color = self.COLORS.get(level, 0x5865F2) + payload = { + "embeds": [{ + "title": title, + "description": message, + "color": color, + "timestamp": datetime.utcnow().isoformat(), + "footer": {"text": "Media Automator"}, + }] + } + try: + requests.post(self.webhook, json=payload, timeout=10) + except Exception as e: + log.warning(f"Discord notification failed: {e}") + + def info(self, title: str, msg: str): self._send(title, msg, "INFO") + def warn(self, title: str, msg: str): self._send(title, msg, "WARNING") + def error(self, title: str, msg: str): self._send(title, msg, "ERROR") + def success(self, title: str, msg: str): self._send(title, msg, "SUCCESS") + + +# ============================================================================= +# qBittorrent Client +# ============================================================================= +class QBitClient: + def __init__(self, cfg: Config): + qb = cfg.qbittorrent + self.base = f"http://{qb['host']}:{qb['port']}" + self.user = qb["username"] + self.pwd = qb["password"] + self.cat_downloading = qb["category_downloading"] + self.cat_copying = qb["category_copying"] + self.cat_seeding = qb["category_seeding"] + self._session = requests.Session() + self._logged_in = False + + def _login(self): + r = self._session.post( + f"{self.base}/api/v2/auth/login", + data={"username": self.user, "password": self.pwd}, + timeout=10, + ) + self._logged_in = r.text == "Ok." + if not self._logged_in: + raise RuntimeError(f"qBittorrent login failed: {r.text}") + + def _get(self, path: str, **params): + if not self._logged_in: + self._login() + r = self._session.get(f"{self.base}{path}", params=params, timeout=15) + r.raise_for_status() + return r.json() + + def _post(self, path: str, data: dict): + if not self._logged_in: + self._login() + r = self._session.post(f"{self.base}{path}", data=data, timeout=15) + r.raise_for_status() + return r + + def get_completed_torrents(self) -> list: + """Return all torrents in the 'downloading' category that are 100% done.""" + try: + torrents = self._get( + "/api/v2/torrents/info", + category=self.cat_downloading, + filter="completed", + ) + return [t for t in torrents if t.get("progress", 0) >= 1.0] + except Exception as e: + log.error(f"qBit: failed to get torrents: {e}") + return [] + + def get_torrent_files(self, hash_: str) -> list: + """Return list of file paths within a torrent.""" + try: + return self._get("/api/v2/torrents/files", hash=hash_) + except Exception as e: + log.error(f"qBit: failed to get files for {hash_}: {e}") + return [] + + def set_category(self, hash_: str, category: str): + self._post("/api/v2/torrents/setCategory", {"hashes": hash_, "category": category}) + log.info(f"qBit: set category '{category}' on {hash_[:8]}") + + def get_torrent_info(self, hash_: str) -> Optional[dict]: + try: + results = self._get("/api/v2/torrents/info", hashes=hash_) + return results[0] if results else None + except Exception: + return None + + def wait_for_move(self, hash_: str, expected_save_path: str, + timeout: int = 120, poll: int = 3) -> bool: + """Block until qBit has moved the torrent files to the expected save path.""" + deadline = time.time() + timeout + while time.time() < deadline: + info = self.get_torrent_info(hash_) + if info and info.get("save_path", "").rstrip("/") == expected_save_path.rstrip("/"): + return True + time.sleep(poll) + return False + + +# ============================================================================= +# Whisparr Client +# ============================================================================= +class WhisparrClient: + def __init__(self, cfg: Config): + w = cfg.whisparr + self.base = f"http://{w['host']}:{w['port']}/api/v3" + self.headers = {"X-Api-Key": w["api_key"], "Content-Type": "application/json"} + self.timeout = w.get("import_timeout", 120) + + def scan_import_folder(self, folder: str) -> list: + """Ask Whisparr to scan a folder and return what it found + matched.""" + r = requests.get( + f"{self.base}/manualimport", + params={"folder": folder, "filterExistingFiles": "false"}, + headers=self.headers, + timeout=30, + ) + r.raise_for_status() + return r.json() + + def execute_import(self, import_items: list) -> bool: + """Execute a manual import for the given pre-matched items.""" + if not import_items: + log.warning("Whisparr: no items to import") + return False + # Only import items that have a valid scene match + valid = [i for i in import_items if i.get("scene") or i.get("sceneId")] + if not valid: + log.warning("Whisparr: no matched scenes in import list") + return False + r = requests.post( + f"{self.base}/manualimport", + json=valid, + headers=self.headers, + timeout=30, + ) + r.raise_for_status() + log.info(f"Whisparr: imported {len(valid)} file(s)") + return True + + def import_folder(self, folder: str) -> tuple[int, int]: + """Full scan + import cycle. Returns (matched, unmatched) counts.""" + items = self.scan_import_folder(folder) + if not items: + return (0, 0) + matched = [i for i in items if i.get("scene") or i.get("sceneId")] + unmatched = [i for i in items if not (i.get("scene") or i.get("sceneId"))] + if matched: + self.execute_import(matched) + return (len(matched), len(unmatched)) + + +# ============================================================================= +# Stash GraphQL Client +# ============================================================================= +class StashClient: + def __init__(self, cfg: Config): + s = cfg.stash + self.endpoint = f"http://{s['host']}:{s['port']}/graphql" + self.headers = {"ApiKey": s["api_key"], "Content-Type": "application/json"} + self.stashdb_url = s.get("stashdb_endpoint", "https://stashdb.org/graphql") + self.job_timeout = s.get("job_timeout", 600) + self.job_poll = s.get("job_poll_interval", 5) + + def _gql(self, query: str, variables: dict = None) -> dict: + payload = {"query": query} + if variables: + payload["variables"] = variables + r = requests.post(self.endpoint, json=payload, headers=self.headers, timeout=30) + r.raise_for_status() + data = r.json() + if "errors" in data: + raise RuntimeError(f"Stash GraphQL error: {data['errors']}") + return data.get("data", {}) + + def _wait_for_job(self, job_id: str) -> bool: + """Poll until a job finishes. Returns True on success, False on failure/timeout.""" + deadline = time.time() + self.job_timeout + while time.time() < deadline: + time.sleep(self.job_poll) + try: + data = self._gql( + "query FindJob($id: ID!) { findJob(id: $id) { id status } }", + {"id": job_id}, + ) + status = data.get("findJob", {}).get("status", "") + if status == "FINISHED": + return True + if status in ("FAILED", "CANCELLED"): + log.error(f"Stash job {job_id} ended with status: {status}") + return False + except Exception as e: + log.warning(f"Stash: error polling job {job_id}: {e}") + log.error(f"Stash: job {job_id} timed out after {self.job_timeout}s") + return False + + def scan_path(self, path: str) -> bool: + """Trigger a metadata scan on a specific path and wait for completion.""" + data = self._gql( + """ + mutation Scan($input: ScanMetadataInput!) { + metadataScan(input: $input) + } + """, + {"input": {"paths": [path]}}, + ) + job_id = data.get("metadataScan") + if not job_id: + raise RuntimeError("Stash scan did not return a job ID") + log.info(f"Stash: scan job {job_id} started for {path}") + return self._wait_for_job(job_id) + + def identify_path(self, path: str) -> bool: + """Trigger metadata identify on a path using StashDB and wait for completion.""" + data = self._gql( + """ + mutation Identify($input: IdentifyMetadataInput!) { + metadataIdentify(input: $input) + } + """, + { + "input": { + "paths": [path], + "sources": [ + {"source": {"stash_box_endpoint": self.stashdb_url}} + ], + "options": { + "fieldOptions": [ + {"field": "title", "strategy": "OVERWRITE"}, + {"field": "date", "strategy": "OVERWRITE"}, + {"field": "studio", "strategy": "OVERWRITE", "createMissing": True}, + {"field": "performers", "strategy": "OVERWRITE", "createMissing": True}, + {"field": "tags", "strategy": "MERGE", "createMissing": True}, + {"field": "stash_ids", "strategy": "MERGE"}, + ], + "setOrganized": False, + }, + } + }, + ) + job_id = data.get("metadataIdentify") + if not job_id: + raise RuntimeError("Stash identify did not return a job ID") + log.info(f"Stash: identify job {job_id} started for {path}") + return self._wait_for_job(job_id) + + +# ============================================================================= +# Disk Space Helpers +# ============================================================================= +def free_gb(path: str) -> float: + """Return free space in GB for the filesystem containing path.""" + usage = psutil.disk_usage(path) + return usage.free / (1024 ** 3) + + +def file_size_gb(path: str) -> float: + """Return size in GB of a file or directory.""" + p = Path(path) + if p.is_file(): + return p.stat().st_size / (1024 ** 3) + total = sum(f.stat().st_size for f in p.rglob("*") if f.is_file()) + return total / (1024 ** 3) + + +# ============================================================================= +# File Utilities +# ============================================================================= +VIDEO_EXTS_DEFAULT = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", + ".m4v", ".ts", ".m2ts", ".webm", ".divx", ".mpg", ".mpeg"} + + +def find_video_files(directory: str, extensions: set) -> list: + """Return all video files in a directory tree (flattened list).""" + found = [] + for root, _, files in os.walk(directory): + for f in files: + if Path(f).suffix.lower() in extensions: + found.append(os.path.join(root, f)) + return found + + +def copy_file_flat(src: str, dest_dir: str) -> str: + """Copy src file into dest_dir, using only the filename (no subfolders). + If a file with the same name exists, appends a counter.""" + name = Path(src).name + dest = os.path.join(dest_dir, name) + counter = 1 + while os.path.exists(dest): + stem = Path(src).stem + ext = Path(src).suffix + dest = os.path.join(dest_dir, f"{stem}_{counter}{ext}") + counter += 1 + shutil.copy2(src, dest) + log.info(f"Copied: {Path(src).name} → {dest_dir}") + return dest + + +def wait_for_file_stable(path: str, stable_seconds: int = 30, poll: int = 5) -> bool: + """Wait until a file hasn't changed size for stable_seconds. Returns True when stable.""" + deadline = time.time() + stable_seconds * 10 # max 10x settle time + last_size = -1 + stable_since = None + while time.time() < deadline: + try: + size = os.path.getsize(path) + except OSError: + time.sleep(poll) + continue + if size == last_size: + if stable_since is None: + stable_since = time.time() + elif time.time() - stable_since >= stable_seconds: + return True + else: + last_size = size + stable_since = None + time.sleep(poll) + return False + + +# ============================================================================= +# stash-temp Watcher (runs in its own thread) +# ============================================================================= +class StashTempWatcher(threading.Thread): + """Watches stash-temp for new video files, waits for them to settle, + then moves them to the Whisparr import dir and triggers a Whisparr import.""" + + def __init__(self, cfg: Config, whisparr: WhisparrClient, + discord: Discord, state: State, video_exts: set): + super().__init__(daemon=True, name="stash-temp-watcher") + self.watch_dir = cfg.get("paths", "stash_temp") + self.import_dir = cfg.get("paths", "whisparr_import") + self.settle_secs = cfg.get("timing", "stash_temp_settle_seconds", default=30) + self.whisparr = whisparr + self.discord = discord + self.state = state + self.video_exts = video_exts + self._seen: set = set() + + def run(self): + log.info(f"StashTempWatcher: watching {self.watch_dir}") + while True: + try: + self._scan() + except Exception as e: + log.error(f"StashTempWatcher error: {e}") + time.sleep(10) + + def _scan(self): + for f in Path(self.watch_dir).iterdir(): + if not f.is_file(): + continue + if f.suffix.lower() not in self.video_exts: + continue + fpath = str(f) + if fpath in self._seen: + continue + self._seen.add(fpath) + # Process in a new thread so we don't block the watcher + threading.Thread( + target=self._process, + args=(fpath,), + daemon=True, + name=f"stash-temp-{f.name[:20]}", + ).start() + + def _process(self, src_path: str): + log.info(f"StashTempWatcher: new file detected: {Path(src_path).name}") + # Wait for file to finish being written/renamed by the plugin + stable = wait_for_file_stable(src_path, self.settle_secs) + if not stable: + log.warning(f"StashTempWatcher: file never stabilised: {src_path}") + + if not os.path.exists(src_path): + log.info(f"StashTempWatcher: file gone before we could move it: {src_path}") + self._seen.discard(src_path) + return + + try: + dest = copy_file_flat(src_path, self.import_dir) + os.remove(src_path) + self._seen.discard(src_path) + log.info(f"StashTempWatcher: moved to Whisparr import: {Path(dest).name}") + + matched, unmatched = self.whisparr.import_folder(self.import_dir) + if matched: + self.state.stats["whisparr_imported"] += matched + self.discord.success( + "✅ Whisparr Import (via Stash)", + f"**{Path(dest).name}**\nMatched and imported {matched} scene(s).", + ) + if unmatched: + self.discord.warn( + "⚠️ Whisparr: Unmatched Files", + f"{unmatched} file(s) in import folder could not be matched.\n" + f"Check Whisparr's manual import UI.", + ) + except Exception as e: + log.error(f"StashTempWatcher: error processing {src_path}: {e}") + self.discord.error("❌ stash-temp Processing Error", str(e)) + + +# ============================================================================= +# namer Output Watcher (runs in its own thread) +# ============================================================================= +class NamerOutputWatcher(threading.Thread): + """Watches namer's renamed/ and failed/ folders. + - renamed → move to Whisparr import dir, trigger import + - failed → move to Stash library dir, trigger scan+identify + """ + + def __init__(self, cfg: Config, whisparr: WhisparrClient, + stash: StashClient, discord: Discord, state: State, video_exts: set): + super().__init__(daemon=True, name="namer-watcher") + self.renamed_dir = cfg.get("paths", "namer_renamed") + self.failed_dir = cfg.get("paths", "namer_failed") + self.import_dir = cfg.get("paths", "whisparr_import") + self.stash_lib = cfg.get("paths", "stash_library") + self.whisparr = whisparr + self.stash = stash + self.discord = discord + self.state = state + self.video_exts = video_exts + self._seen: set = set() + + def run(self): + log.info(f"NamerOutputWatcher: watching {self.renamed_dir} and {self.failed_dir}") + while True: + try: + self._scan_dir(self.renamed_dir, self._handle_renamed) + self._scan_dir(self.failed_dir, self._handle_failed) + except Exception as e: + log.error(f"NamerOutputWatcher error: {e}") + time.sleep(10) + + def _scan_dir(self, directory: str, handler): + try: + for f in Path(directory).iterdir(): + if not f.is_file(): + continue + if f.suffix.lower() not in self.video_exts: + continue + fpath = str(f) + key = f"{directory}:{fpath}" + if key in self._seen: + continue + self._seen.add(key) + threading.Thread( + target=handler, args=(fpath,), daemon=True + ).start() + except FileNotFoundError: + pass # directory may not exist yet + + def _handle_renamed(self, src_path: str): + """namer successfully renamed → send to Whisparr.""" + name = Path(src_path).name + log.info(f"NamerWatcher: renamed file ready: {name}") + try: + dest = copy_file_flat(src_path, self.import_dir) + os.remove(src_path) + + matched, unmatched = self.whisparr.import_folder(self.import_dir) + if matched: + self.state.stats["whisparr_imported"] += matched + self.discord.success( + "✅ Whisparr Import", + f"**{name}** → imported {matched} scene(s) into Whisparr.", + ) + else: + self.discord.warn( + "⚠️ Whisparr: No Match", + f"**{name}** was renamed by namer but Whisparr couldn't match it.\n" + f"Check Whisparr's manual import UI at `{self.import_dir}`.", + ) + except Exception as e: + log.error(f"NamerWatcher: error handling renamed file {src_path}: {e}") + self.discord.error("❌ namer→Whisparr Error", f"**{name}**\n{e}") + + def _handle_failed(self, src_path: str): + """namer could not identify → send to Stash for phash identification.""" + name = Path(src_path).name + log.info(f"NamerWatcher: failed file, sending to Stash: {name}") + try: + dest = copy_file_flat(src_path, self.stash_lib) + os.remove(src_path) + self.discord.info( + "🔍 Sending to Stash (namer failed)", + f"**{name}** → `{self.stash_lib}`\nRunning scan + identify via StashDB...", + ) + + # Scan so Stash knows about the file + scan_ok = self.stash.scan_path(dest) + if not scan_ok: + raise RuntimeError("Stash scan job failed or timed out") + + # Identify using StashDB + identify_ok = self.stash.identify_path(dest) + if identify_ok: + self.state.stats["stash_identified"] += 1 + self.discord.success( + "✅ Stash Identify Complete", + f"**{name}** identified via StashDB.\n" + f"rename-file-on-update plugin should fire shortly.", + ) + else: + self.discord.warn( + "⚠️ Stash Identify Failed/Timed Out", + f"**{name}** — identify job didn't complete cleanly.\n" + f"You may need to run identify manually in Stash.", + ) + except Exception as e: + log.error(f"NamerWatcher: error handling failed file {src_path}: {e}") + self.discord.error("❌ namer→Stash Error", f"**{name}**\n{e}") + + +# ============================================================================= +# Main Pipeline (runs in its own thread, polls qBit) +# ============================================================================= +class Pipeline: + def __init__(self, cfg: Config, qbit: QBitClient, discord: Discord, state: State): + self.cfg = cfg + self.qbit = qbit + self.discord = discord + self.state = state + self.video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT))) + self.namer_watch = cfg.get("paths", "namer_watch") + self.transient = cfg.get("paths", "transient_dir") + self.namer_ssd_min = cfg.get("disk_space", "namer_ssd_min_free_gb", default=10) + self.move_timeout = cfg.get("timing", "qbit_move_timeout", default=120) + self.move_poll = cfg.get("timing", "qbit_move_poll_interval", default=3) + self._active_hashes: set = set() + + def run(self): + poll_interval = self.cfg.get("timing", "qbit_poll_interval", default=15) + log.info("Pipeline: starting main loop") + while True: + try: + if not self.state.paused: + self._tick() + except Exception as e: + log.error(f"Pipeline error: {e}\n{traceback.format_exc()}") + self.discord.error("❌ Pipeline Error", str(e)) + time.sleep(poll_interval) + + def _tick(self): + completed = self.qbit.get_completed_torrents() + for torrent in completed: + hash_ = torrent["hash"] + if hash_ in self._active_hashes: + continue + self._active_hashes.add(hash_) + threading.Thread( + target=self._process_torrent, + args=(torrent,), + daemon=True, + name=f"torrent-{hash_[:8]}", + ).start() + + def _process_torrent(self, torrent: dict): + hash_ = torrent["hash"] + name = torrent["name"] + log.info(f"Pipeline: processing torrent '{name}' [{hash_[:8]}]") + self.state.add_job(hash_, name, "checking space") + + try: + self._run_pipeline(torrent) + self.state.complete_job(hash_, "success") + except Exception as e: + log.error(f"Pipeline: error on '{name}': {e}\n{traceback.format_exc()}") + self.discord.error(f"❌ Pipeline Failed: {name}", str(e)) + self.state.complete_job(hash_, f"error: {e}") + finally: + self._active_hashes.discard(hash_) + + def _run_pipeline(self, torrent: dict): + hash_ = torrent["hash"] + name = torrent["name"] + + # ── 1. Check free space on namer SSD ────────────────────────────────── + torrent_gb = torrent.get("size", 0) / (1024 ** 3) + self._check_space(self.namer_watch, self.namer_ssd_min, torrent_gb, name) + + self.state.update_job(hash_, "setting category: copying") + + # ── 2. Set category → 2 - copying (qBit moves files to transient) ──── + self.qbit.set_category(hash_, self.qbit.cat_copying) + + # ── 3. Wait for qBit to finish moving files to transient dir ────────── + self.state.update_job(hash_, "waiting for qBit file move") + moved = self.qbit.wait_for_move( + hash_, self.transient, self.move_timeout, self.move_poll + ) + if not moved: + # Not fatal — qBit might report save_path differently; try to proceed + log.warning(f"Pipeline: qBit save_path didn't update for {name}, proceeding anyway") + + # ── 4. Find video files in transient dir for this torrent ───────────── + self.state.update_job(hash_, "copying to namer") + # qBit may have created a subfolder named after the torrent + torrent_dir = os.path.join(self.transient, torrent.get("content_path", "").lstrip("/")) + if not os.path.exists(torrent_dir): + torrent_dir = self.transient # flat download, files directly in transient + + video_files = find_video_files(torrent_dir, self.video_exts) + if not video_files: + # Fallback: scan whole transient dir for files matching torrent hash + video_files = find_video_files(self.transient, self.video_exts) + + if not video_files: + raise RuntimeError(f"No video files found in transient dir for '{name}'") + + log.info(f"Pipeline: found {len(video_files)} video file(s) for '{name}'") + self.discord.info( + f"📋 Processing: {name}", + f"Found {len(video_files)} video file(s). Copying to namer...", + ) + + # ── 5. Copy video files (flattened) to namer watch dir ──────────────── + copied = [] + for vf in video_files: + dest = copy_file_flat(vf, self.namer_watch) + copied.append(dest) + + log.info(f"Pipeline: copied {len(copied)} file(s) to namer watch dir") + + # ── 6. Set category → 3 - seeding (qBit moves torrent to spinning array) + self.state.update_job(hash_, "setting category: seeding") + self.qbit.set_category(hash_, self.qbit.cat_seeding) + log.info(f"Pipeline: torrent '{name}' set to seeding") + self.discord.success( + f"✅ Copied & Seeding: {name}", + f"Copied {len(copied)} video file(s) to namer.\n" + f"Torrent moved to seeding. Waiting for namer to process...", + ) + + def _check_space(self, path: str, min_free_gb: float, needed_gb: float, name: str): + """Check free space; pause and wait + alert if insufficient.""" + check_interval = self.cfg.get("disk_space", "space_check_interval", default=60) + while True: + available = free_gb(path) + if available >= (min_free_gb + needed_gb): + self.state.set_pause() # clear any previous pause + return + reason = ( + f"Low disk space on namer SSD: {available:.1f} GB free, " + f"need {min_free_gb + needed_gb:.1f} GB for '{name}'" + ) + if not self.state.paused: + log.warning(f"Pipeline: PAUSED — {reason}") + self.state.set_pause(reason) + self.discord.warn("⏸️ Pipeline Paused — Low Disk Space", reason) + time.sleep(check_interval) + + +# ============================================================================= +# Web UI (Flask, runs in its own thread) +# ============================================================================= +def create_web_app(state: State): + from flask import Flask, jsonify, render_template_string + app = Flask(__name__) + + HTML = """ + + + + + Media Automator + + + + +

📡 Media Automator

+
Auto-refreshes every 15s  ·  {{ now }}
+ + {% if snap.paused %} + + {% endif %} + +
+
Processed
+
{{ snap.stats.total_processed }}
+
Whisparr Imports
+
{{ snap.stats.whisparr_imported }}
+
Stash Identified
+
{{ snap.stats.stash_identified }}
+
Errors
+
{{ snap.stats.errors }}
+
Active Jobs
+
{{ snap.active_jobs|length }}
+
+ +
+

Active Jobs

+ {% if snap.active_jobs %} + + + {% for hash, job in snap.active_jobs.items() %} + + + + + + {% endfor %} +
TorrentStageStarted
{{ job.name }}{{ job.stage }}{{ job.started }}
+ {% else %}
No active jobs.
{% endif %} +
+ +
+

Recent History

+ {% if snap.history %} + + + {% for job in snap.history %} + + + + + + {% endfor %} +
TorrentOutcomeCompleted
{{ job.name }} + {% if job.outcome == 'success' %} + ✓ success + {% else %} + ✗ {{ job.outcome[:40] }} + {% endif %} + {{ job.get('completed', '—') }}
+ {% else %}
No completed jobs yet.
{% endif %} +
+ +""" + + @app.route("/") + def index(): + snap = state.snapshot() + now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") + return render_template_string(HTML, snap=snap, now=now) + + @app.route("/api/state") + def api_state(): + return jsonify(state.snapshot()) + + return app + + +# ============================================================================= +# Entry Point +# ============================================================================= +def main(): + config_path = os.environ.get("CONFIG_PATH", "/config/config.yml") + log.info(f"Loading config from {config_path}") + cfg = Config(config_path) + + video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT))) + + state = State(history_size=cfg.get("webui", "history_size", default=100)) + discord = Discord(cfg) + qbit = QBitClient(cfg) + whisparr = WhisparrClient(cfg) + stash = StashClient(cfg) + + # Ensure output directories exist + for p in [ + cfg.get("paths", "namer_watch"), + cfg.get("paths", "whisparr_import"), + cfg.get("paths", "stash_library"), + cfg.get("paths", "stash_temp"), + ]: + if p: + Path(p).mkdir(parents=True, exist_ok=True) + + discord.info("🚀 Media Automator Started", "All systems initialised. Pipeline is running.") + + # ── Start background watchers ────────────────────────────────────────────── + namer_watcher = NamerOutputWatcher(cfg, whisparr, stash, discord, state, video_exts) + namer_watcher.start() + + stash_watcher = StashTempWatcher(cfg, whisparr, discord, state, video_exts) + stash_watcher.start() + + # ── Start main pipeline in its own thread ───────────────────────────────── + pipeline = Pipeline(cfg, qbit, discord, state) + pipeline_thread = threading.Thread(target=pipeline.run, daemon=True, name="pipeline") + pipeline_thread.start() + + # ── Start web UI (blocks main thread) ───────────────────────────────────── + port = cfg.get("webui", "port", default=8888) + log.info(f"Web UI starting on port {port}") + app = create_web_app(state) + app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False) + + +if __name__ == "__main__": + main() diff --git a/lxc3/porntumate/requirements.txt b/lxc3/porntumate/requirements.txt new file mode 100644 index 0000000..fe4f615 --- /dev/null +++ b/lxc3/porntumate/requirements.txt @@ -0,0 +1,6 @@ +requests==2.31.0 +PyYAML==6.0.1 +Flask==3.0.0 +watchdog==3.0.0 +psutil==5.9.6 +python-dateutil==2.8.2