From 0152456fe8bc0549e581865c8925006d014ecd64 Mon Sep 17 00:00:00 2001 From: Cédric Bonhomme Date: Sun, 8 Feb 2015 00:08:58 +0100 Subject: Misc improvements for the crawler. A semaphore is used to limit the number of simultaneous connection. --- pyaggr3g470r/crawler.py | 104 ++++++++++++++++++++++++------------------------ 1 file changed, 51 insertions(+), 53 deletions(-) (limited to 'pyaggr3g470r') diff --git a/pyaggr3g470r/crawler.py b/pyaggr3g470r/crawler.py index dea06b64..cdbbab8d 100644 --- a/pyaggr3g470r/crawler.py +++ b/pyaggr3g470r/crawler.py @@ -20,9 +20,9 @@ # along with this program. If not, see . __author__ = "Cedric Bonhomme" -__version__ = "$Revision: 3.0 $" +__version__ = "$Revision: 3.1 $" __date__ = "$Date: 2010/09/02 $" -__revision__ = "$Date: 2015/01/21 $" +__revision__ = "$Date: 2015/02/08 $" __copyright__ = "Copyright (c) Cedric Bonhomme" __license__ = "AGPLv3" @@ -41,26 +41,32 @@ from pyaggr3g470r.models import User, Article logger = logging.getLogger(__name__) -# -# asyncio examples: -# -http://compiletoi.net/fast-scraping-in-python-with-asyncio.html -# - https://gist.github.com/kunev/f83146d407c81a2d64a6 -# +sem = asyncio.Semaphore(5) @asyncio.coroutine def get(*args, **kwargs): kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False) - response = yield from aiohttp.request('GET', *args, **kwargs) - return (yield from response.read_and_close(decode=False)) + try: + #logger.info("Fetching the feed: " + args[0]) + response = yield from aiohttp.request('GET', *args, **kwargs) + return (yield from response.read_and_close(decode=False)) + except Exception as e: + print(e) + return None @asyncio.coroutine -def fetch(user, feed): +def parse_feed(user, feed): """ Fetch a feed. """ - logger.info("Fetching the feed: " + feed.title) - print("Fetching the feed: " + feed.title) - data = yield from get(feed.link) + data = None + + with (yield from sem): + data = yield from get(feed.link) + + if data is None: + return + a_feed = feedparser.parse(data) if a_feed['bozo'] == 1: logger.error(a_feed['bozo_exception']) @@ -151,11 +157,11 @@ def fetch(user, feed): @asyncio.coroutine def insert_database(user, feed): - articles = yield from asyncio.async(fetch(user, feed)) + articles = yield from asyncio.async(parse_feed(user, feed)) if None is articles: return [] - print('inserting articles for {}'.format(feed.title)) + #print('inserting articles for {}'.format(feed.title)) logger.info("Database insertion...") new_articles = [] @@ -174,7 +180,6 @@ def insert_database(user, feed): #db.session.merge(article) db.session.commit() #logger.info("New article % (%r) added.", article.title, article.link) - print("New article added: " + article.title) except Exception as e: logger.error("Error when inserting article in database: " + str(e)) continue @@ -182,42 +187,35 @@ def insert_database(user, feed): return new_articles @asyncio.coroutine -def done(feed): - print('done {}'.format(feed.title)) - -sem = asyncio.Semaphore(5) - -@asyncio.coroutine -def process_data(user, feed): - with (yield from sem): - data = yield from asyncio.async(insert_database(user, feed)) - print('inserted articles for {}'.format(feed.title)) +def init_process(user, feed): + data = yield from asyncio.async(insert_database(user, feed)) + #print('inserted articles for {}'.format(feed.title)) def retrieve_feed(user, feed_id=None): - """ - Launch the processus. - """ - logger.info("Starting to retrieve feeds.") - - # 1 - Get the list of feeds to fetch - user = User.query.filter(User.email == user.email).first() - feeds = [feed for feed in user.feeds if feed.enabled] - if feed_id is not None: - feeds = [feed for feed in feeds if feed.id == feed_id] - - # 2 - Fetch the feeds. - loop = asyncio.get_event_loop() - f = asyncio.wait([process_data(user, feed) for feed in feeds]) - loop.run_until_complete(f) - - """ - # 4 - Indexation - if not conf.ON_HEROKU: - self.index(new_articles) - - # 5 - Mail notification - if not conf.ON_HEROKU and conf.NOTIFICATION_ENABLED: - self.mail_notification(new_articles) - """ - - logger.info("All articles retrieved. End of the processus.") \ No newline at end of file + """ + Launch the processus. + """ + logger.info("Starting to retrieve feeds.") + + # 1 - Get the list of feeds to fetch + user = User.query.filter(User.email == user.email).first() + feeds = [feed for feed in user.feeds if feed.enabled] + if feed_id is not None: + feeds = [feed for feed in feeds if feed.id == feed_id] + + # 2 - Fetch the feeds. + loop = asyncio.get_event_loop() + f = asyncio.wait([init_process(user, feed) for feed in feeds]) + loop.run_until_complete(f) + + """ + # 4 - Indexation + if not conf.ON_HEROKU: + self.index(new_articles) + + # 5 - Mail notification + if not conf.ON_HEROKU and conf.NOTIFICATION_ENABLED: + self.mail_notification(new_articles) + """ + + logger.info("All articles retrieved. End of the processus.") \ No newline at end of file -- cgit