aboutsummaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
authorAlex <alexta69@gmail.com>2019-12-07 21:49:31 +0200
committerAlex <alexta69@gmail.com>2019-12-07 21:49:31 +0200
commitc3550ec17f967a500e5191eb4a15d90c004128cd (patch)
treec35789d02e9f0c3af6af3cfc5d2eef999dd9e76f /app
parentimproved error handling (diff)
downloadmetube-c3550ec17f967a500e5191eb4a15d90c004128cd.tar.gz
metube-c3550ec17f967a500e5191eb4a15d90c004128cd.tar.bz2
metube-c3550ec17f967a500e5191eb4a15d90c004128cd.zip
fixed potential race
Diffstat (limited to 'app')
-rw-r--r--app/ytdl.py74
1 files changed, 42 insertions, 32 deletions
diff --git a/app/ytdl.py b/app/ytdl.py
index 1993300..2cd7eb2 100644
--- a/app/ytdl.py
+++ b/app/ytdl.py
@@ -8,6 +8,22 @@ import logging
log = logging.getLogger('ytdl')
+class DownloadQueueNotifier:
+ async def added(self, dl):
+ raise NotImplementedError
+
+ async def updated(self, dl):
+ raise NotImplementedError
+
+ async def completed(self, dl):
+ raise NotImplementedError
+
+ async def canceled(self, id):
+ raise NotImplementedError
+
+ async def cleared(self, id):
+ raise NotImplementedError
+
class DownloadInfo:
def __init__(self, id, title, url):
self.id, self.title, self.url = id, title, url
@@ -19,10 +35,12 @@ class Download:
def __init__(self, download_dir, info):
self.download_dir = download_dir
self.info = info
+ self.canceled = False
self.tmpfilename = None
self.status_queue = None
self.proc = None
self.loop = None
+ self.notifier = None
def _download(self):
try:
@@ -39,21 +57,26 @@ class Download:
except youtube_dl.utils.YoutubeDLError as exc:
self.status_queue.put({'status': 'error', 'msg': str(exc)})
- async def start(self):
+ async def start(self, notifier):
if Download.manager is None:
Download.manager = multiprocessing.Manager()
self.status_queue = Download.manager.Queue()
self.proc = multiprocessing.Process(target=self._download)
self.proc.start()
self.loop = asyncio.get_running_loop()
+ self.notifier = notifier
+ self.info.status = 'preparing'
+ await self.notifier.updated(self.info)
+ asyncio.ensure_future(self.update_status())
return await self.loop.run_in_executor(None, self.proc.join)
def cancel(self):
if self.running():
self.proc.kill()
+ self.canceled = True
def close(self):
- if self.proc is not None:
+ if self.started():
self.proc.close()
self.status_queue.put(None)
@@ -63,9 +86,11 @@ class Download:
except ValueError:
return False
- async def update_status(self, updated_cb):
- await updated_cb()
- while self.running():
+ def started(self):
+ return self.proc is not None
+
+ async def update_status(self):
+ while True:
status = await self.loop.run_in_executor(None, self.status_queue.get)
if status is None:
return
@@ -78,23 +103,7 @@ class Download:
self.info.percent = status['downloaded_bytes'] / total * 100
self.info.speed = status.get('speed')
self.info.eta = status.get('eta')
- await updated_cb()
-
-class DownloadQueueNotifier:
- async def added(self, dl):
- raise NotImplementedError
-
- async def updated(self, dl):
- raise NotImplementedError
-
- async def completed(self, dl):
- raise NotImplementedError
-
- async def canceled(self, id):
- raise NotImplementedError
-
- async def cleared(self, id):
- raise NotImplementedError
+ await self.notifier.updated(self.info)
class DownloadQueue:
def __init__(self, config, notifier):
@@ -140,9 +149,11 @@ class DownloadQueue:
if id not in self.queue:
log.warn(f'requested cancel for non-existent download {id}')
continue
- self.queue[id].cancel()
- del self.queue[id]
- await self.notifier.canceled(id)
+ if self.queue[id].started():
+ self.queue[id].cancel()
+ else:
+ del self.queue[id]
+ await self.notifier.canceled(id)
return {'status': 'ok'}
async def clear(self, ids):
@@ -166,11 +177,7 @@ class DownloadQueue:
self.event.clear()
id, entry = next(iter(self.queue.items()))
log.info(f'downloading {entry.info.title}')
- entry.info.status = 'preparing'
- start_aw = entry.start()
- async def updated_cb(): await self.notifier.updated(entry.info)
- asyncio.ensure_future(entry.update_status(updated_cb))
- await start_aw
+ await entry.start(self.notifier)
if entry.info.status != 'finished':
if entry.tmpfilename and os.path.isfile(entry.tmpfilename):
try:
@@ -181,5 +188,8 @@ class DownloadQueue:
entry.close()
if id in self.queue:
del self.queue[id]
- self.done[id] = entry
- await self.notifier.completed(entry.info)
+ if entry.canceled:
+ await self.notifier.canceled(id)
+ else:
+ self.done[id] = entry
+ await self.notifier.completed(entry.info)
bgstack15