aboutsummaryrefslogtreecommitdiff
path: root/pyaggr3g470r/lib
diff options
context:
space:
mode:
Diffstat (limited to 'pyaggr3g470r/lib')
-rw-r--r--pyaggr3g470r/lib/__init__.py0
-rw-r--r--pyaggr3g470r/lib/crawler.py257
-rw-r--r--pyaggr3g470r/lib/utils.py14
3 files changed, 271 insertions, 0 deletions
diff --git a/pyaggr3g470r/lib/__init__.py b/pyaggr3g470r/lib/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/pyaggr3g470r/lib/__init__.py
diff --git a/pyaggr3g470r/lib/crawler.py b/pyaggr3g470r/lib/crawler.py
new file mode 100644
index 00000000..64ef8b6d
--- /dev/null
+++ b/pyaggr3g470r/lib/crawler.py
@@ -0,0 +1,257 @@
+"""
+Here's a sum up on how it works :
+
+CrawlerScheduler.run
+ will retreive a list of feeds to be refreshed and pass result to
+CrawlerScheduler.callback
+ which will retreive each feed and treat result with
+FeedCrawler.callback
+ which will interprete the result (status_code, etag) collect ids
+ and match them agaisnt pyagg which will cause
+PyAggUpdater.callback
+ to create the missing entries
+"""
+
+import time
+import conf
+import json
+import logging
+import requests
+import feedparser
+import dateutil.parser
+from functools import wraps
+from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor
+from requests_futures.sessions import FuturesSession
+from pyaggr3g470r.lib.utils import default_handler
+
+logger = logging.getLogger(__name__)
+API_ROOT = "api/v2.0/"
+
+
+def extract_id(entry, keys=[('link', 'link'),
+ ('published', 'retrieved_date'),
+ ('updated', 'retrieved_date')], force_id=False):
+ """For a given entry will return a dict that allows to identify it. The
+ dict will be constructed on the uid of the entry. if that identifier is
+ absent, the dict will be constructed upon the values of "keys".
+ """
+ entry_id = entry.get('entry_id') or entry.get('id')
+ if entry_id:
+ return {'entry_id': entry_id}
+ if not entry_id and force_id:
+ entry_id = hash("".join(entry[entry_key] for _, entry_key in keys
+ if entry_key in entry))
+ else:
+ ids = {}
+ for entry_key, pyagg_key in keys:
+ if entry_key in entry and pyagg_key not in ids:
+ ids[pyagg_key] = entry[entry_key]
+ if 'date' in pyagg_key:
+ ids[pyagg_key] = dateutil.parser.parse(ids[pyagg_key])\
+ .isoformat()
+ return ids
+
+
+class AbstractCrawler:
+ __session__ = None
+ __counter__ = 0
+
+ def __init__(self, auth):
+ self.auth = auth
+ self.session = self.get_session()
+ self.url = conf.PLATFORM_URL
+
+ @classmethod
+ def get_session(cls):
+ """methods that allows us to treat session as a singleton"""
+ if cls.__session__ is None:
+ cls.__session__ = FuturesSession(
+ executor=ThreadPoolExecutor(max_workers=conf.NB_WORKER))
+ cls.__session__.verify = False
+ return cls.__session__
+
+ @classmethod
+ def count_on_me(cls, func):
+ """A basic decorator which will count +1 at the begining of a call
+ and -1 at the end. It kinda allows us to wait for the __counter__ value
+ to be 0, meaning nothing is done anymore."""
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ cls.__counter__ += 1
+ result = func(*args, **kwargs)
+ cls.__counter__ -= 1
+ return result
+ return wrapper
+
+ def query_pyagg(self, method, urn, data=None):
+ """A wrapper for internal call, method should be ones you can find
+ on requests (header, post, get, options, ...), urn the distant
+ resources you want to access on pyagg, and data, the data you wanna
+ transmit."""
+ if data is None:
+ data = {}
+ method = getattr(self.session, method)
+ return method("%s%s%s" % (self.url, API_ROOT, urn),
+ auth=self.auth, data=json.dumps(data,
+ default=default_handler),
+ headers={'Content-Type': 'application/json'})
+
+ @classmethod
+ def wait(cls):
+ "See count_on_me, that method will just wait for the counter to be 0"
+ time.sleep(1)
+ while cls.__counter__:
+ time.sleep(1)
+
+
+class PyAggUpdater(AbstractCrawler):
+
+ def __init__(self, feed, entries, headers, auth):
+ self.feed = feed
+ self.entries = entries
+ self.headers = headers
+ super(PyAggUpdater, self).__init__(auth)
+
+ def to_article(self, entry):
+ "Safe method to transorm a feedparser entry into an article"
+ date = datetime.now()
+
+ for date_key in ('published', 'updated'):
+ if entry.get(date_key):
+ try:
+ date = dateutil.parser.parse(entry[date_key])
+ except Exception:
+ pass
+ else:
+ break
+ content = ''
+ if entry.get('content'):
+ content = entry['content'][0]['value']
+ elif entry.get('summary'):
+ content = entry['summary']
+
+ return {'feed_id': self.feed['id'],
+ 'entry_id': extract_id(entry).get('entry_id', None),
+ 'link': entry.get('link', self.feed['site_link']),
+ 'title': entry.get('title', 'No title'),
+ 'readed': False, 'like': False,
+ 'content': content,
+ 'retrieved_date': date.isoformat(),
+ 'date': date.isoformat()}
+
+ @AbstractCrawler.count_on_me
+ def callback(self, response):
+ """Will process the result from the challenge, creating missing article
+ and updating the feed"""
+ results = response.result().json()
+ logger.debug('%r %r - %d entries were not matched and will be created',
+ self.feed['id'], self.feed['title'], len(results))
+ for id_to_create in results:
+ entry = self.to_article(
+ self.entries[tuple(sorted(id_to_create.items()))])
+ logger.info('creating %r - %r', entry['title'], id_to_create)
+ self.query_pyagg('post', 'article', entry)
+
+ now = datetime.now()
+ logger.debug('%r %r - updating feed etag %r last_mod %r',
+ self.feed['id'], self.feed['title'],
+ self.headers.get('etag'), now)
+
+ self.query_pyagg('put', 'feed/%d' % self.feed['id'], {'error_count': 0,
+ 'etag': self.headers.get('etag', ''),
+ 'last_error': '',
+ 'last_modified': self.headers.get('last-modified', '')})
+
+
+class FeedCrawler(AbstractCrawler):
+
+ def __init__(self, feed, auth):
+ self.feed = feed
+ super(FeedCrawler, self).__init__(auth)
+
+ def clean_feed(self):
+ """Will reset the errors counters on a feed that have known errors"""
+ if self.feed.get('error_count') or self.feed.get('last_error'):
+ self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': 0, 'last_error': ''})
+
+ @AbstractCrawler.count_on_me
+ def callback(self, response):
+ """will fetch the feed and interprete results (304, etag) or will
+ challenge pyagg to compare gotten entries with existing ones"""
+ try:
+ response = response.result()
+ response.raise_for_status()
+ except Exception as error:
+ error_count = self.feed['error_count'] + 1
+ logger.warn('%r %r - an error occured while fetching feed; bumping'
+ ' error count to %r', self.feed['id'],
+ self.feed['title'], error_count)
+ self.query_pyagg('put', 'feed/%d' % self.feed['id'],
+ {'error_count': error_count,
+ 'last_error': str(error)})
+ return
+
+ if response.status_code == 304:
+ logger.info("%r %r - feed responded with 304",
+ self.feed['id'], self.feed['title'])
+ self.clean_feed()
+ return
+ if self.feed['etag'] and response.headers.get('etag') \
+ and response.headers.get('etag') == self.feed['etag']:
+ logger.info("%r %r - feed responded with same etag (%d)",
+ self.feed['id'], self.feed['title'],
+ response.status_code)
+ self.clean_feed()
+ return
+ ids, entries = [], {}
+ parsed_response = feedparser.parse(response.text)
+ for entry in parsed_response['entries']:
+ entries[tuple(sorted(extract_id(entry).items()))] = entry
+ ids.append(extract_id(entry))
+ logger.debug('%r %r - found %d entries %r',
+ self.feed['id'], self.feed['title'], len(ids), ids)
+ future = self.query_pyagg('get', 'articles/challenge', {'ids': ids})
+ updater = PyAggUpdater(self.feed, entries, response.headers, self.auth)
+ future.add_done_callback(updater.callback)
+
+
+class CrawlerScheduler(AbstractCrawler):
+
+ def __init__(self, username, password):
+ self.auth = (username, password)
+ super(CrawlerScheduler, self).__init__(self.auth)
+
+ def prepare_headers(self, feed):
+ """For a known feed, will construct some header dictionnary"""
+ headers = {}
+ if feed.get('etag', None):
+ headers['If-None-Match'] = feed['etag']
+ if feed.get('last_modified'):
+ headers['If-Modified-Since'] = feed['last_modified']
+ logger.debug('%r %r - calculated headers %r',
+ feed['id'], feed['title'], headers)
+ return headers
+
+ @AbstractCrawler.count_on_me
+ def callback(self, response):
+ """processes feeds that need to be fetched"""
+ response = response.result()
+ response.raise_for_status()
+ feeds = response.json()
+ logger.debug('%d to fetch %r', len(feeds), feeds)
+ for feed in feeds:
+ logger.info('%r %r - fetching resources',
+ feed['id'], feed['title'])
+ future = self.session.get(feed['link'],
+ headers=self.prepare_headers(feed))
+ future.add_done_callback(FeedCrawler(feed, self.auth).callback)
+
+ @AbstractCrawler.count_on_me
+ def run(self, **kwargs):
+ """entry point, will retreive feeds to be fetch
+ and launch the whole thing"""
+ logger.debug('retreving fetchable feed')
+ future = self.query_pyagg('get', 'feeds/fetchable', kwargs)
+ future.add_done_callback(self.callback)
diff --git a/pyaggr3g470r/lib/utils.py b/pyaggr3g470r/lib/utils.py
new file mode 100644
index 00000000..a4f4b3ec
--- /dev/null
+++ b/pyaggr3g470r/lib/utils.py
@@ -0,0 +1,14 @@
+import types
+
+def default_handler(obj):
+ """JSON handler for default query formatting"""
+ if hasattr(obj, 'isoformat'):
+ return obj.isoformat()
+ if hasattr(obj, 'dump'):
+ return obj.dump()
+ if isinstance(obj, (set, frozenset, types.GeneratorType)):
+ return list(obj)
+ if isinstance(obj, BaseException):
+ return str(obj)
+ raise TypeError("Object of type %s with value of %r "
+ "is not JSON serializable" % (type(obj), obj))
bgstack15