diff options
Diffstat (limited to 'pyaggr3g470r')
-rw-r--r-- | pyaggr3g470r/controllers/feed.py | 2 | ||||
-rw-r--r-- | pyaggr3g470r/lib/crawler.py | 68 |
2 files changed, 33 insertions, 37 deletions
diff --git a/pyaggr3g470r/controllers/feed.py b/pyaggr3g470r/controllers/feed.py index a8e96217..90ee6ea5 100644 --- a/pyaggr3g470r/controllers/feed.py +++ b/pyaggr3g470r/controllers/feed.py @@ -41,7 +41,7 @@ class FeedController(AbstractController): return [feed for feed in self.read( error_count__lt=max_error, enabled=True, last_retrieved__lt=max_last) - .order_by('Feed.last_retrieved') + .order_by('last_retrieved') .limit(limit)] def list_fetchable(self, max_error=DEFAULT_MAX_ERROR, limit=DEFAULT_LIMIT, diff --git a/pyaggr3g470r/lib/crawler.py b/pyaggr3g470r/lib/crawler.py index 91942c59..62d41494 100644 --- a/pyaggr3g470r/lib/crawler.py +++ b/pyaggr3g470r/lib/crawler.py @@ -32,22 +32,14 @@ API_ROOT = "api/v2.0/" class AbstractCrawler: - __session__ = None - def __init__(self, auth): + def __init__(self, auth, pool=None, session=None): self.auth = auth - self.session = self.get_session() + self.pool = pool or ThreadPoolExecutor(max_workers=conf.NB_WORKER) + self.session = session or FuturesSession(executor=self.pool) + self.session.verify = False self.url = conf.PLATFORM_URL - @classmethod - def get_session(cls): - """methods that allows us to treat session as a singleton""" - if cls.__session__ is None: - cls.__session__ = FuturesSession( - executor=ThreadPoolExecutor(max_workers=conf.NB_WORKER)) - cls.__session__.verify = False - return cls.__session__ - def query_pyagg(self, method, urn, data=None): """A wrapper for internal call, method should be ones you can find on requests (header, post, get, options, ...), urn the distant @@ -62,34 +54,31 @@ class AbstractCrawler: headers={'Content-Type': 'application/json', 'User-Agent': 'pyaggr3g470r'}) - @classmethod - def wait(cls, max_wait=300, checks=5, wait_for=2): - "See count_on_me, that method will just wait for the counter to be 0" + def wait(self, max_wait=300, checks=5, wait_for=2): checked, second_waited = 0, 0 - checked = 0 while True: time.sleep(wait_for) second_waited += wait_for if second_waited > max_wait: - logger.warn('Exiting after %d seconds, counter at %d', - max_wait, len(cls.__counter__)) + logger.warn('Exiting after %d seconds', second_waited) break - if cls.get_session().executor._work_queue.queue: + if self.pool._work_queue.qsize(): checked = 0 - continue + continue checked += 1 if checked == checks: - break + break class PyAggUpdater(AbstractCrawler): - def __init__(self, feed, entries, headers, parsed_feed, auth): + def __init__(self, feed, entries, headers, parsed_feed, + auth, pool=None, session=None): self.feed = feed self.entries = entries self.headers = headers self.parsed_feed = parsed_feed - super().__init__(auth) + super().__init__(auth, pool, session) def callback(self, response): """Will process the result from the challenge, creating missing article @@ -132,21 +121,25 @@ class PyAggUpdater(AbstractCrawler): up_feed['last_retrieved'] \ = (datetime.now() - timedelta(minutes=45)).isoformat() - if any([up_feed[key] != self.feed.get(key) for key in up_feed]): - logger.warn('%r %r - pushing feed attrs %r', - self.feed['id'], self.feed['title'], - {key: "%s -> %s" % (up_feed[key], self.feed.get(key)) - for key in up_feed if up_feed[key] != self.feed.get(key)}) + diff_keys = {key for key in up_feed + if up_feed[key] != self.feed.get(key)} + if not diff_keys: + return # no change in the feed, no update + if not article_created and diff_keys == {'last_modified', 'etag'}: + return # meaningless if no new article has been published + logger.info('%r %r - pushing feed attrs %r', + self.feed['id'], self.feed['title'], + {key: "%s -> %s" % (up_feed[key], self.feed.get(key)) + for key in up_feed if up_feed[key] != self.feed.get(key)}) - future = self.query_pyagg('put', - 'feed/%d' % self.feed['id'], up_feed) + future = self.query_pyagg('put', 'feed/%d' % self.feed['id'], up_feed) class FeedCrawler(AbstractCrawler): - def __init__(self, feed, auth): + def __init__(self, feed, auth, pool=None, session=None): self.feed = feed - super().__init__(auth) + super().__init__(auth, pool, session) def clean_feed(self): """Will reset the errors counters on a feed that have known errors""" @@ -211,15 +204,16 @@ class FeedCrawler(AbstractCrawler): self.feed['id'], self.feed['title'], len(ids), ids) future = self.query_pyagg('get', 'articles/challenge', {'ids': ids}) updater = PyAggUpdater(self.feed, entries, response.headers, - parsed_response, self.auth) + parsed_response, + self.auth, self.pool, self.session) future.add_done_callback(updater.callback) class CrawlerScheduler(AbstractCrawler): - def __init__(self, username, password): + def __init__(self, username, password, pool=None, session=None): self.auth = (username, password) - super(CrawlerScheduler, self).__init__(self.auth) + super(CrawlerScheduler, self).__init__(self.auth, pool, session) def prepare_headers(self, feed): """For a known feed, will construct some header dictionnary""" @@ -246,7 +240,9 @@ class CrawlerScheduler(AbstractCrawler): feed['id'], feed['title']) future = self.session.get(feed['link'], headers=self.prepare_headers(feed)) - future.add_done_callback(FeedCrawler(feed, self.auth).callback) + + feed_crwlr = FeedCrawler(feed, self.auth, self.pool, self.session) + future.add_done_callback(feed_crwlr.callback) def run(self, **kwargs): """entry point, will retreive feeds to be fetch |