porntumate add
Some checks are pending
PVE2 Infrastructure Deploy / terraform (push) Waiting to run

This commit is contained in:
admin_jk 2026-03-01 15:15:42 -07:00
parent cc128172bf
commit e6ba4970cf
6 changed files with 1309 additions and 0 deletions

View file

@ -0,0 +1,23 @@
FROM python:3.11-slim
WORKDIR /app
# Install system deps
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ .
# Config is mounted at runtime
VOLUME ["/config"]
EXPOSE 8888
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
CMD curl -f http://localhost:8888/ || exit 1
CMD ["python", "main.py"]

136
lxc3/porntumate/README.md Normal file
View file

@ -0,0 +1,136 @@
# Media Automator
Automates the pipeline:
```
qBittorrent (Mint VM)
↓ [completed torrent detected]
Category: 2 - copying → files move to transient dir
↓ [copy video files, flattened]
namer watch/
├── renamed/ → Whisparr import dir → Whisparr manual import API
└── failed/ → Stash library dir → Stash scan + identify (StashDB)
↓ [rename plugin fires]
stash-temp/ → Whisparr import dir → import
Category: 3 - seeding → files move to spinning array
```
---
## Setup
### 1. Prerequisites
- Docker + Docker Compose on your LXC
- The Mint VM's NFS shares mounted on the LXC (see below)
- qBittorrent Web UI enabled on the Mint VM
- Whisparr and Stash running and accessible from the LXC
### 2. NFS Mounts (LXC side)
The automator needs to see the Mint VM's SSD paths. On your LXC, add to `/etc/fstab`:
```
# Mint VM transient dir
<MINT_VM_IP>:/transfer /mnt/ssd1/transfer nfs defaults,_netdev 0 0
```
Then: `mount -a`
Verify the path `/mnt/ssd1/transfer/2 - ready for copy/` is accessible from the LXC.
### 3. qBittorrent Categories
In qBittorrent on the Mint VM, configure these categories with their save paths:
| Category | Save Path |
|------------------|----------------------------------------|
| `1 - downloading`| `/mnt/nvme/home/jkilloran/torrenting/downloads/` |
| `2 - copying` | `/transfer/2 - ready for copy/` |
| `3 - seeding` | `/qbt/taz-transfer/seeding/` |
Make sure "Automatically manage torrent location" is enabled per category.
### 4. Configure
Copy `config.yml` to your LXC and fill in all `< >` placeholders:
```bash
# On the LXC, in your docker-compose directory:
nano config.yml
```
Key values to fill in:
- `qbittorrent.host` — Mint VM IP address
- `qbittorrent.username` / `password` — qBit Web UI credentials
- `whisparr.host` / `api_key`
- `stash.host` / `api_key`
- `discord.webhook_url` — from Server Settings → Integrations → Webhooks
### 5. Build and Run
```bash
docker compose up -d --build
docker compose logs -f # watch the logs
```
Web UI is at: `http://<LXC_IP>:8888`
---
## Flow Details
### Space Management
Before processing each torrent the automator checks:
- Free space on the namer SSD ≥ `namer_ssd_min_free_gb` + torrent size
- If not enough space: sends a Discord alert, pauses, and retries every `space_check_interval` seconds
- Resumes automatically once space is available (no intervention needed)
### Disk Space Alerts
You'll get Discord alerts for:
- Pipeline paused due to low space (and when it resumes)
- Each torrent starting to process
- Successful Whisparr import
- Files sent to Stash + identify result
- Any errors
### namer failed files
When namer can't identify a file it goes to `stash/`. The automator:
1. Moves the file to your Stash library path
2. Triggers `metadataScan` → waits for completion
3. Triggers `metadataIdentify` using StashDB → waits for completion
4. The `rename-file-on-update` plugin fires automatically → file appears in `stash-temp/`
5. Automator detects the file in `stash-temp/`, waits 30s for it to stabilise, moves to Whisparr import dir
6. Triggers Whisparr manual import
---
## Troubleshooting
### qBit category change isn't moving files
- Verify "Automatic torrent management" is enabled for each category in qBit settings
- Check the save paths match exactly what's in the qBit category config
### Whisparr import returns 0 matches
- The file naming matters — namer should produce `Studio.YYYY-MM-DD.Title.ext` format
- Check Whisparr's manual import UI for the file to see why it's not matching
### Stash identify never fires the rename plugin
- Confirm `rename-file-on-update` is enabled in Stash Settings → Plugins
- The plugin fires on scene update, not scan — identify must successfully update the scene
### Permission errors
- Uncomment `user: "1000:1000"` in `docker-compose.yml` and set to your LXC user's UID:GID
- Check NFS mount options on the LXC
---
## Adding the Docker qBittorrent Instance Later
When you're ready to add the LXC's qBit container to the same pipeline, the config
will gain a `qbittorrent_docker:` section with its own host/port/credentials, and the
automator will poll both instances in the same main loop. Paths will be LXC-native
(no NFS needed for that instance).

121
lxc3/porntumate/config.yml Normal file
View file

@ -0,0 +1,121 @@
# =============================================================================
# Media Automator - Configuration
# Fill in all values marked with < > before running
# =============================================================================
# -----------------------------------------------------------------------------
# qBittorrent (Mint VM instance - private trackers)
# -----------------------------------------------------------------------------
qbittorrent:
host: "<MINT_VM_IP>" # e.g. 192.168.1.50
port: 8080
username: "<QBIT_USERNAME>"
password: "<QBIT_PASSWORD>"
# Category names - must match exactly what's configured in qBittorrent
category_downloading: "1 - downloading"
category_copying: "2 - copying"
category_seeding: "3 - seeding"
# -----------------------------------------------------------------------------
# Whisparr
# -----------------------------------------------------------------------------
whisparr:
host: "<LXC_OR_DOCKER_IP>" # e.g. 192.168.1.100
port: 6969
api_key: "<WHISPARR_API_KEY>"
# How long to wait (seconds) for Whisparr to match files before giving up
import_timeout: 120
# -----------------------------------------------------------------------------
# Stash
# -----------------------------------------------------------------------------
stash:
host: "<LXC_OR_DOCKER_IP>" # e.g. 192.168.1.100
port: 9999
api_key: "<STASH_API_KEY>"
stashdb_endpoint: "https://stashdb.org/graphql"
# How long to poll (seconds) for scan/identify jobs to complete
job_timeout: 600
job_poll_interval: 5
# -----------------------------------------------------------------------------
# Discord Notifications
# -----------------------------------------------------------------------------
discord:
webhook_url: "<DISCORD_WEBHOOK_URL>"
# Minimum level to notify: DEBUG, INFO, WARNING, ERROR
notify_level: "INFO"
# -----------------------------------------------------------------------------
# Paths (as seen from the automator container / LXC)
# -----------------------------------------------------------------------------
paths:
# Where the transient "ready for copy" folder is (LXC/NFS view of Mint SSD)
transient_dir: "/mnt/ssd1/transfer/2 - ready for copy"
# namer subfolders (base dir + subdirs namer uses)
namer_base: "/mnt/ssd3/namer"
namer_watch: "/mnt/ssd3/namer/watch"
namer_work: "/mnt/ssd3/namer/work"
namer_renamed: "/mnt/ssd3/namer/renamed"
namer_failed: "/mnt/ssd3/namer/failed"
# Where Whisparr picks up files for import
whisparr_import: "/pool/other/root/import"
# Where Stash stores its library (failed namer files land here)
stash_library: "/pool/other/root/stash"
# Where the rename-file-on-update plugin outputs renamed files
stash_temp: "/pool/other/root/stash-temp"
# -----------------------------------------------------------------------------
# Disk Space Management
# -----------------------------------------------------------------------------
disk_space:
# Minimum free space (GB) required on the namer SSD before copying a torrent
namer_ssd_min_free_gb: 10
# Minimum free space (GB) required on the whisparr import SSD
import_min_free_gb: 5
# How often (seconds) to re-check space when paused waiting for room
space_check_interval: 60
# -----------------------------------------------------------------------------
# Timing / Polling
# -----------------------------------------------------------------------------
timing:
# How often (seconds) the main loop polls qBittorrent for completed torrents
qbit_poll_interval: 15
# How long (seconds) a file in stash-temp must be stable before we move it
stash_temp_settle_seconds: 30
# How long (seconds) to wait for category change + file move in qBit
qbit_move_timeout: 120
# How often (seconds) to check if qBit has finished moving files
qbit_move_poll_interval: 3
# -----------------------------------------------------------------------------
# Web UI
# -----------------------------------------------------------------------------
webui:
port: 8888
# How many completed items to keep in the history log
history_size: 100
# -----------------------------------------------------------------------------
# File Filtering
# Extensions we treat as video files (everything else is ignored)
# -----------------------------------------------------------------------------
video_extensions:
- .mp4
- .mkv
- .avi
- .mov
- .wmv
- .flv
- .m4v
- .ts
- .m2ts
- .webm
- .divx
- .mpg
- .mpeg

View file

@ -0,0 +1,58 @@
---
# Media Automator - Docker Compose
# Place this file alongside config.yml on your LXC
# Run: docker compose up -d
services:
media-automator:
build: .
# Or use a pre-built image if you push one:
# image: your-registry/media-automator:latest
container_name: media-automator
restart: unless-stopped
environment:
- CONFIG_PATH=/config/config.yml
- TZ=America/New_York # <-- set your timezone
volumes:
# Config file
- ./config.yml:/config/config.yml:ro
# namer folders (LXC paths)
- /mnt/ssd3/namer:/mnt/ssd3/namer
# Transient / ready-for-copy (NFS mount from Mint VM, LXC path)
- /mnt/ssd1/transfer:/mnt/ssd1/transfer
# Whisparr import dir
- /pool/other/root/import:/pool/other/root/import
# Stash library dir
- /pool/other/root/stash:/pool/other/root/stash
# Stash temp dir (plugin output)
- /pool/other/root/stash-temp:/pool/other/root/stash-temp
# Seeding dir (optional - automator doesn't write here, but useful for visibility)
# - /mnt/hdd15/taz-transfer/seeding:/mnt/hdd15/taz-transfer/seeding:ro
ports:
- "8888:8888" # Web UI - change left side if port is taken
# Make sure this starts after your other containers
depends_on: []
# Uncomment and fill in if you use container names instead of IPs:
# depends_on:
# - whisparr
# - stash
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# Give the container read/write on mounted paths
# If you run into permission issues, set this to your LXC user's UID:GID
# user: "1000:1000"

965
lxc3/porntumate/main.py Normal file
View file

@ -0,0 +1,965 @@
#!/usr/bin/env python3
"""
Media Automator
Orchestrates the pipeline: qBittorrent namer Whisparr / Stash Whisparr
"""
import os
import shutil
import time
import threading
import logging
import json
import traceback
from datetime import datetime
from pathlib import Path
from collections import deque
from typing import Optional
import requests
import yaml
import psutil
# =============================================================================
# Logging Setup
# =============================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("automator")
# =============================================================================
# Config Loader
# =============================================================================
class Config:
def __init__(self, path: str = "/config/config.yml"):
with open(path) as f:
self._data = yaml.safe_load(f)
def __getattr__(self, name):
return self._data.get(name, {})
def get(self, *keys, default=None):
d = self._data
for k in keys:
if not isinstance(d, dict):
return default
d = d.get(k, default)
return d
# =============================================================================
# Shared State (thread-safe with a lock)
# =============================================================================
class State:
"""Central state store for the automator. Accessed by both the pipeline
threads and the web UI."""
def __init__(self, history_size: int = 100):
self._lock = threading.Lock()
self.history: deque = deque(maxlen=history_size)
self.active_jobs: dict = {} # hash → {stage, name, started}
self.paused: bool = False
self.pause_reason: str = ""
self.stats = {
"total_processed": 0,
"whisparr_imported": 0,
"stash_identified": 0,
"errors": 0,
"started_at": datetime.utcnow().isoformat(),
}
def add_job(self, hash_: str, name: str, stage: str):
with self._lock:
self.active_jobs[hash_] = {
"name": name,
"stage": stage,
"started": datetime.utcnow().isoformat(),
}
def update_job(self, hash_: str, stage: str):
with self._lock:
if hash_ in self.active_jobs:
self.active_jobs[hash_]["stage"] = stage
def complete_job(self, hash_: str, outcome: str):
with self._lock:
job = self.active_jobs.pop(hash_, None)
if job:
job["completed"] = datetime.utcnow().isoformat()
job["outcome"] = outcome
self.history.appendleft(job)
self.stats["total_processed"] += 1
def set_pause(self, reason: str = ""):
with self._lock:
self.paused = bool(reason)
self.pause_reason = reason
def snapshot(self) -> dict:
with self._lock:
return {
"paused": self.paused,
"pause_reason": self.pause_reason,
"active_jobs": dict(self.active_jobs),
"history": list(self.history),
"stats": dict(self.stats),
}
# =============================================================================
# Discord Notifier
# =============================================================================
class Discord:
COLORS = {"INFO": 0x5865F2, "WARNING": 0xFEE75C, "ERROR": 0xED4245, "SUCCESS": 0x57F287}
LEVELS = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
def __init__(self, cfg: Config):
self.webhook = cfg.get("discord", "webhook_url", default="")
self.min_level = self.LEVELS.get(cfg.get("discord", "notify_level", default="INFO"), 1)
def _send(self, title: str, message: str, level: str = "INFO"):
level_val = self.LEVELS.get(level, 1)
if level_val < self.min_level or not self.webhook:
return
color = self.COLORS.get(level, 0x5865F2)
payload = {
"embeds": [{
"title": title,
"description": message,
"color": color,
"timestamp": datetime.utcnow().isoformat(),
"footer": {"text": "Media Automator"},
}]
}
try:
requests.post(self.webhook, json=payload, timeout=10)
except Exception as e:
log.warning(f"Discord notification failed: {e}")
def info(self, title: str, msg: str): self._send(title, msg, "INFO")
def warn(self, title: str, msg: str): self._send(title, msg, "WARNING")
def error(self, title: str, msg: str): self._send(title, msg, "ERROR")
def success(self, title: str, msg: str): self._send(title, msg, "SUCCESS")
# =============================================================================
# qBittorrent Client
# =============================================================================
class QBitClient:
def __init__(self, cfg: Config):
qb = cfg.qbittorrent
self.base = f"http://{qb['host']}:{qb['port']}"
self.user = qb["username"]
self.pwd = qb["password"]
self.cat_downloading = qb["category_downloading"]
self.cat_copying = qb["category_copying"]
self.cat_seeding = qb["category_seeding"]
self._session = requests.Session()
self._logged_in = False
def _login(self):
r = self._session.post(
f"{self.base}/api/v2/auth/login",
data={"username": self.user, "password": self.pwd},
timeout=10,
)
self._logged_in = r.text == "Ok."
if not self._logged_in:
raise RuntimeError(f"qBittorrent login failed: {r.text}")
def _get(self, path: str, **params):
if not self._logged_in:
self._login()
r = self._session.get(f"{self.base}{path}", params=params, timeout=15)
r.raise_for_status()
return r.json()
def _post(self, path: str, data: dict):
if not self._logged_in:
self._login()
r = self._session.post(f"{self.base}{path}", data=data, timeout=15)
r.raise_for_status()
return r
def get_completed_torrents(self) -> list:
"""Return all torrents in the 'downloading' category that are 100% done."""
try:
torrents = self._get(
"/api/v2/torrents/info",
category=self.cat_downloading,
filter="completed",
)
return [t for t in torrents if t.get("progress", 0) >= 1.0]
except Exception as e:
log.error(f"qBit: failed to get torrents: {e}")
return []
def get_torrent_files(self, hash_: str) -> list:
"""Return list of file paths within a torrent."""
try:
return self._get("/api/v2/torrents/files", hash=hash_)
except Exception as e:
log.error(f"qBit: failed to get files for {hash_}: {e}")
return []
def set_category(self, hash_: str, category: str):
self._post("/api/v2/torrents/setCategory", {"hashes": hash_, "category": category})
log.info(f"qBit: set category '{category}' on {hash_[:8]}")
def get_torrent_info(self, hash_: str) -> Optional[dict]:
try:
results = self._get("/api/v2/torrents/info", hashes=hash_)
return results[0] if results else None
except Exception:
return None
def wait_for_move(self, hash_: str, expected_save_path: str,
timeout: int = 120, poll: int = 3) -> bool:
"""Block until qBit has moved the torrent files to the expected save path."""
deadline = time.time() + timeout
while time.time() < deadline:
info = self.get_torrent_info(hash_)
if info and info.get("save_path", "").rstrip("/") == expected_save_path.rstrip("/"):
return True
time.sleep(poll)
return False
# =============================================================================
# Whisparr Client
# =============================================================================
class WhisparrClient:
def __init__(self, cfg: Config):
w = cfg.whisparr
self.base = f"http://{w['host']}:{w['port']}/api/v3"
self.headers = {"X-Api-Key": w["api_key"], "Content-Type": "application/json"}
self.timeout = w.get("import_timeout", 120)
def scan_import_folder(self, folder: str) -> list:
"""Ask Whisparr to scan a folder and return what it found + matched."""
r = requests.get(
f"{self.base}/manualimport",
params={"folder": folder, "filterExistingFiles": "false"},
headers=self.headers,
timeout=30,
)
r.raise_for_status()
return r.json()
def execute_import(self, import_items: list) -> bool:
"""Execute a manual import for the given pre-matched items."""
if not import_items:
log.warning("Whisparr: no items to import")
return False
# Only import items that have a valid scene match
valid = [i for i in import_items if i.get("scene") or i.get("sceneId")]
if not valid:
log.warning("Whisparr: no matched scenes in import list")
return False
r = requests.post(
f"{self.base}/manualimport",
json=valid,
headers=self.headers,
timeout=30,
)
r.raise_for_status()
log.info(f"Whisparr: imported {len(valid)} file(s)")
return True
def import_folder(self, folder: str) -> tuple[int, int]:
"""Full scan + import cycle. Returns (matched, unmatched) counts."""
items = self.scan_import_folder(folder)
if not items:
return (0, 0)
matched = [i for i in items if i.get("scene") or i.get("sceneId")]
unmatched = [i for i in items if not (i.get("scene") or i.get("sceneId"))]
if matched:
self.execute_import(matched)
return (len(matched), len(unmatched))
# =============================================================================
# Stash GraphQL Client
# =============================================================================
class StashClient:
def __init__(self, cfg: Config):
s = cfg.stash
self.endpoint = f"http://{s['host']}:{s['port']}/graphql"
self.headers = {"ApiKey": s["api_key"], "Content-Type": "application/json"}
self.stashdb_url = s.get("stashdb_endpoint", "https://stashdb.org/graphql")
self.job_timeout = s.get("job_timeout", 600)
self.job_poll = s.get("job_poll_interval", 5)
def _gql(self, query: str, variables: dict = None) -> dict:
payload = {"query": query}
if variables:
payload["variables"] = variables
r = requests.post(self.endpoint, json=payload, headers=self.headers, timeout=30)
r.raise_for_status()
data = r.json()
if "errors" in data:
raise RuntimeError(f"Stash GraphQL error: {data['errors']}")
return data.get("data", {})
def _wait_for_job(self, job_id: str) -> bool:
"""Poll until a job finishes. Returns True on success, False on failure/timeout."""
deadline = time.time() + self.job_timeout
while time.time() < deadline:
time.sleep(self.job_poll)
try:
data = self._gql(
"query FindJob($id: ID!) { findJob(id: $id) { id status } }",
{"id": job_id},
)
status = data.get("findJob", {}).get("status", "")
if status == "FINISHED":
return True
if status in ("FAILED", "CANCELLED"):
log.error(f"Stash job {job_id} ended with status: {status}")
return False
except Exception as e:
log.warning(f"Stash: error polling job {job_id}: {e}")
log.error(f"Stash: job {job_id} timed out after {self.job_timeout}s")
return False
def scan_path(self, path: str) -> bool:
"""Trigger a metadata scan on a specific path and wait for completion."""
data = self._gql(
"""
mutation Scan($input: ScanMetadataInput!) {
metadataScan(input: $input)
}
""",
{"input": {"paths": [path]}},
)
job_id = data.get("metadataScan")
if not job_id:
raise RuntimeError("Stash scan did not return a job ID")
log.info(f"Stash: scan job {job_id} started for {path}")
return self._wait_for_job(job_id)
def identify_path(self, path: str) -> bool:
"""Trigger metadata identify on a path using StashDB and wait for completion."""
data = self._gql(
"""
mutation Identify($input: IdentifyMetadataInput!) {
metadataIdentify(input: $input)
}
""",
{
"input": {
"paths": [path],
"sources": [
{"source": {"stash_box_endpoint": self.stashdb_url}}
],
"options": {
"fieldOptions": [
{"field": "title", "strategy": "OVERWRITE"},
{"field": "date", "strategy": "OVERWRITE"},
{"field": "studio", "strategy": "OVERWRITE", "createMissing": True},
{"field": "performers", "strategy": "OVERWRITE", "createMissing": True},
{"field": "tags", "strategy": "MERGE", "createMissing": True},
{"field": "stash_ids", "strategy": "MERGE"},
],
"setOrganized": False,
},
}
},
)
job_id = data.get("metadataIdentify")
if not job_id:
raise RuntimeError("Stash identify did not return a job ID")
log.info(f"Stash: identify job {job_id} started for {path}")
return self._wait_for_job(job_id)
# =============================================================================
# Disk Space Helpers
# =============================================================================
def free_gb(path: str) -> float:
"""Return free space in GB for the filesystem containing path."""
usage = psutil.disk_usage(path)
return usage.free / (1024 ** 3)
def file_size_gb(path: str) -> float:
"""Return size in GB of a file or directory."""
p = Path(path)
if p.is_file():
return p.stat().st_size / (1024 ** 3)
total = sum(f.stat().st_size for f in p.rglob("*") if f.is_file())
return total / (1024 ** 3)
# =============================================================================
# File Utilities
# =============================================================================
VIDEO_EXTS_DEFAULT = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv",
".m4v", ".ts", ".m2ts", ".webm", ".divx", ".mpg", ".mpeg"}
def find_video_files(directory: str, extensions: set) -> list:
"""Return all video files in a directory tree (flattened list)."""
found = []
for root, _, files in os.walk(directory):
for f in files:
if Path(f).suffix.lower() in extensions:
found.append(os.path.join(root, f))
return found
def copy_file_flat(src: str, dest_dir: str) -> str:
"""Copy src file into dest_dir, using only the filename (no subfolders).
If a file with the same name exists, appends a counter."""
name = Path(src).name
dest = os.path.join(dest_dir, name)
counter = 1
while os.path.exists(dest):
stem = Path(src).stem
ext = Path(src).suffix
dest = os.path.join(dest_dir, f"{stem}_{counter}{ext}")
counter += 1
shutil.copy2(src, dest)
log.info(f"Copied: {Path(src).name}{dest_dir}")
return dest
def wait_for_file_stable(path: str, stable_seconds: int = 30, poll: int = 5) -> bool:
"""Wait until a file hasn't changed size for stable_seconds. Returns True when stable."""
deadline = time.time() + stable_seconds * 10 # max 10x settle time
last_size = -1
stable_since = None
while time.time() < deadline:
try:
size = os.path.getsize(path)
except OSError:
time.sleep(poll)
continue
if size == last_size:
if stable_since is None:
stable_since = time.time()
elif time.time() - stable_since >= stable_seconds:
return True
else:
last_size = size
stable_since = None
time.sleep(poll)
return False
# =============================================================================
# stash-temp Watcher (runs in its own thread)
# =============================================================================
class StashTempWatcher(threading.Thread):
"""Watches stash-temp for new video files, waits for them to settle,
then moves them to the Whisparr import dir and triggers a Whisparr import."""
def __init__(self, cfg: Config, whisparr: WhisparrClient,
discord: Discord, state: State, video_exts: set):
super().__init__(daemon=True, name="stash-temp-watcher")
self.watch_dir = cfg.get("paths", "stash_temp")
self.import_dir = cfg.get("paths", "whisparr_import")
self.settle_secs = cfg.get("timing", "stash_temp_settle_seconds", default=30)
self.whisparr = whisparr
self.discord = discord
self.state = state
self.video_exts = video_exts
self._seen: set = set()
def run(self):
log.info(f"StashTempWatcher: watching {self.watch_dir}")
while True:
try:
self._scan()
except Exception as e:
log.error(f"StashTempWatcher error: {e}")
time.sleep(10)
def _scan(self):
for f in Path(self.watch_dir).iterdir():
if not f.is_file():
continue
if f.suffix.lower() not in self.video_exts:
continue
fpath = str(f)
if fpath in self._seen:
continue
self._seen.add(fpath)
# Process in a new thread so we don't block the watcher
threading.Thread(
target=self._process,
args=(fpath,),
daemon=True,
name=f"stash-temp-{f.name[:20]}",
).start()
def _process(self, src_path: str):
log.info(f"StashTempWatcher: new file detected: {Path(src_path).name}")
# Wait for file to finish being written/renamed by the plugin
stable = wait_for_file_stable(src_path, self.settle_secs)
if not stable:
log.warning(f"StashTempWatcher: file never stabilised: {src_path}")
if not os.path.exists(src_path):
log.info(f"StashTempWatcher: file gone before we could move it: {src_path}")
self._seen.discard(src_path)
return
try:
dest = copy_file_flat(src_path, self.import_dir)
os.remove(src_path)
self._seen.discard(src_path)
log.info(f"StashTempWatcher: moved to Whisparr import: {Path(dest).name}")
matched, unmatched = self.whisparr.import_folder(self.import_dir)
if matched:
self.state.stats["whisparr_imported"] += matched
self.discord.success(
"✅ Whisparr Import (via Stash)",
f"**{Path(dest).name}**\nMatched and imported {matched} scene(s).",
)
if unmatched:
self.discord.warn(
"⚠️ Whisparr: Unmatched Files",
f"{unmatched} file(s) in import folder could not be matched.\n"
f"Check Whisparr's manual import UI.",
)
except Exception as e:
log.error(f"StashTempWatcher: error processing {src_path}: {e}")
self.discord.error("❌ stash-temp Processing Error", str(e))
# =============================================================================
# namer Output Watcher (runs in its own thread)
# =============================================================================
class NamerOutputWatcher(threading.Thread):
"""Watches namer's renamed/ and failed/ folders.
- renamed move to Whisparr import dir, trigger import
- failed move to Stash library dir, trigger scan+identify
"""
def __init__(self, cfg: Config, whisparr: WhisparrClient,
stash: StashClient, discord: Discord, state: State, video_exts: set):
super().__init__(daemon=True, name="namer-watcher")
self.renamed_dir = cfg.get("paths", "namer_renamed")
self.failed_dir = cfg.get("paths", "namer_failed")
self.import_dir = cfg.get("paths", "whisparr_import")
self.stash_lib = cfg.get("paths", "stash_library")
self.whisparr = whisparr
self.stash = stash
self.discord = discord
self.state = state
self.video_exts = video_exts
self._seen: set = set()
def run(self):
log.info(f"NamerOutputWatcher: watching {self.renamed_dir} and {self.failed_dir}")
while True:
try:
self._scan_dir(self.renamed_dir, self._handle_renamed)
self._scan_dir(self.failed_dir, self._handle_failed)
except Exception as e:
log.error(f"NamerOutputWatcher error: {e}")
time.sleep(10)
def _scan_dir(self, directory: str, handler):
try:
for f in Path(directory).iterdir():
if not f.is_file():
continue
if f.suffix.lower() not in self.video_exts:
continue
fpath = str(f)
key = f"{directory}:{fpath}"
if key in self._seen:
continue
self._seen.add(key)
threading.Thread(
target=handler, args=(fpath,), daemon=True
).start()
except FileNotFoundError:
pass # directory may not exist yet
def _handle_renamed(self, src_path: str):
"""namer successfully renamed → send to Whisparr."""
name = Path(src_path).name
log.info(f"NamerWatcher: renamed file ready: {name}")
try:
dest = copy_file_flat(src_path, self.import_dir)
os.remove(src_path)
matched, unmatched = self.whisparr.import_folder(self.import_dir)
if matched:
self.state.stats["whisparr_imported"] += matched
self.discord.success(
"✅ Whisparr Import",
f"**{name}** → imported {matched} scene(s) into Whisparr.",
)
else:
self.discord.warn(
"⚠️ Whisparr: No Match",
f"**{name}** was renamed by namer but Whisparr couldn't match it.\n"
f"Check Whisparr's manual import UI at `{self.import_dir}`.",
)
except Exception as e:
log.error(f"NamerWatcher: error handling renamed file {src_path}: {e}")
self.discord.error("❌ namer→Whisparr Error", f"**{name}**\n{e}")
def _handle_failed(self, src_path: str):
"""namer could not identify → send to Stash for phash identification."""
name = Path(src_path).name
log.info(f"NamerWatcher: failed file, sending to Stash: {name}")
try:
dest = copy_file_flat(src_path, self.stash_lib)
os.remove(src_path)
self.discord.info(
"🔍 Sending to Stash (namer failed)",
f"**{name}** → `{self.stash_lib}`\nRunning scan + identify via StashDB...",
)
# Scan so Stash knows about the file
scan_ok = self.stash.scan_path(dest)
if not scan_ok:
raise RuntimeError("Stash scan job failed or timed out")
# Identify using StashDB
identify_ok = self.stash.identify_path(dest)
if identify_ok:
self.state.stats["stash_identified"] += 1
self.discord.success(
"✅ Stash Identify Complete",
f"**{name}** identified via StashDB.\n"
f"rename-file-on-update plugin should fire shortly.",
)
else:
self.discord.warn(
"⚠️ Stash Identify Failed/Timed Out",
f"**{name}** — identify job didn't complete cleanly.\n"
f"You may need to run identify manually in Stash.",
)
except Exception as e:
log.error(f"NamerWatcher: error handling failed file {src_path}: {e}")
self.discord.error("❌ namer→Stash Error", f"**{name}**\n{e}")
# =============================================================================
# Main Pipeline (runs in its own thread, polls qBit)
# =============================================================================
class Pipeline:
def __init__(self, cfg: Config, qbit: QBitClient, discord: Discord, state: State):
self.cfg = cfg
self.qbit = qbit
self.discord = discord
self.state = state
self.video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT)))
self.namer_watch = cfg.get("paths", "namer_watch")
self.transient = cfg.get("paths", "transient_dir")
self.namer_ssd_min = cfg.get("disk_space", "namer_ssd_min_free_gb", default=10)
self.move_timeout = cfg.get("timing", "qbit_move_timeout", default=120)
self.move_poll = cfg.get("timing", "qbit_move_poll_interval", default=3)
self._active_hashes: set = set()
def run(self):
poll_interval = self.cfg.get("timing", "qbit_poll_interval", default=15)
log.info("Pipeline: starting main loop")
while True:
try:
if not self.state.paused:
self._tick()
except Exception as e:
log.error(f"Pipeline error: {e}\n{traceback.format_exc()}")
self.discord.error("❌ Pipeline Error", str(e))
time.sleep(poll_interval)
def _tick(self):
completed = self.qbit.get_completed_torrents()
for torrent in completed:
hash_ = torrent["hash"]
if hash_ in self._active_hashes:
continue
self._active_hashes.add(hash_)
threading.Thread(
target=self._process_torrent,
args=(torrent,),
daemon=True,
name=f"torrent-{hash_[:8]}",
).start()
def _process_torrent(self, torrent: dict):
hash_ = torrent["hash"]
name = torrent["name"]
log.info(f"Pipeline: processing torrent '{name}' [{hash_[:8]}]")
self.state.add_job(hash_, name, "checking space")
try:
self._run_pipeline(torrent)
self.state.complete_job(hash_, "success")
except Exception as e:
log.error(f"Pipeline: error on '{name}': {e}\n{traceback.format_exc()}")
self.discord.error(f"❌ Pipeline Failed: {name}", str(e))
self.state.complete_job(hash_, f"error: {e}")
finally:
self._active_hashes.discard(hash_)
def _run_pipeline(self, torrent: dict):
hash_ = torrent["hash"]
name = torrent["name"]
# ── 1. Check free space on namer SSD ──────────────────────────────────
torrent_gb = torrent.get("size", 0) / (1024 ** 3)
self._check_space(self.namer_watch, self.namer_ssd_min, torrent_gb, name)
self.state.update_job(hash_, "setting category: copying")
# ── 2. Set category → 2 - copying (qBit moves files to transient) ────
self.qbit.set_category(hash_, self.qbit.cat_copying)
# ── 3. Wait for qBit to finish moving files to transient dir ──────────
self.state.update_job(hash_, "waiting for qBit file move")
moved = self.qbit.wait_for_move(
hash_, self.transient, self.move_timeout, self.move_poll
)
if not moved:
# Not fatal — qBit might report save_path differently; try to proceed
log.warning(f"Pipeline: qBit save_path didn't update for {name}, proceeding anyway")
# ── 4. Find video files in transient dir for this torrent ─────────────
self.state.update_job(hash_, "copying to namer")
# qBit may have created a subfolder named after the torrent
torrent_dir = os.path.join(self.transient, torrent.get("content_path", "").lstrip("/"))
if not os.path.exists(torrent_dir):
torrent_dir = self.transient # flat download, files directly in transient
video_files = find_video_files(torrent_dir, self.video_exts)
if not video_files:
# Fallback: scan whole transient dir for files matching torrent hash
video_files = find_video_files(self.transient, self.video_exts)
if not video_files:
raise RuntimeError(f"No video files found in transient dir for '{name}'")
log.info(f"Pipeline: found {len(video_files)} video file(s) for '{name}'")
self.discord.info(
f"📋 Processing: {name}",
f"Found {len(video_files)} video file(s). Copying to namer...",
)
# ── 5. Copy video files (flattened) to namer watch dir ────────────────
copied = []
for vf in video_files:
dest = copy_file_flat(vf, self.namer_watch)
copied.append(dest)
log.info(f"Pipeline: copied {len(copied)} file(s) to namer watch dir")
# ── 6. Set category → 3 - seeding (qBit moves torrent to spinning array)
self.state.update_job(hash_, "setting category: seeding")
self.qbit.set_category(hash_, self.qbit.cat_seeding)
log.info(f"Pipeline: torrent '{name}' set to seeding")
self.discord.success(
f"✅ Copied & Seeding: {name}",
f"Copied {len(copied)} video file(s) to namer.\n"
f"Torrent moved to seeding. Waiting for namer to process...",
)
def _check_space(self, path: str, min_free_gb: float, needed_gb: float, name: str):
"""Check free space; pause and wait + alert if insufficient."""
check_interval = self.cfg.get("disk_space", "space_check_interval", default=60)
while True:
available = free_gb(path)
if available >= (min_free_gb + needed_gb):
self.state.set_pause() # clear any previous pause
return
reason = (
f"Low disk space on namer SSD: {available:.1f} GB free, "
f"need {min_free_gb + needed_gb:.1f} GB for '{name}'"
)
if not self.state.paused:
log.warning(f"Pipeline: PAUSED — {reason}")
self.state.set_pause(reason)
self.discord.warn("⏸️ Pipeline Paused — Low Disk Space", reason)
time.sleep(check_interval)
# =============================================================================
# Web UI (Flask, runs in its own thread)
# =============================================================================
def create_web_app(state: State):
from flask import Flask, jsonify, render_template_string
app = Flask(__name__)
HTML = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Media Automator</title>
<meta http-equiv="refresh" content="15">
<style>
:root {
--bg: #0f1117; --surface: #1a1d27; --surface2: #22263a;
--accent: #5865f2; --green: #57f287; --yellow: #fee75c;
--red: #ed4245; --text: #e0e0e0; --muted: #888;
--border: #2e3250; --radius: 8px;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body { background: var(--bg); color: var(--text); font-family: 'Segoe UI', sans-serif;
font-size: 14px; line-height: 1.6; padding: 24px; }
h1 { font-size: 22px; color: #fff; margin-bottom: 4px; }
.subtitle { color: var(--muted); font-size: 12px; margin-bottom: 24px; }
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
gap: 16px; margin-bottom: 24px; }
.stat { background: var(--surface); border: 1px solid var(--border);
border-radius: var(--radius); padding: 16px; }
.stat .label { color: var(--muted); font-size: 11px; text-transform: uppercase;
letter-spacing: .05em; margin-bottom: 6px; }
.stat .value { font-size: 28px; font-weight: 700; color: var(--accent); }
.card { background: var(--surface); border: 1px solid var(--border);
border-radius: var(--radius); padding: 16px; margin-bottom: 16px; }
.card h2 { font-size: 14px; color: var(--muted); text-transform: uppercase;
letter-spacing: .05em; margin-bottom: 12px; }
table { width: 100%; border-collapse: collapse; }
th { text-align: left; color: var(--muted); font-size: 11px; text-transform: uppercase;
padding: 6px 8px; border-bottom: 1px solid var(--border); }
td { padding: 8px; border-bottom: 1px solid var(--border); font-size: 13px; }
tr:last-child td { border-bottom: none; }
.badge { display: inline-block; padding: 2px 8px; border-radius: 20px;
font-size: 11px; font-weight: 600; }
.badge-blue { background: rgba(88,101,242,.2); color: var(--accent); }
.badge-green { background: rgba(87,242,135,.2); color: var(--green); }
.badge-yellow { background: rgba(254,231,92,.2); color: var(--yellow); }
.badge-red { background: rgba(237,66,69,.2); color: var(--red); }
.banner { padding: 12px 16px; border-radius: var(--radius); margin-bottom: 16px;
font-weight: 600; background: rgba(254,231,92,.15);
border: 1px solid rgba(254,231,92,.3); color: var(--yellow); }
.empty { color: var(--muted); font-style: italic; padding: 8px; }
.ts { color: var(--muted); font-size: 11px; }
</style>
</head>
<body>
<h1>📡 Media Automator</h1>
<div class="subtitle">Auto-refreshes every 15s &nbsp;·&nbsp; {{ now }}</div>
{% if snap.paused %}
<div class="banner"> PAUSED {{ snap.pause_reason }}</div>
{% endif %}
<div class="grid">
<div class="stat"><div class="label">Processed</div>
<div class="value">{{ snap.stats.total_processed }}</div></div>
<div class="stat"><div class="label">Whisparr Imports</div>
<div class="value" style="color:var(--green)">{{ snap.stats.whisparr_imported }}</div></div>
<div class="stat"><div class="label">Stash Identified</div>
<div class="value" style="color:var(--accent)">{{ snap.stats.stash_identified }}</div></div>
<div class="stat"><div class="label">Errors</div>
<div class="value" style="color:var(--red)">{{ snap.stats.errors }}</div></div>
<div class="stat"><div class="label">Active Jobs</div>
<div class="value" style="color:var(--yellow)">{{ snap.active_jobs|length }}</div></div>
</div>
<div class="card">
<h2>Active Jobs</h2>
{% if snap.active_jobs %}
<table>
<tr><th>Torrent</th><th>Stage</th><th>Started</th></tr>
{% for hash, job in snap.active_jobs.items() %}
<tr>
<td>{{ job.name }}</td>
<td><span class="badge badge-blue">{{ job.stage }}</span></td>
<td class="ts">{{ job.started }}</td>
</tr>
{% endfor %}
</table>
{% else %}<div class="empty">No active jobs.</div>{% endif %}
</div>
<div class="card">
<h2>Recent History</h2>
{% if snap.history %}
<table>
<tr><th>Torrent</th><th>Outcome</th><th>Completed</th></tr>
{% for job in snap.history %}
<tr>
<td>{{ job.name }}</td>
<td>
{% if job.outcome == 'success' %}
<span class="badge badge-green"> success</span>
{% else %}
<span class="badge badge-red"> {{ job.outcome[:40] }}</span>
{% endif %}
</td>
<td class="ts">{{ job.get('completed', '') }}</td>
</tr>
{% endfor %}
</table>
{% else %}<div class="empty">No completed jobs yet.</div>{% endif %}
</div>
</body>
</html>"""
@app.route("/")
def index():
snap = state.snapshot()
now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
return render_template_string(HTML, snap=snap, now=now)
@app.route("/api/state")
def api_state():
return jsonify(state.snapshot())
return app
# =============================================================================
# Entry Point
# =============================================================================
def main():
config_path = os.environ.get("CONFIG_PATH", "/config/config.yml")
log.info(f"Loading config from {config_path}")
cfg = Config(config_path)
video_exts = set(cfg.get("video_extensions", default=list(VIDEO_EXTS_DEFAULT)))
state = State(history_size=cfg.get("webui", "history_size", default=100))
discord = Discord(cfg)
qbit = QBitClient(cfg)
whisparr = WhisparrClient(cfg)
stash = StashClient(cfg)
# Ensure output directories exist
for p in [
cfg.get("paths", "namer_watch"),
cfg.get("paths", "whisparr_import"),
cfg.get("paths", "stash_library"),
cfg.get("paths", "stash_temp"),
]:
if p:
Path(p).mkdir(parents=True, exist_ok=True)
discord.info("🚀 Media Automator Started", "All systems initialised. Pipeline is running.")
# ── Start background watchers ──────────────────────────────────────────────
namer_watcher = NamerOutputWatcher(cfg, whisparr, stash, discord, state, video_exts)
namer_watcher.start()
stash_watcher = StashTempWatcher(cfg, whisparr, discord, state, video_exts)
stash_watcher.start()
# ── Start main pipeline in its own thread ─────────────────────────────────
pipeline = Pipeline(cfg, qbit, discord, state)
pipeline_thread = threading.Thread(target=pipeline.run, daemon=True, name="pipeline")
pipeline_thread.start()
# ── Start web UI (blocks main thread) ─────────────────────────────────────
port = cfg.get("webui", "port", default=8888)
log.info(f"Web UI starting on port {port}")
app = create_web_app(state)
app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,6 @@
requests==2.31.0
PyYAML==6.0.1
Flask==3.0.0
watchdog==3.0.0
psutil==5.9.6
python-dateutil==2.8.2