Skip to content

Commit

Permalink
Use threaded downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielOaks committed Oct 4, 2015
2 parents 4d7a0e4 + fa84306 commit cca4087
Show file tree
Hide file tree
Showing 6 changed files with 473 additions and 229 deletions.
66 changes: 55 additions & 11 deletions basc_archiver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# BASC Imageboard Archiver
from __future__ import print_function
from __future__ import absolute_import
from __future__ import print_function
import threading

from .sites import default_archivers

version = '0.8.6'
version = '0.8.7 (threaded)'
_default_base_dir = './archive'


class Options:
"""Holds Archiver options."""

def __init__(self, base_dir, use_ssl=False,
silent=False, verbose=False, delay=2,
silent=False, verbose=False,
delay=2, thread_check_delay=90,
dl_threads_per_site=5, dl_thread_wait=1,
skip_thumbs=False, thumbs_only=False,
follow_child_threads=False, follow_to_other_boards=False):
follow_child_threads=False, follow_to_other_boards=False,
run_once=False,):
self.base_dir = base_dir
self.use_ssl = use_ssl
self.silent = silent
self.verbose = verbose
self.delay = delay # wait 2 seconds by default
self.delay = float(delay) # wait 2 seconds by default
self.thread_check_delay = float(thread_check_delay) # between checks of the same thread
self.dl_threads_per_site = int(dl_threads_per_site)
self.dl_thread_wait = float(dl_thread_wait)
self.skip_thumbs = skip_thumbs
self.thumbs_only = thumbs_only
self.follow_child_threads = follow_child_threads
self.follow_to_other_boards = follow_to_other_boards
self.run_once = run_once


class Archiver:
Expand All @@ -35,12 +43,22 @@ def __init__(self, options=None):
if options is None:
options = Options(_default_base_dir)
self.options = options
self.callbacks_lock = threading.Lock()
self.callbacks = {
'all': []
} # info callbacks

# add our default site-specific archivers
self.archivers = []
for archiver in default_archivers:
self.archivers.append(archiver(self.options))
self.archivers.append(archiver(self.update_status, self.options))

def shutdown(self):
"""Shutdown the archiver."""
for archiver in self.archivers:
archiver.shutdown()

# threads
def add_thread(self, url):
"""Archive the given thread if possible"""
url_archived = False
Expand All @@ -55,15 +73,41 @@ def add_thread(self, url):
print('We could not find a valid archiver for:', url)
return False

def download_threads(self):
"""Download all the threads we currently hold."""
for archiver in self.archivers:
archiver.download_threads()

@property
def existing_threads(self):
"""Return how many threads exist."""
threads = 0
for archiver in self.archivers:
threads += archiver.existing_threads
return threads

# callbacks
def register_callback(self, cb_type, handler):
"""Register a callback."""
with self.callbacks_lock:
if cb_type not in self.callbacks:
self.callbacks[cb_type] = []

if handler not in self.callbacks[cb_type]:
self.callbacks[cb_type].append(handler)

def unregister_callback(self, cb_type, handler):
"""Remove a callback."""
with self.callbacks_lock:
if cb_type in self.callbacks and handler in self.callbacks[cb_type]:
self.callbacks[cb_type].remove(handler)

def update_status(self, cb_type, info):
"""Update thread status, call callbacks where appropriate."""
with self.callbacks_lock:
# to stop us calling same handler twice
called = []

if cb_type in self.callbacks:
for handler in self.callbacks[cb_type]:
handler(cb_type, info)
called.append(handler)

for handler in self.callbacks['all']:
if handler not in called:
handler(cb_type, info)
122 changes: 110 additions & 12 deletions basc_archiver/sites/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,122 @@
from __future__ import absolute_import

import os
import time
import threading

DEFAULT_NOITEM_WAIT = 1
DEFAULT_OK_WAIT = 0.1


class DownloadItem(object):
def __init__(self, dl_type, info):
self.dl_type = dl_type
self.info = info
self.next_dl_timestamp = 0

def can_dl(self):
"""True if you can download this item."""
return time.time() >= self.next_dl_timestamp

def delay_dl_timestamp(self, delay_in_seconds=90):
"""Delay the download of this item for 90 seconds."""
self.next_dl_timestamp = time.time() + delay_in_seconds


class DownloadThread(threading.Thread):
def __init__(self, site,
noitem_wait_seconds=DEFAULT_NOITEM_WAIT,
nextitem_wait_seconds=DEFAULT_OK_WAIT):
threading.Thread.__init__(self)
self.site = site
self.noitem_wait_seconds = noitem_wait_seconds
self.nextitem_wait_seconds = nextitem_wait_seconds
self.daemon = True
self.start()

def run(self):
while True:
# check if shutdown
if self.site.is_shutdown:
break

# get next item to dl
next_item = None
with self.site.to_dl_lock:
# make sure dl timestamp on selected item has passed
for i in range(len(self.site.to_dl)):
next_item = self.site.to_dl[i]

if next_item.can_dl():
self.site.to_dl.pop(i)
break
else:
next_item = None

# download
if next_item is not None:
self.site.download_item(next_item)

# wait
if next_item is None:
time.sleep(self.noitem_wait_seconds)
else:
time.sleep(self.nextitem_wait_seconds)


class BaseSiteArchiver(object):
name = 'base'

def __init__(self, options):
def __init__(self, handler_callback, options):
if self.name == 'base':
raise Exception('BaseSiteArchiver must be subclassed!')
self.threads = {}
self.options = options
self.base_thread_dir = os.path.join(options.base_dir, '{}/{{board}}/{{thread}}/'.format(self.name))
self.base_thread_dir = os.path.join(options.base_dir,
'{}/{{board}}/{{thread}}/'.format(self.name))
self.base_images_dir = os.path.join(self.base_thread_dir, 'images')
self.base_thumbs_dir = os.path.join(self.base_thread_dir, 'thumbs')

self.threads_lock = threading.Lock()
self.threads = {}

# setup thread info
self.is_shutdown = False
self.to_dl_lock = threading.Lock()
self.to_dl = []

self._handler_callback = handler_callback

# start download threads
for i in range(getattr(self, 'dl_threads', options.dl_threads_per_site)):
DownloadThread(self, **{
'noitem_wait_seconds': getattr(self, 'noitem_wait_seconds',
DEFAULT_NOITEM_WAIT),
'nextitem_wait_seconds': getattr(self, 'nextitem_wait_seconds',
options.dl_thread_wait),
})

def shutdown(self):
"""Shutdown this archiver."""
self.is_shutdown = True

def update_status(self, cb_type, info):
"""Update thread status, call callbacks where appropriate."""
# mostly convenience function
info['site'] = self.name
self._handler_callback(cb_type, info)

# download
def add_to_dl(self, dl_type=None, item=None, **kwargs):
"""Add an item to our download list."""
if item is not None:
new_item = item
else:
new_item = DownloadItem(dl_type, kwargs)

with self.to_dl_lock:
self.to_dl.append(new_item)

# adding threads
def url_valid(self, url):
"""Return true if the given URL is for my site."""
raise Exception('you must override this method')
Expand All @@ -25,18 +129,12 @@ def add_thread(self, url):
"""Try to add the given thread to our internal list."""
raise Exception('you must override this method')

def download_threads(self):
"""Download all the threads we currently hold."""
# we iterate over a copy of self.threads because download_thread
# deletes it from there if thread 404's
for thread_id in dict(self.threads):
self._download_thread(self.threads[thread_id])

@property
def existing_threads(self):
"""Return how many threads we have and are downloading."""
return len(self.threads)

def _download_thread(self, thread_info):
"""Download the given thread, from the thread info."""
# downloading specific items
def download_item(self, item):
"""Download the given item"""
raise Exception('you must override this method')
Loading

0 comments on commit cca4087

Please sign in to comment.