From 2b5cedd394f9219b605a0e9cd7f89335b338b62e Mon Sep 17 00:00:00 2001 From: Cédric Bonhomme Date: Wed, 21 Jan 2015 08:05:39 +0100 Subject: First implementation with asyncio (not really async for the moment). --- pyaggr3g470r/crawler.py | 402 ++++++++++++++++++------------------------------ pyaggr3g470r/utils.py | 3 +- 2 files changed, 150 insertions(+), 255 deletions(-) (limited to 'pyaggr3g470r') diff --git a/pyaggr3g470r/crawler.py b/pyaggr3g470r/crawler.py index 902edcd5..7815a631 100644 --- a/pyaggr3g470r/crawler.py +++ b/pyaggr3g470r/crawler.py @@ -20,123 +20,184 @@ # along with this program. If not, see . __author__ = "Cedric Bonhomme" -__version__ = "$Revision: 2.2 $" +__version__ = "$Revision: 3.0 $" __date__ = "$Date: 2010/09/02 $" -__revision__ = "$Date: 2014/07/13 $" +__revision__ = "$Date: 2015/01/21 $" __copyright__ = "Copyright (c) Cedric Bonhomme" __license__ = "AGPLv3" -import re -import socket +import asyncio import logging -import feedparser -try: - # Python 3 - from urllib.request import ProxyHandler -except: - # Python 2 - from urllib2 import ProxyHandler import requests +import feedparser import dateutil.parser from bs4 import BeautifulSoup -from datetime import datetime -from sqlalchemy.exc import IntegrityError -from requests.exceptions import * - -import gevent.monkey -gevent.monkey.patch_all() -from gevent import Timeout -from gevent.pool import Pool from pyaggr3g470r import utils from pyaggr3g470r import conf -from pyaggr3g470r import notifications from pyaggr3g470r import db from pyaggr3g470r.models import User, Article -if not conf.ON_HEROKU: - import pyaggr3g470r.search as fastsearch - logger = logging.getLogger(__name__) -socket.setdefaulttimeout(5.0) - - -# Hack: Re-add sslwrap to Python 2.7.9 -import inspect -__ssl__ = __import__('ssl') -try: - _ssl = __ssl__._ssl -except AttributeError: - _ssl = __ssl__._ssl2 -if not hasattr(_ssl, 'sslwrap'): - def new_sslwrap(sock, server_side=False, keyfile=None, certfile=None, cert_reqs=__ssl__.CERT_NONE, ssl_version=__ssl__.PROTOCOL_SSLv23, ca_certs=None, ciphers=None): - context = __ssl__.SSLContext(ssl_version) - context.verify_mode = cert_reqs or __ssl__.CERT_NONE - if ca_certs: - context.load_verify_locations(ca_certs) - if certfile: - context.load_cert_chain(certfile, keyfile) - if ciphers: - context.set_ciphers(ciphers) - - caller_self = inspect.currentframe().f_back.f_locals['self'] - return context._wrap_socket(sock, server_side=server_side, ssl_sock=caller_self) - _ssl.sslwrap = new_sslwrap -# End hack - - -class TooLong(Exception): - def __init__(self): - """ - Log a when greenlet took to long to fetch a resource. - """ - logger.warning("Greenlet took to long") - - -class FeedGetter(object): +@asyncio.coroutine +def fetch(user, feed): """ - This class is in charge of retrieving the feeds. + Fetch a feed. """ - def __init__(self, email): - """ - Initialization. - """ - feedparser.USER_AGENT = conf.USER_AGENT - if conf.HTTP_PROXY == "": - self.proxy = ProxyHandler({}) - self.proxies = {} - else: - self.proxy = ProxyHandler({"http": conf.HTTP_PROXY, - "https": conf.HTTP_PROXY}) - self.proxies = { - "http": "http://" + conf.HTTP_PROXY, - "https": "http://" + conf.HTTP_PROXY - } - feedparser.USER_AGENT = conf.USER_AGENT - self.user = User.query.filter(User.email == email).first() + logger.info("Fetching the feed: " + feed.title) + print("Fetching the feed: " + feed.title) + a_feed = feedparser.parse(feed.link) + if a_feed['bozo'] == 1: + logger.error(a_feed['bozo_exception']) + if a_feed['entries'] == []: + return + + # Feed informations + if feed.title == "": + try: + feed.title = a_feed.feed.title + except: + feed.title = "No title" + if feed.link == "": + try: + feed.link = a_feed.feed.link + except: + feed.link = "" + if feed.description == "": + try: + feed.description = a_feed.feed.subtitle + except: + feed.description = "" + + articles = [] + for article in a_feed['entries']: + + try: + nice_url = article.link.encode("utf-8") + except: + # if not able to get the link of the article, continue + continue + if conf.RESOLVE_ARTICLE_URL: + try: + # resolves URL behind proxies + # (like feedproxy.google.com) + r = requests.get(article.link, timeout=5.0) + nice_url = r.url.encode("utf-8") + except Exception as error: + logger.warning( + "Unable to get the real URL of %s. Error: %s", + article.link, error) + continue + # remove utm_* parameters + nice_url = utils.clean_url(nice_url) + + description = "" + article_title = article.get('title', '') + try: + # article content + description = article.content[0].value + except AttributeError: + # article description + description = article.get('description', '') + + try: + description = BeautifulSoup(description, "lxml").decode() + except: + logger.error("Problem when sanitizing the content of the article %s (%s)", + article_title, nice_url) + + post_date = None + for date_key in ('published_parsed', 'published', + 'updated_parsed', 'updated'): + if not date_key in article: + continue - def retrieve_feed(self, feed_id=None): + try: + post_date = dateutil.parser.parse(article[date_key], + dayfirst=True) + break + except: + try: # trying to clean date field from letters + post_date = dateutil.parser.parse( + re.sub('[A-z]', '', article[date_key]), + dayfirst=True) + break + except: + pass + + # create the models.Article object and append it to the list of articles + article = Article(link=nice_url, title=article_title, + content=description, readed=False, like=False, + date=post_date, user_id=user.id, + feed_id=feed.id) + articles.append(article) + return articles + +@asyncio.coroutine +def insert_database(user, feed): + + articles = yield from asyncio.async(fetch(user, feed)) + + print('inserting articles for {}'.format(feed.title)) + + logger.info("Database insertion...") + new_articles = [] + query1 = Article.query.filter(Article.user_id == user.id) + for article in articles: + query2 = query1.filter(Article.feed_id == feed.id) + for article in articles: + exist = False#query2.filter(Article.link == article.link).count() != 0 + if exist: + logger.debug("Article %r (%r) already in the database.", + article.title, article.link) + continue + if article.date is None: + article.date = datetime.now(dateutil.tz.tzlocal()) + new_articles.append(article) + try: + feed.articles.append(article) + #db.session.merge(article) + db.session.commit() + logger.info("New article %r (%r) added.", article.title, + article.link) + except Exception as e: + logger.error("Error when inserting article in database: " + str(e)) + continue + #db.session.close() + return new_articles + + + +@asyncio.coroutine +def done(feed): + print('done {}'.format(feed.title)) + +@asyncio.coroutine +def process_data(user, feed): + data = yield from asyncio.async(insert_database(user, feed)) + print('inserted articles for {}'.format(feed.title)) + + + +def retrieve_feed(user, feed_id=None): """ Launch the processus. """ logger.info("Starting to retrieve feeds.") # 1 - Get the list of feeds to fetch - user = User.query.filter(User.email == self.user.email).first() + user = User.query.filter(User.email == user.email).first() feeds = [feed for feed in user.feeds if feed.enabled] if feed_id is not None: feeds = [feed for feed in feeds if feed.id == feed_id] # 2 - Fetch the feeds. - # 'responses' contains all the jobs returned by - # the function retrieve_async() - responses = self.retrieve_async(feeds) - elements = [item.value for item in responses if item.value is not None] - - # 3 - Insert articles in the database - new_articles = self.insert_database(elements) + loop = asyncio.get_event_loop() + f = asyncio.wait([process_data(user, feed) for feed in feeds[0:15]]) + loop.run_until_complete(f) + """ # 4 - Indexation if not conf.ON_HEROKU: self.index(new_articles) @@ -144,173 +205,6 @@ class FeedGetter(object): # 5 - Mail notification if not conf.ON_HEROKU and conf.NOTIFICATION_ENABLED: self.mail_notification(new_articles) - - logger.info("All articles retrieved. End of the processus.") - - def retrieve_async(self, feeds): """ - Spawn different jobs in order to retrieve a list of feeds. - Returns a list of jobs. - """ - def fetch(feed): - """ - Fetch a feed. - """ - logger.info("Fetching the feed: " + feed.title) - a_feed = feedparser.parse(feed.link, handlers=[self.proxy]) - if a_feed['bozo'] == 1: - logger.error(a_feed['bozo_exception']) - if a_feed['entries'] == []: - return - # Feed informations - if feed.title == "": - try: - feed.title = a_feed.feed.title - except: - feed.title = "No title" - if feed.link == "": - try: - feed.link = a_feed.feed.link - except: - feed.link = "" - if feed.description == "": - try: - feed.description = a_feed.feed.subtitle - except: - feed.description = "" - - articles = [] - for article in a_feed['entries']: - - try: - nice_url = article.link.encode("utf-8") - except: - # if not able to get the link of the article, continue - continue - if conf.RESOLVE_ARTICLE_URL: - try: - # resolves URL behind proxies - # (like feedproxy.google.com) - r = requests.get(article.link, timeout=5.0, - proxies=self.proxies) - nice_url = r.url.encode("utf-8") - except Timeout: - logger.warning( - "Timeout when getting the real URL of %s.", - article.link) - continue - except Exception as error: - logger.warning( - "Unable to get the real URL of %s. Error: %s", - article.link, error) - continue - # remove utm_* parameters - nice_url = utils.clean_url(nice_url) - - description = "" - article_title = article.get('title', '') - try: - # article content - description = article.content[0].value - except AttributeError: - # article description - description = article.get('description', '') - - try: - description = BeautifulSoup(description, "lxml").decode() - except: - logger.error("Problem when sanitizing the content of the article %s (%s)", - article_title, nice_url) - - post_date = None - for date_key in ('published_parsed', 'published', - 'updated_parsed', 'updated'): - if not date_key in article: - continue - - try: - post_date = dateutil.parser.parse(article[date_key], - dayfirst=True) - break - except: - try: # trying to clean date field from letters - post_date = dateutil.parser.parse( - re.sub('[A-z]', '', article[date_key]), - dayfirst=True) - break - except: - pass - - # create the models.Article object and append it to the list of articles - article = Article(link=nice_url, title=article_title, - content=description, readed=False, like=False, - date=post_date, user_id=self.user.id, - feed_id=feed.id) - articles.append(article) - - # return the feed with the list of retrieved articles - return feed, articles - - pool = Pool(20) - jobs = [pool.spawn(fetch, feed) for feed in feeds] - pool.join() - - return jobs - - def insert_database(self, elements): - """ - Insert articles in the database. - """ - logger.info("Database insertion...") - new_articles = [] - query1 = Article.query.filter(Article.user_id == self.user.id) - for feed, articles in elements: - query2 = query1.filter(Article.feed_id == feed.id) - for article in articles: - exist = query2.filter(Article.link == article.link).count() != 0 - if exist: - logger.debug("Article %r (%r) already in the database.", - article.title, article.link) - continue - if article.date is None: - article.date = datetime.now(dateutil.tz.tzlocal()) - - new_articles.append(article) - - try: - feed.articles.append(article) - #db.session.merge(article) - db.session.commit() - logger.info("New article %r (%r) added.", article.title, - article.link) - except Exception as e: - logger.error("Error when inserting article in database: " + str(e)) - continue - #db.session.close() - return new_articles - - def index(self, new_articles): - """ - Index new articles. - """ - logger.info("Indexing new articles.") - for element in new_articles: - article = Article.query.filter(Article.user_id == self.user.id, - Article.link == element.link).first() - try: - fastsearch.add_to_index(self.user.id, [article], - article.source) - except: - logger.exception("Problem during indexation:") - return True - - def mail_notification(self, new_articles): - """ - Mail notification. - """ - logger.info("Starting mail notification.") - for element in new_articles: - if element.source.email_notification: - notifications.new_article_notification(self.user, element.source, element) - return True + logger.info("All articles retrieved. End of the processus.") \ No newline at end of file diff --git a/pyaggr3g470r/utils.py b/pyaggr3g470r/utils.py index ed86a04c..d815d02b 100755 --- a/pyaggr3g470r/utils.py +++ b/pyaggr3g470r/utils.py @@ -188,9 +188,10 @@ def clean_url(url): """ Remove utm_* parameters """ + return url parsed_url = urlparse(url) qd = parse_qs(parsed_url.query, keep_blank_values=True) - filtered = dict((k, v) for k, v in qd.iteritems() + filtered = dict((k, v) for k, v in qd.items() if not k.startswith('utm_')) return urlunparse([ parsed_url.scheme, -- cgit