From 4a8438d7f2b7b16941240b91f39a9402c431ffc2 Mon Sep 17 00:00:00 2001 From: François Schmidts Date: Tue, 2 Feb 2016 23:30:57 +0100 Subject: writing a bit of doc, moving crawler together --- src/crawler.py | 168 --------------------------- src/crawler/classic_crawler.py | 168 +++++++++++++++++++++++++++ src/crawler/http_crawler.py | 251 +++++++++++++++++++++++++++++++++++++++++ src/manager.py | 6 +- src/web/lib/crawler.py | 251 ----------------------------------------- src/web/utils.py | 4 +- 6 files changed, 424 insertions(+), 424 deletions(-) delete mode 100644 src/crawler.py create mode 100644 src/crawler/classic_crawler.py create mode 100644 src/crawler/http_crawler.py delete mode 100644 src/web/lib/crawler.py (limited to 'src') diff --git a/src/crawler.py b/src/crawler.py deleted file mode 100644 index 0598c418..00000000 --- a/src/crawler.py +++ /dev/null @@ -1,168 +0,0 @@ -#! /usr/bin/env python -# -*- coding: utf-8 - - -# jarr - A Web based news aggregator. -# Copyright (C) 2010-2015 Cédric Bonhomme - https://www.JARR-aggregator.org -# -# For more information : https://github.com/JARR-aggregator/JARR/ -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -__author__ = "Cedric Bonhomme" -__version__ = "$Revision: 3.3 $" -__date__ = "$Date: 2010/09/02 $" -__revision__ = "$Date: 2015/12/07 $" -__copyright__ = "Copyright (c) Cedric Bonhomme" -__license__ = "AGPLv3" - -import asyncio -import logging -import feedparser -import dateutil.parser -from datetime import datetime -from sqlalchemy import or_ - -import conf -from bootstrap import db -from web.models import User -from web.controllers import FeedController, ArticleController -from web.lib.feed_utils import construct_feed_from, is_parsing_ok -from web.lib.article_utils import construct_article, extract_id - -logger = logging.getLogger(__name__) - -sem = asyncio.Semaphore(5) - -import ssl -try: - _create_unverified_https_context = ssl._create_unverified_context -except AttributeError: - # Legacy Python that doesn't verify HTTPS certificates by default - pass -else: - # Handle target environment that doesn't support HTTPS verification - ssl._create_default_https_context = _create_unverified_https_context - - -async def get(*args, **kwargs): - #kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False) - try: - data = feedparser.parse(args[0]) - return data - except Exception as e: - raise e - - -async def parse_feed(user, feed): - """ - Fetch a feed. - Update the feed and return the articles. - """ - parsed_feed = None - up_feed = {} - articles = [] - with (await sem): - try: - parsed_feed = await get(feed.link) - except Exception as e: - up_feed['last_error'] = str(e) - up_feed['error_count'] = feed.error_count + 1 - finally: - up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal()) - if parsed_feed is None: - FeedController().update({'id': feed.id}, up_feed) - return - - if not is_parsing_ok(parsed_feed): - up_feed['last_error'] = str(parsed_feed['bozo_exception']) - up_feed['error_count'] = feed.error_count + 1 - FeedController().update({'id': feed.id}, up_feed) - return - if parsed_feed['entries'] != []: - articles = parsed_feed['entries'] - - up_feed['error_count'] = 0 - up_feed['last_error'] = "" - - # Feed informations - construct_feed_from(feed.link, parsed_feed).update(up_feed) - if feed.title and 'title' in up_feed: - # do not override the title set by the user - del up_feed['title'] - FeedController().update({'id': feed.id}, up_feed) - - return articles - - -async def insert_database(user, feed): - - articles = await parse_feed(user, feed) - if None is articles: - return [] - - logger.debug('inserting articles for {}'.format(feed.title)) - - logger.info("Database insertion...") - new_articles = [] - art_contr = ArticleController(user.id) - for article in articles: - exist = art_contr.read(feed_id=feed.id, - **extract_id(article)).count() != 0 - if exist: - logger.debug("Article %r (%r) already in the database.", - article['title'], article['link']) - continue - article = construct_article(article, feed) - try: - new_articles.append(art_contr.create(**article)) - logger.info("New article % (%r) added.", - article['title'], article['link']) - except Exception: - logger.exception("Error when inserting article in database:") - continue - return new_articles - - -async def init_process(user, feed): - # Fetch the feed and insert new articles in the database - articles = await insert_database(user, feed) - logger.debug('inserted articles for %s', feed.title) - return articles - - -def retrieve_feed(loop, user, feed_id=None): - """ - Launch the processus. - """ - logger.info("Starting to retrieve feeds.") - - # Get the list of feeds to fetch - user = User.query.filter(User.email == user.email).first() - feeds = [feed for feed in user.feeds if - feed.error_count <= conf.DEFAULT_MAX_ERROR and feed.enabled] - if feed_id is not None: - feeds = [feed for feed in feeds if feed.id == feed_id] - - if feeds == []: - return - - # Launch the process for all the feeds - tasks = [asyncio.ensure_future(init_process(user, feed)) for feed in feeds] - - try: - loop.run_until_complete(asyncio.wait(tasks)) - except Exception: - logger.exception('an error occured') - - logger.info("All articles retrieved. End of the processus.") diff --git a/src/crawler/classic_crawler.py b/src/crawler/classic_crawler.py new file mode 100644 index 00000000..0598c418 --- /dev/null +++ b/src/crawler/classic_crawler.py @@ -0,0 +1,168 @@ +#! /usr/bin/env python +# -*- coding: utf-8 - + +# jarr - A Web based news aggregator. +# Copyright (C) 2010-2015 Cédric Bonhomme - https://www.JARR-aggregator.org +# +# For more information : https://github.com/JARR-aggregator/JARR/ +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +__author__ = "Cedric Bonhomme" +__version__ = "$Revision: 3.3 $" +__date__ = "$Date: 2010/09/02 $" +__revision__ = "$Date: 2015/12/07 $" +__copyright__ = "Copyright (c) Cedric Bonhomme" +__license__ = "AGPLv3" + +import asyncio +import logging +import feedparser +import dateutil.parser +from datetime import datetime +from sqlalchemy import or_ + +import conf +from bootstrap import db +from web.models import User +from web.controllers import FeedController, ArticleController +from web.lib.feed_utils import construct_feed_from, is_parsing_ok +from web.lib.article_utils import construct_article, extract_id + +logger = logging.getLogger(__name__) + +sem = asyncio.Semaphore(5) + +import ssl +try: + _create_unverified_https_context = ssl._create_unverified_context +except AttributeError: + # Legacy Python that doesn't verify HTTPS certificates by default + pass +else: + # Handle target environment that doesn't support HTTPS verification + ssl._create_default_https_context = _create_unverified_https_context + + +async def get(*args, **kwargs): + #kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False) + try: + data = feedparser.parse(args[0]) + return data + except Exception as e: + raise e + + +async def parse_feed(user, feed): + """ + Fetch a feed. + Update the feed and return the articles. + """ + parsed_feed = None + up_feed = {} + articles = [] + with (await sem): + try: + parsed_feed = await get(feed.link) + except Exception as e: + up_feed['last_error'] = str(e) + up_feed['error_count'] = feed.error_count + 1 + finally: + up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal()) + if parsed_feed is None: + FeedController().update({'id': feed.id}, up_feed) + return + + if not is_parsing_ok(parsed_feed): + up_feed['last_error'] = str(parsed_feed['bozo_exception']) + up_feed['error_count'] = feed.error_count + 1 + FeedController().update({'id': feed.id}, up_feed) + return + if parsed_feed['entries'] != []: + articles = parsed_feed['entries'] + + up_feed['error_count'] = 0 + up_feed['last_error'] = "" + + # Feed informations + construct_feed_from(feed.link, parsed_feed).update(up_feed) + if feed.title and 'title' in up_feed: + # do not override the title set by the user + del up_feed['title'] + FeedController().update({'id': feed.id}, up_feed) + + return articles + + +async def insert_database(user, feed): + + articles = await parse_feed(user, feed) + if None is articles: + return [] + + logger.debug('inserting articles for {}'.format(feed.title)) + + logger.info("Database insertion...") + new_articles = [] + art_contr = ArticleController(user.id) + for article in articles: + exist = art_contr.read(feed_id=feed.id, + **extract_id(article)).count() != 0 + if exist: + logger.debug("Article %r (%r) already in the database.", + article['title'], article['link']) + continue + article = construct_article(article, feed) + try: + new_articles.append(art_contr.create(**article)) + logger.info("New article % (%r) added.", + article['title'], article['link']) + except Exception: + logger.exception("Error when inserting article in database:") + continue + return new_articles + + +async def init_process(user, feed): + # Fetch the feed and insert new articles in the database + articles = await insert_database(user, feed) + logger.debug('inserted articles for %s', feed.title) + return articles + + +def retrieve_feed(loop, user, feed_id=None): + """ + Launch the processus. + """ + logger.info("Starting to retrieve feeds.") + + # Get the list of feeds to fetch + user = User.query.filter(User.email == user.email).first() + feeds = [feed for feed in user.feeds if + feed.error_count <= conf.DEFAULT_MAX_ERROR and feed.enabled] + if feed_id is not None: + feeds = [feed for feed in feeds if feed.id == feed_id] + + if feeds == []: + return + + # Launch the process for all the feeds + tasks = [asyncio.ensure_future(init_process(user, feed)) for feed in feeds] + + try: + loop.run_until_complete(asyncio.wait(tasks)) + except Exception: + logger.exception('an error occured') + + logger.info("All articles retrieved. End of the processus.") diff --git a/src/crawler/http_crawler.py b/src/crawler/http_crawler.py new file mode 100644 index 00000000..f480fe96 --- /dev/null +++ b/src/crawler/http_crawler.py @@ -0,0 +1,251 @@ +""" +Here's a sum up on how it works : + +CrawlerScheduler.run + will retreive a list of feeds to be refreshed and pass result to +CrawlerScheduler.callback + which will retreive each feed and treat result with +FeedCrawler.callback + which will interprete the result (status_code, etag) collect ids + and match them agaisnt pyagg which will cause +PyAggUpdater.callback + to create the missing entries +""" + +import time +import conf +import json +import logging +import feedparser +from datetime import datetime, timedelta +from time import strftime, gmtime +from concurrent.futures import ThreadPoolExecutor +from requests_futures.sessions import FuturesSession +from web.lib.utils import default_handler, to_hash +from web.lib.feed_utils import construct_feed_from +from web.lib.article_utils import extract_id, construct_article + +logger = logging.getLogger(__name__) +logging.captureWarnings(True) +API_ROOT = "api/v2.0/" + + +class AbstractCrawler: + + def __init__(self, auth, pool=None, session=None): + self.auth = auth + self.pool = pool or ThreadPoolExecutor(max_workers=conf.NB_WORKER) + self.session = session or FuturesSession(executor=self.pool) + self.session.verify = False + self.url = conf.PLATFORM_URL + + def query_pyagg(self, method, urn, data=None): + """A wrapper for internal call, method should be ones you can find + on requests (header, post, get, options, ...), urn the distant + resources you want to access on pyagg, and data, the data you wanna + transmit.""" + if data is None: + data = {} + method = getattr(self.session, method) + return method("%s%s%s" % (self.url, API_ROOT, urn), + auth=self.auth, data=json.dumps(data, + default=default_handler), + headers={'Content-Type': 'application/json', + 'User-Agent': conf.USER_AGENT}) + + def wait(self, max_wait=300, checks=5, wait_for=2): + checked, second_waited = 0, 0 + while True: + time.sleep(wait_for) + second_waited += wait_for + if second_waited > max_wait: + logger.warn('Exiting after %d seconds', second_waited) + break + if self.pool._work_queue.qsize(): + checked = 0 + continue + checked += 1 + if checked == checks: + break + + +class PyAggUpdater(AbstractCrawler): + + def __init__(self, feed, entries, headers, parsed_feed, + auth, pool=None, session=None): + self.feed = feed + self.entries = entries + self.headers = headers + self.parsed_feed = parsed_feed + super().__init__(auth, pool, session) + + def callback(self, response): + """Will process the result from the challenge, creating missing article + and updating the feed""" + article_created = False + if response.result().status_code != 204: + results = response.result().json() + logger.debug('%r %r - %d entries were not matched ' + 'and will be created', + self.feed['id'], self.feed['title'], len(results)) + for id_to_create in results: + article_created = True + entry = construct_article( + self.entries[tuple(sorted(id_to_create.items()))], + self.feed) + logger.info('%r %r - creating %r for %r - %r', self.feed['id'], + self.feed['title'], entry['title'], + entry['user_id'], id_to_create) + self.query_pyagg('post', 'article', entry) + + logger.debug('%r %r - updating feed etag %r last_mod %r', + self.feed['id'], self.feed['title'], + self.headers.get('etag', ''), + self.headers.get('last-modified', '')) + + up_feed = {'error_count': 0, 'last_error': None, + 'etag': self.headers.get('etag', ''), + 'last_modified': self.headers.get('last-modified', + strftime('%a, %d %b %Y %X %Z', gmtime()))} + fresh_feed = construct_feed_from(url=self.feed['link'], + fp_parsed=self.parsed_feed) + for key in ('description', 'site_link', 'icon_url'): + if fresh_feed.get(key) and fresh_feed[key] != self.feed.get(key): + up_feed[key] = fresh_feed[key] + if not self.feed.get('title'): + up_feed['title'] = fresh_feed.get('title', '') + up_feed['user_id'] = self.feed['user_id'] + # re-getting that feed earlier since new entries appeared + if article_created: + up_feed['last_retrieved'] \ + = (datetime.now() - timedelta(minutes=45)).isoformat() + + diff_keys = {key for key in up_feed + if up_feed[key] != self.feed.get(key)} + if not diff_keys: + return # no change in the feed, no update + if not article_created and diff_keys == {'last_modified', 'etag'}: + return # meaningless if no new article has been published + logger.info('%r %r - pushing feed attrs %r', + self.feed['id'], self.feed['title'], + {key: "%s -> %s" % (up_feed[key], self.feed.get(key)) + for key in up_feed if up_feed[key] != self.feed.get(key)}) + + self.query_pyagg('put', 'feed/%d' % self.feed['id'], up_feed) + + +class FeedCrawler(AbstractCrawler): + + def __init__(self, feed, auth, pool=None, session=None): + self.feed = feed + super().__init__(auth, pool, session) + + def clean_feed(self): + """Will reset the errors counters on a feed that have known errors""" + if self.feed.get('error_count') or self.feed.get('last_error'): + self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': 0, 'last_error': ''}) + + def callback(self, response): + """will fetch the feed and interprete results (304, etag) or will + challenge pyagg to compare gotten entries with existing ones""" + try: + response = response.result() + response.raise_for_status() + except Exception as error: + error_count = self.feed['error_count'] + 1 + logger.exception('%r %r - an error occured while fetching ' + 'feed; bumping error count to %r', + self.feed['id'], self.feed['title'], error_count) + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': error_count, + 'last_error': str(error), + 'user_id': self.feed['user_id']}) + return + + if response.status_code == 304: + logger.info("%r %r - feed responded with 304", + self.feed['id'], self.feed['title']) + self.clean_feed() + return + if 'etag' not in response.headers: + logger.debug('%r %r - manually generating etag', + self.feed['id'], self.feed['title']) + response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text) + if response.headers['etag'] and self.feed['etag'] \ + and response.headers['etag'] == self.feed['etag']: + if 'pyagg' in self.feed['etag']: + logger.info("%r %r - calculated hash matches (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + else: + logger.info("%r %r - feed responded with same etag (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + self.clean_feed() + return + else: + logger.debug('%r %r - etag mismatch %r != %r', + self.feed['id'], self.feed['title'], + response.headers['etag'], self.feed['etag']) + logger.info('%r %r - cache validation failed, challenging entries', + self.feed['id'], self.feed['title']) + + ids, entries = [], {} + parsed_response = feedparser.parse(response.content) + for entry in parsed_response['entries']: + entry_ids = extract_id(entry) + entry_ids['feed_id'] = self.feed['id'] + entry_ids['user_id'] = self.feed['user_id'] + entries[tuple(sorted(entry_ids.items()))] = entry + ids.append(entry_ids) + logger.debug('%r %r - found %d entries %r', + self.feed['id'], self.feed['title'], len(ids), ids) + future = self.query_pyagg('get', 'articles/challenge', {'ids': ids}) + updater = PyAggUpdater(self.feed, entries, response.headers, + parsed_response, + self.auth, self.pool, self.session) + future.add_done_callback(updater.callback) + + +class CrawlerScheduler(AbstractCrawler): + + def __init__(self, username, password, pool=None, session=None): + self.auth = (username, password) + super(CrawlerScheduler, self).__init__(self.auth, pool, session) + + def prepare_headers(self, feed): + """For a known feed, will construct some header dictionnary""" + headers = {'User-Agent': conf.USER_AGENT} + if feed.get('last_modified'): + headers['If-Modified-Since'] = feed['last_modified'] + if feed.get('etag') and 'pyagg' not in feed['etag']: + headers['If-None-Match'] = feed['etag'] + logger.debug('%r %r - calculated headers %r', + feed['id'], feed['title'], headers) + return headers + + def callback(self, response): + """processes feeds that need to be fetched""" + response = response.result() + response.raise_for_status() + if response.status_code == 204: + logger.debug("No feed to fetch") + return + feeds = response.json() + logger.debug('%d to fetch %r', len(feeds), feeds) + for feed in feeds: + logger.debug('%r %r - fetching resources', + feed['id'], feed['title']) + future = self.session.get(feed['link'], + headers=self.prepare_headers(feed)) + + feed_crwlr = FeedCrawler(feed, self.auth, self.pool, self.session) + future.add_done_callback(feed_crwlr.callback) + + def run(self, **kwargs): + """entry point, will retreive feeds to be fetch + and launch the whole thing""" + logger.debug('retreving fetchable feed') + future = self.query_pyagg('get', 'feeds/fetchable', kwargs) + future.add_done_callback(self.callback) diff --git a/src/manager.py b/src/manager.py index f7240670..781d742b 100755 --- a/src/manager.py +++ b/src/manager.py @@ -32,7 +32,7 @@ def db_create(): @manager.command def fetch(limit=100, retreive_all=False): "Crawl the feeds with the client crawler." - from web.lib.crawler import CrawlerScheduler + from crawler.http_crawler import CrawlerScheduler scheduler = CrawlerScheduler(conf.API_LOGIN, conf.API_PASSWD) scheduler.run(limit=limit, retreive_all=retreive_all) scheduler.wait() @@ -47,7 +47,7 @@ def fetch_asyncio(user_id, feed_id): populate_g() from flask import g from web.models import User - import crawler + from crawler import classic_crawler users = [] try: users = User.query.filter(User.id == int(user_id)).all() @@ -67,7 +67,7 @@ def fetch_asyncio(user_id, feed_id): if user.activation_key == "": print("Fetching articles for " + user.nickname) g.user = user - crawler.retrieve_feed(loop, g.user, feed_id) + classic_crawler.retrieve_feed(loop, g.user, feed_id) loop.close() from scripts.probes import ArticleProbe, FeedProbe diff --git a/src/web/lib/crawler.py b/src/web/lib/crawler.py deleted file mode 100644 index f480fe96..00000000 --- a/src/web/lib/crawler.py +++ /dev/null @@ -1,251 +0,0 @@ -""" -Here's a sum up on how it works : - -CrawlerScheduler.run - will retreive a list of feeds to be refreshed and pass result to -CrawlerScheduler.callback - which will retreive each feed and treat result with -FeedCrawler.callback - which will interprete the result (status_code, etag) collect ids - and match them agaisnt pyagg which will cause -PyAggUpdater.callback - to create the missing entries -""" - -import time -import conf -import json -import logging -import feedparser -from datetime import datetime, timedelta -from time import strftime, gmtime -from concurrent.futures import ThreadPoolExecutor -from requests_futures.sessions import FuturesSession -from web.lib.utils import default_handler, to_hash -from web.lib.feed_utils import construct_feed_from -from web.lib.article_utils import extract_id, construct_article - -logger = logging.getLogger(__name__) -logging.captureWarnings(True) -API_ROOT = "api/v2.0/" - - -class AbstractCrawler: - - def __init__(self, auth, pool=None, session=None): - self.auth = auth - self.pool = pool or ThreadPoolExecutor(max_workers=conf.NB_WORKER) - self.session = session or FuturesSession(executor=self.pool) - self.session.verify = False - self.url = conf.PLATFORM_URL - - def query_pyagg(self, method, urn, data=None): - """A wrapper for internal call, method should be ones you can find - on requests (header, post, get, options, ...), urn the distant - resources you want to access on pyagg, and data, the data you wanna - transmit.""" - if data is None: - data = {} - method = getattr(self.session, method) - return method("%s%s%s" % (self.url, API_ROOT, urn), - auth=self.auth, data=json.dumps(data, - default=default_handler), - headers={'Content-Type': 'application/json', - 'User-Agent': conf.USER_AGENT}) - - def wait(self, max_wait=300, checks=5, wait_for=2): - checked, second_waited = 0, 0 - while True: - time.sleep(wait_for) - second_waited += wait_for - if second_waited > max_wait: - logger.warn('Exiting after %d seconds', second_waited) - break - if self.pool._work_queue.qsize(): - checked = 0 - continue - checked += 1 - if checked == checks: - break - - -class PyAggUpdater(AbstractCrawler): - - def __init__(self, feed, entries, headers, parsed_feed, - auth, pool=None, session=None): - self.feed = feed - self.entries = entries - self.headers = headers - self.parsed_feed = parsed_feed - super().__init__(auth, pool, session) - - def callback(self, response): - """Will process the result from the challenge, creating missing article - and updating the feed""" - article_created = False - if response.result().status_code != 204: - results = response.result().json() - logger.debug('%r %r - %d entries were not matched ' - 'and will be created', - self.feed['id'], self.feed['title'], len(results)) - for id_to_create in results: - article_created = True - entry = construct_article( - self.entries[tuple(sorted(id_to_create.items()))], - self.feed) - logger.info('%r %r - creating %r for %r - %r', self.feed['id'], - self.feed['title'], entry['title'], - entry['user_id'], id_to_create) - self.query_pyagg('post', 'article', entry) - - logger.debug('%r %r - updating feed etag %r last_mod %r', - self.feed['id'], self.feed['title'], - self.headers.get('etag', ''), - self.headers.get('last-modified', '')) - - up_feed = {'error_count': 0, 'last_error': None, - 'etag': self.headers.get('etag', ''), - 'last_modified': self.headers.get('last-modified', - strftime('%a, %d %b %Y %X %Z', gmtime()))} - fresh_feed = construct_feed_from(url=self.feed['link'], - fp_parsed=self.parsed_feed) - for key in ('description', 'site_link', 'icon_url'): - if fresh_feed.get(key) and fresh_feed[key] != self.feed.get(key): - up_feed[key] = fresh_feed[key] - if not self.feed.get('title'): - up_feed['title'] = fresh_feed.get('title', '') - up_feed['user_id'] = self.feed['user_id'] - # re-getting that feed earlier since new entries appeared - if article_created: - up_feed['last_retrieved'] \ - = (datetime.now() - timedelta(minutes=45)).isoformat() - - diff_keys = {key for key in up_feed - if up_feed[key] != self.feed.get(key)} - if not diff_keys: - return # no change in the feed, no update - if not article_created and diff_keys == {'last_modified', 'etag'}: - return # meaningless if no new article has been published - logger.info('%r %r - pushing feed attrs %r', - self.feed['id'], self.feed['title'], - {key: "%s -> %s" % (up_feed[key], self.feed.get(key)) - for key in up_feed if up_feed[key] != self.feed.get(key)}) - - self.query_pyagg('put', 'feed/%d' % self.feed['id'], up_feed) - - -class FeedCrawler(AbstractCrawler): - - def __init__(self, feed, auth, pool=None, session=None): - self.feed = feed - super().__init__(auth, pool, session) - - def clean_feed(self): - """Will reset the errors counters on a feed that have known errors""" - if self.feed.get('error_count') or self.feed.get('last_error'): - self.query_pyagg('put', 'feed/%d' % self.feed['id'], - {'error_count': 0, 'last_error': ''}) - - def callback(self, response): - """will fetch the feed and interprete results (304, etag) or will - challenge pyagg to compare gotten entries with existing ones""" - try: - response = response.result() - response.raise_for_status() - except Exception as error: - error_count = self.feed['error_count'] + 1 - logger.exception('%r %r - an error occured while fetching ' - 'feed; bumping error count to %r', - self.feed['id'], self.feed['title'], error_count) - future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], - {'error_count': error_count, - 'last_error': str(error), - 'user_id': self.feed['user_id']}) - return - - if response.status_code == 304: - logger.info("%r %r - feed responded with 304", - self.feed['id'], self.feed['title']) - self.clean_feed() - return - if 'etag' not in response.headers: - logger.debug('%r %r - manually generating etag', - self.feed['id'], self.feed['title']) - response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text) - if response.headers['etag'] and self.feed['etag'] \ - and response.headers['etag'] == self.feed['etag']: - if 'pyagg' in self.feed['etag']: - logger.info("%r %r - calculated hash matches (%d)", - self.feed['id'], self.feed['title'], - response.status_code) - else: - logger.info("%r %r - feed responded with same etag (%d)", - self.feed['id'], self.feed['title'], - response.status_code) - self.clean_feed() - return - else: - logger.debug('%r %r - etag mismatch %r != %r', - self.feed['id'], self.feed['title'], - response.headers['etag'], self.feed['etag']) - logger.info('%r %r - cache validation failed, challenging entries', - self.feed['id'], self.feed['title']) - - ids, entries = [], {} - parsed_response = feedparser.parse(response.content) - for entry in parsed_response['entries']: - entry_ids = extract_id(entry) - entry_ids['feed_id'] = self.feed['id'] - entry_ids['user_id'] = self.feed['user_id'] - entries[tuple(sorted(entry_ids.items()))] = entry - ids.append(entry_ids) - logger.debug('%r %r - found %d entries %r', - self.feed['id'], self.feed['title'], len(ids), ids) - future = self.query_pyagg('get', 'articles/challenge', {'ids': ids}) - updater = PyAggUpdater(self.feed, entries, response.headers, - parsed_response, - self.auth, self.pool, self.session) - future.add_done_callback(updater.callback) - - -class CrawlerScheduler(AbstractCrawler): - - def __init__(self, username, password, pool=None, session=None): - self.auth = (username, password) - super(CrawlerScheduler, self).__init__(self.auth, pool, session) - - def prepare_headers(self, feed): - """For a known feed, will construct some header dictionnary""" - headers = {'User-Agent': conf.USER_AGENT} - if feed.get('last_modified'): - headers['If-Modified-Since'] = feed['last_modified'] - if feed.get('etag') and 'pyagg' not in feed['etag']: - headers['If-None-Match'] = feed['etag'] - logger.debug('%r %r - calculated headers %r', - feed['id'], feed['title'], headers) - return headers - - def callback(self, response): - """processes feeds that need to be fetched""" - response = response.result() - response.raise_for_status() - if response.status_code == 204: - logger.debug("No feed to fetch") - return - feeds = response.json() - logger.debug('%d to fetch %r', len(feeds), feeds) - for feed in feeds: - logger.debug('%r %r - fetching resources', - feed['id'], feed['title']) - future = self.session.get(feed['link'], - headers=self.prepare_headers(feed)) - - feed_crwlr = FeedCrawler(feed, self.auth, self.pool, self.session) - future.add_done_callback(feed_crwlr.callback) - - def run(self, **kwargs): - """entry point, will retreive feeds to be fetch - and launch the whole thing""" - logger.debug('retreving fetchable feed') - future = self.query_pyagg('get', 'feeds/fetchable', kwargs) - future.add_done_callback(self.callback) diff --git a/src/web/utils.py b/src/web/utils.py index fcd791e8..1d4b30ab 100755 --- a/src/web/utils.py +++ b/src/web/utils.py @@ -109,8 +109,8 @@ def fetch(id, feed_id=None): Fetch the feeds in a new processus. The "asyncio" crawler is launched with the manager. """ - cmd = [sys.executable, conf.BASE_DIR+'/manager.py', 'fetch_asyncio', str(id), - str(feed_id)] + cmd = [sys.executable, conf.BASE_DIR + '/manager.py', 'fetch_asyncio', + str(id), str(feed_id)] return subprocess.Popen(cmd, stdout=subprocess.PIPE) def history(user_id, year=None, month=None): -- cgit