diff options
Diffstat (limited to 'pyaggr3g470r/lib/crawler.py')
-rw-r--r-- | pyaggr3g470r/lib/crawler.py | 123 |
1 files changed, 91 insertions, 32 deletions
diff --git a/pyaggr3g470r/lib/crawler.py b/pyaggr3g470r/lib/crawler.py index 68a7efd0..1cb61973 100644 --- a/pyaggr3g470r/lib/crawler.py +++ b/pyaggr3g470r/lib/crawler.py @@ -19,6 +19,7 @@ import logging import requests import feedparser import dateutil.parser +from hashlib import md5 from functools import wraps from datetime import datetime from concurrent.futures import ThreadPoolExecutor @@ -30,6 +31,10 @@ logging.captureWarnings(True) API_ROOT = "api/v2.0/" +def to_hash(text): + return md5(text.encode('utf8')).hexdigest() + + def extract_id(entry, keys=[('link', 'link'), ('published', 'retrieved_date'), ('updated', 'retrieved_date')], force_id=False): @@ -41,8 +46,8 @@ def extract_id(entry, keys=[('link', 'link'), if entry_id: return {'entry_id': entry_id} if not entry_id and force_id: - entry_id = hash("".join(entry[entry_key] for _, entry_key in keys - if entry_key in entry)) + entry_id = to_hash("".join(entry[entry_key] for _, entry_key in keys + if entry_key in entry).encode('utf8')) else: ids = {} for entry_key, pyagg_key in keys: @@ -59,6 +64,7 @@ class AbstractCrawler: __counter__ = 0 def __init__(self, auth): + AbstractCrawler.__counter__ += 1 self.auth = auth self.session = self.get_session() self.url = conf.PLATFORM_URL @@ -80,11 +86,21 @@ class AbstractCrawler: @wraps(func) def wrapper(*args, **kwargs): cls.__counter__ += 1 - result = func(*args, **kwargs) - cls.__counter__ -= 1 - return result + try: + return func(*args, **kwargs) + except: + logger.exception('an error occured while %r', func) + finally: + cls.__counter__ -= 1 return wrapper + @classmethod + def get_counter_callback(cls): + cls.__counter__ += 1 + def debump(*args, **kwargs): + cls.__counter__ -= 1 + return debump + def query_pyagg(self, method, urn, data=None): """A wrapper for internal call, method should be ones you can find on requests (header, post, get, options, ...), urn the distant @@ -96,22 +112,30 @@ class AbstractCrawler: return method("%s%s%s" % (self.url, API_ROOT, urn), auth=self.auth, data=json.dumps(data, default=default_handler), - headers={'Content-Type': 'application/json'}) + headers={'Content-Type': 'application/json', + 'User-Agent': 'pyaggr3g470r'}) @classmethod - def wait(cls): + def wait(cls, max_wait=600): "See count_on_me, that method will just wait for the counter to be 0" time.sleep(1) + second_waited = 1 while cls.__counter__: + if second_waited > max_wait: + logger.warn('Exiting after %d seconds, counter at %d', + max_wait, cls.__counter__) + break time.sleep(1) + second_waited += 1 class PyAggUpdater(AbstractCrawler): - def __init__(self, feed, entries, headers, auth): + def __init__(self, feed, entries, headers, parsed_feed, auth): self.feed = feed self.entries = entries self.headers = headers + self.parsed_feed = parsed_feed.get('feed', {}) super(PyAggUpdater, self).__init__(auth) def to_article(self, entry): @@ -145,24 +169,36 @@ class PyAggUpdater(AbstractCrawler): def callback(self, response): """Will process the result from the challenge, creating missing article and updating the feed""" + AbstractCrawler.__counter__ -= 1 results = response.result().json() logger.debug('%r %r - %d entries were not matched and will be created', self.feed['id'], self.feed['title'], len(results)) for id_to_create in results: entry = self.to_article( self.entries[tuple(sorted(id_to_create.items()))]) - logger.info('creating %r - %r', entry['title'], id_to_create) + logger.warn('%r %r - creating %r - %r', self.feed['id'], + self.feed['title'], entry['title'], id_to_create) self.query_pyagg('post', 'article', entry) now = datetime.now() logger.debug('%r %r - updating feed etag %r last_mod %r', self.feed['id'], self.feed['title'], - self.headers.get('etag'), now) - - self.query_pyagg('put', 'feed/%d' % self.feed['id'], {'error_count': 0, - 'etag': self.headers.get('etag', ''), - 'last_error': '', - 'last_modified': self.headers.get('last-modified', '')}) + self.headers.get('etag', ''), + self.headers.get('last-modified', '')) + + dico = {'error_count': 0, 'last_error': None, + 'etag': self.headers.get('etag', ''), + 'last_modified': self.headers.get('last-modified', ''), + 'site_link': self.parsed_feed.get('link')} + if not self.feed.get('title'): + dico['title'] = self.parsed_feed.get('title', '') + logger.info('%r %r - pushing feed attrs %r', + self.feed['id'], self.feed['title'], + {key: "%s -> %s" % (dico[key], self.feed.get(key)) + for key in dico if dico[key] != self.feed.get(key)}) + if any([dico[key] != self.feed.get(key) for key in dico]): + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], dico) + future.add_done_callback(self.get_counter_callback()) class FeedCrawler(AbstractCrawler): @@ -174,13 +210,15 @@ class FeedCrawler(AbstractCrawler): def clean_feed(self): """Will reset the errors counters on a feed that have known errors""" if self.feed.get('error_count') or self.feed.get('last_error'): - self.query_pyagg('put', 'feed/%d' % self.feed['id'], - {'error_count': 0, 'last_error': ''}) + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': 0, 'last_error': ''}) + future.add_done_callback(self.get_counter_callback()) @AbstractCrawler.count_on_me def callback(self, response): """will fetch the feed and interprete results (304, etag) or will challenge pyagg to compare gotten entries with existing ones""" + AbstractCrawler.__counter__ -= 1 try: response = response.result() response.raise_for_status() @@ -189,23 +227,40 @@ class FeedCrawler(AbstractCrawler): logger.warn('%r %r - an error occured while fetching feed; bumping' ' error count to %r', self.feed['id'], self.feed['title'], error_count) - self.query_pyagg('put', 'feed/%d' % self.feed['id'], - {'error_count': error_count, - 'last_error': str(error)}) + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], + {'error_count': error_count, + 'last_error': str(error)}) + future.add_done_callback(self.get_counter_callback()) return if response.status_code == 304: logger.info("%r %r - feed responded with 304", - self.feed['id'], self.feed['title']) + self.feed['id'], self.feed['title']) self.clean_feed() return - if self.feed['etag'] and response.headers.get('etag') \ - and response.headers.get('etag') == self.feed['etag']: - logger.info("%r %r - feed responded with same etag (%d)", - self.feed['id'], self.feed['title'], - response.status_code) + if 'etag' not in response.headers: + logger.debug('%r %r - manually generating etag', + self.feed['id'], self.feed['title']) + response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text) + if response.headers['etag'] and self.feed['etag'] \ + and response.headers['etag'] == self.feed['etag']: + if 'pyagg' in self.feed['etag']: + logger.info("%r %r - calculated hash matches (%d)", + self.feed['id'], self.feed['title'], + response.status_code) + else: + logger.info("%r %r - feed responded with same etag (%d)", + self.feed['id'], self.feed['title'], + response.status_code) self.clean_feed() return + else: + logger.debug('%r %r - etag mismatch %r != %r', + self.feed['id'], self.feed['title'], + response.headers['etag'], self.feed['etag']) + logger.info('%r %r - cache validation failed, challenging entries', + self.feed['id'], self.feed['title']) + ids, entries = [], {} parsed_response = feedparser.parse(response.text) for entry in parsed_response['entries']: @@ -214,7 +269,8 @@ class FeedCrawler(AbstractCrawler): logger.debug('%r %r - found %d entries %r', self.feed['id'], self.feed['title'], len(ids), ids) future = self.query_pyagg('get', 'articles/challenge', {'ids': ids}) - updater = PyAggUpdater(self.feed, entries, response.headers, self.auth) + updater = PyAggUpdater(self.feed, entries, response.headers, + parsed_response, self.auth) future.add_done_callback(updater.callback) @@ -223,14 +279,15 @@ class CrawlerScheduler(AbstractCrawler): def __init__(self, username, password): self.auth = (username, password) super(CrawlerScheduler, self).__init__(self.auth) + AbstractCrawler.__counter__ = 0 def prepare_headers(self, feed): """For a known feed, will construct some header dictionnary""" - headers = {} - if feed.get('etag', None): - headers['If-None-Match'] = feed['etag'] + headers = {'User-Agent': 'pyaggr3g470r/crawler'} if feed.get('last_modified'): headers['If-Modified-Since'] = feed['last_modified'] + if feed.get('etag') and 'pyagg' not in feed['etag']: + headers['If-None-Match'] = feed['etag'] logger.debug('%r %r - calculated headers %r', feed['id'], feed['title'], headers) return headers @@ -238,13 +295,14 @@ class CrawlerScheduler(AbstractCrawler): @AbstractCrawler.count_on_me def callback(self, response): """processes feeds that need to be fetched""" + AbstractCrawler.__counter__ -= 1 response = response.result() response.raise_for_status() feeds = response.json() logger.debug('%d to fetch %r', len(feeds), feeds) for feed in feeds: - logger.info('%r %r - fetching resources', - feed['id'], feed['title']) + logger.debug('%r %r - fetching resources', + feed['id'], feed['title']) future = self.session.get(feed['link'], headers=self.prepare_headers(feed)) future.add_done_callback(FeedCrawler(feed, self.auth).callback) @@ -255,4 +313,5 @@ class CrawlerScheduler(AbstractCrawler): and launch the whole thing""" logger.debug('retreving fetchable feed') future = self.query_pyagg('get', 'feeds/fetchable', kwargs) + AbstractCrawler.__counter__ += 1 future.add_done_callback(self.callback) |