aboutsummaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/main.py2
-rw-r--r--app/ytdl.py92
2 files changed, 72 insertions, 22 deletions
diff --git a/app/main.py b/app/main.py
index 8b2a57f..36c0e94 100644
--- a/app/main.py
+++ b/app/main.py
@@ -16,6 +16,7 @@ class Config:
_DEFAULTS = {
'DOWNLOAD_DIR': '.',
'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR',
+ 'STATE_DIR': '.',
'URL_PREFIX': '',
'OUTPUT_TEMPLATE': '%(title)s.%(ext)s',
'YTDL_OPTIONS': '{}',
@@ -68,6 +69,7 @@ class Notifier(DownloadQueueNotifier):
await sio.emit('cleared', serializer.encode(id))
dqueue = DownloadQueue(config, Notifier())
+app.on_startup.append(lambda app: dqueue.initialize())
@routes.post(config.URL_PREFIX + 'add')
async def add(request):
diff --git a/app/ytdl.py b/app/ytdl.py
index a5e60b7..deb745c 100644
--- a/app/ytdl.py
+++ b/app/ytdl.py
@@ -1,6 +1,8 @@
import os
import yt_dlp
from collections import OrderedDict
+import shelve
+import time
import asyncio
import multiprocessing
import logging
@@ -31,6 +33,7 @@ class DownloadInfo:
self.format = format
self.status = self.msg = self.percent = self.speed = self.eta = None
self.filename = None
+ self.timestamp = time.time_ns()
class Download:
manager = None
@@ -131,19 +134,65 @@ class Download:
self.info.eta = status.get('eta')
await self.notifier.updated(self.info)
+class PersistentQueue:
+ def __init__(self, path):
+ self.path = path
+ with shelve.open(path, 'c'):
+ pass
+ self.dict = OrderedDict()
+
+ def load(self):
+ for k, v in self.saved_items():
+ self.dict[k] = Download(None, None, None, None, {}, v)
+
+ def exists(self, key):
+ return key in self.dict
+
+ def get(self, key):
+ return self.dict[key]
+
+ def items(self):
+ return self.dict.items()
+
+ def saved_items(self):
+ with shelve.open(self.path, 'r') as shelf:
+ return sorted(shelf.items(), key=lambda item: item[1].timestamp)
+
+ def put(self, value):
+ key = value.info.id
+ self.dict[key] = value
+ with shelve.open(self.path, 'w') as shelf:
+ shelf[key] = value.info
+
+ def delete(self, key):
+ del self.dict[key]
+ with shelve.open(self.path, 'w') as shelf:
+ shelf.pop(key)
+
+ def next(self):
+ k, v = next(iter(self.dict.items()))
+ return k, v
+
+ def empty(self):
+ return not bool(self.dict)
+
+
class DownloadQueue:
def __init__(self, config, notifier):
self.config = config
self.notifier = notifier
- self.queue = OrderedDict()
- self.done = OrderedDict()
- self.initialized = False
+ self.queue = PersistentQueue(self.config.STATE_DIR + '/queue')
+ self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
+ self.done.load()
+
+ async def __import_queue(self):
+ for k, v in self.queue.saved_items():
+ await self.add(v.url, v.quality, v.format)
- def __initialize(self):
- if not self.initialized:
- self.initialized = True
- self.event = asyncio.Event()
- asyncio.create_task(self.__download())
+ async def initialize(self):
+ self.event = asyncio.Event()
+ asyncio.create_task(self.__download())
+ asyncio.create_task(self.__import_queue())
def __extract_info(self, url):
return yt_dlp.YoutubeDL(params={
@@ -165,10 +214,10 @@ class DownloadQueue:
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
return {'status': 'ok'}
elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry:
- if entry['id'] not in self.queue:
+ if not self.queue.exists(entry['id']):
dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format)
dldirectory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format != 'mp3') else self.config.AUDIO_DOWNLOAD_DIR
- self.queue[entry['id']] = Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl)
+ self.queue.put(Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl))
self.event.set()
await self.notifier.added(dl)
return {'status': 'ok'}
@@ -178,7 +227,6 @@ class DownloadQueue:
async def add(self, url, quality, format, already=None):
log.info(f'adding {url}')
- self.__initialize()
already = set() if already is None else already
if url in already:
log.info('recursion detected, skipping')
@@ -193,22 +241,22 @@ class DownloadQueue:
async def cancel(self, ids):
for id in ids:
- if id not in self.queue:
+ if not self.queue.exists(id):
log.warn(f'requested cancel for non-existent download {id}')
continue
- if self.queue[id].started():
- self.queue[id].cancel()
+ if self.queue.get(id).started():
+ self.queue.get(id).cancel()
else:
- del self.queue[id]
+ self.queue.delete(id)
await self.notifier.canceled(id)
return {'status': 'ok'}
async def clear(self, ids):
for id in ids:
- if id not in self.done:
+ if not self.done.exists(id):
log.warn(f'requested delete for non-existent download {id}')
continue
- del self.done[id]
+ self.done.delete(id)
await self.notifier.cleared(id)
return {'status': 'ok'}
@@ -218,11 +266,11 @@ class DownloadQueue:
async def __download(self):
while True:
- while not self.queue:
+ while self.queue.empty():
log.info('waiting for item to download')
await self.event.wait()
self.event.clear()
- id, entry = next(iter(self.queue.items()))
+ id, entry = self.queue.next()
log.info(f'downloading {entry.info.title}')
await entry.start(self.notifier)
if entry.info.status != 'finished':
@@ -233,10 +281,10 @@ class DownloadQueue:
pass
entry.info.status = 'error'
entry.close()
- if id in self.queue:
- del self.queue[id]
+ if self.queue.exists(id):
+ self.queue.delete(id)
if entry.canceled:
await self.notifier.canceled(id)
else:
- self.done[id] = entry
+ self.done.put(entry)
await self.notifier.completed(entry.info)
bgstack15