aboutsummaryrefslogtreecommitdiff
path: root/pyaggr3g470r/crawler.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyaggr3g470r/crawler.py')
-rw-r--r--pyaggr3g470r/crawler.py402
1 files changed, 148 insertions, 254 deletions
diff --git a/pyaggr3g470r/crawler.py b/pyaggr3g470r/crawler.py
index 902edcd5..7815a631 100644
--- a/pyaggr3g470r/crawler.py
+++ b/pyaggr3g470r/crawler.py
@@ -20,123 +20,184 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Cedric Bonhomme"
-__version__ = "$Revision: 2.2 $"
+__version__ = "$Revision: 3.0 $"
__date__ = "$Date: 2010/09/02 $"
-__revision__ = "$Date: 2014/07/13 $"
+__revision__ = "$Date: 2015/01/21 $"
__copyright__ = "Copyright (c) Cedric Bonhomme"
__license__ = "AGPLv3"
-import re
-import socket
+import asyncio
import logging
-import feedparser
-try:
- # Python 3
- from urllib.request import ProxyHandler
-except:
- # Python 2
- from urllib2 import ProxyHandler
import requests
+import feedparser
import dateutil.parser
from bs4 import BeautifulSoup
-from datetime import datetime
-from sqlalchemy.exc import IntegrityError
-from requests.exceptions import *
-
-import gevent.monkey
-gevent.monkey.patch_all()
-from gevent import Timeout
-from gevent.pool import Pool
from pyaggr3g470r import utils
from pyaggr3g470r import conf
-from pyaggr3g470r import notifications
from pyaggr3g470r import db
from pyaggr3g470r.models import User, Article
-if not conf.ON_HEROKU:
- import pyaggr3g470r.search as fastsearch
-
logger = logging.getLogger(__name__)
-socket.setdefaulttimeout(5.0)
-
-
-# Hack: Re-add sslwrap to Python 2.7.9
-import inspect
-__ssl__ = __import__('ssl')
-try:
- _ssl = __ssl__._ssl
-except AttributeError:
- _ssl = __ssl__._ssl2
-if not hasattr(_ssl, 'sslwrap'):
- def new_sslwrap(sock, server_side=False, keyfile=None, certfile=None, cert_reqs=__ssl__.CERT_NONE, ssl_version=__ssl__.PROTOCOL_SSLv23, ca_certs=None, ciphers=None):
- context = __ssl__.SSLContext(ssl_version)
- context.verify_mode = cert_reqs or __ssl__.CERT_NONE
- if ca_certs:
- context.load_verify_locations(ca_certs)
- if certfile:
- context.load_cert_chain(certfile, keyfile)
- if ciphers:
- context.set_ciphers(ciphers)
-
- caller_self = inspect.currentframe().f_back.f_locals['self']
- return context._wrap_socket(sock, server_side=server_side, ssl_sock=caller_self)
- _ssl.sslwrap = new_sslwrap
-# End hack
-
-
-class TooLong(Exception):
- def __init__(self):
- """
- Log a when greenlet took to long to fetch a resource.
- """
- logger.warning("Greenlet took to long")
-
-
-class FeedGetter(object):
+@asyncio.coroutine
+def fetch(user, feed):
"""
- This class is in charge of retrieving the feeds.
+ Fetch a feed.
"""
- def __init__(self, email):
- """
- Initialization.
- """
- feedparser.USER_AGENT = conf.USER_AGENT
- if conf.HTTP_PROXY == "":
- self.proxy = ProxyHandler({})
- self.proxies = {}
- else:
- self.proxy = ProxyHandler({"http": conf.HTTP_PROXY,
- "https": conf.HTTP_PROXY})
- self.proxies = {
- "http": "http://" + conf.HTTP_PROXY,
- "https": "http://" + conf.HTTP_PROXY
- }
- feedparser.USER_AGENT = conf.USER_AGENT
- self.user = User.query.filter(User.email == email).first()
+ logger.info("Fetching the feed: " + feed.title)
+ print("Fetching the feed: " + feed.title)
+ a_feed = feedparser.parse(feed.link)
+ if a_feed['bozo'] == 1:
+ logger.error(a_feed['bozo_exception'])
+ if a_feed['entries'] == []:
+ return
+
+ # Feed informations
+ if feed.title == "":
+ try:
+ feed.title = a_feed.feed.title
+ except:
+ feed.title = "No title"
+ if feed.link == "":
+ try:
+ feed.link = a_feed.feed.link
+ except:
+ feed.link = ""
+ if feed.description == "":
+ try:
+ feed.description = a_feed.feed.subtitle
+ except:
+ feed.description = ""
+
+ articles = []
+ for article in a_feed['entries']:
+
+ try:
+ nice_url = article.link.encode("utf-8")
+ except:
+ # if not able to get the link of the article, continue
+ continue
+ if conf.RESOLVE_ARTICLE_URL:
+ try:
+ # resolves URL behind proxies
+ # (like feedproxy.google.com)
+ r = requests.get(article.link, timeout=5.0)
+ nice_url = r.url.encode("utf-8")
+ except Exception as error:
+ logger.warning(
+ "Unable to get the real URL of %s. Error: %s",
+ article.link, error)
+ continue
+ # remove utm_* parameters
+ nice_url = utils.clean_url(nice_url)
+
+ description = ""
+ article_title = article.get('title', '')
+ try:
+ # article content
+ description = article.content[0].value
+ except AttributeError:
+ # article description
+ description = article.get('description', '')
+
+ try:
+ description = BeautifulSoup(description, "lxml").decode()
+ except:
+ logger.error("Problem when sanitizing the content of the article %s (%s)",
+ article_title, nice_url)
+
+ post_date = None
+ for date_key in ('published_parsed', 'published',
+ 'updated_parsed', 'updated'):
+ if not date_key in article:
+ continue
- def retrieve_feed(self, feed_id=None):
+ try:
+ post_date = dateutil.parser.parse(article[date_key],
+ dayfirst=True)
+ break
+ except:
+ try: # trying to clean date field from letters
+ post_date = dateutil.parser.parse(
+ re.sub('[A-z]', '', article[date_key]),
+ dayfirst=True)
+ break
+ except:
+ pass
+
+ # create the models.Article object and append it to the list of articles
+ article = Article(link=nice_url, title=article_title,
+ content=description, readed=False, like=False,
+ date=post_date, user_id=user.id,
+ feed_id=feed.id)
+ articles.append(article)
+ return articles
+
+@asyncio.coroutine
+def insert_database(user, feed):
+
+ articles = yield from asyncio.async(fetch(user, feed))
+
+ print('inserting articles for {}'.format(feed.title))
+
+ logger.info("Database insertion...")
+ new_articles = []
+ query1 = Article.query.filter(Article.user_id == user.id)
+ for article in articles:
+ query2 = query1.filter(Article.feed_id == feed.id)
+ for article in articles:
+ exist = False#query2.filter(Article.link == article.link).count() != 0
+ if exist:
+ logger.debug("Article %r (%r) already in the database.",
+ article.title, article.link)
+ continue
+ if article.date is None:
+ article.date = datetime.now(dateutil.tz.tzlocal())
+ new_articles.append(article)
+ try:
+ feed.articles.append(article)
+ #db.session.merge(article)
+ db.session.commit()
+ logger.info("New article %r (%r) added.", article.title,
+ article.link)
+ except Exception as e:
+ logger.error("Error when inserting article in database: " + str(e))
+ continue
+ #db.session.close()
+ return new_articles
+
+
+
+@asyncio.coroutine
+def done(feed):
+ print('done {}'.format(feed.title))
+
+@asyncio.coroutine
+def process_data(user, feed):
+ data = yield from asyncio.async(insert_database(user, feed))
+ print('inserted articles for {}'.format(feed.title))
+
+
+
+def retrieve_feed(user, feed_id=None):
"""
Launch the processus.
"""
logger.info("Starting to retrieve feeds.")
# 1 - Get the list of feeds to fetch
- user = User.query.filter(User.email == self.user.email).first()
+ user = User.query.filter(User.email == user.email).first()
feeds = [feed for feed in user.feeds if feed.enabled]
if feed_id is not None:
feeds = [feed for feed in feeds if feed.id == feed_id]
# 2 - Fetch the feeds.
- # 'responses' contains all the jobs returned by
- # the function retrieve_async()
- responses = self.retrieve_async(feeds)
- elements = [item.value for item in responses if item.value is not None]
-
- # 3 - Insert articles in the database
- new_articles = self.insert_database(elements)
+ loop = asyncio.get_event_loop()
+ f = asyncio.wait([process_data(user, feed) for feed in feeds[0:15]])
+ loop.run_until_complete(f)
+ """
# 4 - Indexation
if not conf.ON_HEROKU:
self.index(new_articles)
@@ -144,173 +205,6 @@ class FeedGetter(object):
# 5 - Mail notification
if not conf.ON_HEROKU and conf.NOTIFICATION_ENABLED:
self.mail_notification(new_articles)
-
- logger.info("All articles retrieved. End of the processus.")
-
- def retrieve_async(self, feeds):
"""
- Spawn different jobs in order to retrieve a list of feeds.
- Returns a list of jobs.
- """
- def fetch(feed):
- """
- Fetch a feed.
- """
- logger.info("Fetching the feed: " + feed.title)
- a_feed = feedparser.parse(feed.link, handlers=[self.proxy])
- if a_feed['bozo'] == 1:
- logger.error(a_feed['bozo_exception'])
- if a_feed['entries'] == []:
- return
- # Feed informations
- if feed.title == "":
- try:
- feed.title = a_feed.feed.title
- except:
- feed.title = "No title"
- if feed.link == "":
- try:
- feed.link = a_feed.feed.link
- except:
- feed.link = ""
- if feed.description == "":
- try:
- feed.description = a_feed.feed.subtitle
- except:
- feed.description = ""
-
- articles = []
- for article in a_feed['entries']:
-
- try:
- nice_url = article.link.encode("utf-8")
- except:
- # if not able to get the link of the article, continue
- continue
- if conf.RESOLVE_ARTICLE_URL:
- try:
- # resolves URL behind proxies
- # (like feedproxy.google.com)
- r = requests.get(article.link, timeout=5.0,
- proxies=self.proxies)
- nice_url = r.url.encode("utf-8")
- except Timeout:
- logger.warning(
- "Timeout when getting the real URL of %s.",
- article.link)
- continue
- except Exception as error:
- logger.warning(
- "Unable to get the real URL of %s. Error: %s",
- article.link, error)
- continue
- # remove utm_* parameters
- nice_url = utils.clean_url(nice_url)
-
- description = ""
- article_title = article.get('title', '')
- try:
- # article content
- description = article.content[0].value
- except AttributeError:
- # article description
- description = article.get('description', '')
-
- try:
- description = BeautifulSoup(description, "lxml").decode()
- except:
- logger.error("Problem when sanitizing the content of the article %s (%s)",
- article_title, nice_url)
-
- post_date = None
- for date_key in ('published_parsed', 'published',
- 'updated_parsed', 'updated'):
- if not date_key in article:
- continue
-
- try:
- post_date = dateutil.parser.parse(article[date_key],
- dayfirst=True)
- break
- except:
- try: # trying to clean date field from letters
- post_date = dateutil.parser.parse(
- re.sub('[A-z]', '', article[date_key]),
- dayfirst=True)
- break
- except:
- pass
-
- # create the models.Article object and append it to the list of articles
- article = Article(link=nice_url, title=article_title,
- content=description, readed=False, like=False,
- date=post_date, user_id=self.user.id,
- feed_id=feed.id)
- articles.append(article)
-
- # return the feed with the list of retrieved articles
- return feed, articles
-
- pool = Pool(20)
- jobs = [pool.spawn(fetch, feed) for feed in feeds]
- pool.join()
-
- return jobs
-
- def insert_database(self, elements):
- """
- Insert articles in the database.
- """
- logger.info("Database insertion...")
- new_articles = []
- query1 = Article.query.filter(Article.user_id == self.user.id)
- for feed, articles in elements:
- query2 = query1.filter(Article.feed_id == feed.id)
- for article in articles:
- exist = query2.filter(Article.link == article.link).count() != 0
- if exist:
- logger.debug("Article %r (%r) already in the database.",
- article.title, article.link)
- continue
- if article.date is None:
- article.date = datetime.now(dateutil.tz.tzlocal())
-
- new_articles.append(article)
-
- try:
- feed.articles.append(article)
- #db.session.merge(article)
- db.session.commit()
- logger.info("New article %r (%r) added.", article.title,
- article.link)
- except Exception as e:
- logger.error("Error when inserting article in database: " + str(e))
- continue
- #db.session.close()
- return new_articles
-
- def index(self, new_articles):
- """
- Index new articles.
- """
- logger.info("Indexing new articles.")
- for element in new_articles:
- article = Article.query.filter(Article.user_id == self.user.id,
- Article.link == element.link).first()
- try:
- fastsearch.add_to_index(self.user.id, [article],
- article.source)
- except:
- logger.exception("Problem during indexation:")
- return True
-
- def mail_notification(self, new_articles):
- """
- Mail notification.
- """
- logger.info("Starting mail notification.")
- for element in new_articles:
- if element.source.email_notification:
- notifications.new_article_notification(self.user, element.source, element)
- return True
+ logger.info("All articles retrieved. End of the processus.") \ No newline at end of file
bgstack15