import re import requests import sys import json from urllib.parse import urlparse, urlencode, quote from datetime import timedelta import py_common.log as log from py_common.config import get_config from py_common.util import scraper_args from py_common.types import ScrapedPerformer, ScrapedScene, ScrapedMovie # DO NOT EDIT THIS FILE # run the scraper once and edit the config.ini file instead config = get_config( default="""# Jellyfin endpoint, change this if Jellyfin is not running on the same server as Stash host = http://localhost:8096/ # Jellyfin API Key: generate one in Jellyfin Dashboard -> Advanced -> API Keys api_key = xxxxxxxxxxxxxxxxxxxxxxxx # Jellyfin user-id (extract from Jellyfin->Admin->User from the URL of the User) # for example: http://localhost:8096/web/index.html#!/myprofile.html?userId=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx userid = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx """ ) errors = [] if "x" in config.api_key: errors.append("api_key") if "x" in config.userid: errors.append("userid") if errors: log.error(f"Please configure {' and '.join(errors)} in your config.ini") exit(1) base_params = {"api_key": config.api_key, "userid": config.userid} base_url = urlparse(config.host)._replace(query=urlencode(base_params)) def to_scraped_performer(item: dict) -> ScrapedPerformer: performer: ScrapedPerformer = { "name": item["Name"], "url": base_url._replace(path=f"Persons/{quote(item['Name'])}").geturl(), "images": [ base_url._replace(path=f"Items/{item['Id']}/Images/Primary").geturl() ], } if birthdate := item.get("PremiereDate"): performer["birthdate"] = birthdate[:10] if aliases := item.get("OriginalTitle"): performer["aliases"] = aliases if country := item.get("ProductionLocations"): performer["country"] = country[0].split(",")[-1].strip() if tags := item.get("Tags"): performer["tags"] = [{"name": tag} for tag in tags] return performer def to_scraped_scene(item: dict) -> ScrapedScene: scene: ScrapedScene = { "title": item["Name"], "url": base_url._replace(path=f"Items/{item['Id']}").geturl(), "image": base_url._replace(path=f"Items/{item['Id']}/Images/Primary").geturl(), } if date := item.get("PremiereDate"): scene["date"] = date[:10] if details := item.get("Overview"): scene["details"] = details if tags := item.get("Genres"): scene["tags"] = [{"name": tag} for tag in tags] if studios := item.get("Studios"): scene["studio"] = {"name": studios[0]["Name"]} if performers := item.get("People"): scene["performers"] = [ {"name": performer["Name"]} for performer in performers if performer["Type"] == "Actor" ] if urls := item.get("ExternalUrls"): for url in urls: if url["Name"] in ("IMDb", "TheMovieDb"): scene["url"] = url["Url"] return scene def to_scraped_movie(item: dict) -> ScrapedMovie: movie: ScrapedMovie = { "name": item["Name"], "front_image": base_url._replace( path=f"Items/{item['Id']}/Images/Primary" ).geturl(), } if duration := item.get("RunTimeTicks"): runtime = timedelta(seconds=(int(duration / 10000000))) movie["duration"] = str(runtime) if date := item.get("PremiereDate"): movie["date"] = date[:10] elif date := item.get("ProductionYear"): movie["date"] = str(date) if details := item.get("Overview"): movie["synopsis"] = details if people := item.get("People"): for person in people: if person["Type"] == "Director": movie["director"] = person["Name"] break if studios := item.get("Studios"): movie["studio"] = { "name": studios[0]["Name"], } if rating := item.get("CriticRating"): movie["rating"] = str(rating) elif rating := item.get("CommunityRating"): movie["rating"] = str(rating) return movie def scene_from_url(url: str) -> ScrapedScene | None: # If users paste their Jellyfin URL we can extract the ID from it if match := re.search(r"(? list[ScrapedScene]: # Strip the extension if we get a file name if "." in title: scenename = title.rsplit(".", 1)[0] else: scenename = title search_params = { "searchTerm": scenename, "Limit": 10, "IncludeItemTypes": "Movie", "Fields": "Overview,Genres,Studios,People,ExternalUrls,PremiereDate", "Recursive": True, **base_params, } search_url = base_url._replace( path="Items", query=urlencode(search_params) ).geturl() log.debug(f"Querying URL: {search_url}") res = requests.get(search_url) result = res.json() scenes = [to_scraped_scene(item) for item in result["Items"]] # In order for scene-by-query-fragment to scrape these scenes, we need to # add their Jellyfin URLs to the scene object: users can rescrape to get a TMDb URL scrapable_urls = [ base_url._replace(path=f"Users/{config.userid}/Items/{item['Id']}").geturl() for item in result["Items"] ] scrapable_scenes = [ scene | {"url": url} for scene, url in zip(scenes, scrapable_urls) ] log.debug(f"Found {len(scenes)} scenes") return scrapable_scenes def scene_from_fragment(title: str) -> ScrapedScene | None: log.debug(f"Getting scene through fragment title: '{title}'") scenes = scene_search(title) if not scenes: log.debug(f"Didn't find {title} in fragment mode") return None found = scenes[0] log.debug(f"Found '{found['title']}'") # type: ignore (title will be set) return found def performer_from_url(url: str) -> ScrapedPerformer | None: log.debug(f"Getting performer from URL: '{url}'") performer_url = urlparse(url) res = requests.get(performer_url._replace(query=urlencode(base_params)).geturl()) person = res.json() if "Name" not in person: log.warning(f"Didn't find {url} in URL mode") return None log.debug(f"Found performer '{person['Name']}'") performer = to_scraped_performer(person) return performer def performer_search(query: str) -> list[ScrapedPerformer]: log.debug(f"Searching for performer '{query}'") search_params = { "searchTerm": query, "fields": "OriginalTitle,ProductionLocations,PremiereDate,Tags,ExternalUrls", **base_params, } get_url = base_url._replace(path="Persons", query=urlencode(search_params)) log.debug(f"Querying URL: {get_url}") res = requests.get(get_url.geturl()) search_result = res.json() performers = [to_scraped_performer(item) for item in search_result["Items"]] log.debug(f"Found {len(performers)} performers") return performers def movie_from_url(url): # If users paste their Jellyfin URL we can extract the ID from it if match := re.search(r"(?