From 80c2ed3f98b5b3d2269cf58089487b8c7262953d Mon Sep 17 00:00:00 2001 From: Alex Shnitman Date: Tue, 25 Jan 2022 23:56:17 +0200 Subject: reworked persistent queues --- app/main.py | 2 +- app/ytdl.py | 56 +++++++++++++++++++++----------------------------------- 2 files changed, 22 insertions(+), 36 deletions(-) (limited to 'app') diff --git a/app/main.py b/app/main.py index 5d59958..36c0e94 100644 --- a/app/main.py +++ b/app/main.py @@ -69,6 +69,7 @@ class Notifier(DownloadQueueNotifier): await sio.emit('cleared', serializer.encode(id)) dqueue = DownloadQueue(config, Notifier()) +app.on_startup.append(lambda app: dqueue.initialize()) @routes.post(config.URL_PREFIX + 'add') async def add(request): @@ -93,7 +94,6 @@ async def delete(request): @sio.event async def connect(sid, environ): - await dqueue.importQueue() await sio.emit('all', serializer.encode(dqueue.get()), to=sid) @routes.get(config.URL_PREFIX) diff --git a/app/ytdl.py b/app/ytdl.py index 9d04531..deb745c 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -135,19 +135,14 @@ class Download: await self.notifier.updated(self.info) class PersistentQueue: - def __init__(self, filePath, load = False): + def __init__(self, path): + self.path = path + with shelve.open(path, 'c'): + pass self.dict = OrderedDict() - self.shelvePath = filePath - self.__createShelve() - if load: - self.__loadShelve() - def __createShelve(self): - shelf = shelve.open(self.shelvePath, 'c') - shelf.close() - - def __loadShelve(self): - for k, v in self.savedItems(): + def load(self): + for k, v in self.saved_items(): self.dict[k] = Download(None, None, None, None, {}, v) def exists(self, key): @@ -159,19 +154,19 @@ class PersistentQueue: def items(self): return self.dict.items() - def savedItems(self): - with shelve.open(self.shelvePath, 'r') as shelf: + def saved_items(self): + with shelve.open(self.path, 'r') as shelf: return sorted(shelf.items(), key=lambda item: item[1].timestamp) def put(self, value): key = value.info.id self.dict[key] = value - with shelve.open(self.shelvePath, 'w') as shelf: + with shelve.open(self.path, 'w') as shelf: shelf[key] = value.info def delete(self, key): del self.dict[key] - with shelve.open(self.shelvePath, 'w') as shelf: + with shelve.open(self.path, 'w') as shelf: shelf.pop(key) def next(self): @@ -187,24 +182,17 @@ class DownloadQueue: self.config = config self.notifier = notifier self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') - self.done = PersistentQueue(self.config.STATE_DIR + '/completed', True) - self.initialized = False - self.imported = False + self.done = PersistentQueue(self.config.STATE_DIR + '/completed') + self.done.load() - async def importQueue(self): - if not self.imported: - for item in self.queue.savedItems(): - await self.add( - item[1].url, - item[1].quality, - item[1].format) - self.imported = True + async def __import_queue(self): + for k, v in self.queue.saved_items(): + await self.add(v.url, v.quality, v.format) - def __initialize(self): - if not self.initialized: - self.initialized = True - self.event = asyncio.Event() - asyncio.create_task(self.__download()) + async def initialize(self): + self.event = asyncio.Event() + asyncio.create_task(self.__download()) + asyncio.create_task(self.__import_queue()) def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ @@ -239,7 +227,6 @@ class DownloadQueue: async def add(self, url, quality, format, already=None): log.info(f'adding {url}') - self.__initialize() already = set() if already is None else already if url in already: log.info('recursion detected, skipping') @@ -274,9 +261,8 @@ class DownloadQueue: return {'status': 'ok'} def get(self): - item = (list((k, v) for k, v in self.queue.savedItems()), - list((k, v) for k, v in self.done.savedItems())) - return item + return(list((k, v.info) for k, v in self.queue.items()), + list((k, v.info) for k, v in self.done.items())) async def __download(self): while True: -- cgit