diff options
Diffstat (limited to 'pyaggr3g470r/lib')
-rw-r--r-- | pyaggr3g470r/lib/__init__.py | 0 | ||||
-rw-r--r-- | pyaggr3g470r/lib/crawler.py | 257 | ||||
-rw-r--r-- | pyaggr3g470r/lib/utils.py | 14 |
3 files changed, 271 insertions, 0 deletions
diff --git a/pyaggr3g470r/lib/__init__.py b/pyaggr3g470r/lib/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/pyaggr3g470r/lib/__init__.py diff --git a/pyaggr3g470r/lib/crawler.py b/pyaggr3g470r/lib/crawler.py new file mode 100644 index 00000000..64ef8b6d --- /dev/null +++ b/pyaggr3g470r/lib/crawler.py @@ -0,0 +1,257 @@ +""" +Here's a sum up on how it works : + +CrawlerScheduler.run + will retreive a list of feeds to be refreshed and pass result to +CrawlerScheduler.callback + which will retreive each feed and treat result with +FeedCrawler.callback + which will interprete the result (status_code, etag) collect ids + and match them agaisnt pyagg which will cause +PyAggUpdater.callback + to create the missing entries +""" + +import time +import conf +import json +import logging +import requests +import feedparser +import dateutil.parser +from functools import wraps +from datetime import datetime +from concurrent.futures import ThreadPoolExecutor +from requests_futures.sessions import FuturesSession +from pyaggr3g470r.lib.utils import default_handler + +logger = logging.getLogger(__name__) +API_ROOT = "api/v2.0/" + + +def extract_id(entry, keys=[('link', 'link'), + ('published', 'retrieved_date'), + ('updated', 'retrieved_date')], force_id=False): + """For a given entry will return a dict that allows to identify it. The + dict will be constructed on the uid of the entry. if that identifier is + absent, the dict will be constructed upon the values of "keys". + """ + entry_id = entry.get('entry_id') or entry.get('id') + if entry_id: + return {'entry_id': entry_id} + if not entry_id and force_id: + entry_id = hash("".join(entry[entry_key] for _, entry_key in keys + if entry_key in entry)) + else: + ids = {} + for entry_key, pyagg_key in keys: + if entry_key in entry and pyagg_key not in ids: + ids[pyagg_key] = entry[entry_key] + if 'date' in pyagg_key: + ids[pyagg_key] = dateutil.parser.parse(ids[pyagg_key])\ + .isoformat() + return ids + + +class AbstractCrawler: + __session__ = None + __counter__ = 0 + + def __init__(self, auth): + self.auth = auth + self.session = self.get_session() + self.url = conf.PLATFORM_URL + + @classmethod + def get_session(cls): + """methods that allows us to treat session as a singleton""" + if cls.__session__ is None: + cls.__session__ = FuturesSession( + executor=ThreadPoolExecutor(max_workers=conf.NB_WORKER)) + cls.__session__.verify = False + return cls.__session__ + + @classmethod + def count_on_me(cls, func): + """A basic decorator which will count +1 at the begining of a call + and -1 at the end. It kinda allows us to wait for the __counter__ value + to be 0, meaning nothing is done anymore.""" + @wraps(func) + def wrapper(*args, **kwargs): + cls.__counter__ += 1 + result = func(*args, **kwargs) + cls.__counter__ -= 1 + return result + return wrapper + + def query_pyagg(self, method, urn, data=None): + """A wrapper for internal call, method should be ones you can find + on requests (header, post, get, options, ...), urn the distant + resources you want to access on pyagg, and data, the data you wanna + transmit.""" + if data is None: + data = {} + method = getattr(self.session, method) + return method("%s%s%s" % (self.url, API_ROOT, urn), + auth=self.auth, data=json.dumps(data, + default=default_handler), + headers={'Content-Type': 'application/json'}) + + @classmethod + def wait(cls): + "See count_on_me, that method will just wait for the counter to be 0" + time.sleep(1) + while cls.__counter__: + time.sleep(1) + + +class PyAggUpdater(AbstractCrawler): + + def __init__(self, feed, entries, headers, auth): + self.feed = feed + self.entries = entries + self.headers = headers + super(PyAggUpdater, self).__init__(auth) + + def to_article(self, entry): + "Safe method to transorm a feedparser entry into an article" + date = datetime.now() + + for date_key in ('published', 'updated'): + if entry.get(date_key): + try: + date = dateutil.parser.parse(entry[date_key]) + except Exception: + pass + else: + break + content = '' + if entry.get('content'): + content = entry['content'][0]['value'] + elif entry.get('summary'): + content = entry['summary'] + + return {'feed_id': self.feed['id'], + 'entry_id': extract_id(entry).get('entry_id', None), + 'link': entry.get('link', self.feed['site_link']), + 'title': entry.get('title', 'No title'), + 'readed': False, 'like': False, + 'content': content, + 'retrieved_date': date.isoformat(), + 'date': date.isoformat()} + + @AbstractCrawler.count_on_me + def callback(self, response): + """Will process the result from the challenge, creating missing article + and updating the feed""" + results = response.result().json() + logger.debug('%r %r - %d entries were not matched and will be created', + self.feed['id'], self.feed['title'], len(results)) + for id_to_create in results: + entry = self.to_article( + self.entries[tuple(sorted(id_to_create.items()))]) + logger.info('creating %r - %r', entry['title'], id_to_create) + self.query_pyagg('post', 'article', entry) + + now = datetime.now() + logger.debug('%r %r - updating feed etag %r last_mod %r', + self.feed['id'], self.feed['title'], + self.headers.get('etag'), now) + + self.query_pyagg('put', 'feed/%d' % self.feed['id'], {'error_count': 0, + 'etag': self.headers.get('etag', ''), + 'last_error': '', + 'last_modified': self.headers.get('last-modified', '')}) + + +class FeedCrawler(AbstractCrawler): + + def __init__(self, feed, auth): + self.feed = feed + super(FeedCrawler, self).__init__(auth) + + def clean_feed(self): + """Will reset the errors counters on a feed that have known errors""" + if self.feed.get('error_count') or self.feed.get('last_error'): + self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': 0, 'last_error': ''}) + + @AbstractCrawler.count_on_me + def callback(self, response): + """will fetch the feed and interprete results (304, etag) or will + challenge pyagg to compare gotten entries with existing ones""" + try: + response = response.result() + response.raise_for_status() + except Exception as error: + error_count = self.feed['error_count'] + 1 + logger.warn('%r %r - an error occured while fetching feed; bumping' + ' error count to %r', self.feed['id'], + self.feed['title'], error_count) + self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': error_count, + 'last_error': str(error)}) + return + + if response.status_code == 304: + logger.info("%r %r - feed responded with 304", + self.feed['id'], self.feed['title']) + self.clean_feed() + return + if self.feed['etag'] and response.headers.get('etag') \ + and response.headers.get('etag') == self.feed['etag']: + logger.info("%r %r - feed responded with same etag (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + self.clean_feed() + return + ids, entries = [], {} + parsed_response = feedparser.parse(response.text) + for entry in parsed_response['entries']: + entries[tuple(sorted(extract_id(entry).items()))] = entry + ids.append(extract_id(entry)) + logger.debug('%r %r - found %d entries %r', + self.feed['id'], self.feed['title'], len(ids), ids) + future = self.query_pyagg('get', 'articles/challenge', {'ids': ids}) + updater = PyAggUpdater(self.feed, entries, response.headers, self.auth) + future.add_done_callback(updater.callback) + + +class CrawlerScheduler(AbstractCrawler): + + def __init__(self, username, password): + self.auth = (username, password) + super(CrawlerScheduler, self).__init__(self.auth) + + def prepare_headers(self, feed): + """For a known feed, will construct some header dictionnary""" + headers = {} + if feed.get('etag', None): + headers['If-None-Match'] = feed['etag'] + if feed.get('last_modified'): + headers['If-Modified-Since'] = feed['last_modified'] + logger.debug('%r %r - calculated headers %r', + feed['id'], feed['title'], headers) + return headers + + @AbstractCrawler.count_on_me + def callback(self, response): + """processes feeds that need to be fetched""" + response = response.result() + response.raise_for_status() + feeds = response.json() + logger.debug('%d to fetch %r', len(feeds), feeds) + for feed in feeds: + logger.info('%r %r - fetching resources', + feed['id'], feed['title']) + future = self.session.get(feed['link'], + headers=self.prepare_headers(feed)) + future.add_done_callback(FeedCrawler(feed, self.auth).callback) + + @AbstractCrawler.count_on_me + def run(self, **kwargs): + """entry point, will retreive feeds to be fetch + and launch the whole thing""" + logger.debug('retreving fetchable feed') + future = self.query_pyagg('get', 'feeds/fetchable', kwargs) + future.add_done_callback(self.callback) diff --git a/pyaggr3g470r/lib/utils.py b/pyaggr3g470r/lib/utils.py new file mode 100644 index 00000000..a4f4b3ec --- /dev/null +++ b/pyaggr3g470r/lib/utils.py @@ -0,0 +1,14 @@ +import types + +def default_handler(obj): + """JSON handler for default query formatting""" + if hasattr(obj, 'isoformat'): + return obj.isoformat() + if hasattr(obj, 'dump'): + return obj.dump() + if isinstance(obj, (set, frozenset, types.GeneratorType)): + return list(obj) + if isinstance(obj, BaseException): + return str(obj) + raise TypeError("Object of type %s with value of %r " + "is not JSON serializable" % (type(obj), obj)) |