From 511404d23f5edd6ccf79db1fc1b0ba622134db24 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 29 Nov 2019 19:31:34 +0200 Subject: initial commit: working version --- app/main.py | 91 +++++++++++++++++++++++++++++++++++ app/ytdl.py | 154 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 245 insertions(+) create mode 100644 app/main.py create mode 100644 app/ytdl.py (limited to 'app') diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..dc021d4 --- /dev/null +++ b/app/main.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 + +import os +from aiohttp import web +import asyncio +import socketio +import time +import logging +import json + +from ytdl import DownloadQueueNotifier, DownloadQueue + +log = logging.getLogger('main') + +class Config: + _DEFAULTS = { + 'DOWNLOAD_DIR': '.', + } + + def __init__(self): + for k, v in self._DEFAULTS.items(): + setattr(self, k, os.environ[k] if k in os.environ else v) + +config = Config() + +class ObjectSerializer(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, object): + return obj.__dict__ + else: + return json.JSONEncoder.default(self, obj) + +serializer = ObjectSerializer() +app = web.Application() +sio = socketio.AsyncServer() +sio.attach(app) +routes = web.RouteTableDef() + +class Notifier(DownloadQueueNotifier): + async def added(self, dl): + await sio.emit('added', serializer.encode(dl)) + + async def updated(self, dl): + await sio.emit('updated', serializer.encode(dl)) + + async def deleted(self, id): + await sio.emit('deleted', serializer.encode(id)) + +dqueue = DownloadQueue(config, Notifier()) + +@routes.post('/add') +async def add(request): + post = await request.json() + url = post.get('url') + if not url: + raise web.HTTPBadRequest() + status = await dqueue.add(url) + return web.Response(text=serializer.encode(status)) + +@routes.post('/delete') +async def delete(request): + post = await request.json() + ids = post.get('ids') + if not ids: + raise web.HTTPBadRequest() + status = await dqueue.delete(ids) + return web.Response(text=serializer.encode(status)) + +@routes.get('/queue') +def queue(request): + ret = dqueue.get() + return web.Response(text=serializer.encode(ret)) + +@sio.event +async def connect(sid, environ): + ret = dqueue.get() + #ret = [["XeNTV0kyHaU", {"id": "XeNTV0kyHaU", "title": "2020 Mercedes ACTROS \u2013 Digital Side Mirrors, Electronic Stability, Auto Braking, Side Guard Safety", "url": "XeNTV0kyHaU", "status": None, "percentage": 0}], ["76wlIusQe9U", {"id": "76wlIusQe9U", "title": "Toyota HIACE 2020 \u2013 Toyota Wagon / Toyota HIACE 2019 and 2020", "url": "76wlIusQe9U", "status": None, "percentage": 0}], ["n_d5LPwflMM", {"id": "n_d5LPwflMM", "title": "2020 Toyota GRANVIA \u2013 Toyota 8 Seater LUXURY VAN / ALL-NEW Toyota GRANVIA 2020", "url": "n_d5LPwflMM", "status": None, "percentage": 0}], ["Dv4ZFhCpF1M", {"id": "Dv4ZFhCpF1M", "title": "Toyota SIENNA 2019 vs Honda ODYSSEY 2019", "url": "Dv4ZFhCpF1M", "status": None, "percentage": 0}], ["GjHJFb3Mgqw", {"id": "GjHJFb3Mgqw", "title": "How It's Made (Buses) \u2013 How Buses are made? SETRA BUS Production", "url": "GjHJFb3Mgqw", "status": None, "percentage": 0}]] + await sio.emit('queue', serializer.encode(ret), to=sid) + +@routes.get('/') +def index(request): + return web.FileResponse('ui/dist/metube/index.html') + +routes.static('/', 'ui/dist/metube') + +app.add_routes(routes) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + web.run_app(app, port=8081) diff --git a/app/ytdl.py b/app/ytdl.py new file mode 100644 index 0000000..d1512ef --- /dev/null +++ b/app/ytdl.py @@ -0,0 +1,154 @@ +import os +import sys +import youtube_dl +from collections import OrderedDict +import asyncio +import multiprocessing +import logging + +log = logging.getLogger('ytdl') + +class DownloadInfo: + def __init__(self, id, title, url): + self.id, self.title, self.url = id, title, url + self.status = self.percent = self.speed = self.eta = None + +class Download: + manager = None + + def __init__(self, download_dir, info): + self.download_dir = download_dir + self.info = info + self.tmpfilename = None + self.status_queue = None + self.proc = None + self.loop = None + + def _download(self): + youtube_dl.YoutubeDL(params={ + 'quiet': True, + 'no_color': True, + #'skip_download': True, + 'outtmpl': os.path.join(self.download_dir, '%(title)s.%(ext)s'), + 'socket_timeout': 30, + 'progress_hooks': [lambda d: self.status_queue.put(d)], + }).download([self.info.url]) + + async def start(self): + if Download.manager is None: + Download.manager = multiprocessing.Manager() + self.status_queue = Download.manager.Queue() + self.proc = multiprocessing.Process(target=self._download) + self.proc.start() + self.loop = asyncio.get_running_loop() + return await self.loop.run_in_executor(None, self.proc.join) + + def cancel(self): + if self.running(): + self.proc.kill() + + def close(self): + if self.proc is not None: + self.proc.close() + self.status_queue.put(None) + + def running(self): + return self.proc is not None and self.proc.is_alive() + + async def update_status(self, updated_cb): + await updated_cb() + while self.running(): + status = await self.loop.run_in_executor(None, self.status_queue.get) + if status is None: + return + self.tmpfilename = status.get('tmpfilename') + self.info.status = status['status'] + if 'downloaded_bytes' in status: + total = status.get('total_bytes') or status.get('total_bytes_estimate') + if total: + self.info.percent = status['downloaded_bytes'] / total * 100 + self.info.speed = status.get('speed') + self.info.eta = status.get('eta') + await updated_cb() + +class DownloadQueueNotifier: + async def added(self, dl): + raise NotImplementedError + + async def updated(self, dl): + raise NotImplementedError + + async def deleted(self, id): + raise NotImplementedError + +class DownloadQueue: + def __init__(self, config, notifier): + self.config = config + self.notifier = notifier + self.queue = OrderedDict() + self.event = asyncio.Event() + asyncio.ensure_future(self.__download()) + + def __extract_info(self, url): + return youtube_dl.YoutubeDL(params={ + 'quiet': True, + 'no_color': True, + 'extract_flat': True, + }).extract_info(url, download=False) + + async def add(self, url): + log.info(f'adding {url}') + try: + info = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) + except youtube_dl.utils.YoutubeDLError as exc: + return {'status': 'error', 'msg': str(exc)} + if info.get('_type') == 'playlist': + entries = info['entries'] + log.info(f'playlist detected with {len(entries)} entries') + else: + entries = [info] + log.info('single video detected') + for entry in entries: + if entry['id'] not in self.queue: + dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url']) + self.queue[entry['id']] = Download(self.config.DOWNLOAD_DIR, dl) + await self.notifier.added(dl) + self.event.set() + return {'status': 'ok'} + + async def delete(self, ids): + for id in ids: + if id not in self.queue: + log.warn(f'requested delete for non-existent download {id}') + continue + if self.queue[id].info.status is not None: + self.queue[id].cancel() + else: + del self.queue[id] + await self.notifier.deleted(id) + return {'status': 'ok'} + + def get(self): + return list((k, v.info) for k, v in self.queue.items()) + + async def __download(self): + while True: + while not self.queue: + log.info('waiting for item to download') + await self.event.wait() + self.event.clear() + id, entry = next(iter(self.queue.items())) + log.info(f'downloading {entry.info.title}') + entry.info.status = 'preparing' + start_aw = entry.start() + async def updated_cb(): await self.notifier.updated(entry.info) + asyncio.ensure_future(entry.update_status(updated_cb)) + await start_aw + if entry.info.status != 'finished' and entry.tmpfilename and os.path.isfile(entry.tmpfilename): + try: + os.remove(entry.tmpfilename) + except: + pass + entry.close() + del self.queue[id] + await self.notifier.deleted(id) -- cgit