aboutsummaryrefslogtreecommitdiff
path: root/web/lib
diff options
context:
space:
mode:
Diffstat (limited to 'web/lib')
-rw-r--r--web/lib/__init__.py0
-rw-r--r--web/lib/article_utils.py74
-rw-r--r--web/lib/crawler.py252
-rw-r--r--web/lib/feed_utils.py92
-rw-r--r--web/lib/utils.py57
-rw-r--r--web/lib/view_utils.py26
6 files changed, 501 insertions, 0 deletions
diff --git a/web/lib/__init__.py b/web/lib/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/web/lib/__init__.py
diff --git a/web/lib/article_utils.py b/web/lib/article_utils.py
new file mode 100644
index 00000000..02ca2cd1
--- /dev/null
+++ b/web/lib/article_utils.py
@@ -0,0 +1,74 @@
+import logging
+import requests
+import dateutil.parser
+from datetime import datetime
+
+import conf
+from web.lib.utils import to_hash
+
+logger = logging.getLogger(__name__)
+
+
+def extract_id(entry, keys=[('link', 'link'), ('published', 'date'),
+ ('updated', 'date')], force_id=False):
+ """For a given entry will return a dict that allows to identify it. The
+ dict will be constructed on the uid of the entry. if that identifier is
+ absent, the dict will be constructed upon the values of "keys".
+ """
+ entry_id = entry.get('entry_id') or entry.get('id')
+ if entry_id:
+ return {'entry_id': entry_id}
+ if not entry_id and force_id:
+ return to_hash("".join(entry[entry_key] for _, entry_key in keys
+ if entry_key in entry).encode('utf8'))
+ else:
+ ids = {}
+ for entry_key, pyagg_key in keys:
+ if entry_key in entry and pyagg_key not in ids:
+ ids[pyagg_key] = entry[entry_key]
+ if 'date' in pyagg_key:
+ ids[pyagg_key] = dateutil.parser.parse(ids[pyagg_key])\
+ .isoformat()
+ return ids
+
+
+def construct_article(entry, feed):
+ if hasattr(feed, 'dump'): # this way can be a sqlalchemy obj or a dict
+ feed = feed.dump()
+ "Safe method to transorm a feedparser entry into an article"
+ now = datetime.now()
+ date = None
+ for date_key in ('published', 'updated'):
+ if entry.get(date_key):
+ try:
+ date = dateutil.parser.parse(entry[date_key])
+ except Exception:
+ pass
+ else:
+ break
+ content = ''
+ if entry.get('content'):
+ content = entry['content'][0]['value']
+ elif entry.get('summary'):
+ content = entry['summary']
+
+ article_link = entry.get('link')
+ if conf.RESOLVE_ARTICLE_URL and article_link:
+ try:
+ # resolves URL behind proxies
+ # (like feedproxy.google.com)
+ response = requests.get(article_link, verify=False, timeout=5.0)
+ article_link = response.url
+ except Exception as error:
+ logger.warning("Unable to get the real URL of %s. Error: %s",
+ article_link, error)
+
+ return {'feed_id': feed['id'],
+ 'user_id': feed['user_id'],
+ 'entry_id': extract_id(entry).get('entry_id', None),
+ 'link': entry.get('link', feed['site_link']),
+ 'title': entry.get('title', 'No title'),
+ 'readed': False, 'like': False,
+ 'content': content,
+ 'retrieved_date': now.isoformat(),
+ 'date': (date or now).isoformat()}
diff --git a/web/lib/crawler.py b/web/lib/crawler.py
new file mode 100644
index 00000000..90a268e8
--- /dev/null
+++ b/web/lib/crawler.py
@@ -0,0 +1,252 @@
+"""
+Here's a sum up on how it works :
+
+CrawlerScheduler.run
+ will retreive a list of feeds to be refreshed and pass result to
+CrawlerScheduler.callback
+ which will retreive each feed and treat result with
+FeedCrawler.callback
+ which will interprete the result (status_code, etag) collect ids
+ and match them agaisnt pyagg which will cause
+PyAggUpdater.callback
+ to create the missing entries
+"""
+
+import time
+import conf
+import json
+import logging
+import feedparser
+from datetime import datetime, timedelta
+from functools import wraps
+from time import strftime, gmtime
+from concurrent.futures import ThreadPoolExecutor
+from requests_futures.sessions import FuturesSession
+from web.lib.utils import default_handler, to_hash
+from web.lib.feed_utils import construct_feed_from
+from web.lib.article_utils import extract_id, construct_article
+
+logger = logging.getLogger(__name__)
+logging.captureWarnings(True)
+API_ROOT = "api/v2.0/"
+
+
+class AbstractCrawler:
+
+ def __init__(self, auth, pool=None, session=None):
+ self.auth = auth
+ self.pool = pool or ThreadPoolExecutor(max_workers=conf.NB_WORKER)
+ self.session = session or FuturesSession(executor=self.pool)
+ self.session.verify = False
+ self.url = conf.PLATFORM_URL
+
+ def query_pyagg(self, method, urn, data=None):
+ """A wrapper for internal call, method should be ones you can find
+ on requests (header, post, get, options, ...), urn the distant
+ resources you want to access on pyagg, and data, the data you wanna
+ transmit."""
+ if data is None:
+ data = {}
+ method = getattr(self.session, method)
+ return method("%s%s%s" % (self.url, API_ROOT, urn),
+ auth=self.auth, data=json.dumps(data,
+ default=default_handler),
+ headers={'Content-Type': 'application/json',
+ 'User-Agent': 'jarr'})
+
+ def wait(self, max_wait=300, checks=5, wait_for=2):
+ checked, second_waited = 0, 0
+ while True:
+ time.sleep(wait_for)
+ second_waited += wait_for
+ if second_waited > max_wait:
+ logger.warn('Exiting after %d seconds', second_waited)
+ break
+ if self.pool._work_queue.qsize():
+ checked = 0
+ continue
+ checked += 1
+ if checked == checks:
+ break
+
+
+class PyAggUpdater(AbstractCrawler):
+
+ def __init__(self, feed, entries, headers, parsed_feed,
+ auth, pool=None, session=None):
+ self.feed = feed
+ self.entries = entries
+ self.headers = headers
+ self.parsed_feed = parsed_feed
+ super().__init__(auth, pool, session)
+
+ def callback(self, response):
+ """Will process the result from the challenge, creating missing article
+ and updating the feed"""
+ article_created = False
+ if response.result().status_code != 204:
+ results = response.result().json()
+ logger.debug('%r %r - %d entries were not matched '
+ 'and will be created',
+ self.feed['id'], self.feed['title'], len(results))
+ for id_to_create in results:
+ article_created = True
+ entry = construct_article(
+ self.entries[tuple(sorted(id_to_create.items()))],
+ self.feed)
+ logger.info('%r %r - creating %r for %r - %r', self.feed['id'],
+ self.feed['title'], entry['title'],
+ entry['user_id'], id_to_create)
+ self.query_pyagg('post', 'article', entry)
+
+ logger.debug('%r %r - updating feed etag %r last_mod %r',
+ self.feed['id'], self.feed['title'],
+ self.headers.get('etag', ''),
+ self.headers.get('last-modified', ''))
+
+ up_feed = {'error_count': 0, 'last_error': None,
+ 'etag': self.headers.get('etag', ''),
+ 'last_modified': self.headers.get('last-modified',
+ strftime('%a, %d %b %Y %X %Z', gmtime()))}
+ fresh_feed = construct_feed_from(url=self.feed['link'],
+ fp_parsed=self.parsed_feed)
+ for key in ('description', 'site_link', 'icon_url'):
+ if fresh_feed.get(key) and fresh_feed[key] != self.feed.get(key):
+ up_feed[key] = fresh_feed[key]
+ if not self.feed.get('title'):
+ up_feed['title'] = fresh_feed.get('title', '')
+ up_feed['user_id'] = self.feed['user_id']
+ # re-getting that feed earlier since new entries appeared
+ if article_created:
+ up_feed['last_retrieved'] \
+ = (datetime.now() - timedelta(minutes=45)).isoformat()
+
+ diff_keys = {key for key in up_feed
+ if up_feed[key] != self.feed.get(key)}
+ if not diff_keys:
+ return # no change in the feed, no update
+ if not article_created and diff_keys == {'last_modified', 'etag'}:
+ return # meaningless if no new article has been published
+ logger.info('%r %r - pushing feed attrs %r',
+ self.feed['id'], self.feed['title'],
+ {key: "%s -> %s" % (up_feed[key], self.feed.get(key))
+ for key in up_feed if up_feed[key] != self.feed.get(key)})
+
+ future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], up_feed)
+
+
+class FeedCrawler(AbstractCrawler):
+
+ def __init__(self, feed, auth, pool=None, session=None):
+ self.feed = feed
+ super().__init__(auth, pool, session)
+
+ def clean_feed(self):
+ """Will reset the errors counters on a feed that have known errors"""
+ if self.feed.get('error_count') or self.feed.get('last_error'):
+ future = self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': 0, 'last_error': ''})
+
+ def callback(self, response):
+ """will fetch the feed and interprete results (304, etag) or will
+ challenge pyagg to compare gotten entries with existing ones"""
+ try:
+ response = response.result()
+ response.raise_for_status()
+ except Exception as error:
+ error_count = self.feed['error_count'] + 1
+ logger.error('%r %r - an error occured while fetching '
+ 'feed; bumping error count to %r', self.feed['id'],
+ self.feed['title'], error_count)
+ future = self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': error_count,
+ 'last_error': str(error),
+ 'user_id': self.feed['user_id']})
+ return
+
+ if response.status_code == 304:
+ logger.info("%r %r - feed responded with 304",
+ self.feed['id'], self.feed['title'])
+ self.clean_feed()
+ return
+ if 'etag' not in response.headers:
+ logger.debug('%r %r - manually generating etag',
+ self.feed['id'], self.feed['title'])
+ response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text)
+ if response.headers['etag'] and self.feed['etag'] \
+ and response.headers['etag'] == self.feed['etag']:
+ if 'pyagg' in self.feed['etag']:
+ logger.info("%r %r - calculated hash matches (%d)",
+ self.feed['id'], self.feed['title'],
+ response.status_code)
+ else:
+ logger.info("%r %r - feed responded with same etag (%d)",
+ self.feed['id'], self.feed['title'],
+ response.status_code)
+ self.clean_feed()
+ return
+ else:
+ logger.debug('%r %r - etag mismatch %r != %r',
+ self.feed['id'], self.feed['title'],
+ response.headers['etag'], self.feed['etag'])
+ logger.info('%r %r - cache validation failed, challenging entries',
+ self.feed['id'], self.feed['title'])
+
+ ids, entries = [], {}
+ parsed_response = feedparser.parse(response.content)
+ for entry in parsed_response['entries']:
+ entry_ids = extract_id(entry)
+ entry_ids['feed_id'] = self.feed['id']
+ entry_ids['user_id'] = self.feed['user_id']
+ entries[tuple(sorted(entry_ids.items()))] = entry
+ ids.append(entry_ids)
+ logger.debug('%r %r - found %d entries %r',
+ self.feed['id'], self.feed['title'], len(ids), ids)
+ future = self.query_pyagg('get', 'articles/challenge', {'ids': ids})
+ updater = PyAggUpdater(self.feed, entries, response.headers,
+ parsed_response,
+ self.auth, self.pool, self.session)
+ future.add_done_callback(updater.callback)
+
+
+class CrawlerScheduler(AbstractCrawler):
+
+ def __init__(self, username, password, pool=None, session=None):
+ self.auth = (username, password)
+ super(CrawlerScheduler, self).__init__(self.auth, pool, session)
+
+ def prepare_headers(self, feed):
+ """For a known feed, will construct some header dictionnary"""
+ headers = {'User-Agent': 'jarr/crawler'}
+ if feed.get('last_modified'):
+ headers['If-Modified-Since'] = feed['last_modified']
+ if feed.get('etag') and 'pyagg' not in feed['etag']:
+ headers['If-None-Match'] = feed['etag']
+ logger.debug('%r %r - calculated headers %r',
+ feed['id'], feed['title'], headers)
+ return headers
+
+ def callback(self, response):
+ """processes feeds that need to be fetched"""
+ response = response.result()
+ response.raise_for_status()
+ if response.status_code == 204:
+ logger.debug("No feed to fetch")
+ return
+ feeds = response.json()
+ logger.debug('%d to fetch %r', len(feeds), feeds)
+ for feed in feeds:
+ logger.debug('%r %r - fetching resources',
+ feed['id'], feed['title'])
+ future = self.session.get(feed['link'],
+ headers=self.prepare_headers(feed))
+
+ feed_crwlr = FeedCrawler(feed, self.auth, self.pool, self.session)
+ future.add_done_callback(feed_crwlr.callback)
+
+ def run(self, **kwargs):
+ """entry point, will retreive feeds to be fetch
+ and launch the whole thing"""
+ logger.debug('retreving fetchable feed')
+ future = self.query_pyagg('get', 'feeds/fetchable', kwargs)
+ future.add_done_callback(self.callback)
diff --git a/web/lib/feed_utils.py b/web/lib/feed_utils.py
new file mode 100644
index 00000000..f39fe693
--- /dev/null
+++ b/web/lib/feed_utils.py
@@ -0,0 +1,92 @@
+import urllib
+import logging
+import requests
+import feedparser
+from bs4 import BeautifulSoup, SoupStrainer
+
+from web.lib.utils import try_keys, try_get_icon_url, rebuild_url
+
+logger = logging.getLogger(__name__)
+
+
+def construct_feed_from(url=None, fp_parsed=None, feed=None, query_site=True):
+ if url is None and fp_parsed is not None:
+ url = fp_parsed.get('url')
+ if url is not None and fp_parsed is None:
+ try:
+ response = requests.get(url, verify=False)
+ fp_parsed = feedparser.parse(response.content,
+ request_headers=response.headers)
+ except Exception:
+ logger.exception('failed to retreive that url')
+ fp_parsed = {'bozo': True}
+ assert url is not None and fp_parsed is not None
+ feed = feed or {}
+ feed_split = urllib.parse.urlsplit(url)
+ site_split = None
+ if not fp_parsed['bozo']:
+ feed['link'] = url
+ feed['site_link'] = try_keys(fp_parsed['feed'], 'href', 'link')
+ feed['title'] = fp_parsed['feed'].get('title')
+ feed['description'] = try_keys(fp_parsed['feed'], 'subtitle', 'title')
+ feed['icon_url'] = try_keys(fp_parsed['feed'], 'icon')
+ else:
+ feed['site_link'] = url
+
+ if feed.get('site_link'):
+ feed['site_link'] = rebuild_url(feed['site_link'], feed_split)
+ site_split = urllib.parse.urlsplit(feed['site_link'])
+
+ if feed.get('icon_url'):
+ feed['icon_url'] = try_get_icon_url(
+ feed['icon_url'], site_split, feed_split)
+ if feed['icon_url'] is None:
+ del feed['icon_url']
+
+ if not feed.get('site_link') or not query_site \
+ or all(bool(feed.get(k)) for k in ('link', 'title', 'icon_url')):
+ return feed
+
+ response = requests.get(feed['site_link'], verify=False)
+ bs_parsed = BeautifulSoup(response.content, 'html.parser',
+ parse_only=SoupStrainer('head'))
+
+ if not feed.get('title'):
+ try:
+ feed['title'] = bs_parsed.find_all('title')[0].text
+ except Exception:
+ pass
+
+ def check_keys(**kwargs):
+ def wrapper(elem):
+ for key, vals in kwargs.items():
+ if not elem.has_attr(key):
+ return False
+ if not all(val in elem.attrs[key] for val in vals):
+ return False
+ return True
+ return wrapper
+
+ if not feed.get('icon_url'):
+ icons = bs_parsed.find_all(check_keys(rel=['icon', 'shortcut']))
+ if not len(icons):
+ icons = bs_parsed.find_all(check_keys(rel=['icon']))
+ if len(icons) >= 1:
+ for icon in icons:
+ feed['icon_url'] = try_get_icon_url(icon.attrs['href'],
+ site_split, feed_split)
+ if feed['icon_url'] is not None:
+ break
+
+ if feed.get('icon_url') is None:
+ feed['icon_url'] = try_get_icon_url('/favicon.ico',
+ site_split, feed_split)
+ if 'icon_url' in feed and feed['icon_url'] is None:
+ del feed['icon_url']
+
+ if not feed.get('link'):
+ alternates = bs_parsed.find_all(check_keys(rel=['alternate'],
+ type=['application/rss+xml']))
+ if len(alternates) >= 1:
+ feed['link'] = rebuild_url(alternates[0].attrs['href'], feed_split)
+ return feed
diff --git a/web/lib/utils.py b/web/lib/utils.py
new file mode 100644
index 00000000..aa552a12
--- /dev/null
+++ b/web/lib/utils.py
@@ -0,0 +1,57 @@
+import types
+import urllib
+import logging
+import requests
+from hashlib import md5
+
+logger = logging.getLogger(__name__)
+
+
+def default_handler(obj):
+ """JSON handler for default query formatting"""
+ if hasattr(obj, 'isoformat'):
+ return obj.isoformat()
+ if hasattr(obj, 'dump'):
+ return obj.dump()
+ if isinstance(obj, (set, frozenset, types.GeneratorType)):
+ return list(obj)
+ if isinstance(obj, BaseException):
+ return str(obj)
+ raise TypeError("Object of type %s with value of %r "
+ "is not JSON serializable" % (type(obj), obj))
+
+
+def try_keys(dico, *keys):
+ for key in keys:
+ if key in dico:
+ return dico[key]
+ return
+
+
+def rebuild_url(url, base_split):
+ split = urllib.parse.urlsplit(url)
+ if split.scheme and split.netloc:
+ return url # url is fine
+ new_split = urllib.parse.SplitResult(
+ scheme=split.scheme or base_split.scheme,
+ netloc=split.netloc or base_split.netloc,
+ path=split.path, query='', fragment='')
+ return urllib.parse.urlunsplit(new_split)
+
+
+def try_get_icon_url(url, *splits):
+ for split in splits:
+ if split is None:
+ continue
+ rb_url = rebuild_url(url, split)
+ response = requests.get(rb_url, verify=False, timeout=10)
+ # if html in content-type, we assume it's a fancy 404 page
+ content_type = response.headers.get('content-type', '')
+ if response.ok and 'html' not in content_type and response.content:
+ return response.url
+ return None
+
+
+def to_hash(text):
+ return md5(text.encode('utf8') if hasattr(text, 'encode') else text)\
+ .hexdigest()
diff --git a/web/lib/view_utils.py b/web/lib/view_utils.py
new file mode 100644
index 00000000..d4c119da
--- /dev/null
+++ b/web/lib/view_utils.py
@@ -0,0 +1,26 @@
+from functools import wraps
+from flask import request, Response, make_response
+from web.lib.utils import to_hash
+
+
+def etag_match(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ response = func(*args, **kwargs)
+ if isinstance(response, Response):
+ etag = to_hash(response.data)
+ headers = response.headers
+ elif type(response) is str:
+ etag = to_hash(response)
+ headers = {}
+ else:
+ return response
+ if request.headers.get('if-none-match') == etag:
+ response = Response(status=304)
+ response.headers['Cache-Control'] \
+ = headers.get('Cache-Control', 'pragma: no-cache')
+ elif not isinstance(response, Response):
+ response = make_response(response)
+ response.headers['etag'] = etag
+ return response
+ return wrapper
bgstack15