From eb1f031b33d3df3c0d3351b0cb1b2b7078e30245 Mon Sep 17 00:00:00 2001 From: Erazor2 Date: Sat, 15 Jan 2022 12:17:12 +0000 Subject: Queue persistence for download and completed --- Dockerfile | 1 + app/main.py | 1 + app/ytdl.py | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 79 insertions(+), 17 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3571da0..64c3cea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,5 +26,6 @@ COPY --from=builder /metube/dist/metube ./ui/dist/metube ENV DOWNLOAD_DIR /downloads VOLUME /downloads +VOLUME /queue EXPOSE 8081 CMD ["python3", "app/main.py"] diff --git a/app/main.py b/app/main.py index 8b2a57f..f901afc 100644 --- a/app/main.py +++ b/app/main.py @@ -92,6 +92,7 @@ async def delete(request): @sio.event async def connect(sid, environ): + await dqueue.importQueue() await sio.emit('all', serializer.encode(dqueue.get()), to=sid) @routes.get(config.URL_PREFIX) diff --git a/app/ytdl.py b/app/ytdl.py index a5e60b7..c91e4fc 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -1,6 +1,7 @@ import os import yt_dlp from collections import OrderedDict +import shelve import asyncio import multiprocessing import logging @@ -131,13 +132,71 @@ class Download: self.info.eta = status.get('eta') await self.notifier.updated(self.info) +class PersistentQueue: + def __init__(self, filePath, load = False): + self.dict = OrderedDict() + self.shelvePath = filePath + self.__createShelve() + if load: + self.__loadShelve() + + def __createShelve(self): + shelf = shelve.open(self.shelvePath, 'c') + shelf.close() + + def __loadShelve(self): + with shelve.open(self.shelvePath, 'r') as shelf: + for key in shelf.keys(): + self.dict[key] = shelf[key] + + def exists(self, key): + return key in self.dict + + def get(self, key): + return self.dict[key] + + def items(self): + return self.dict.items() + + def savedItems(self): + with shelve.open(self.shelvePath, 'r') as shelf: + return dict(shelf).items() + + def put(self, key, value): + self.dict[key] = value + with shelve.open(self.shelvePath, 'w') as shelf: + shelf[key] = value.info + + def delete(self, key): + del self.dict[key] + with shelve.open(self.shelvePath, 'w') as shelf: + shelf.pop(key) + + def next(self): + k, v = next(iter(self.dict.items())) + return k, v + + def empty(self): + return not bool(self.dict) + + class DownloadQueue: def __init__(self, config, notifier): self.config = config self.notifier = notifier - self.queue = OrderedDict() - self.done = OrderedDict() + self.queue = PersistentQueue('/queue/queue') + self.done = PersistentQueue('/queue/done', True) self.initialized = False + self.imported = False + + async def importQueue(self): + if not self.imported: + for item in self.queue.savedItems(): + await self.add( + item[1].url, + item[1].quality, + item[1].format) + self.imported = True def __initialize(self): if not self.initialized: @@ -165,10 +224,10 @@ class DownloadQueue: return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)} return {'status': 'ok'} elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry: - if entry['id'] not in self.queue: + if not self.queue.exists(entry['id']): dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format) dldirectory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format != 'mp3') else self.config.AUDIO_DOWNLOAD_DIR - self.queue[entry['id']] = Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl) + self.queue.put(entry['id'], Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl)) self.event.set() await self.notifier.added(dl) return {'status': 'ok'} @@ -193,36 +252,37 @@ class DownloadQueue: async def cancel(self, ids): for id in ids: - if id not in self.queue: + if not self.queue.exists(id): log.warn(f'requested cancel for non-existent download {id}') continue - if self.queue[id].started(): - self.queue[id].cancel() + if self.queue.get(id).started(): + self.queue.get(id).cancel() else: - del self.queue[id] + self.queue.delete(id) await self.notifier.canceled(id) return {'status': 'ok'} async def clear(self, ids): for id in ids: - if id not in self.done: + if not self.done.exists(id): log.warn(f'requested delete for non-existent download {id}') continue - del self.done[id] + self.done.delete(id) await self.notifier.cleared(id) return {'status': 'ok'} def get(self): - return(list((k, v.info) for k, v in self.queue.items()), - list((k, v.info) for k, v in self.done.items())) + item = (list((k, v) for k, v in self.queue.savedItems()), + list((k, v) for k, v in self.done.savedItems())) + return item async def __download(self): while True: - while not self.queue: + while self.queue.empty(): log.info('waiting for item to download') await self.event.wait() self.event.clear() - id, entry = next(iter(self.queue.items())) + id, entry = self.queue.next() log.info(f'downloading {entry.info.title}') await entry.start(self.notifier) if entry.info.status != 'finished': @@ -233,10 +293,10 @@ class DownloadQueue: pass entry.info.status = 'error' entry.close() - if id in self.queue: - del self.queue[id] + if self.queue.exists(id): + self.queue.delete(id) if entry.canceled: await self.notifier.canceled(id) else: - self.done[id] = entry + self.done.put(id, entry) await self.notifier.completed(entry.info) -- cgit