diff options
author | Cédric Bonhomme <cedric@cedricbonhomme.org> | 2019-05-21 23:49:03 +0200 |
---|---|---|
committer | Cédric Bonhomme <cedric@cedricbonhomme.org> | 2019-05-21 23:49:03 +0200 |
commit | 7d9bc6621eea035034cd56d20de9333f09387940 (patch) | |
tree | d9b985163e990012987b64c7d563c5110a9ff0ca | |
parent | Merge branch 'master' of gitlab.com:newspipe/newspipe (diff) | |
download | newspipe-7d9bc6621eea035034cd56d20de9333f09387940.tar.gz newspipe-7d9bc6621eea035034cd56d20de9333f09387940.tar.bz2 newspipe-7d9bc6621eea035034cd56d20de9333f09387940.zip |
Improved crawler.
-rw-r--r-- | src/crawler/default_crawler.py | 224 | ||||
-rwxr-xr-x | src/manager.py | 15 |
2 files changed, 117 insertions, 122 deletions
diff --git a/src/crawler/default_crawler.py b/src/crawler/default_crawler.py index 38810fc0..bba8431a 100644 --- a/src/crawler/default_crawler.py +++ b/src/crawler/default_crawler.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 - # newspipe - A Web based news aggregator. -# Copyright (C) 2010-2018 Cédric Bonhomme - https://www.cedricbonhomme.org +# Copyright (C) 2010-2019 Cédric Bonhomme - https://www.cedricbonhomme.org # # For more information : https://gitlab.com/newspipe/newspipe # @@ -20,9 +20,9 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. __author__ = "Cedric Bonhomme" -__version__ = "$Revision: 3.3 $" +__version__ = "$Revision: 4.0 $" __date__ = "$Date: 2010/09/02 $" -__revision__ = "$Date: 2015/12/07 $" +__revision__ = "$Date: 2010/05/21 $" __copyright__ = "Copyright (c) Cedric Bonhomme" __license__ = "AGPLv3" @@ -45,18 +45,8 @@ logger = logging.getLogger(__name__) sem = asyncio.Semaphore(5) -import ssl -try: - _create_unverified_https_context = ssl._create_unverified_context -except AttributeError: - # Legacy Python that doesn't verify HTTPS certificates by default - pass -else: - # Handle target environment that doesn't support HTTPS verification - ssl._create_default_https_context = _create_unverified_https_context - -async def get(*args, **kwargs): +def get(*args, **kwargs): #kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False) try: logger.info('Retrieving feed {}'.format(args[0])) @@ -66,7 +56,7 @@ async def get(*args, **kwargs): raise e -async def parse_feed(user, feed): +def parse_feed(user, feed): """ Fetch a feed. Update the feed and return the articles. @@ -74,21 +64,21 @@ async def parse_feed(user, feed): parsed_feed = None up_feed = {} articles = [] - with (await sem): - try: - parsed_feed = await get(feed.link) - except Exception as e: - up_feed['last_error'] = str(e) - up_feed['error_count'] = feed.error_count + 1 - logger.exception("error when parsing feed: " + str(e)) - finally: - up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal()) - if parsed_feed is None: - try: - FeedController().update({'id': feed.id}, up_feed) - except Exception as e: - logger.exception('something bad here: ' + str(e)) - return + #with (await sem): + try: + parsed_feed = get(feed.link) + except Exception as e: + up_feed['last_error'] = str(e) + up_feed['error_count'] = feed.error_count + 1 + logger.exception("error when parsing feed: " + str(e)) + finally: + up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal()) + if parsed_feed is None: + try: + FeedController().update({'id': feed.id}, up_feed) + except Exception as e: + logger.exception('something bad here: ' + str(e)) + return if not is_parsing_ok(parsed_feed): up_feed['last_error'] = str(parsed_feed['bozo_exception']) @@ -114,95 +104,97 @@ async def parse_feed(user, feed): return articles -async def insert_database(user, feed): - articles = await parse_feed(user, feed) - if None is articles: - return [] - - logger.info('Inserting articles for {}'.format(feed.title)) - - new_articles = [] - art_contr = ArticleController(user.id) - for article in articles: - new_article = await construct_article(article, feed) - - try: - existing_article_req = art_contr.read(feed_id=feed.id, - entry_id=extract_id(article)) - except Exception as e: - logger.exception("existing_article_req: " + str(e)) - continue - exist = existing_article_req.count() != 0 - if exist: - continue - # if the article has been already retrieved, we only update - # the content or the title - logger.info('Article already in the database: {}'. \ - format(article['link'])) - existing_article = existing_article_req.first() - - if new_article['date'].replace(tzinfo=None) != \ - existing_article.date: - existing_article.date = new_article['date'] - existing_article.updated_date = new_article['date'] - if existing_article.title != new_article['title']: - existing_article.title = new_article['title'] - content = get_article_content(article) - if existing_article.content != content: - existing_article.content = content - existing_article.readed = False - art_contr.update({'entry_id': existing_article.entry_id}, - existing_article.dump()) - logger.info('Article updated: {}'.format(article['link'])) - continue - - # insertion of the new article - try: - new_articles.append(art_contr.create(**new_article)) - logger.info('New article added: {}'.format(new_article['link'])) - except Exception: - logger.exception('Error when inserting article in database:') +async def insert_articles(queue, nḅ_producers=1): + """Consumer coroutines. + """ + nb_producers_done = 0 + while True: + item = await queue.get() + if item is None: + nb_producers_done += 1 + if nb_producers_done == nḅ_producers: + print('All producers done.') + print('Process finished.') + break continue - return new_articles - - -async def init_process(user, feed): - # Fetch the feed and insert new articles in the database - try: - articles = await insert_database(user, feed) - logger.debug('inserted articles for %s', feed.title) - return articles - except Exception as e: - logger.exception('init_process: ' + str(e)) - -def retrieve_feed(loop, user, feed_id=None): + user, feed, articles = item + + + if None is articles: + logger.info('None') + articles = [] + + logger.info('Inserting articles for {}'.format(feed.title)) + + new_articles = [] + art_contr = ArticleController(user.id) + for article in articles: + new_article = await construct_article(article, feed) + + try: + existing_article_req = art_contr.read(feed_id=feed.id, + entry_id=extract_id(article)) + except Exception as e: + logger.exception("existing_article_req: " + str(e)) + continue + exist = existing_article_req.count() != 0 + if exist: + continue + # if the article has been already retrieved, we only update + # the content or the title + logger.info('Article already in the database: {}'. \ + format(article['link'])) + existing_article = existing_article_req.first() + + if new_article['date'].replace(tzinfo=None) != \ + existing_article.date: + existing_article.date = new_article['date'] + existing_article.updated_date = new_article['date'] + if existing_article.title != new_article['title']: + existing_article.title = new_article['title'] + content = get_article_content(article) + if existing_article.content != content: + existing_article.content = content + existing_article.readed = False + art_contr.update({'entry_id': existing_article.entry_id}, + existing_article.dump()) + logger.info('Article updated: {}'.format(article['link'])) + continue + + # insertion of the new article + try: + new_articles.append(art_contr.create(**new_article)) + logger.info('New article added: {}'.format(new_article['link'])) + except Exception: + logger.exception('Error when inserting article in database:') + continue + + +async def retrieve_feed(queue, users, feed_id=None): """ Launch the processus. """ - logger.info('Starting to retrieve feeds for {}'.format(user.nickname)) - - # Get the list of feeds to fetch - filters = {} - filters['user_id'] = user.id - if feed_id is not None: - filters['id'] = feed_id - filters['enabled'] = True - filters['error_count__lt'] = conf.DEFAULT_MAX_ERROR - filters['last_retrieved__lt'] = datetime.now() - \ - timedelta(minutes=conf.FEED_REFRESH_INTERVAL) - feeds = FeedController().read(**filters).all() - - if feeds == []: - logger.info('No feed to retrieve for {}'.format(user.nickname)) - return - - # Launch the process for all the feeds - tasks = [asyncio.ensure_future(init_process(user, feed)) for feed in feeds] + for user in users: + logger.info('Starting to retrieve feeds for {}'.format(user.nickname)) + filters = {} + filters['user_id'] = user.id + if feed_id is not None: + filters['id'] = feed_id + filters['enabled'] = True + filters['error_count__lt'] = conf.DEFAULT_MAX_ERROR + filters['last_retrieved__lt'] = datetime.now() - \ + timedelta(minutes=conf.FEED_REFRESH_INTERVAL) + feeds = FeedController().read(**filters).all() + + + if feeds == []: + logger.info('No feed to retrieve for {}'.format(user.nickname)) + + + for feed in feeds: + articles = parse_feed(user, feed) + await queue.put((user, feed, articles)) + + await queue.put(None) - try: - loop.run_until_complete(asyncio.wait(tasks)) - except: - logger.exception('an error occured') - finally: - logger.info('Articles retrieved for {}'.format(user.nickname)) diff --git a/src/manager.py b/src/manager.py index 795b3974..244ab77f 100755 --- a/src/manager.py +++ b/src/manager.py @@ -65,16 +65,19 @@ def fetch_asyncio(user_id=None, feed_id=None): feed_id = int(feed_id) except: feed_id = None + + + loop = asyncio.get_event_loop() + queue = asyncio.Queue(maxsize=2, loop=loop) + + producer_coro = default_crawler.retrieve_feed(queue, users, feed_id) + consumer_coro = default_crawler.insert_articles(queue, 1) logger.info('Starting crawler.') - start = datetime.now() - loop = asyncio.get_event_loop() - for user in users: - default_crawler.retrieve_feed(loop, user, feed_id) - loop.close() + loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro)) end = datetime.now() - + loop.close() logger.info('Crawler finished in {} seconds.' \ .format((end - start).seconds)) |