Skip to content

Commit

Permalink
Merge pull request #25 from lewler/dev
Browse files Browse the repository at this point in the history
v0.6.1
  • Loading branch information
lewler authored Nov 19, 2024
2 parents 9738400 + ccdf195 commit 04c6a7c
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 62 deletions.
19 changes: 13 additions & 6 deletions bulldozer
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,33 @@ from classes.report import Report

config = load_config()

def main(input, censor_rss, report_only=False, search_term=None, download_only=False, refresh=False):
def main(input, censor_rss, report_only=False, search_term=None, download_only=False, refresh=False, name=None, match_titles=None):
"""
Main function to run the bulldozer script
:param input: The input to the script, can be a directory path, RSS feed URL, or local RSS file path
:param censor_rss: Whether to censor the RSS feed or not
"""
global config
print("· • —– ++ ---| Bulldozer v0.6.0 |--- ++ —– • ·")
print("· • —– ++ ---| Bulldozer v0.6.1 |--- ++ —– • ·")
database_active = config.get("database", {}).get("active", True)

if os.path.isdir(input):
folder_path = Path(input)
name = folder_path.name
if not name:
name = folder_path.name
check_duplicates = not report_only
podcast = Podcast(name, folder_path, config, censor_rss=censor_rss, check_duplicates=check_duplicates, search_term=search_term)
else:
source_rss_file = input
output_dir = config.get("output_dir", ".")
temp_dir = Path(output_dir) / 'download_temp'
if not name:
name = "unknown podcast"
temp_dir = Path(output_dir) / 'download_temp'
else:
temp_dir = Path(output_dir) / f"{name}"
temp_dir.mkdir(parents=True, exist_ok=True)
podcast = Podcast("unknown podcast", temp_dir, config, source_rss_file, censor_rss, True, search_term)
podcast = Podcast(name, temp_dir, config, source_rss_file, censor_rss, True, search_term, match_titles)
podcast.download_episodes()
name = podcast.name
folder_path = temp_dir
Expand Down Expand Up @@ -162,6 +167,8 @@ if __name__ == "__main__":
parser.add_argument("--check-config", action="store_true", help="Check that user config is valid")
parser.add_argument("--log-level", type=str, help="Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)")
parser.add_argument("--search-term", type=str, help="Search term to use when querying APIs")
parser.add_argument("--name", type=str, help="Override the name to use for the podcast")
parser.add_argument("--match-titles", type=str, help="Only includes the episodes with titles matching the given string")
args = parser.parse_args()

if not config:
Expand All @@ -181,4 +188,4 @@ if __name__ == "__main__":
elif args.check_config:
check_config()
else:
main(args.input, args.censor_rss, args.report_only, args.search_term, args.download_only, args.refresh)
main(args.input, args.censor_rss, args.report_only, args.search_term, args.download_only, args.refresh, args.name, args.match_titles)
19 changes: 10 additions & 9 deletions classes/file_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,17 @@ def get_date_range(self):
if self.earliest_year is None or (year and year < self.earliest_year):
self.earliest_year = year
if self.first_episode_date is None or date_str < self.first_episode_date:
self.first_episode_date = date_str
self.real_first_episode_date = self.first_episode_date = date_str
if self.last_episode_date is None or date_str > self.last_episode_date:
self.last_episode_date = date_str

for date_str in self.original_files.keys():
year = int(str(date_str)[:4])
if self.real_first_episode_date is None or (date_str and date_str < self.real_first_episode_date):
self.real_first_episode_date = date_str
if self.real_last_episode_date is None or (date_str and date_str > self.real_last_episode_date):
self.real_last_episode_date = date_str
self.real_last_episode_date = self.last_episode_date = date_str

if self.original_files:
for date_str in self.original_files.keys():
year = int(str(date_str)[:4])
if self.real_first_episode_date is None or (date_str and date_str < self.real_first_episode_date):
self.real_first_episode_date = date_str
if self.real_last_episode_date is None or (date_str and date_str > self.real_last_episode_date):
self.real_last_episode_date = date_str

def process_metadata(self, metadata, file_path):
"""
Expand Down
2 changes: 1 addition & 1 deletion classes/file_organizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def rename_folder(self):
date_format_short = self.config.get('date_format_short', '%Y-%m-%d')
date_format_long = self.config.get('date_format_long', '%B %d %Y')
start_year_str = str(self.podcast.analyzer.earliest_year) if self.podcast.analyzer.earliest_year else "Unknown"
real_start_year_str = str(self.podcast.analyzer.real_first_episode_date) if self.podcast.analyzer.real_first_episode_date else "Unknown"
real_start_year_str = str(self.podcast.analyzer.real_first_episode_date)[:4] if self.podcast.analyzer.real_first_episode_date else "Unknown"
first_episode_date_str = format_last_date(self.podcast.analyzer.first_episode_date, date_format_long) if self.podcast.analyzer.first_episode_date else "Unknown"
last_episode_date_str = format_last_date(self.podcast.analyzer.last_episode_date, date_format_long) if self.podcast.analyzer.last_episode_date else "Unknown"
last_episode_date_dt = datetime.strptime(self.podcast.analyzer.last_episode_date, date_format_short) if self.podcast.analyzer.last_episode_date != "Unknown" else None
Expand Down
12 changes: 6 additions & 6 deletions classes/podcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .database import Database

class Podcast:
def __init__(self, name, folder_path, config, source_rss_file=None, censor_rss=False, check_duplicates=True, search_term=None):
def __init__(self, name, folder_path, config, source_rss_file=None, censor_rss=False, check_duplicates=True, search_term=None, match_titles=None):
"""
Initialize the Podcast with the name, folder path, configuration, and source RSS file.
Expand All @@ -23,6 +23,7 @@ def __init__(self, name, folder_path, config, source_rss_file=None, censor_rss=F
:param censor_rss: If True, the RSS feed will be censored.
:param check_duplicates: If True, check for duplicate episodes.
:param search_term: The search term used to find the podcast.
:param match_titles: The titles to match when checking for duplicates.
The Podcast class is responsible for handling the podcast.
"""
Expand All @@ -33,9 +34,10 @@ def __init__(self, name, folder_path, config, source_rss_file=None, censor_rss=F
self.config = config
self.completed = False
self.downloaded = False
if self.name != 'unknown podcast':
if not source_rss_file:
self.downloaded = True
self.search_term = search_term
self.match_titles = match_titles
self.rss = Rss(self, source_rss_file, self.config, censor_rss)
self.image = PodcastImage(self, self.config)
self.metadata = PodcastMetadata(self, self.config)
Expand All @@ -58,14 +60,12 @@ def get_metadata(self, critical=True):
elif not metadata:
return metadata

self.name = self.rss.metadata['name']
if self.name == 'unknown podcast':
self.name = self.rss.metadata['name']

def download_episodes(self):
"""
Download the podcast episodes using podcast-dl.
:param episode_template: The template for the episode file names.
:param threads: The number of threads to use for downloading.
"""
self.get_metadata()
self.check_for_duplicates()
Expand Down
51 changes: 48 additions & 3 deletions classes/podcast_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pillow_avif
from PIL import Image
from .utils import spinner, get_metadata_directory, log, archive_metadata, find_case_insensitive_files
from .utils import copy_file
from .utils import copy_file, download_file

class PodcastImage:
def __init__(self, podcast, config):
Expand Down Expand Up @@ -91,8 +91,14 @@ def archive_file(self):
file_path = self.get_file_path()

if not file_path:
log(f"Image {file_path} does not exist.", "debug")
return
log(f"Image does not exist, trying to get it from the RSS", "debug")
if not self.get_image_from_rss():
log("Failed to get image from RSS", "debug")
return
file_path = self.get_file_path()
if not file_path:
log("Something went wrong getting the image from the RSS", "debug")
return

if self.archive:
log(f"Archiving image {file_path.name}", "debug")
Expand Down Expand Up @@ -138,3 +144,42 @@ def duplicate(self, new_folder):
new_file_path = new_folder / file_path.name
copy_file(file_path, new_file_path)
log(f"Duplicating image {file_path.name} to {new_file_path}", "debug")

def get_image_from_rss(self):
"""
Get the image from the RSS feed.
:return: The image from the RSS feed.
"""
if self.get_file_path():
log("Image already exists in the podcast folder", "debug")
return False

image_url = self.podcast.rss.get_image_url()
if not image_url:
log("No image found in the RSS feed", "debug")
return

with spinner("Downloading image from RSS") as spin:
status = self.download_image(image_url, self.podcast.folder_path / f'{self.podcast.name}.image.jpg')
if status:
spin.ok("✔")
else:
spin.fail("✖")
return False

return True

def download_image(self, image_url, target_path):
"""
Download an image from a URL.
:param image_url: The URL of the image to download.
"""
status = download_file(image_url, target_path)
if status:
log(f"Downloaded image to {target_path}", "debug")
else:
return False

return True
2 changes: 1 addition & 1 deletion classes/podcast_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def load(self, search_term=None):
log(f"Invalid JSON in file '{file_path.name}'.", "error")
log(json.JSONDecodeError.msg, "debug")
status = False
self.fetch_additional_data()
self.fetch_additional_data(search_term)
return status

def check_if_podcast_is_complete(self):
Expand Down
113 changes: 77 additions & 36 deletions classes/rss.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# rss.py
import requests
import re
import os
import shutil
import xml.etree.ElementTree as ET
from pathlib import Path
from titlecase import titlecase
from .utils import spinner, get_metadata_directory, log, find_case_insensitive_files, copy_file
from .utils import spinner, get_metadata_directory, log, find_case_insensitive_files, copy_file, download_file
from .utils import special_capitalization, archive_metadata, ask_yes_no, announce, perform_replacements

class Rss:
Expand Down Expand Up @@ -89,6 +88,9 @@ def rename(self):
Rename the RSS file to the podcast name.
"""
old_file_path = get_metadata_directory(self.podcast.folder_path, self.config) / f'podcast.rss'
if not old_file_path.exists():
log(f"RSS file {old_file_path} does not exist, can't rename", "error")
return
new_file_path = get_metadata_directory(self.podcast.folder_path, self.config) / f'{self.podcast.name}.rss'
log(f"Renaming RSS file from {old_file_path} to {new_file_path}", "debug")
old_file_path.rename(new_file_path)
Expand All @@ -113,21 +115,23 @@ def get_metadata_rename_folder(self):
log("Failed to extract name from RSS feed", "critical")
exit(1)

new_folder_path = self.podcast.folder_path.parent / f'{self.metadata['name']}'
if new_folder_path.exists():
spin.fail("✖")
log(f"Folder {new_folder_path} already exists", "critical")
if not ask_yes_no("Folder already exists, do you want to overwrite it?"):
announce("Exiting, cya later!", "info")
exit(1)

shutil.rmtree(new_folder_path)

self.podcast.folder_path.rename(new_folder_path)
log(f"Folder renamed to {new_folder_path}", "debug")
self.podcast.folder_path = new_folder_path
self.podcast.name = self.metadata['name']
self.rename()
if self.podcast.name == 'unknown podcast':
new_folder_path = self.podcast.folder_path.parent / f'{self.metadata['name']}'
if new_folder_path.exists():
spin.fail("✖")
log(f"Folder {new_folder_path} already exists", "critical")
if not ask_yes_no("Folder already exists, do you want to overwrite it?"):
announce("Exiting, cya later!", "info")
exit(1)

shutil.rmtree(new_folder_path)

self.podcast.folder_path.rename(new_folder_path)
log(f"Folder renamed to {new_folder_path}", "debug")
self.podcast.folder_path = new_folder_path
self.podcast.name = self.metadata['name']
self.rename()

self.metadata['total_episodes'] = self.get_episode_count_from()
self.check_for_premium_show()
spin.ok("✔")
Expand All @@ -139,34 +143,48 @@ def download_file(self):
Download the RSS feed file.
"""
with spinner("Downloading RSS feed") as spin:
try:
# Add headers to mimic a browser
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive"
}
response = requests.get(self.source_rss_file, headers=headers)
response.raise_for_status()
with self.default_file_path().open('wb') as rss_file:
rss_file.write(response.content)
result = download_file(self.source_rss_file, self.default_file_path())
if result:
log(f"RSS feed downloaded to {self.default_file_path()}", "debug")
spin.ok("✔")
except requests.RequestException as e:
else:
spin.fail("✘")
log(f"Failed to download RSS feed", "critical")
log(e, "debug")
raise

def check_titles(self):
"""
Check if the episode titles match self.podcast.match_titles
Only keep the episodes that match the titles.
"""
if not self.get_file_path():
log("RSS file does not exist, can't check for episode titles", "error")
return

if not self.podcast.match_titles:
log("No string to match provided, not removing any episodes", "debug")
return

log(f"Removing episodes that don't match: {self.podcast.match_titles}", "debug")
tree = ET.parse(self.get_file_path())
root = tree.getroot()
channel = root.find('channel')
if channel is not None:
items = channel.findall('item')
for item in items:
title_element = item.find('title')
if title_element is not None:
if self.podcast.match_titles not in title_element.text:
channel.remove(item)
with self.get_file_path().open('w') as rss_file:
rss_file.write(ET.tostring(root, encoding='utf-8').decode('utf-8'))

def load_local_file(self):
"""
Load the local RSS feed file.
"""
if self.keep_source_rss:
shutil.copy(self.source_rss_file, self.get_file_path())
shutil.copy(self.source_rss_file, self.default_file_path())
else:
self.source_rss_file.rename(self.get_file_path())
self.source_rss_file.rename(self.default_file_path())
self.source_rss_file = None

def get_file(self):
Expand All @@ -183,6 +201,8 @@ def get_file(self):
self.load_local_file()
else:
self.download_file()

self.check_titles()

def edit_rss_feed(self):
"""
Expand Down Expand Up @@ -260,7 +280,7 @@ def check_for_premium_show(self):
log(f"Invalid premium network configuration: {network}", "debug")
continue
tag = channel.find(network['tag'])
if tag is not None:
if tag is not None and tag.text:
if network['text'] in tag.text:
log(f"Identified premium network {network['name']} from RSS feed", "debug")
self.censor_rss = True
Expand Down Expand Up @@ -315,3 +335,24 @@ def duplicate(self, new_folder):
new_file_path.parent.mkdir(parents=True, exist_ok=True)
copy_file(file_path, new_file_path)
log(f"Duplicating RSS feed {file_path} to {new_file_path}", "debug")

def get_image_url(self):
if not self.get_file_path():
log("RSS file does not exist, can't get image url", "warning")
return None

try:
namespaces = {node[0]: node[1] for _, node in ET.iterparse(self.get_file_path(), events=['start-ns'])}
tree = ET.parse(self.get_file_path())
root = tree.getroot()

image = root.find('./channel/ns0:image', namespaces)

if image is not None:
return image.attrib.get('href')

return None
except ET.ParseError as e:
log(f"Error parsing RSS feed", "error")
log(e, "debug")
return None
Loading

0 comments on commit 04c6a7c

Please sign in to comment.