1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
import re
import logging
import sqlalchemy
from sqlalchemy import func
from collections import Counter
from bootstrap import db
from .abstract import AbstractController
from lib.article_utils import process_filters
from web.controllers import CategoryController, FeedController
from web.models import Article
logger = logging.getLogger(__name__)
class ArticleController(AbstractController):
_db_cls = Article
def challenge(self, ids):
"""Will return each id that wasn't found in the database."""
for id_ in ids:
if self.read(**id_).first():
continue
yield id_
def count_by_category(self, **filters):
return self._count_by(Article.category_id, filters)
def count_by_feed(self, **filters):
return self._count_by(Article.feed_id, filters)
def count_by_user_id(self, **filters):
return dict(db.session.query(Article.user_id, func.count(Article.id))
.filter(*self._to_filters(**filters))
.group_by(Article.user_id).all())
def create(self, **attrs):
# handling special denorm for article rights
assert 'feed_id' in attrs, "must provide feed_id when creating article"
feed = FeedController(
attrs.get('user_id', self.user_id)).get(id=attrs['feed_id'])
if 'user_id' in attrs:
assert feed.user_id == attrs['user_id'] or self.user_id is None, \
"no right on feed %r" % feed.id
attrs['user_id'], attrs['category_id'] = feed.user_id, feed.category_id
skipped, read, liked = process_filters(feed.filters, attrs)
if skipped:
return None
article = super().create(**attrs)
return article
def update(self, filters, attrs):
user_id = attrs.get('user_id', self.user_id)
if 'feed_id' in attrs:
feed = FeedController().get(id=attrs['feed_id'])
assert feed.user_id == user_id, "no right on feed %r" % feed.id
attrs['category_id'] = feed.category_id
if attrs.get('category_id'):
cat = CategoryController().get(id=attrs['category_id'])
assert self.user_id is None or cat.user_id == user_id, \
"no right on cat %r" % cat.id
return super().update(filters, attrs)
def get_history(self, year=None, month=None):
"""
Sort articles by year and month.
"""
articles_counter = Counter()
articles = self.read()
if year is not None:
articles = articles.filter(
sqlalchemy.extract('year', Article.date) == year)
if month is not None:
articles = articles.filter(
sqlalchemy.extract('month', Article.date) == month)
for article in articles.all():
if year is not None:
articles_counter[article.date.month] += 1
else:
articles_counter[article.date.year] += 1
return articles_counter, articles
def read_light(self, **filters):
return super().read(**filters).with_entities(Article.id, Article.title,
Article.readed, Article.like, Article.feed_id, Article.date,
Article.category_id).order_by(Article.date.desc())
|