PlexWasher/plexwasher.py

239 lines
9.1 KiB
Python
Executable File

#!/usr/bin/env python3.10
import configparser
import logging
import traceback
import argparse
import json
import os
import ast
import sys
import requests
from datetime import datetime
from dateutil.relativedelta import relativedelta
class RequestError(Exception):
pass
class ApplicationError(Exception):
pass
SERVER_URL = ""
API_KEY = ""
FILM_SECTION_IDS = []
DEADLINE_NEVER_WATCHED = ""
DEADLINE_LAST_WATCHED = ""
FILES_TO_KEEP = []
# Retrieve the list of records that does not follow the deadline policies from tautulli
def get_unwatched_rating_keys(sectionId):
# Init variables
length = 100
start = 0
unwatched = []
deadlineNeverWacthed = datetime.now() - relativedelta(months=DEADLINE_NEVER_WATCHED)
deadlineLastWacthed = datetime.now() - relativedelta(months=DEADLINE_LAST_WATCHED)
# Send http request to refresh the list and get the number of media to safeguard from infinite loop
payload = {'apikey': API_KEY, 'cmd': 'get_library_media_info', 'section_id': sectionId, 'order_column': 'last_played', 'order_dir': 'asc','length': 1, 'start': 0, 'refresh ': 'true'}
request = requests.get(SERVER_URL+"api/v2", params=payload)
response = json.loads(request.content)
if response["response"]['result'] == 'error':
raise RequestError("Could not request records list to tautulli: " + response["response"]['message'])
totalRecords = response["response"]["data"]["recordsTotal"]
while True:
# Used to break from loop. See below
recentlyWatched = False
# Send http request to retrieve the next n=length items
payload = {'apikey': API_KEY, 'cmd': 'get_library_media_info', 'section_id': sectionId, 'order_column': 'last_played', 'order_dir': 'asc', 'length': length, 'start': start}
request = requests.get(SERVER_URL+"api/v2", params=payload)
response = json.loads(request.content)
if response["response"]['result'] == 'error':
raise RequestError("Could not request records list to tautulli: " + response["response"]['message'])
# Select the items that has not been watched based on deadline policies
for media in response["response"]["data"]["data"]:
if media["last_played"] == None:
addedAt = datetime.fromtimestamp(int(media["added_at"]))
logmsg = "media: {0:50} has never been watched. Added the {1:20}".format(media["title"], addedAt.strftime("%d/%m/%Y %H:%M:%S"))
if addedAt > deadlineNeverWacthed:
logger.debug("{} -----> Ignoring".format(logmsg))
continue
else:
lastPlayed = datetime.fromtimestamp(int(media["last_played"]))
logmsg = "media: {0:50} has been last played the: {1:20}".format(media["title"], lastPlayed.strftime("%d/%m/%Y %H:%M:%S"))
if lastPlayed > deadlineLastWacthed:
logger.debug("{} -----> Ignoring".format(logmsg))
recentlyWatched = True
continue
# Item is selected for deletion
logger.debug("{} -----> To be deleted".format(logmsg))
unwatched.append(media["rating_key"])
# The media are listed in asc order of last_played (never played records comes first), so if a record has been watched there is no need to continue
if recentlyWatched == True or start > totalRecords:
break
start += length
return unwatched
# Returns the path list of all the files composing the media
def get_media_paths(rating_key):
# Get the metadata of the media
payload = {'apikey': API_KEY, 'cmd': 'get_metadata', 'rating_key': rating_key}
request = requests.get(SERVER_URL+"api/v2", params=payload)
response = json.loads(request.content)
if response["response"]['result'] == 'error':
raise RequestError("Could not request media path for {} : {}".format(rating_key, response["response"]['message']))
match response["response"]["data"]["media_type"]:
case "movie" | "episode":
# If the media is a video file we return it as a list of single element
return [response["response"]["data"]["media_info"][0]["parts"][0]["file"]]
case "show" | "season":
# If the media is a group of media we get the path of all its components
# Getting the list of child
payload = {'apikey': API_KEY, 'cmd': 'get_children_metadata', 'rating_key': rating_key}
request = requests.get(SERVER_URL+"api/v2", params=payload)
response = json.loads(request.content)
if response["response"]['result'] == 'error':
raise RequestError("Could not request media path for {} : {}".format(rating_key, response["response"]['message']))
# Get the path of all childs recursively
paths = []
for media in response["response"]["data"]["children_list"]:
if media["rating_key"] == "":
continue
paths.extend(get_media_paths(media["rating_key"]))
return paths
case _:
raise ApplicationError("Unkown media type for rating key {}: {} ".format(rating_key, response["response"]["data"]["media_type"]))
# Get the list of files path to be removed
def get_files_to_remove(unwatched):
logger.info("Getting path of unwatched medias")
pathToRemove = []
for media in unwatched:
paths = get_media_paths(media)
for path in paths:
if path not in FILES_TO_KEEP:
pathToRemove.append(path)
return pathToRemove
# writes the list of files selected for deletion in a file
def get_and_store_files_to_remove():
logger.info("Getting list of rating id for unwatched medias")
unwatched = []
for sectionId in FILM_SECTION_IDS:
logger.debug("Getting list for sectionID={}".format(sectionId))
unwatched.extend(get_unwatched_rating_keys(sectionId))
logger.debug("list of rating_key of unwatched medias: {}".format(unwatched))
pathToRemove = get_files_to_remove(unwatched)
logger.info("{} medias to be removed".format(len(pathToRemove)))
logger.info("List stored in {}".format(outputfile))
with open(outputfile, 'w') as f:
logger.debug("writing to '{}'".format(outputfile))
for path in pathToRemove:
f.write(f"{path}\n")
# Delete all files and empty parent folder listed in an input file
def delete_files(inputFile):
logger.info("Deleting all unwanted media files listed in \"{}\"".format(inputFile))
with open(inputFile, 'r') as f:
for path in f:
path = path.strip()
logger.debug("Deleting file: " + path)
os.remove(path)
parent = os.path.split(path)[0]
dir = os.listdir(parent)
if len(dir) == 0:
logger.debug("Deleting empty folder: " + parent)
os.rmdir(parent)
###### ----- main ----- ######
### --- Create and read the cmd line --- ###
# -- Define cmd --
parser = argparse.ArgumentParser()
parser.add_argument(
'-v', '--verbose',
help="Print lots of debugging statements",
action="store_const", dest="loglevel", const=logging.DEBUG,
default=logging.INFO,
)
parser.add_argument(
'-g', '--get',
help="Get the list of files to delete and stores the list in a file",
action="store_true"
)
parser.add_argument(
'-d', '--delete',
help="Read a file listing unwanted files, delete the unwanted files and its parent folders if empty.",
action="store_true"
)
parser.add_argument(
'-o', '--output',
help="Change the default output file.",
action="store",
default="to_delete.txt",
)
# -- Read args --
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
logger = logging.getLogger('plexwasher')
outputfile = args.output
### --- Read config.ini file --- ###
# -- Open --
logger.info("Initializing...")
config_obj = configparser.ConfigParser()
config_obj.read("config.ini")
# -- Read TAUTULLI section --
serverParam = config_obj["TAUTULLI"]
SERVER_URL = serverParam["server_url"]
API_KEY = serverParam["api_key"]
# -- Read MEDIA_FILTER section --
mediaParam = config_obj["MEDIA_FILTER"]
FILM_SECTION_IDS = ast.literal_eval(mediaParam["film_section_ids"])
DEADLINE_NEVER_WATCHED = int(mediaParam["deadline_never_watched"])
DEADLINE_LAST_WATCHED = int(mediaParam["deadline_last_watched"])
FILES_TO_KEEP = ast.literal_eval(mediaParam["files_to_ignore"])
logger.debug("Loaded config values: \n SERVER_URL: {}\n API_KEY: hidden\n FILM_SECTION_IDS: {}\n DEADLINE_NEVER_WATCHED: {}\n DEADLINE_LAST_WATCHED: {}\n FILES_TO_KEEP: {}".format(SERVER_URL, FILM_SECTION_IDS, DEADLINE_NEVER_WATCHED, DEADLINE_LAST_WATCHED, FILES_TO_KEEP))
### --- Start logic --- ###
if not args.get and not args.delete:
logger.info("Nothing to do. Please give provide arguments (ex: python3 plexwasher.py --get --delete).")
exit(0)
try:
if args.get:
get_and_store_files_to_remove()
if args.delete:
delete_files(outputfile)
except (RequestError, ApplicationError) as err:
logger.error(err)