1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
import re
import logging
import sqlalchemy
from sqlalchemy import func
from collections import Counter
from bootstrap import db
from .abstract import AbstractController
from web.controllers import FeedController
from web.models import Article
logger = logging.getLogger(__name__)
class ArticleController(AbstractController):
_db_cls = Article
def get(self, **filters):
article = super(ArticleController, self).get(**filters)
if not article.readed:
self.update({'id': article.id}, {'readed': True})
return article
def challenge(self, ids):
"""Will return each id that wasn't found in the database."""
for id_ in ids:
if self.read(**id_).first():
continue
yield id_
def count_by_feed(self, **filters):
if self.user_id:
filters['user_id'] = self.user_id
return dict(db.session.query(Article.feed_id, func.count(Article.id))
.filter(*self._to_filters(**filters))
.group_by(Article.feed_id).all())
def count_by_user_id(self, **filters):
return dict(db.session.query(Article.user_id, func.count(Article.id))
.filter(*self._to_filters(**filters))
.group_by(Article.user_id).all())
def create(self, **attrs):
# handling special denorm for article rights
assert 'feed_id' in attrs
feed = FeedController(
attrs.get('user_id', self.user_id)).get(id=attrs['feed_id'])
if 'user_id' in attrs:
assert feed.user_id == attrs['user_id'] or self.user_id is None
attrs['user_id'], attrs['category_id'] = feed.user_id, feed.category_id
# handling feed's filters
for filter_ in feed.filters or []:
match = False
if filter_.get('type') == 'regex':
match = re.match(filter_['pattern'], attrs.get('title', ''))
elif filter_.get('type') == 'simple match':
match = filter_['pattern'] in attrs.get('title', '')
take_action = match and filter_.get('action on') == 'match' \
or not match and filter_.get('action on') == 'no match'
if not take_action:
continue
if filter_.get('action') == 'mark as read':
attrs['readed'] = True
logger.warn("article %s will be created as read",
attrs['link'])
elif filter_.get('action') == 'mark as favorite':
attrs['like'] = True
logger.warn("article %s will be created as liked",
attrs['link'])
return super().create(**attrs)
def get_history(self, year=None, month=None):
"""
Sort articles by year and month.
"""
articles_counter = Counter()
articles = self.read()
if year is not None:
articles = articles.filter(
sqlalchemy.extract('year', Article.date) == year)
if month is not None:
articles = articles.filter(
sqlalchemy.extract('month', Article.date) == month)
for article in articles.all():
if year is not None:
articles_counter[article.date.month] += 1
else:
articles_counter[article.date.year] += 1
return articles_counter, articles
|