From eb1f031b33d3df3c0d3351b0cb1b2b7078e30245 Mon Sep 17 00:00:00 2001 From: Erazor2 Date: Sat, 15 Jan 2022 12:17:12 +0000 Subject: Queue persistence for download and completed --- app/main.py | 1 + app/ytdl.py | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 78 insertions(+), 17 deletions(-) (limited to 'app') diff --git a/app/main.py b/app/main.py index 8b2a57f..f901afc 100644 --- a/app/main.py +++ b/app/main.py @@ -92,6 +92,7 @@ async def delete(request): @sio.event async def connect(sid, environ): + await dqueue.importQueue() await sio.emit('all', serializer.encode(dqueue.get()), to=sid) @routes.get(config.URL_PREFIX) diff --git a/app/ytdl.py b/app/ytdl.py index a5e60b7..c91e4fc 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -1,6 +1,7 @@ import os import yt_dlp from collections import OrderedDict +import shelve import asyncio import multiprocessing import logging @@ -131,13 +132,71 @@ class Download: self.info.eta = status.get('eta') await self.notifier.updated(self.info) +class PersistentQueue: + def __init__(self, filePath, load = False): + self.dict = OrderedDict() + self.shelvePath = filePath + self.__createShelve() + if load: + self.__loadShelve() + + def __createShelve(self): + shelf = shelve.open(self.shelvePath, 'c') + shelf.close() + + def __loadShelve(self): + with shelve.open(self.shelvePath, 'r') as shelf: + for key in shelf.keys(): + self.dict[key] = shelf[key] + + def exists(self, key): + return key in self.dict + + def get(self, key): + return self.dict[key] + + def items(self): + return self.dict.items() + + def savedItems(self): + with shelve.open(self.shelvePath, 'r') as shelf: + return dict(shelf).items() + + def put(self, key, value): + self.dict[key] = value + with shelve.open(self.shelvePath, 'w') as shelf: + shelf[key] = value.info + + def delete(self, key): + del self.dict[key] + with shelve.open(self.shelvePath, 'w') as shelf: + shelf.pop(key) + + def next(self): + k, v = next(iter(self.dict.items())) + return k, v + + def empty(self): + return not bool(self.dict) + + class DownloadQueue: def __init__(self, config, notifier): self.config = config self.notifier = notifier - self.queue = OrderedDict() - self.done = OrderedDict() + self.queue = PersistentQueue('/queue/queue') + self.done = PersistentQueue('/queue/done', True) self.initialized = False + self.imported = False + + async def importQueue(self): + if not self.imported: + for item in self.queue.savedItems(): + await self.add( + item[1].url, + item[1].quality, + item[1].format) + self.imported = True def __initialize(self): if not self.initialized: @@ -165,10 +224,10 @@ class DownloadQueue: return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)} return {'status': 'ok'} elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry: - if entry['id'] not in self.queue: + if not self.queue.exists(entry['id']): dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format) dldirectory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format != 'mp3') else self.config.AUDIO_DOWNLOAD_DIR - self.queue[entry['id']] = Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl) + self.queue.put(entry['id'], Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl)) self.event.set() await self.notifier.added(dl) return {'status': 'ok'} @@ -193,36 +252,37 @@ class DownloadQueue: async def cancel(self, ids): for id in ids: - if id not in self.queue: + if not self.queue.exists(id): log.warn(f'requested cancel for non-existent download {id}') continue - if self.queue[id].started(): - self.queue[id].cancel() + if self.queue.get(id).started(): + self.queue.get(id).cancel() else: - del self.queue[id] + self.queue.delete(id) await self.notifier.canceled(id) return {'status': 'ok'} async def clear(self, ids): for id in ids: - if id not in self.done: + if not self.done.exists(id): log.warn(f'requested delete for non-existent download {id}') continue - del self.done[id] + self.done.delete(id) await self.notifier.cleared(id) return {'status': 'ok'} def get(self): - return(list((k, v.info) for k, v in self.queue.items()), - list((k, v.info) for k, v in self.done.items())) + item = (list((k, v) for k, v in self.queue.savedItems()), + list((k, v) for k, v in self.done.savedItems())) + return item async def __download(self): while True: - while not self.queue: + while self.queue.empty(): log.info('waiting for item to download') await self.event.wait() self.event.clear() - id, entry = next(iter(self.queue.items())) + id, entry = self.queue.next() log.info(f'downloading {entry.info.title}') await entry.start(self.notifier) if entry.info.status != 'finished': @@ -233,10 +293,10 @@ class DownloadQueue: pass entry.info.status = 'error' entry.close() - if id in self.queue: - del self.queue[id] + if self.queue.exists(id): + self.queue.delete(id) if entry.canceled: await self.notifier.canceled(id) else: - self.done[id] = entry + self.done.put(id, entry) await self.notifier.completed(entry.info) -- cgit From 1ebf1da076a993be60734d7019ae9a8d98b8c97a Mon Sep 17 00:00:00 2001 From: Erazor2 Date: Mon, 17 Jan 2022 18:47:32 +0000 Subject: Added Env-Var for State-Directory --- app/main.py | 1 + app/ytdl.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'app') diff --git a/app/main.py b/app/main.py index f901afc..5d59958 100644 --- a/app/main.py +++ b/app/main.py @@ -16,6 +16,7 @@ class Config: _DEFAULTS = { 'DOWNLOAD_DIR': '.', 'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR', + 'STATE_DIR': '.', 'URL_PREFIX': '', 'OUTPUT_TEMPLATE': '%(title)s.%(ext)s', 'YTDL_OPTIONS': '{}', diff --git a/app/ytdl.py b/app/ytdl.py index c91e4fc..c77347e 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -184,8 +184,8 @@ class DownloadQueue: def __init__(self, config, notifier): self.config = config self.notifier = notifier - self.queue = PersistentQueue('/queue/queue') - self.done = PersistentQueue('/queue/done', True) + self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') + self.done = PersistentQueue(self.config.STATE_DIR + '/completed', True) self.initialized = False self.imported = False -- cgit From 99947779747cb25c40161947e982b5b7c8b1cd6a Mon Sep 17 00:00:00 2001 From: Erazor2 Date: Fri, 21 Jan 2022 21:23:59 +0000 Subject: Added sorting by Timestamp --- app/ytdl.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'app') diff --git a/app/ytdl.py b/app/ytdl.py index c77347e..9d04531 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -2,6 +2,7 @@ import os import yt_dlp from collections import OrderedDict import shelve +import time import asyncio import multiprocessing import logging @@ -32,6 +33,7 @@ class DownloadInfo: self.format = format self.status = self.msg = self.percent = self.speed = self.eta = None self.filename = None + self.timestamp = time.time_ns() class Download: manager = None @@ -145,9 +147,8 @@ class PersistentQueue: shelf.close() def __loadShelve(self): - with shelve.open(self.shelvePath, 'r') as shelf: - for key in shelf.keys(): - self.dict[key] = shelf[key] + for k, v in self.savedItems(): + self.dict[k] = Download(None, None, None, None, {}, v) def exists(self, key): return key in self.dict @@ -160,9 +161,10 @@ class PersistentQueue: def savedItems(self): with shelve.open(self.shelvePath, 'r') as shelf: - return dict(shelf).items() + return sorted(shelf.items(), key=lambda item: item[1].timestamp) - def put(self, key, value): + def put(self, value): + key = value.info.id self.dict[key] = value with shelve.open(self.shelvePath, 'w') as shelf: shelf[key] = value.info @@ -227,7 +229,7 @@ class DownloadQueue: if not self.queue.exists(entry['id']): dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format) dldirectory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format != 'mp3') else self.config.AUDIO_DOWNLOAD_DIR - self.queue.put(entry['id'], Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl)) + self.queue.put(Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl)) self.event.set() await self.notifier.added(dl) return {'status': 'ok'} @@ -298,5 +300,5 @@ class DownloadQueue: if entry.canceled: await self.notifier.canceled(id) else: - self.done.put(id, entry) + self.done.put(entry) await self.notifier.completed(entry.info) -- cgit From 80c2ed3f98b5b3d2269cf58089487b8c7262953d Mon Sep 17 00:00:00 2001 From: Alex Shnitman Date: Tue, 25 Jan 2022 23:56:17 +0200 Subject: reworked persistent queues --- app/main.py | 2 +- app/ytdl.py | 56 +++++++++++++++++++++----------------------------------- 2 files changed, 22 insertions(+), 36 deletions(-) (limited to 'app') diff --git a/app/main.py b/app/main.py index 5d59958..36c0e94 100644 --- a/app/main.py +++ b/app/main.py @@ -69,6 +69,7 @@ class Notifier(DownloadQueueNotifier): await sio.emit('cleared', serializer.encode(id)) dqueue = DownloadQueue(config, Notifier()) +app.on_startup.append(lambda app: dqueue.initialize()) @routes.post(config.URL_PREFIX + 'add') async def add(request): @@ -93,7 +94,6 @@ async def delete(request): @sio.event async def connect(sid, environ): - await dqueue.importQueue() await sio.emit('all', serializer.encode(dqueue.get()), to=sid) @routes.get(config.URL_PREFIX) diff --git a/app/ytdl.py b/app/ytdl.py index 9d04531..deb745c 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -135,19 +135,14 @@ class Download: await self.notifier.updated(self.info) class PersistentQueue: - def __init__(self, filePath, load = False): + def __init__(self, path): + self.path = path + with shelve.open(path, 'c'): + pass self.dict = OrderedDict() - self.shelvePath = filePath - self.__createShelve() - if load: - self.__loadShelve() - def __createShelve(self): - shelf = shelve.open(self.shelvePath, 'c') - shelf.close() - - def __loadShelve(self): - for k, v in self.savedItems(): + def load(self): + for k, v in self.saved_items(): self.dict[k] = Download(None, None, None, None, {}, v) def exists(self, key): @@ -159,19 +154,19 @@ class PersistentQueue: def items(self): return self.dict.items() - def savedItems(self): - with shelve.open(self.shelvePath, 'r') as shelf: + def saved_items(self): + with shelve.open(self.path, 'r') as shelf: return sorted(shelf.items(), key=lambda item: item[1].timestamp) def put(self, value): key = value.info.id self.dict[key] = value - with shelve.open(self.shelvePath, 'w') as shelf: + with shelve.open(self.path, 'w') as shelf: shelf[key] = value.info def delete(self, key): del self.dict[key] - with shelve.open(self.shelvePath, 'w') as shelf: + with shelve.open(self.path, 'w') as shelf: shelf.pop(key) def next(self): @@ -187,24 +182,17 @@ class DownloadQueue: self.config = config self.notifier = notifier self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') - self.done = PersistentQueue(self.config.STATE_DIR + '/completed', True) - self.initialized = False - self.imported = False + self.done = PersistentQueue(self.config.STATE_DIR + '/completed') + self.done.load() - async def importQueue(self): - if not self.imported: - for item in self.queue.savedItems(): - await self.add( - item[1].url, - item[1].quality, - item[1].format) - self.imported = True + async def __import_queue(self): + for k, v in self.queue.saved_items(): + await self.add(v.url, v.quality, v.format) - def __initialize(self): - if not self.initialized: - self.initialized = True - self.event = asyncio.Event() - asyncio.create_task(self.__download()) + async def initialize(self): + self.event = asyncio.Event() + asyncio.create_task(self.__download()) + asyncio.create_task(self.__import_queue()) def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ @@ -239,7 +227,6 @@ class DownloadQueue: async def add(self, url, quality, format, already=None): log.info(f'adding {url}') - self.__initialize() already = set() if already is None else already if url in already: log.info('recursion detected, skipping') @@ -274,9 +261,8 @@ class DownloadQueue: return {'status': 'ok'} def get(self): - item = (list((k, v) for k, v in self.queue.savedItems()), - list((k, v) for k, v in self.done.savedItems())) - return item + return(list((k, v.info) for k, v in self.queue.items()), + list((k, v.info) for k, v in self.done.items())) async def __download(self): while True: -- cgit