aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCédric Bonhomme <cedric@cedricbonhomme.org>2019-05-21 23:49:03 +0200
committerCédric Bonhomme <cedric@cedricbonhomme.org>2019-05-21 23:49:03 +0200
commit7d9bc6621eea035034cd56d20de9333f09387940 (patch)
treed9b985163e990012987b64c7d563c5110a9ff0ca
parentMerge branch 'master' of gitlab.com:newspipe/newspipe (diff)
downloadnewspipe-7d9bc6621eea035034cd56d20de9333f09387940.tar.gz
newspipe-7d9bc6621eea035034cd56d20de9333f09387940.tar.bz2
newspipe-7d9bc6621eea035034cd56d20de9333f09387940.zip
Improved crawler.
-rw-r--r--src/crawler/default_crawler.py224
-rwxr-xr-xsrc/manager.py15
2 files changed, 117 insertions, 122 deletions
diff --git a/src/crawler/default_crawler.py b/src/crawler/default_crawler.py
index 38810fc0..bba8431a 100644
--- a/src/crawler/default_crawler.py
+++ b/src/crawler/default_crawler.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -
# newspipe - A Web based news aggregator.
-# Copyright (C) 2010-2018 Cédric Bonhomme - https://www.cedricbonhomme.org
+# Copyright (C) 2010-2019 Cédric Bonhomme - https://www.cedricbonhomme.org
#
# For more information : https://gitlab.com/newspipe/newspipe
#
@@ -20,9 +20,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Cedric Bonhomme"
-__version__ = "$Revision: 3.3 $"
+__version__ = "$Revision: 4.0 $"
__date__ = "$Date: 2010/09/02 $"
-__revision__ = "$Date: 2015/12/07 $"
+__revision__ = "$Date: 2010/05/21 $"
__copyright__ = "Copyright (c) Cedric Bonhomme"
__license__ = "AGPLv3"
@@ -45,18 +45,8 @@ logger = logging.getLogger(__name__)
sem = asyncio.Semaphore(5)
-import ssl
-try:
- _create_unverified_https_context = ssl._create_unverified_context
-except AttributeError:
- # Legacy Python that doesn't verify HTTPS certificates by default
- pass
-else:
- # Handle target environment that doesn't support HTTPS verification
- ssl._create_default_https_context = _create_unverified_https_context
-
-async def get(*args, **kwargs):
+def get(*args, **kwargs):
#kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False)
try:
logger.info('Retrieving feed {}'.format(args[0]))
@@ -66,7 +56,7 @@ async def get(*args, **kwargs):
raise e
-async def parse_feed(user, feed):
+def parse_feed(user, feed):
"""
Fetch a feed.
Update the feed and return the articles.
@@ -74,21 +64,21 @@ async def parse_feed(user, feed):
parsed_feed = None
up_feed = {}
articles = []
- with (await sem):
- try:
- parsed_feed = await get(feed.link)
- except Exception as e:
- up_feed['last_error'] = str(e)
- up_feed['error_count'] = feed.error_count + 1
- logger.exception("error when parsing feed: " + str(e))
- finally:
- up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal())
- if parsed_feed is None:
- try:
- FeedController().update({'id': feed.id}, up_feed)
- except Exception as e:
- logger.exception('something bad here: ' + str(e))
- return
+ #with (await sem):
+ try:
+ parsed_feed = get(feed.link)
+ except Exception as e:
+ up_feed['last_error'] = str(e)
+ up_feed['error_count'] = feed.error_count + 1
+ logger.exception("error when parsing feed: " + str(e))
+ finally:
+ up_feed['last_retrieved'] = datetime.now(dateutil.tz.tzlocal())
+ if parsed_feed is None:
+ try:
+ FeedController().update({'id': feed.id}, up_feed)
+ except Exception as e:
+ logger.exception('something bad here: ' + str(e))
+ return
if not is_parsing_ok(parsed_feed):
up_feed['last_error'] = str(parsed_feed['bozo_exception'])
@@ -114,95 +104,97 @@ async def parse_feed(user, feed):
return articles
-async def insert_database(user, feed):
- articles = await parse_feed(user, feed)
- if None is articles:
- return []
-
- logger.info('Inserting articles for {}'.format(feed.title))
-
- new_articles = []
- art_contr = ArticleController(user.id)
- for article in articles:
- new_article = await construct_article(article, feed)
-
- try:
- existing_article_req = art_contr.read(feed_id=feed.id,
- entry_id=extract_id(article))
- except Exception as e:
- logger.exception("existing_article_req: " + str(e))
- continue
- exist = existing_article_req.count() != 0
- if exist:
- continue
- # if the article has been already retrieved, we only update
- # the content or the title
- logger.info('Article already in the database: {}'. \
- format(article['link']))
- existing_article = existing_article_req.first()
-
- if new_article['date'].replace(tzinfo=None) != \
- existing_article.date:
- existing_article.date = new_article['date']
- existing_article.updated_date = new_article['date']
- if existing_article.title != new_article['title']:
- existing_article.title = new_article['title']
- content = get_article_content(article)
- if existing_article.content != content:
- existing_article.content = content
- existing_article.readed = False
- art_contr.update({'entry_id': existing_article.entry_id},
- existing_article.dump())
- logger.info('Article updated: {}'.format(article['link']))
- continue
-
- # insertion of the new article
- try:
- new_articles.append(art_contr.create(**new_article))
- logger.info('New article added: {}'.format(new_article['link']))
- except Exception:
- logger.exception('Error when inserting article in database:')
+async def insert_articles(queue, nḅ_producers=1):
+ """Consumer coroutines.
+ """
+ nb_producers_done = 0
+ while True:
+ item = await queue.get()
+ if item is None:
+ nb_producers_done += 1
+ if nb_producers_done == nḅ_producers:
+ print('All producers done.')
+ print('Process finished.')
+ break
continue
- return new_articles
-
-
-async def init_process(user, feed):
- # Fetch the feed and insert new articles in the database
- try:
- articles = await insert_database(user, feed)
- logger.debug('inserted articles for %s', feed.title)
- return articles
- except Exception as e:
- logger.exception('init_process: ' + str(e))
-
-def retrieve_feed(loop, user, feed_id=None):
+ user, feed, articles = item
+
+
+ if None is articles:
+ logger.info('None')
+ articles = []
+
+ logger.info('Inserting articles for {}'.format(feed.title))
+
+ new_articles = []
+ art_contr = ArticleController(user.id)
+ for article in articles:
+ new_article = await construct_article(article, feed)
+
+ try:
+ existing_article_req = art_contr.read(feed_id=feed.id,
+ entry_id=extract_id(article))
+ except Exception as e:
+ logger.exception("existing_article_req: " + str(e))
+ continue
+ exist = existing_article_req.count() != 0
+ if exist:
+ continue
+ # if the article has been already retrieved, we only update
+ # the content or the title
+ logger.info('Article already in the database: {}'. \
+ format(article['link']))
+ existing_article = existing_article_req.first()
+
+ if new_article['date'].replace(tzinfo=None) != \
+ existing_article.date:
+ existing_article.date = new_article['date']
+ existing_article.updated_date = new_article['date']
+ if existing_article.title != new_article['title']:
+ existing_article.title = new_article['title']
+ content = get_article_content(article)
+ if existing_article.content != content:
+ existing_article.content = content
+ existing_article.readed = False
+ art_contr.update({'entry_id': existing_article.entry_id},
+ existing_article.dump())
+ logger.info('Article updated: {}'.format(article['link']))
+ continue
+
+ # insertion of the new article
+ try:
+ new_articles.append(art_contr.create(**new_article))
+ logger.info('New article added: {}'.format(new_article['link']))
+ except Exception:
+ logger.exception('Error when inserting article in database:')
+ continue
+
+
+async def retrieve_feed(queue, users, feed_id=None):
"""
Launch the processus.
"""
- logger.info('Starting to retrieve feeds for {}'.format(user.nickname))
-
- # Get the list of feeds to fetch
- filters = {}
- filters['user_id'] = user.id
- if feed_id is not None:
- filters['id'] = feed_id
- filters['enabled'] = True
- filters['error_count__lt'] = conf.DEFAULT_MAX_ERROR
- filters['last_retrieved__lt'] = datetime.now() - \
- timedelta(minutes=conf.FEED_REFRESH_INTERVAL)
- feeds = FeedController().read(**filters).all()
-
- if feeds == []:
- logger.info('No feed to retrieve for {}'.format(user.nickname))
- return
-
- # Launch the process for all the feeds
- tasks = [asyncio.ensure_future(init_process(user, feed)) for feed in feeds]
+ for user in users:
+ logger.info('Starting to retrieve feeds for {}'.format(user.nickname))
+ filters = {}
+ filters['user_id'] = user.id
+ if feed_id is not None:
+ filters['id'] = feed_id
+ filters['enabled'] = True
+ filters['error_count__lt'] = conf.DEFAULT_MAX_ERROR
+ filters['last_retrieved__lt'] = datetime.now() - \
+ timedelta(minutes=conf.FEED_REFRESH_INTERVAL)
+ feeds = FeedController().read(**filters).all()
+
+
+ if feeds == []:
+ logger.info('No feed to retrieve for {}'.format(user.nickname))
+
+
+ for feed in feeds:
+ articles = parse_feed(user, feed)
+ await queue.put((user, feed, articles))
+
+ await queue.put(None)
- try:
- loop.run_until_complete(asyncio.wait(tasks))
- except:
- logger.exception('an error occured')
- finally:
- logger.info('Articles retrieved for {}'.format(user.nickname))
diff --git a/src/manager.py b/src/manager.py
index 795b3974..244ab77f 100755
--- a/src/manager.py
+++ b/src/manager.py
@@ -65,16 +65,19 @@ def fetch_asyncio(user_id=None, feed_id=None):
feed_id = int(feed_id)
except:
feed_id = None
+
+
+ loop = asyncio.get_event_loop()
+ queue = asyncio.Queue(maxsize=2, loop=loop)
+
+ producer_coro = default_crawler.retrieve_feed(queue, users, feed_id)
+ consumer_coro = default_crawler.insert_articles(queue, 1)
logger.info('Starting crawler.')
-
start = datetime.now()
- loop = asyncio.get_event_loop()
- for user in users:
- default_crawler.retrieve_feed(loop, user, feed_id)
- loop.close()
+ loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
end = datetime.now()
-
+ loop.close()
logger.info('Crawler finished in {} seconds.' \
.format((end - start).seconds))
bgstack15