diff options
author | François Schmidts <francois.schmidts@gmail.com> | 2016-02-02 23:30:57 +0100 |
---|---|---|
committer | François Schmidts <francois.schmidts@gmail.com> | 2016-02-02 23:30:57 +0100 |
commit | 4a8438d7f2b7b16941240b91f39a9402c431ffc2 (patch) | |
tree | 89380f41b802256d8fdbf724e7d9e63b48209b4a /src/crawler | |
parent | Merge branch 'feature/categories' (diff) | |
download | newspipe-4a8438d7f2b7b16941240b91f39a9402c431ffc2.tar.gz newspipe-4a8438d7f2b7b16941240b91f39a9402c431ffc2.tar.bz2 newspipe-4a8438d7f2b7b16941240b91f39a9402c431ffc2.zip |
writing a bit of doc, moving crawler together
Diffstat (limited to 'src/crawler')
-rw-r--r-- | src/crawler/classic_crawler.py | 168 | ||||
-rw-r--r-- | src/crawler/http_crawler.py | 251 |
2 files changed, 419 insertions, 0 deletions
diff --git a/src/crawler/classic_crawler.py b/src/crawler/classic_crawler.py new file mode 100644 index 00000000..0598c418 --- /dev/null +++ b/src/crawler/classic_crawler.py @@ -0,0 +1,168 @@ +#! /usr/bin/env python +# -*- coding: utf-8 - + +# jarr - A Web based news aggregator. +# Copyright (C) 2010-2015 Cédric Bonhomme - https://www.JARR-aggregator.org +# +# For more information : https://github.com/JARR-aggregator/JARR/ +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +__author__ = "Cedric Bonhomme" +__version__ = "$Revision: 3.3 $" +__date__ = "$Date: 2010/09/02 $" +__revision__ = "$Date: 2015/12/07 $" +__copyright__ = "Copyright (c) Cedric Bonhomme" +__license__ = "AGPLv3" + +import asyncio +import logging +import feedparser +import dateutil.parser +from datetime import datetime +from sqlalchemy import or_ + +import conf +from bootstrap import db +from web.models import User +from web.controllers import FeedController, ArticleController +from web.lib.feed_utils import construct_feed_from, is_parsing_ok +from web.lib.article_utils import construct_article, extract_id + +logger = logging.getLogger(__name__) + +sem = asyncio.Semaphore(5) + +import ssl +try: + _create_unverified_https_context = ssl._create_unverified_context +except AttributeError: + # Legacy Python that doesn't verify HTTPS certificates by default + pass +else: + # Handle target environment that doesn't support HTTPS verification + ssl._create_default_https_context = _create_unverified_https_context + + +async def get(*args, **kwargs): + #kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False) + try: + data = feedparser.parse(args[0]) + return data + except Exception as e: + raise e + + +async def parse_feed(user, feed): + """ + Fetch a feed. + Update the feed and return the articles. + """ + parsed_feed = None + up_feed = {} + articles = [] + with (await sem): + try: + parsed_feed = await get(feed.link) + except Exception as e: + up_feed['last_error'] = str(e) + up_feed['error_count'] = feed.error_count + 1 + finally: + up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal()) + if parsed_feed is None: + FeedController().update({'id': feed.id}, up_feed) + return + + if not is_parsing_ok(parsed_feed): + up_feed['last_error'] = str(parsed_feed['bozo_exception']) + up_feed['error_count'] = feed.error_count + 1 + FeedController().update({'id': feed.id}, up_feed) + return + if parsed_feed['entries'] != []: + articles = parsed_feed['entries'] + + up_feed['error_count'] = 0 + up_feed['last_error'] = "" + + # Feed informations + construct_feed_from(feed.link, parsed_feed).update(up_feed) + if feed.title and 'title' in up_feed: + # do not override the title set by the user + del up_feed['title'] + FeedController().update({'id': feed.id}, up_feed) + + return articles + + +async def insert_database(user, feed): + + articles = await parse_feed(user, feed) + if None is articles: + return [] + + logger.debug('inserting articles for {}'.format(feed.title)) + + logger.info("Database insertion...") + new_articles = [] + art_contr = ArticleController(user.id) + for article in articles: + exist = art_contr.read(feed_id=feed.id, + **extract_id(article)).count() != 0 + if exist: + logger.debug("Article %r (%r) already in the database.", + article['title'], article['link']) + continue + article = construct_article(article, feed) + try: + new_articles.append(art_contr.create(**article)) + logger.info("New article % (%r) added.", + article['title'], article['link']) + except Exception: + logger.exception("Error when inserting article in database:") + continue + return new_articles + + +async def init_process(user, feed): + # Fetch the feed and insert new articles in the database + articles = await insert_database(user, feed) + logger.debug('inserted articles for %s', feed.title) + return articles + + +def retrieve_feed(loop, user, feed_id=None): + """ + Launch the processus. + """ + logger.info("Starting to retrieve feeds.") + + # Get the list of feeds to fetch + user = User.query.filter(User.email == user.email).first() + feeds = [feed for feed in user.feeds if + feed.error_count <= conf.DEFAULT_MAX_ERROR and feed.enabled] + if feed_id is not None: + feeds = [feed for feed in feeds if feed.id == feed_id] + + if feeds == []: + return + + # Launch the process for all the feeds + tasks = [asyncio.ensure_future(init_process(user, feed)) for feed in feeds] + + try: + loop.run_until_complete(asyncio.wait(tasks)) + except Exception: + logger.exception('an error occured') + + logger.info("All articles retrieved. End of the processus.") diff --git a/src/crawler/http_crawler.py b/src/crawler/http_crawler.py new file mode 100644 index 00000000..f480fe96 --- /dev/null +++ b/src/crawler/http_crawler.py @@ -0,0 +1,251 @@ +""" +Here's a sum up on how it works : + +CrawlerScheduler.run + will retreive a list of feeds to be refreshed and pass result to +CrawlerScheduler.callback + which will retreive each feed and treat result with +FeedCrawler.callback + which will interprete the result (status_code, etag) collect ids + and match them agaisnt pyagg which will cause +PyAggUpdater.callback + to create the missing entries +""" + +import time +import conf +import json +import logging +import feedparser +from datetime import datetime, timedelta +from time import strftime, gmtime +from concurrent.futures import ThreadPoolExecutor +from requests_futures.sessions import FuturesSession +from web.lib.utils import default_handler, to_hash +from web.lib.feed_utils import construct_feed_from +from web.lib.article_utils import extract_id, construct_article + +logger = logging.getLogger(__name__) +logging.captureWarnings(True) +API_ROOT = "api/v2.0/" + + +class AbstractCrawler: + + def __init__(self, auth, pool=None, session=None): + self.auth = auth + self.pool = pool or ThreadPoolExecutor(max_workers=conf.NB_WORKER) + self.session = session or FuturesSession(executor=self.pool) + self.session.verify = False + self.url = conf.PLATFORM_URL + + def query_pyagg(self, method, urn, data=None): + """A wrapper for internal call, method should be ones you can find + on requests (header, post, get, options, ...), urn the distant + resources you want to access on pyagg, and data, the data you wanna + transmit.""" + if data is None: + data = {} + method = getattr(self.session, method) + return method("%s%s%s" % (self.url, API_ROOT, urn), + auth=self.auth, data=json.dumps(data, + default=default_handler), + headers={'Content-Type': 'application/json', + 'User-Agent': conf.USER_AGENT}) + + def wait(self, max_wait=300, checks=5, wait_for=2): + checked, second_waited = 0, 0 + while True: + time.sleep(wait_for) + second_waited += wait_for + if second_waited > max_wait: + logger.warn('Exiting after %d seconds', second_waited) + break + if self.pool._work_queue.qsize(): + checked = 0 + continue + checked += 1 + if checked == checks: + break + + +class PyAggUpdater(AbstractCrawler): + + def __init__(self, feed, entries, headers, parsed_feed, + auth, pool=None, session=None): + self.feed = feed + self.entries = entries + self.headers = headers + self.parsed_feed = parsed_feed + super().__init__(auth, pool, session) + + def callback(self, response): + """Will process the result from the challenge, creating missing article + and updating the feed""" + article_created = False + if response.result().status_code != 204: + results = response.result().json() + logger.debug('%r %r - %d entries were not matched ' + 'and will be created', + self.feed['id'], self.feed['title'], len(results)) + for id_to_create in results: + article_created = True + entry = construct_article( + self.entries[tuple(sorted(id_to_create.items()))], + self.feed) + logger.info('%r %r - creating %r for %r - %r', self.feed['id'], + self.feed['title'], entry['title'], + entry['user_id'], id_to_create) + self.query_pyagg('post', 'article', entry) + + logger.debug('%r %r - updating feed etag %r last_mod %r', + self.feed['id'], self.feed['title'], + self.headers.get('etag', ''), + self.headers.get('last-modified', '')) + + up_feed = {'error_count': 0, 'last_error': None, + 'etag': self.headers.get('etag', ''), + 'last_modified': self.headers.get('last-modified', + strftime('%a, %d %b %Y %X %Z', gmtime()))} + fresh_feed = construct_feed_from(url=self.feed['link'], + fp_parsed=self.parsed_feed) + for key in ('description', 'site_link', 'icon_url'): + if fresh_feed.get(key) and fresh_feed[key] != self.feed.get(key): + up_feed[key] = fresh_feed[key] + if not self.feed.get('title'): + up_feed['title'] = fresh_feed.get('title', '') + up_feed['user_id'] = self.feed['user_id'] + # re-getting that feed earlier since new entries appeared + if article_created: + up_feed['last_retrieved'] \ + = (datetime.now() - timedelta(minutes=45)).isoformat() + + diff_keys = {key for key in up_feed + if up_feed[key] != self.feed.get(key)} + if not diff_keys: + return # no change in the feed, no update + if not article_created and diff_keys == {'last_modified', 'etag'}: + return # meaningless if no new article has been published + logger.info('%r %r - pushing feed attrs %r', + self.feed['id'], self.feed['title'], + {key: "%s -> %s" % (up_feed[key], self.feed.get(key)) + for key in up_feed if up_feed[key] != self.feed.get(key)}) + + self.query_pyagg('put', 'feed/%d' % self.feed['id'], up_feed) + + +class FeedCrawler(AbstractCrawler): + + def __init__(self, feed, auth, pool=None, session=None): + self.feed = feed + super().__init__(auth, pool, session) + + def clean_feed(self): + """Will reset the errors counters on a feed that have known errors""" + if self.feed.get('error_count') or self.feed.get('last_error'): + self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': 0, 'last_error': ''}) + + def callback(self, response): + """will fetch the feed and interprete results (304, etag) or will + challenge pyagg to compare gotten entries with existing ones""" + try: + response = response.result() + response.raise_for_status() + except Exception as error: + error_count = self.feed['error_count'] + 1 + logger.exception('%r %r - an error occured while fetching ' + 'feed; bumping error count to %r', + self.feed['id'], self.feed['title'], error_count) + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': error_count, + 'last_error': str(error), + 'user_id': self.feed['user_id']}) + return + + if response.status_code == 304: + logger.info("%r %r - feed responded with 304", + self.feed['id'], self.feed['title']) + self.clean_feed() + return + if 'etag' not in response.headers: + logger.debug('%r %r - manually generating etag', + self.feed['id'], self.feed['title']) + response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text) + if response.headers['etag'] and self.feed['etag'] \ + and response.headers['etag'] == self.feed['etag']: + if 'pyagg' in self.feed['etag']: + logger.info("%r %r - calculated hash matches (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + else: + logger.info("%r %r - feed responded with same etag (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + self.clean_feed() + return + else: + logger.debug('%r %r - etag mismatch %r != %r', + self.feed['id'], self.feed['title'], + response.headers['etag'], self.feed['etag']) + logger.info('%r %r - cache validation failed, challenging entries', + self.feed['id'], self.feed['title']) + + ids, entries = [], {} + parsed_response = feedparser.parse(response.content) + for entry in parsed_response['entries']: + entry_ids = extract_id(entry) + entry_ids['feed_id'] = self.feed['id'] + entry_ids['user_id'] = self.feed['user_id'] + entries[tuple(sorted(entry_ids.items()))] = entry + ids.append(entry_ids) + logger.debug('%r %r - found %d entries %r', + self.feed['id'], self.feed['title'], len(ids), ids) + future = self.query_pyagg('get', 'articles/challenge', {'ids': ids}) + updater = PyAggUpdater(self.feed, entries, response.headers, + parsed_response, + self.auth, self.pool, self.session) + future.add_done_callback(updater.callback) + + +class CrawlerScheduler(AbstractCrawler): + + def __init__(self, username, password, pool=None, session=None): + self.auth = (username, password) + super(CrawlerScheduler, self).__init__(self.auth, pool, session) + + def prepare_headers(self, feed): + """For a known feed, will construct some header dictionnary""" + headers = {'User-Agent': conf.USER_AGENT} + if feed.get('last_modified'): + headers['If-Modified-Since'] = feed['last_modified'] + if feed.get('etag') and 'pyagg' not in feed['etag']: + headers['If-None-Match'] = feed['etag'] + logger.debug('%r %r - calculated headers %r', + feed['id'], feed['title'], headers) + return headers + + def callback(self, response): + """processes feeds that need to be fetched""" + response = response.result() + response.raise_for_status() + if response.status_code == 204: + logger.debug("No feed to fetch") + return + feeds = response.json() + logger.debug('%d to fetch %r', len(feeds), feeds) + for feed in feeds: + logger.debug('%r %r - fetching resources', + feed['id'], feed['title']) + future = self.session.get(feed['link'], + headers=self.prepare_headers(feed)) + + feed_crwlr = FeedCrawler(feed, self.auth, self.pool, self.session) + future.add_done_callback(feed_crwlr.callback) + + def run(self, **kwargs): + """entry point, will retreive feeds to be fetch + and launch the whole thing""" + logger.debug('retreving fetchable feed') + future = self.query_pyagg('get', 'feeds/fetchable', kwargs) + future.add_done_callback(self.callback) |