aboutsummaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/main.py91
-rw-r--r--app/ytdl.py154
2 files changed, 245 insertions, 0 deletions
diff --git a/app/main.py b/app/main.py
new file mode 100644
index 0000000..dc021d4
--- /dev/null
+++ b/app/main.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python3
+
+import os
+from aiohttp import web
+import asyncio
+import socketio
+import time
+import logging
+import json
+
+from ytdl import DownloadQueueNotifier, DownloadQueue
+
+log = logging.getLogger('main')
+
+class Config:
+ _DEFAULTS = {
+ 'DOWNLOAD_DIR': '.',
+ }
+
+ def __init__(self):
+ for k, v in self._DEFAULTS.items():
+ setattr(self, k, os.environ[k] if k in os.environ else v)
+
+config = Config()
+
+class ObjectSerializer(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, object):
+ return obj.__dict__
+ else:
+ return json.JSONEncoder.default(self, obj)
+
+serializer = ObjectSerializer()
+app = web.Application()
+sio = socketio.AsyncServer()
+sio.attach(app)
+routes = web.RouteTableDef()
+
+class Notifier(DownloadQueueNotifier):
+ async def added(self, dl):
+ await sio.emit('added', serializer.encode(dl))
+
+ async def updated(self, dl):
+ await sio.emit('updated', serializer.encode(dl))
+
+ async def deleted(self, id):
+ await sio.emit('deleted', serializer.encode(id))
+
+dqueue = DownloadQueue(config, Notifier())
+
+@routes.post('/add')
+async def add(request):
+ post = await request.json()
+ url = post.get('url')
+ if not url:
+ raise web.HTTPBadRequest()
+ status = await dqueue.add(url)
+ return web.Response(text=serializer.encode(status))
+
+@routes.post('/delete')
+async def delete(request):
+ post = await request.json()
+ ids = post.get('ids')
+ if not ids:
+ raise web.HTTPBadRequest()
+ status = await dqueue.delete(ids)
+ return web.Response(text=serializer.encode(status))
+
+@routes.get('/queue')
+def queue(request):
+ ret = dqueue.get()
+ return web.Response(text=serializer.encode(ret))
+
+@sio.event
+async def connect(sid, environ):
+ ret = dqueue.get()
+ #ret = [["XeNTV0kyHaU", {"id": "XeNTV0kyHaU", "title": "2020 Mercedes ACTROS \u2013 Digital Side Mirrors, Electronic Stability, Auto Braking, Side Guard Safety", "url": "XeNTV0kyHaU", "status": None, "percentage": 0}], ["76wlIusQe9U", {"id": "76wlIusQe9U", "title": "Toyota HIACE 2020 \u2013 Toyota Wagon / Toyota HIACE 2019 and 2020", "url": "76wlIusQe9U", "status": None, "percentage": 0}], ["n_d5LPwflMM", {"id": "n_d5LPwflMM", "title": "2020 Toyota GRANVIA \u2013 Toyota 8 Seater LUXURY VAN / ALL-NEW Toyota GRANVIA 2020", "url": "n_d5LPwflMM", "status": None, "percentage": 0}], ["Dv4ZFhCpF1M", {"id": "Dv4ZFhCpF1M", "title": "Toyota SIENNA 2019 vs Honda ODYSSEY 2019", "url": "Dv4ZFhCpF1M", "status": None, "percentage": 0}], ["GjHJFb3Mgqw", {"id": "GjHJFb3Mgqw", "title": "How It's Made (Buses) \u2013 How Buses are made? SETRA BUS Production", "url": "GjHJFb3Mgqw", "status": None, "percentage": 0}]]
+ await sio.emit('queue', serializer.encode(ret), to=sid)
+
+@routes.get('/')
+def index(request):
+ return web.FileResponse('ui/dist/metube/index.html')
+
+routes.static('/', 'ui/dist/metube')
+
+app.add_routes(routes)
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG)
+ web.run_app(app, port=8081)
diff --git a/app/ytdl.py b/app/ytdl.py
new file mode 100644
index 0000000..d1512ef
--- /dev/null
+++ b/app/ytdl.py
@@ -0,0 +1,154 @@
+import os
+import sys
+import youtube_dl
+from collections import OrderedDict
+import asyncio
+import multiprocessing
+import logging
+
+log = logging.getLogger('ytdl')
+
+class DownloadInfo:
+ def __init__(self, id, title, url):
+ self.id, self.title, self.url = id, title, url
+ self.status = self.percent = self.speed = self.eta = None
+
+class Download:
+ manager = None
+
+ def __init__(self, download_dir, info):
+ self.download_dir = download_dir
+ self.info = info
+ self.tmpfilename = None
+ self.status_queue = None
+ self.proc = None
+ self.loop = None
+
+ def _download(self):
+ youtube_dl.YoutubeDL(params={
+ 'quiet': True,
+ 'no_color': True,
+ #'skip_download': True,
+ 'outtmpl': os.path.join(self.download_dir, '%(title)s.%(ext)s'),
+ 'socket_timeout': 30,
+ 'progress_hooks': [lambda d: self.status_queue.put(d)],
+ }).download([self.info.url])
+
+ async def start(self):
+ if Download.manager is None:
+ Download.manager = multiprocessing.Manager()
+ self.status_queue = Download.manager.Queue()
+ self.proc = multiprocessing.Process(target=self._download)
+ self.proc.start()
+ self.loop = asyncio.get_running_loop()
+ return await self.loop.run_in_executor(None, self.proc.join)
+
+ def cancel(self):
+ if self.running():
+ self.proc.kill()
+
+ def close(self):
+ if self.proc is not None:
+ self.proc.close()
+ self.status_queue.put(None)
+
+ def running(self):
+ return self.proc is not None and self.proc.is_alive()
+
+ async def update_status(self, updated_cb):
+ await updated_cb()
+ while self.running():
+ status = await self.loop.run_in_executor(None, self.status_queue.get)
+ if status is None:
+ return
+ self.tmpfilename = status.get('tmpfilename')
+ self.info.status = status['status']
+ if 'downloaded_bytes' in status:
+ total = status.get('total_bytes') or status.get('total_bytes_estimate')
+ if total:
+ self.info.percent = status['downloaded_bytes'] / total * 100
+ self.info.speed = status.get('speed')
+ self.info.eta = status.get('eta')
+ await updated_cb()
+
+class DownloadQueueNotifier:
+ async def added(self, dl):
+ raise NotImplementedError
+
+ async def updated(self, dl):
+ raise NotImplementedError
+
+ async def deleted(self, id):
+ raise NotImplementedError
+
+class DownloadQueue:
+ def __init__(self, config, notifier):
+ self.config = config
+ self.notifier = notifier
+ self.queue = OrderedDict()
+ self.event = asyncio.Event()
+ asyncio.ensure_future(self.__download())
+
+ def __extract_info(self, url):
+ return youtube_dl.YoutubeDL(params={
+ 'quiet': True,
+ 'no_color': True,
+ 'extract_flat': True,
+ }).extract_info(url, download=False)
+
+ async def add(self, url):
+ log.info(f'adding {url}')
+ try:
+ info = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url)
+ except youtube_dl.utils.YoutubeDLError as exc:
+ return {'status': 'error', 'msg': str(exc)}
+ if info.get('_type') == 'playlist':
+ entries = info['entries']
+ log.info(f'playlist detected with {len(entries)} entries')
+ else:
+ entries = [info]
+ log.info('single video detected')
+ for entry in entries:
+ if entry['id'] not in self.queue:
+ dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'])
+ self.queue[entry['id']] = Download(self.config.DOWNLOAD_DIR, dl)
+ await self.notifier.added(dl)
+ self.event.set()
+ return {'status': 'ok'}
+
+ async def delete(self, ids):
+ for id in ids:
+ if id not in self.queue:
+ log.warn(f'requested delete for non-existent download {id}')
+ continue
+ if self.queue[id].info.status is not None:
+ self.queue[id].cancel()
+ else:
+ del self.queue[id]
+ await self.notifier.deleted(id)
+ return {'status': 'ok'}
+
+ def get(self):
+ return list((k, v.info) for k, v in self.queue.items())
+
+ async def __download(self):
+ while True:
+ while not self.queue:
+ log.info('waiting for item to download')
+ await self.event.wait()
+ self.event.clear()
+ id, entry = next(iter(self.queue.items()))
+ log.info(f'downloading {entry.info.title}')
+ entry.info.status = 'preparing'
+ start_aw = entry.start()
+ async def updated_cb(): await self.notifier.updated(entry.info)
+ asyncio.ensure_future(entry.update_status(updated_cb))
+ await start_aw
+ if entry.info.status != 'finished' and entry.tmpfilename and os.path.isfile(entry.tmpfilename):
+ try:
+ os.remove(entry.tmpfilename)
+ except:
+ pass
+ entry.close()
+ del self.queue[id]
+ await self.notifier.deleted(id)
bgstack15