aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErazor2 <jeromewhweb@gmail.com>2022-01-15 12:17:12 +0000
committerErazor2 <jeromewhweb@gmail.com>2022-01-15 12:17:12 +0000
commiteb1f031b33d3df3c0d3351b0cb1b2b7078e30245 (patch)
tree7b53d7397220a19e4d0d682d1b1ec54929e5de81
parentencode download links (closes #104) (diff)
downloadmetube-eb1f031b33d3df3c0d3351b0cb1b2b7078e30245.tar.gz
metube-eb1f031b33d3df3c0d3351b0cb1b2b7078e30245.tar.bz2
metube-eb1f031b33d3df3c0d3351b0cb1b2b7078e30245.zip
Queue persistence for download and completed
-rw-r--r--Dockerfile1
-rw-r--r--app/main.py1
-rw-r--r--app/ytdl.py94
3 files changed, 79 insertions, 17 deletions
diff --git a/Dockerfile b/Dockerfile
index 3571da0..64c3cea 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -26,5 +26,6 @@ COPY --from=builder /metube/dist/metube ./ui/dist/metube
ENV DOWNLOAD_DIR /downloads
VOLUME /downloads
+VOLUME /queue
EXPOSE 8081
CMD ["python3", "app/main.py"]
diff --git a/app/main.py b/app/main.py
index 8b2a57f..f901afc 100644
--- a/app/main.py
+++ b/app/main.py
@@ -92,6 +92,7 @@ async def delete(request):
@sio.event
async def connect(sid, environ):
+ await dqueue.importQueue()
await sio.emit('all', serializer.encode(dqueue.get()), to=sid)
@routes.get(config.URL_PREFIX)
diff --git a/app/ytdl.py b/app/ytdl.py
index a5e60b7..c91e4fc 100644
--- a/app/ytdl.py
+++ b/app/ytdl.py
@@ -1,6 +1,7 @@
import os
import yt_dlp
from collections import OrderedDict
+import shelve
import asyncio
import multiprocessing
import logging
@@ -131,13 +132,71 @@ class Download:
self.info.eta = status.get('eta')
await self.notifier.updated(self.info)
+class PersistentQueue:
+ def __init__(self, filePath, load = False):
+ self.dict = OrderedDict()
+ self.shelvePath = filePath
+ self.__createShelve()
+ if load:
+ self.__loadShelve()
+
+ def __createShelve(self):
+ shelf = shelve.open(self.shelvePath, 'c')
+ shelf.close()
+
+ def __loadShelve(self):
+ with shelve.open(self.shelvePath, 'r') as shelf:
+ for key in shelf.keys():
+ self.dict[key] = shelf[key]
+
+ def exists(self, key):
+ return key in self.dict
+
+ def get(self, key):
+ return self.dict[key]
+
+ def items(self):
+ return self.dict.items()
+
+ def savedItems(self):
+ with shelve.open(self.shelvePath, 'r') as shelf:
+ return dict(shelf).items()
+
+ def put(self, key, value):
+ self.dict[key] = value
+ with shelve.open(self.shelvePath, 'w') as shelf:
+ shelf[key] = value.info
+
+ def delete(self, key):
+ del self.dict[key]
+ with shelve.open(self.shelvePath, 'w') as shelf:
+ shelf.pop(key)
+
+ def next(self):
+ k, v = next(iter(self.dict.items()))
+ return k, v
+
+ def empty(self):
+ return not bool(self.dict)
+
+
class DownloadQueue:
def __init__(self, config, notifier):
self.config = config
self.notifier = notifier
- self.queue = OrderedDict()
- self.done = OrderedDict()
+ self.queue = PersistentQueue('/queue/queue')
+ self.done = PersistentQueue('/queue/done', True)
self.initialized = False
+ self.imported = False
+
+ async def importQueue(self):
+ if not self.imported:
+ for item in self.queue.savedItems():
+ await self.add(
+ item[1].url,
+ item[1].quality,
+ item[1].format)
+ self.imported = True
def __initialize(self):
if not self.initialized:
@@ -165,10 +224,10 @@ class DownloadQueue:
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
return {'status': 'ok'}
elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry:
- if entry['id'] not in self.queue:
+ if not self.queue.exists(entry['id']):
dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format)
dldirectory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format != 'mp3') else self.config.AUDIO_DOWNLOAD_DIR
- self.queue[entry['id']] = Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl)
+ self.queue.put(entry['id'], Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl))
self.event.set()
await self.notifier.added(dl)
return {'status': 'ok'}
@@ -193,36 +252,37 @@ class DownloadQueue:
async def cancel(self, ids):
for id in ids:
- if id not in self.queue:
+ if not self.queue.exists(id):
log.warn(f'requested cancel for non-existent download {id}')
continue
- if self.queue[id].started():
- self.queue[id].cancel()
+ if self.queue.get(id).started():
+ self.queue.get(id).cancel()
else:
- del self.queue[id]
+ self.queue.delete(id)
await self.notifier.canceled(id)
return {'status': 'ok'}
async def clear(self, ids):
for id in ids:
- if id not in self.done:
+ if not self.done.exists(id):
log.warn(f'requested delete for non-existent download {id}')
continue
- del self.done[id]
+ self.done.delete(id)
await self.notifier.cleared(id)
return {'status': 'ok'}
def get(self):
- return(list((k, v.info) for k, v in self.queue.items()),
- list((k, v.info) for k, v in self.done.items()))
+ item = (list((k, v) for k, v in self.queue.savedItems()),
+ list((k, v) for k, v in self.done.savedItems()))
+ return item
async def __download(self):
while True:
- while not self.queue:
+ while self.queue.empty():
log.info('waiting for item to download')
await self.event.wait()
self.event.clear()
- id, entry = next(iter(self.queue.items()))
+ id, entry = self.queue.next()
log.info(f'downloading {entry.info.title}')
await entry.start(self.notifier)
if entry.info.status != 'finished':
@@ -233,10 +293,10 @@ class DownloadQueue:
pass
entry.info.status = 'error'
entry.close()
- if id in self.queue:
- del self.queue[id]
+ if self.queue.exists(id):
+ self.queue.delete(id)
if entry.canceled:
await self.notifier.canceled(id)
else:
- self.done[id] = entry
+ self.done.put(id, entry)
await self.notifier.completed(entry.info)
bgstack15