aboutsummaryrefslogtreecommitdiff
path: root/pyaggr3g470r/lib/crawler.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyaggr3g470r/lib/crawler.py')
-rw-r--r--pyaggr3g470r/lib/crawler.py123
1 files changed, 91 insertions, 32 deletions
diff --git a/pyaggr3g470r/lib/crawler.py b/pyaggr3g470r/lib/crawler.py
index 68a7efd0..1cb61973 100644
--- a/pyaggr3g470r/lib/crawler.py
+++ b/pyaggr3g470r/lib/crawler.py
@@ -19,6 +19,7 @@ import logging
import requests
import feedparser
import dateutil.parser
+from hashlib import md5
from functools import wraps
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
@@ -30,6 +31,10 @@ logging.captureWarnings(True)
API_ROOT = "api/v2.0/"
+def to_hash(text):
+ return md5(text.encode('utf8')).hexdigest()
+
+
def extract_id(entry, keys=[('link', 'link'),
('published', 'retrieved_date'),
('updated', 'retrieved_date')], force_id=False):
@@ -41,8 +46,8 @@ def extract_id(entry, keys=[('link', 'link'),
if entry_id:
return {'entry_id': entry_id}
if not entry_id and force_id:
- entry_id = hash("".join(entry[entry_key] for _, entry_key in keys
- if entry_key in entry))
+ entry_id = to_hash("".join(entry[entry_key] for _, entry_key in keys
+ if entry_key in entry).encode('utf8'))
else:
ids = {}
for entry_key, pyagg_key in keys:
@@ -59,6 +64,7 @@ class AbstractCrawler:
__counter__ = 0
def __init__(self, auth):
+ AbstractCrawler.__counter__ += 1
self.auth = auth
self.session = self.get_session()
self.url = conf.PLATFORM_URL
@@ -80,11 +86,21 @@ class AbstractCrawler:
@wraps(func)
def wrapper(*args, **kwargs):
cls.__counter__ += 1
- result = func(*args, **kwargs)
- cls.__counter__ -= 1
- return result
+ try:
+ return func(*args, **kwargs)
+ except:
+ logger.exception('an error occured while %r', func)
+ finally:
+ cls.__counter__ -= 1
return wrapper
+ @classmethod
+ def get_counter_callback(cls):
+ cls.__counter__ += 1
+ def debump(*args, **kwargs):
+ cls.__counter__ -= 1
+ return debump
+
def query_pyagg(self, method, urn, data=None):
"""A wrapper for internal call, method should be ones you can find
on requests (header, post, get, options, ...), urn the distant
@@ -96,22 +112,30 @@ class AbstractCrawler:
return method("%s%s%s" % (self.url, API_ROOT, urn),
auth=self.auth, data=json.dumps(data,
default=default_handler),
- headers={'Content-Type': 'application/json'})
+ headers={'Content-Type': 'application/json',
+ 'User-Agent': 'pyaggr3g470r'})
@classmethod
- def wait(cls):
+ def wait(cls, max_wait=600):
"See count_on_me, that method will just wait for the counter to be 0"
time.sleep(1)
+ second_waited = 1
while cls.__counter__:
+ if second_waited > max_wait:
+ logger.warn('Exiting after %d seconds, counter at %d',
+ max_wait, cls.__counter__)
+ break
time.sleep(1)
+ second_waited += 1
class PyAggUpdater(AbstractCrawler):
- def __init__(self, feed, entries, headers, auth):
+ def __init__(self, feed, entries, headers, parsed_feed, auth):
self.feed = feed
self.entries = entries
self.headers = headers
+ self.parsed_feed = parsed_feed.get('feed', {})
super(PyAggUpdater, self).__init__(auth)
def to_article(self, entry):
@@ -145,24 +169,36 @@ class PyAggUpdater(AbstractCrawler):
def callback(self, response):
"""Will process the result from the challenge, creating missing article
and updating the feed"""
+ AbstractCrawler.__counter__ -= 1
results = response.result().json()
logger.debug('%r %r - %d entries were not matched and will be created',
self.feed['id'], self.feed['title'], len(results))
for id_to_create in results:
entry = self.to_article(
self.entries[tuple(sorted(id_to_create.items()))])
- logger.info('creating %r - %r', entry['title'], id_to_create)
+ logger.warn('%r %r - creating %r - %r', self.feed['id'],
+ self.feed['title'], entry['title'], id_to_create)
self.query_pyagg('post', 'article', entry)
now = datetime.now()
logger.debug('%r %r - updating feed etag %r last_mod %r',
self.feed['id'], self.feed['title'],
- self.headers.get('etag'), now)
-
- self.query_pyagg('put', 'feed/%d' % self.feed['id'], {'error_count': 0,
- 'etag': self.headers.get('etag', ''),
- 'last_error': '',
- 'last_modified': self.headers.get('last-modified', '')})
+ self.headers.get('etag', ''),
+ self.headers.get('last-modified', ''))
+
+ dico = {'error_count': 0, 'last_error': None,
+ 'etag': self.headers.get('etag', ''),
+ 'last_modified': self.headers.get('last-modified', ''),
+ 'site_link': self.parsed_feed.get('link')}
+ if not self.feed.get('title'):
+ dico['title'] = self.parsed_feed.get('title', '')
+ logger.info('%r %r - pushing feed attrs %r',
+ self.feed['id'], self.feed['title'],
+ {key: "%s -> %s" % (dico[key], self.feed.get(key))
+ for key in dico if dico[key] != self.feed.get(key)})
+ if any([dico[key] != self.feed.get(key) for key in dico]):
+ future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], dico)
+ future.add_done_callback(self.get_counter_callback())
class FeedCrawler(AbstractCrawler):
@@ -174,13 +210,15 @@ class FeedCrawler(AbstractCrawler):
def clean_feed(self):
"""Will reset the errors counters on a feed that have known errors"""
if self.feed.get('error_count') or self.feed.get('last_error'):
- self.query_pyagg('put', 'feed/%d' % self.feed['id'],
- {'error_count': 0, 'last_error': ''})
+ future = self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': 0, 'last_error': ''})
+ future.add_done_callback(self.get_counter_callback())
@AbstractCrawler.count_on_me
def callback(self, response):
"""will fetch the feed and interprete results (304, etag) or will
challenge pyagg to compare gotten entries with existing ones"""
+ AbstractCrawler.__counter__ -= 1
try:
response = response.result()
response.raise_for_status()
@@ -189,23 +227,40 @@ class FeedCrawler(AbstractCrawler):
logger.warn('%r %r - an error occured while fetching feed; bumping'
' error count to %r', self.feed['id'],
self.feed['title'], error_count)
- self.query_pyagg('put', 'feed/%d' % self.feed['id'],
- {'error_count': error_count,
- 'last_error': str(error)})
+ future = self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': error_count,
+ 'last_error': str(error)})
+ future.add_done_callback(self.get_counter_callback())
return
if response.status_code == 304:
logger.info("%r %r - feed responded with 304",
- self.feed['id'], self.feed['title'])
+ self.feed['id'], self.feed['title'])
self.clean_feed()
return
- if self.feed['etag'] and response.headers.get('etag') \
- and response.headers.get('etag') == self.feed['etag']:
- logger.info("%r %r - feed responded with same etag (%d)",
- self.feed['id'], self.feed['title'],
- response.status_code)
+ if 'etag' not in response.headers:
+ logger.debug('%r %r - manually generating etag',
+ self.feed['id'], self.feed['title'])
+ response.headers['etag'] = 'pyagg/"%s"' % to_hash(response.text)
+ if response.headers['etag'] and self.feed['etag'] \
+ and response.headers['etag'] == self.feed['etag']:
+ if 'pyagg' in self.feed['etag']:
+ logger.info("%r %r - calculated hash matches (%d)",
+ self.feed['id'], self.feed['title'],
+ response.status_code)
+ else:
+ logger.info("%r %r - feed responded with same etag (%d)",
+ self.feed['id'], self.feed['title'],
+ response.status_code)
self.clean_feed()
return
+ else:
+ logger.debug('%r %r - etag mismatch %r != %r',
+ self.feed['id'], self.feed['title'],
+ response.headers['etag'], self.feed['etag'])
+ logger.info('%r %r - cache validation failed, challenging entries',
+ self.feed['id'], self.feed['title'])
+
ids, entries = [], {}
parsed_response = feedparser.parse(response.text)
for entry in parsed_response['entries']:
@@ -214,7 +269,8 @@ class FeedCrawler(AbstractCrawler):
logger.debug('%r %r - found %d entries %r',
self.feed['id'], self.feed['title'], len(ids), ids)
future = self.query_pyagg('get', 'articles/challenge', {'ids': ids})
- updater = PyAggUpdater(self.feed, entries, response.headers, self.auth)
+ updater = PyAggUpdater(self.feed, entries, response.headers,
+ parsed_response, self.auth)
future.add_done_callback(updater.callback)
@@ -223,14 +279,15 @@ class CrawlerScheduler(AbstractCrawler):
def __init__(self, username, password):
self.auth = (username, password)
super(CrawlerScheduler, self).__init__(self.auth)
+ AbstractCrawler.__counter__ = 0
def prepare_headers(self, feed):
"""For a known feed, will construct some header dictionnary"""
- headers = {}
- if feed.get('etag', None):
- headers['If-None-Match'] = feed['etag']
+ headers = {'User-Agent': 'pyaggr3g470r/crawler'}
if feed.get('last_modified'):
headers['If-Modified-Since'] = feed['last_modified']
+ if feed.get('etag') and 'pyagg' not in feed['etag']:
+ headers['If-None-Match'] = feed['etag']
logger.debug('%r %r - calculated headers %r',
feed['id'], feed['title'], headers)
return headers
@@ -238,13 +295,14 @@ class CrawlerScheduler(AbstractCrawler):
@AbstractCrawler.count_on_me
def callback(self, response):
"""processes feeds that need to be fetched"""
+ AbstractCrawler.__counter__ -= 1
response = response.result()
response.raise_for_status()
feeds = response.json()
logger.debug('%d to fetch %r', len(feeds), feeds)
for feed in feeds:
- logger.info('%r %r - fetching resources',
- feed['id'], feed['title'])
+ logger.debug('%r %r - fetching resources',
+ feed['id'], feed['title'])
future = self.session.get(feed['link'],
headers=self.prepare_headers(feed))
future.add_done_callback(FeedCrawler(feed, self.auth).callback)
@@ -255,4 +313,5 @@ class CrawlerScheduler(AbstractCrawler):
and launch the whole thing"""
logger.debug('retreving fetchable feed')
future = self.query_pyagg('get', 'feeds/fetchable', kwargs)
+ AbstractCrawler.__counter__ += 1
future.add_done_callback(self.callback)
bgstack15