aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCédric Bonhomme <cedric@cedricbonhomme.org>2015-02-08 00:08:58 +0100
committerCédric Bonhomme <cedric@cedricbonhomme.org>2015-02-08 00:08:58 +0100
commit0152456fe8bc0549e581865c8925006d014ecd64 (patch)
treea87f82a9f64b04ecfc957e558cd8652bef10ff43
parentAdded aiohttp to the list of requirements. (diff)
downloadnewspipe-0152456fe8bc0549e581865c8925006d014ecd64.tar.gz
newspipe-0152456fe8bc0549e581865c8925006d014ecd64.tar.bz2
newspipe-0152456fe8bc0549e581865c8925006d014ecd64.zip
Misc improvements for the crawler. A semaphore is used to limit the number of simultaneous connection.
-rw-r--r--pyaggr3g470r/crawler.py104
1 files changed, 51 insertions, 53 deletions
diff --git a/pyaggr3g470r/crawler.py b/pyaggr3g470r/crawler.py
index dea06b64..cdbbab8d 100644
--- a/pyaggr3g470r/crawler.py
+++ b/pyaggr3g470r/crawler.py
@@ -20,9 +20,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Cedric Bonhomme"
-__version__ = "$Revision: 3.0 $"
+__version__ = "$Revision: 3.1 $"
__date__ = "$Date: 2010/09/02 $"
-__revision__ = "$Date: 2015/01/21 $"
+__revision__ = "$Date: 2015/02/08 $"
__copyright__ = "Copyright (c) Cedric Bonhomme"
__license__ = "AGPLv3"
@@ -41,26 +41,32 @@ from pyaggr3g470r.models import User, Article
logger = logging.getLogger(__name__)
-#
-# asyncio examples:
-# -http://compiletoi.net/fast-scraping-in-python-with-asyncio.html
-# - https://gist.github.com/kunev/f83146d407c81a2d64a6
-#
+sem = asyncio.Semaphore(5)
@asyncio.coroutine
def get(*args, **kwargs):
kwargs["connector"] = aiohttp.TCPConnector(verify_ssl=False)
- response = yield from aiohttp.request('GET', *args, **kwargs)
- return (yield from response.read_and_close(decode=False))
+ try:
+ #logger.info("Fetching the feed: " + args[0])
+ response = yield from aiohttp.request('GET', *args, **kwargs)
+ return (yield from response.read_and_close(decode=False))
+ except Exception as e:
+ print(e)
+ return None
@asyncio.coroutine
-def fetch(user, feed):
+def parse_feed(user, feed):
"""
Fetch a feed.
"""
- logger.info("Fetching the feed: " + feed.title)
- print("Fetching the feed: " + feed.title)
- data = yield from get(feed.link)
+ data = None
+
+ with (yield from sem):
+ data = yield from get(feed.link)
+
+ if data is None:
+ return
+
a_feed = feedparser.parse(data)
if a_feed['bozo'] == 1:
logger.error(a_feed['bozo_exception'])
@@ -151,11 +157,11 @@ def fetch(user, feed):
@asyncio.coroutine
def insert_database(user, feed):
- articles = yield from asyncio.async(fetch(user, feed))
+ articles = yield from asyncio.async(parse_feed(user, feed))
if None is articles:
return []
- print('inserting articles for {}'.format(feed.title))
+ #print('inserting articles for {}'.format(feed.title))
logger.info("Database insertion...")
new_articles = []
@@ -174,7 +180,6 @@ def insert_database(user, feed):
#db.session.merge(article)
db.session.commit()
#logger.info("New article % (%r) added.", article.title, article.link)
- print("New article added: " + article.title)
except Exception as e:
logger.error("Error when inserting article in database: " + str(e))
continue
@@ -182,42 +187,35 @@ def insert_database(user, feed):
return new_articles
@asyncio.coroutine
-def done(feed):
- print('done {}'.format(feed.title))
-
-sem = asyncio.Semaphore(5)
-
-@asyncio.coroutine
-def process_data(user, feed):
- with (yield from sem):
- data = yield from asyncio.async(insert_database(user, feed))
- print('inserted articles for {}'.format(feed.title))
+def init_process(user, feed):
+ data = yield from asyncio.async(insert_database(user, feed))
+ #print('inserted articles for {}'.format(feed.title))
def retrieve_feed(user, feed_id=None):
- """
- Launch the processus.
- """
- logger.info("Starting to retrieve feeds.")
-
- # 1 - Get the list of feeds to fetch
- user = User.query.filter(User.email == user.email).first()
- feeds = [feed for feed in user.feeds if feed.enabled]
- if feed_id is not None:
- feeds = [feed for feed in feeds if feed.id == feed_id]
-
- # 2 - Fetch the feeds.
- loop = asyncio.get_event_loop()
- f = asyncio.wait([process_data(user, feed) for feed in feeds])
- loop.run_until_complete(f)
-
- """
- # 4 - Indexation
- if not conf.ON_HEROKU:
- self.index(new_articles)
-
- # 5 - Mail notification
- if not conf.ON_HEROKU and conf.NOTIFICATION_ENABLED:
- self.mail_notification(new_articles)
- """
-
- logger.info("All articles retrieved. End of the processus.") \ No newline at end of file
+ """
+ Launch the processus.
+ """
+ logger.info("Starting to retrieve feeds.")
+
+ # 1 - Get the list of feeds to fetch
+ user = User.query.filter(User.email == user.email).first()
+ feeds = [feed for feed in user.feeds if feed.enabled]
+ if feed_id is not None:
+ feeds = [feed for feed in feeds if feed.id == feed_id]
+
+ # 2 - Fetch the feeds.
+ loop = asyncio.get_event_loop()
+ f = asyncio.wait([init_process(user, feed) for feed in feeds])
+ loop.run_until_complete(f)
+
+ """
+ # 4 - Indexation
+ if not conf.ON_HEROKU:
+ self.index(new_articles)
+
+ # 5 - Mail notification
+ if not conf.ON_HEROKU and conf.NOTIFICATION_ENABLED:
+ self.mail_notification(new_articles)
+ """
+
+ logger.info("All articles retrieved. End of the processus.") \ No newline at end of file
bgstack15