aboutsummaryrefslogtreecommitdiff
path: root/src/crawler
diff options
context:
space:
mode:
Diffstat (limited to 'src/crawler')
-rw-r--r--src/crawler/classic_crawler.py168
-rw-r--r--src/crawler/http_crawler.py251
2 files changed, 419 insertions, 0 deletions
diff --git a/src/crawler/classic_crawler.py b/src/crawler/classic_crawler.py
new file mode 100644
index 00000000..0598c418
--- /dev/null
+++ b/src/crawler/classic_crawler.py
@@ -0,0 +1,168 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -
+
+# jarr - A Web based news aggregator.
+# Copyright (C) 2010-2015 Cédric Bonhomme - https://www.JARR-aggregator.org
+#
+# For more information : https://github.com/JARR-aggregator/JARR/
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+__author__ = "Cedric Bonhomme"
+__version__ = "$Revision: 3.3 $"
+__date__ = "$Date: 2010/09/02 $"
+__revision__ = "$Date: 2015/12/07 $"
+__copyright__ = "Copyright (c) Cedric Bonhomme"
+__license__ = "AGPLv3"
+
+import asyncio
+import logging
+import feedparser
+import dateutil.parser
+from datetime import datetime
+from sqlalchemy import or_
+
+import conf
+from bootstrap import db
+from web.models import User
+from web.controllers import FeedController, ArticleController
+from web.lib.feed_utils import construct_feed_from, is_parsing_ok
+from web.lib.article_utils import construct_article, extract_id
+
+logger = logging.getLogger(__name__)
+
+sem = asyncio.Semaphore(5)
+
+import ssl
+try:
+ _create_unverified_https_context = ssl._create_unverified_context
+except AttributeError:
+ # Legacy Python that doesn't verify HTTPS certificates by default
+ pass
+else:
+ # Handle target environment that doesn't support HTTPS verification
+ ssl._create_default_https_context = _create_unverified_https_context
+
+
+async def get(*args, **kwargs):
+ #kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False)
+ try:
+ data = feedparser.parse(args[0])
+ return data
+ except Exception as e:
+ raise e
+
+
+async def parse_feed(user, feed):
+ """
+ Fetch a feed.
+ Update the feed and return the articles.
+ """
+ parsed_feed = None
+ up_feed = {}
+ articles = []
+ with (await sem):
+ try:
+ parsed_feed = await get(feed.link)
+ except Exception as e:
+ up_feed['last_error'] = str(e)
+ up_feed['error_count'] = feed.error_count + 1
+ finally:
+ up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal())
+ if parsed_feed is None:
+ FeedController().update({'id': feed.id}, up_feed)
+ return
+
+ if not is_parsing_ok(parsed_feed):
+ up_feed['last_error'] = str(parsed_feed['bozo_exception'])
+ up_feed['error_count'] = feed.error_count + 1
+ FeedController().update({'id': feed.id}, up_feed)
+ return
+ if parsed_feed['entries'] != []:
+ articles = parsed_feed['entries']
+
+ up_feed['error_count'] = 0
+ up_feed['last_error'] = ""
+
+ # Feed informations
+ construct_feed_from(feed.link, parsed_feed).update(up_feed)
+ if feed.title and 'title' in up_feed:
+ # do not override the title set by the user
+ del up_feed['title']
+ FeedController().update({'id': feed.id}, up_feed)
+
+ return articles
+
+
+async def insert_database(user, feed):
+
+ articles = await parse_feed(user, feed)
+ if None is articles:
+ return []
+
+ logger.debug('inserting articles for {}'.format(feed.title))
+
+ logger.info("Database insertion...")
+ new_articles = []
+ art_contr = ArticleController(user.id)
+ for article in articles:
+ exist = art_contr.read(feed_id=feed.id,
+ **extract_id(article)).count() != 0
+ if exist:
+ logger.debug("Article %r (%r) already in the database.",
+ article['title'], article['link'])
+ continue
+ article = construct_article(article, feed)
+ try:
+ new_articles.append(art_contr.create(**article))
+ logger.info("New article % (%r) added.",
+ article['title'], article['link'])
+ except Exception:
+ logger.exception("Error when inserting article in database:")
+ continue
+ return new_articles
+
+
+async def init_process(user, feed):
+ # Fetch the feed and insert new articles in the database
+ articles = await insert_database(user, feed)
+ logger.debug('inserted articles for %s', feed.title)
+ return articles
+
+
+def retrieve_feed(loop, user, feed_id=None):
+ """
+ Launch the processus.
+ """
+ logger.info("Starting to retrieve feeds.")
+
+ # Get the list of feeds to fetch
+ user = User.query.filter(User.email == user.email).first()
+ feeds = [feed for feed in user.feeds if
+ feed.error_count <= conf.DEFAULT_MAX_ERROR and feed.enabled]
+ if feed_id is not None:
+ feeds = [feed for feed in feeds if feed.id == feed_id]
+
+ if feeds == []:
+ return
+
+ # Launch the process for all the feeds
+ tasks = [asyncio.ensure_future(init_process(user, feed)) for feed in feeds]
+
+ try:
+ loop.run_until_complete(asyncio.wait(tasks))
+ except Exception:
+ logger.exception('an error occured')
+
+ logger.info("All articles retrieved. End of the processus.")
diff --git a/src/crawler/http_crawler.py b/src/crawler/http_crawler.py
new file mode 100644
index 00000000..f480fe96
--- /dev/null
+++ b/src/crawler/http_crawler.py
@@ -0,0 +1,251 @@
+"""
+Here's a sum up on how it works :
+
+CrawlerScheduler.run
+ will retreive a list of feeds to be refreshed and pass result to
+CrawlerScheduler.callback
+ which will retreive each feed and treat result with
+FeedCrawler.callback
+ which will interprete the result (status_code, etag) collect ids
+ and match them agaisnt pyagg which will cause
+PyAggUpdater.callback
+ to create the missing entries
+"""
+
+import time
+import conf
+import json
+import logging
+import feedparser
+from datetime import datetime, timedelta
+from time import strftime, gmtime
+from concurrent.futures import ThreadPoolExecutor
+from requests_futures.sessions import FuturesSession
+from web.lib.utils import default_handler, to_hash
+from web.lib.feed_utils import construct_feed_from
+from web.lib.article_utils import extract_id, construct_article
+
+logger = logging.getLogger(__name__)
+logging.captureWarnings(True)
+API_ROOT = "api/v2.0/"
+
+
+class AbstractCrawler:
+
+ def __init__(self, auth, pool=None, session=None):
+ self.auth = auth
+ self.pool = pool or ThreadPoolExecutor(max_workers=conf.NB_WORKER)
+ self.session = session or FuturesSession(executor=self.pool)
+ self.session.verify = False
+ self.url = conf.PLATFORM_URL
+
+ def query_pyagg(self, method, urn, data=None):
+ """A wrapper for internal call, method should be ones you can find
+ on requests (header, post, get, options, ...), urn the distant
+ resources you want to access on pyagg, and data, the data you wanna
+ transmit."""
+ if data is None:
+ data = {}
+ method = getattr(self.session, method)
+ return method("%s%s%s" % (self.url, API_ROOT, urn),
+ auth=self.auth, data=json.dumps(data,
+ default=default_handler),
+ headers={'Content-Type': 'application/json',
+ 'User-Agent': conf.USER_AGENT})
+
+ def wait(self, max_wait=300, checks=5, wait_for=2):
+ checked, second_waited = 0, 0
+ while True:
+ time.sleep(wait_for)
+ second_waited += wait_for
+ if second_waited > max_wait:
+ logger.warn('Exiting after %d seconds', second_waited)
+ break
+ if self.pool._work_queue.qsize():
+ checked = 0
+ continue
+ checked += 1
+ if checked == checks:
+ break
+
+
+class PyAggUpdater(AbstractCrawler):
+
+ def __init__(self, feed, entries, headers, parsed_feed,
+ auth, pool=None, session=None):
+ self.feed = feed
+ self.entries = entries
+ self.headers = headers
+ self.parsed_feed = parsed_feed
+ super().__init__(auth, pool, session)
+
+ def callback(self, response):
+ """Will process the result from the challenge, creating missing article
+ and updating the feed"""
+ article_created = False
+ if response.result().status_code != 204:
+ results = response.result().json()
+ logger.debug('%r %r - %d entries were not matched '
+ 'and will be created',
+ self.feed['id'], self.feed['title'], len(results))
+ for id_to_create in results:
+ article_created = True
+ entry = construct_article(
+ self.entries[tuple(sorted(id_to_create.items()))],
+ self.feed)
+ logger.info('%r %r - creating %r for %r - %r', self.feed['id'],
+ self.feed['title'], entry['title'],
+ entry['user_id'], id_to_create)
+ self.query_pyagg('post', 'article', entry)
+
+ logger.debug('%r %r - updating feed etag %r last_mod %r',
+ self.feed['id'], self.feed['title'],
+ self.headers.get('etag', ''),
+ self.headers.get('last-modified', ''))
+
+ up_feed = {'error_count': 0, 'last_error': None,
+ 'etag': self.headers.get('etag', ''),
+ 'last_modified': self.headers.get('last-modified',
+ strftime('%a, %d %b %Y %X %Z', gmtime()))}
+ fresh_feed = construct_feed_from(url=self.feed['link'],
+ fp_parsed=self.parsed_feed)
+ for key in ('description', 'site_link', 'icon_url'):
+ if fresh_feed.get(key) and fresh_feed[key] != self.feed.get(key):
+ up_feed[key] = fresh_feed[key]
+ if not self.feed.get('title'):
+ up_feed['title'] = fresh_feed.get('title', '')
+ up_feed['user_id'] = self.feed['user_id']
+ # re-getting that feed earlier since new entries appeared
+ if article_created:
+ up_feed['last_retrieved'] \
+ = (datetime.now() - timedelta(minutes=45)).isoformat()
+
+ diff_keys = {key for key in up_feed
+ if up_feed[key] != self.feed.get(key)}
+ if not diff_keys:
+ return # no change in the feed, no update
+ if not article_created and diff_keys == {'last_modified', 'etag'}:
+ return # meaningless if no new article has been published
+ logger.info('%r %r - pushing feed attrs %r',
+ self.feed['id'], self.feed['title'],
+ {key: "%s -> %s" % (up_feed[key], self.feed.get(key))
+ for key in up_feed if up_feed[key] != self.feed.get(key)})
+
+ self.query_pyagg('put', 'feed/%d' % self.feed['id'], up_feed)
+
+
+class FeedCrawler(AbstractCrawler):
+
+ def __init__(self, feed, auth, pool=None, session=None):
+ self.feed = feed
+ super().__init__(auth, pool, session)
+
+ def clean_feed(self):
+ """Will reset the errors counters on a feed that have known errors"""
+ if self.feed.get('error_count') or self.feed.get('last_error'):
+ self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': 0, 'last_error': ''})
+
+ def callback(self, response):
+ """will fetch the feed and interprete results (304, etag) or will
+ challenge pyagg to compare gotten entries with existing ones"""
+ try:
+ response = response.result()
+ response.raise_for_status()
+ except Exception as error:
+ error_count = self.feed['error_count'] + 1
+ logger.exception('%r %r - an error occured while fetching '
+ 'feed; bumping error count to %r',
+ self.feed['id'], self.feed['title'], error_count)
+ future = self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': error_count,
+ 'last_error': str(error),
+ 'user_id': self.feed['user_id']})
+ return
+
+ if response.status_code == 304:
+ logger.info("%r %r - feed responded with 304",
+ self.feed['id'], self.feed['title'])
+ self.clean_feed()
+ return
+ if 'etag' not in response.headers:
+ logger.debug('%r %r - manually generating etag',
+ self.feed['id'], self.feed['title'])
+ response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text)
+ if response.headers['etag'] and self.feed['etag'] \
+ and response.headers['etag'] == self.feed['etag']:
+ if 'pyagg' in self.feed['etag']:
+ logger.info("%r %r - calculated hash matches (%d)",
+ self.feed['id'], self.feed['title'],
+ response.status_code)
+ else:
+ logger.info("%r %r - feed responded with same etag (%d)",
+ self.feed['id'], self.feed['title'],
+ response.status_code)
+ self.clean_feed()
+ return
+ else:
+ logger.debug('%r %r - etag mismatch %r != %r',
+ self.feed['id'], self.feed['title'],
+ response.headers['etag'], self.feed['etag'])
+ logger.info('%r %r - cache validation failed, challenging entries',
+ self.feed['id'], self.feed['title'])
+
+ ids, entries = [], {}
+ parsed_response = feedparser.parse(response.content)
+ for entry in parsed_response['entries']:
+ entry_ids = extract_id(entry)
+ entry_ids['feed_id'] = self.feed['id']
+ entry_ids['user_id'] = self.feed['user_id']
+ entries[tuple(sorted(entry_ids.items()))] = entry
+ ids.append(entry_ids)
+ logger.debug('%r %r - found %d entries %r',
+ self.feed['id'], self.feed['title'], len(ids), ids)
+ future = self.query_pyagg('get', 'articles/challenge', {'ids': ids})
+ updater = PyAggUpdater(self.feed, entries, response.headers,
+ parsed_response,
+ self.auth, self.pool, self.session)
+ future.add_done_callback(updater.callback)
+
+
+class CrawlerScheduler(AbstractCrawler):
+
+ def __init__(self, username, password, pool=None, session=None):
+ self.auth = (username, password)
+ super(CrawlerScheduler, self).__init__(self.auth, pool, session)
+
+ def prepare_headers(self, feed):
+ """For a known feed, will construct some header dictionnary"""
+ headers = {'User-Agent': conf.USER_AGENT}
+ if feed.get('last_modified'):
+ headers['If-Modified-Since'] = feed['last_modified']
+ if feed.get('etag') and 'pyagg' not in feed['etag']:
+ headers['If-None-Match'] = feed['etag']
+ logger.debug('%r %r - calculated headers %r',
+ feed['id'], feed['title'], headers)
+ return headers
+
+ def callback(self, response):
+ """processes feeds that need to be fetched"""
+ response = response.result()
+ response.raise_for_status()
+ if response.status_code == 204:
+ logger.debug("No feed to fetch")
+ return
+ feeds = response.json()
+ logger.debug('%d to fetch %r', len(feeds), feeds)
+ for feed in feeds:
+ logger.debug('%r %r - fetching resources',
+ feed['id'], feed['title'])
+ future = self.session.get(feed['link'],
+ headers=self.prepare_headers(feed))
+
+ feed_crwlr = FeedCrawler(feed, self.auth, self.pool, self.session)
+ future.add_done_callback(feed_crwlr.callback)
+
+ def run(self, **kwargs):
+ """entry point, will retreive feeds to be fetch
+ and launch the whole thing"""
+ logger.debug('retreving fetchable feed')
+ future = self.query_pyagg('get', 'feeds/fetchable', kwargs)
+ future.add_done_callback(self.callback)
bgstack15