diff options
Diffstat (limited to 'src/web/lib')
-rw-r--r-- | src/web/lib/__init__.py | 0 | ||||
-rw-r--r-- | src/web/lib/article_utils.py | 74 | ||||
-rw-r--r-- | src/web/lib/crawler.py | 252 | ||||
-rw-r--r-- | src/web/lib/feed_utils.py | 96 | ||||
-rw-r--r-- | src/web/lib/utils.py | 57 | ||||
-rw-r--r-- | src/web/lib/view_utils.py | 26 |
6 files changed, 505 insertions, 0 deletions
diff --git a/src/web/lib/__init__.py b/src/web/lib/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/web/lib/__init__.py diff --git a/src/web/lib/article_utils.py b/src/web/lib/article_utils.py new file mode 100644 index 00000000..02ca2cd1 --- /dev/null +++ b/src/web/lib/article_utils.py @@ -0,0 +1,74 @@ +import logging +import requests +import dateutil.parser +from datetime import datetime + +import conf +from web.lib.utils import to_hash + +logger = logging.getLogger(__name__) + + +def extract_id(entry, keys=[('link', 'link'), ('published', 'date'), + ('updated', 'date')], force_id=False): + """For a given entry will return a dict that allows to identify it. The + dict will be constructed on the uid of the entry. if that identifier is + absent, the dict will be constructed upon the values of "keys". + """ + entry_id = entry.get('entry_id') or entry.get('id') + if entry_id: + return {'entry_id': entry_id} + if not entry_id and force_id: + return to_hash("".join(entry[entry_key] for _, entry_key in keys + if entry_key in entry).encode('utf8')) + else: + ids = {} + for entry_key, pyagg_key in keys: + if entry_key in entry and pyagg_key not in ids: + ids[pyagg_key] = entry[entry_key] + if 'date' in pyagg_key: + ids[pyagg_key] = dateutil.parser.parse(ids[pyagg_key])\ + .isoformat() + return ids + + +def construct_article(entry, feed): + if hasattr(feed, 'dump'): # this way can be a sqlalchemy obj or a dict + feed = feed.dump() + "Safe method to transorm a feedparser entry into an article" + now = datetime.now() + date = None + for date_key in ('published', 'updated'): + if entry.get(date_key): + try: + date = dateutil.parser.parse(entry[date_key]) + except Exception: + pass + else: + break + content = '' + if entry.get('content'): + content = entry['content'][0]['value'] + elif entry.get('summary'): + content = entry['summary'] + + article_link = entry.get('link') + if conf.RESOLVE_ARTICLE_URL and article_link: + try: + # resolves URL behind proxies + # (like feedproxy.google.com) + response = requests.get(article_link, verify=False, timeout=5.0) + article_link = response.url + except Exception as error: + logger.warning("Unable to get the real URL of %s. Error: %s", + article_link, error) + + return {'feed_id': feed['id'], + 'user_id': feed['user_id'], + 'entry_id': extract_id(entry).get('entry_id', None), + 'link': entry.get('link', feed['site_link']), + 'title': entry.get('title', 'No title'), + 'readed': False, 'like': False, + 'content': content, + 'retrieved_date': now.isoformat(), + 'date': (date or now).isoformat()} diff --git a/src/web/lib/crawler.py b/src/web/lib/crawler.py new file mode 100644 index 00000000..90a268e8 --- /dev/null +++ b/src/web/lib/crawler.py @@ -0,0 +1,252 @@ +""" +Here's a sum up on how it works : + +CrawlerScheduler.run + will retreive a list of feeds to be refreshed and pass result to +CrawlerScheduler.callback + which will retreive each feed and treat result with +FeedCrawler.callback + which will interprete the result (status_code, etag) collect ids + and match them agaisnt pyagg which will cause +PyAggUpdater.callback + to create the missing entries +""" + +import time +import conf +import json +import logging +import feedparser +from datetime import datetime, timedelta +from functools import wraps +from time import strftime, gmtime +from concurrent.futures import ThreadPoolExecutor +from requests_futures.sessions import FuturesSession +from web.lib.utils import default_handler, to_hash +from web.lib.feed_utils import construct_feed_from +from web.lib.article_utils import extract_id, construct_article + +logger = logging.getLogger(__name__) +logging.captureWarnings(True) +API_ROOT = "api/v2.0/" + + +class AbstractCrawler: + + def __init__(self, auth, pool=None, session=None): + self.auth = auth + self.pool = pool or ThreadPoolExecutor(max_workers=conf.NB_WORKER) + self.session = session or FuturesSession(executor=self.pool) + self.session.verify = False + self.url = conf.PLATFORM_URL + + def query_pyagg(self, method, urn, data=None): + """A wrapper for internal call, method should be ones you can find + on requests (header, post, get, options, ...), urn the distant + resources you want to access on pyagg, and data, the data you wanna + transmit.""" + if data is None: + data = {} + method = getattr(self.session, method) + return method("%s%s%s" % (self.url, API_ROOT, urn), + auth=self.auth, data=json.dumps(data, + default=default_handler), + headers={'Content-Type': 'application/json', + 'User-Agent': 'jarr'}) + + def wait(self, max_wait=300, checks=5, wait_for=2): + checked, second_waited = 0, 0 + while True: + time.sleep(wait_for) + second_waited += wait_for + if second_waited > max_wait: + logger.warn('Exiting after %d seconds', second_waited) + break + if self.pool._work_queue.qsize(): + checked = 0 + continue + checked += 1 + if checked == checks: + break + + +class PyAggUpdater(AbstractCrawler): + + def __init__(self, feed, entries, headers, parsed_feed, + auth, pool=None, session=None): + self.feed = feed + self.entries = entries + self.headers = headers + self.parsed_feed = parsed_feed + super().__init__(auth, pool, session) + + def callback(self, response): + """Will process the result from the challenge, creating missing article + and updating the feed""" + article_created = False + if response.result().status_code != 204: + results = response.result().json() + logger.debug('%r %r - %d entries were not matched ' + 'and will be created', + self.feed['id'], self.feed['title'], len(results)) + for id_to_create in results: + article_created = True + entry = construct_article( + self.entries[tuple(sorted(id_to_create.items()))], + self.feed) + logger.info('%r %r - creating %r for %r - %r', self.feed['id'], + self.feed['title'], entry['title'], + entry['user_id'], id_to_create) + self.query_pyagg('post', 'article', entry) + + logger.debug('%r %r - updating feed etag %r last_mod %r', + self.feed['id'], self.feed['title'], + self.headers.get('etag', ''), + self.headers.get('last-modified', '')) + + up_feed = {'error_count': 0, 'last_error': None, + 'etag': self.headers.get('etag', ''), + 'last_modified': self.headers.get('last-modified', + strftime('%a, %d %b %Y %X %Z', gmtime()))} + fresh_feed = construct_feed_from(url=self.feed['link'], + fp_parsed=self.parsed_feed) + for key in ('description', 'site_link', 'icon_url'): + if fresh_feed.get(key) and fresh_feed[key] != self.feed.get(key): + up_feed[key] = fresh_feed[key] + if not self.feed.get('title'): + up_feed['title'] = fresh_feed.get('title', '') + up_feed['user_id'] = self.feed['user_id'] + # re-getting that feed earlier since new entries appeared + if article_created: + up_feed['last_retrieved'] \ + = (datetime.now() - timedelta(minutes=45)).isoformat() + + diff_keys = {key for key in up_feed + if up_feed[key] != self.feed.get(key)} + if not diff_keys: + return # no change in the feed, no update + if not article_created and diff_keys == {'last_modified', 'etag'}: + return # meaningless if no new article has been published + logger.info('%r %r - pushing feed attrs %r', + self.feed['id'], self.feed['title'], + {key: "%s -> %s" % (up_feed[key], self.feed.get(key)) + for key in up_feed if up_feed[key] != self.feed.get(key)}) + + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], up_feed) + + +class FeedCrawler(AbstractCrawler): + + def __init__(self, feed, auth, pool=None, session=None): + self.feed = feed + super().__init__(auth, pool, session) + + def clean_feed(self): + """Will reset the errors counters on a feed that have known errors""" + if self.feed.get('error_count') or self.feed.get('last_error'): + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': 0, 'last_error': ''}) + + def callback(self, response): + """will fetch the feed and interprete results (304, etag) or will + challenge pyagg to compare gotten entries with existing ones""" + try: + response = response.result() + response.raise_for_status() + except Exception as error: + error_count = self.feed['error_count'] + 1 + logger.error('%r %r - an error occured while fetching ' + 'feed; bumping error count to %r', self.feed['id'], + self.feed['title'], error_count) + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': error_count, + 'last_error': str(error), + 'user_id': self.feed['user_id']}) + return + + if response.status_code == 304: + logger.info("%r %r - feed responded with 304", + self.feed['id'], self.feed['title']) + self.clean_feed() + return + if 'etag' not in response.headers: + logger.debug('%r %r - manually generating etag', + self.feed['id'], self.feed['title']) + response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text) + if response.headers['etag'] and self.feed['etag'] \ + and response.headers['etag'] == self.feed['etag']: + if 'pyagg' in self.feed['etag']: + logger.info("%r %r - calculated hash matches (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + else: + logger.info("%r %r - feed responded with same etag (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + self.clean_feed() + return + else: + logger.debug('%r %r - etag mismatch %r != %r', + self.feed['id'], self.feed['title'], + response.headers['etag'], self.feed['etag']) + logger.info('%r %r - cache validation failed, challenging entries', + self.feed['id'], self.feed['title']) + + ids, entries = [], {} + parsed_response = feedparser.parse(response.content) + for entry in parsed_response['entries']: + entry_ids = extract_id(entry) + entry_ids['feed_id'] = self.feed['id'] + entry_ids['user_id'] = self.feed['user_id'] + entries[tuple(sorted(entry_ids.items()))] = entry + ids.append(entry_ids) + logger.debug('%r %r - found %d entries %r', + self.feed['id'], self.feed['title'], len(ids), ids) + future = self.query_pyagg('get', 'articles/challenge', {'ids': ids}) + updater = PyAggUpdater(self.feed, entries, response.headers, + parsed_response, + self.auth, self.pool, self.session) + future.add_done_callback(updater.callback) + + +class CrawlerScheduler(AbstractCrawler): + + def __init__(self, username, password, pool=None, session=None): + self.auth = (username, password) + super(CrawlerScheduler, self).__init__(self.auth, pool, session) + + def prepare_headers(self, feed): + """For a known feed, will construct some header dictionnary""" + headers = {'User-Agent': 'jarr/crawler'} + if feed.get('last_modified'): + headers['If-Modified-Since'] = feed['last_modified'] + if feed.get('etag') and 'pyagg' not in feed['etag']: + headers['If-None-Match'] = feed['etag'] + logger.debug('%r %r - calculated headers %r', + feed['id'], feed['title'], headers) + return headers + + def callback(self, response): + """processes feeds that need to be fetched""" + response = response.result() + response.raise_for_status() + if response.status_code == 204: + logger.debug("No feed to fetch") + return + feeds = response.json() + logger.debug('%d to fetch %r', len(feeds), feeds) + for feed in feeds: + logger.debug('%r %r - fetching resources', + feed['id'], feed['title']) + future = self.session.get(feed['link'], + headers=self.prepare_headers(feed)) + + feed_crwlr = FeedCrawler(feed, self.auth, self.pool, self.session) + future.add_done_callback(feed_crwlr.callback) + + def run(self, **kwargs): + """entry point, will retreive feeds to be fetch + and launch the whole thing""" + logger.debug('retreving fetchable feed') + future = self.query_pyagg('get', 'feeds/fetchable', kwargs) + future.add_done_callback(self.callback) diff --git a/src/web/lib/feed_utils.py b/src/web/lib/feed_utils.py new file mode 100644 index 00000000..f3b18224 --- /dev/null +++ b/src/web/lib/feed_utils.py @@ -0,0 +1,96 @@ +import urllib +import logging +import requests +import feedparser +from bs4 import BeautifulSoup, SoupStrainer + +from web.lib.utils import try_keys, try_get_icon_url, rebuild_url + +logger = logging.getLogger(__name__) + + +def construct_feed_from(url=None, fp_parsed=None, feed=None, query_site=True): + if url is None and fp_parsed is not None: + url = fp_parsed.get('url') + if url is not None and fp_parsed is None: + try: + response = requests.get(url, verify=False) + fp_parsed = feedparser.parse(response.content, + request_headers=response.headers) + except Exception: + logger.exception('failed to retreive that url') + fp_parsed = {'bozo': True} + assert url is not None and fp_parsed is not None + feed = feed or {} + feed_split = urllib.parse.urlsplit(url) + site_split = None + if not fp_parsed['bozo']: + feed['link'] = url + feed['site_link'] = try_keys(fp_parsed['feed'], 'href', 'link') + feed['title'] = fp_parsed['feed'].get('title') + feed['description'] = try_keys(fp_parsed['feed'], 'subtitle', 'title') + feed['icon_url'] = try_keys(fp_parsed['feed'], 'icon') + else: + feed['site_link'] = url + + if feed.get('site_link'): + feed['site_link'] = rebuild_url(feed['site_link'], feed_split) + site_split = urllib.parse.urlsplit(feed['site_link']) + + if feed.get('icon_url'): + feed['icon_url'] = try_get_icon_url( + feed['icon_url'], site_split, feed_split) + if feed['icon_url'] is None: + del feed['icon_url'] + + if not feed.get('site_link') or not query_site \ + or all(bool(feed.get(k)) for k in ('link', 'title', 'icon_url')): + return feed + + try: + response = requests.get(feed['site_link'], verify=False) + except Exception: + logger.exception('failed to retreive %r', feed['site_link']) + return feed + bs_parsed = BeautifulSoup(response.content, 'html.parser', + parse_only=SoupStrainer('head')) + + if not feed.get('title'): + try: + feed['title'] = bs_parsed.find_all('title')[0].text + except Exception: + pass + + def check_keys(**kwargs): + def wrapper(elem): + for key, vals in kwargs.items(): + if not elem.has_attr(key): + return False + if not all(val in elem.attrs[key] for val in vals): + return False + return True + return wrapper + + if not feed.get('icon_url'): + icons = bs_parsed.find_all(check_keys(rel=['icon', 'shortcut'])) + if not len(icons): + icons = bs_parsed.find_all(check_keys(rel=['icon'])) + if len(icons) >= 1: + for icon in icons: + feed['icon_url'] = try_get_icon_url(icon.attrs['href'], + site_split, feed_split) + if feed['icon_url'] is not None: + break + + if feed.get('icon_url') is None: + feed['icon_url'] = try_get_icon_url('/favicon.ico', + site_split, feed_split) + if 'icon_url' in feed and feed['icon_url'] is None: + del feed['icon_url'] + + if not feed.get('link'): + alternates = bs_parsed.find_all(check_keys(rel=['alternate'], + type=['application/rss+xml'])) + if len(alternates) >= 1: + feed['link'] = rebuild_url(alternates[0].attrs['href'], feed_split) + return feed diff --git a/src/web/lib/utils.py b/src/web/lib/utils.py new file mode 100644 index 00000000..aa552a12 --- /dev/null +++ b/src/web/lib/utils.py @@ -0,0 +1,57 @@ +import types +import urllib +import logging +import requests +from hashlib import md5 + +logger = logging.getLogger(__name__) + + +def default_handler(obj): + """JSON handler for default query formatting""" + if hasattr(obj, 'isoformat'): + return obj.isoformat() + if hasattr(obj, 'dump'): + return obj.dump() + if isinstance(obj, (set, frozenset, types.GeneratorType)): + return list(obj) + if isinstance(obj, BaseException): + return str(obj) + raise TypeError("Object of type %s with value of %r " + "is not JSON serializable" % (type(obj), obj)) + + +def try_keys(dico, *keys): + for key in keys: + if key in dico: + return dico[key] + return + + +def rebuild_url(url, base_split): + split = urllib.parse.urlsplit(url) + if split.scheme and split.netloc: + return url # url is fine + new_split = urllib.parse.SplitResult( + scheme=split.scheme or base_split.scheme, + netloc=split.netloc or base_split.netloc, + path=split.path, query='', fragment='') + return urllib.parse.urlunsplit(new_split) + + +def try_get_icon_url(url, *splits): + for split in splits: + if split is None: + continue + rb_url = rebuild_url(url, split) + response = requests.get(rb_url, verify=False, timeout=10) + # if html in content-type, we assume it's a fancy 404 page + content_type = response.headers.get('content-type', '') + if response.ok and 'html' not in content_type and response.content: + return response.url + return None + + +def to_hash(text): + return md5(text.encode('utf8') if hasattr(text, 'encode') else text)\ + .hexdigest() diff --git a/src/web/lib/view_utils.py b/src/web/lib/view_utils.py new file mode 100644 index 00000000..d4c119da --- /dev/null +++ b/src/web/lib/view_utils.py @@ -0,0 +1,26 @@ +from functools import wraps +from flask import request, Response, make_response +from web.lib.utils import to_hash + + +def etag_match(func): + @wraps(func) + def wrapper(*args, **kwargs): + response = func(*args, **kwargs) + if isinstance(response, Response): + etag = to_hash(response.data) + headers = response.headers + elif type(response) is str: + etag = to_hash(response) + headers = {} + else: + return response + if request.headers.get('if-none-match') == etag: + response = Response(status=304) + response.headers['Cache-Control'] \ + = headers.get('Cache-Control', 'pragma: no-cache') + elif not isinstance(response, Response): + response = make_response(response) + response.headers['etag'] = etag + return response + return wrapper |