aboutsummaryrefslogtreecommitdiff
path: root/newspipe/web/controllers/article.py
blob: 03342a1fc3732f6a4dac0f9d04e1c070ef0b4438 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import re
import logging
import sqlalchemy
from sqlalchemy import func
from collections import Counter

from bootstrap import db
from .abstract import AbstractController
from lib.article_utils import process_filters
from web.controllers import CategoryController, FeedController
from web.models import Article

logger = logging.getLogger(__name__)


class ArticleController(AbstractController):
    _db_cls = Article

    def challenge(self, ids):
        """Will return each id that wasn't found in the database."""
        for id_ in ids:
            if self.read(**id_).first():
                continue
            yield id_

    def count_by_category(self, **filters):
        return self._count_by(Article.category_id, filters)

    def count_by_feed(self, **filters):
        return self._count_by(Article.feed_id, filters)

    def count_by_user_id(self, **filters):
        return dict(
            db.session.query(Article.user_id, func.count(Article.id))
            .filter(*self._to_filters(**filters))
            .group_by(Article.user_id)
            .all()
        )

    def create(self, **attrs):
        # handling special denorm for article rights
        assert "feed_id" in attrs, "must provide feed_id when creating article"
        feed = FeedController(attrs.get("user_id", self.user_id)).get(
            id=attrs["feed_id"]
        )
        if "user_id" in attrs:
            assert feed.user_id == attrs["user_id"] or self.user_id is None, (
                "no right on feed %r" % feed.id
            )
        attrs["user_id"], attrs["category_id"] = feed.user_id, feed.category_id

        skipped, read, liked = process_filters(feed.filters, attrs)
        if skipped:
            return None
        article = super().create(**attrs)
        return article

    def update(self, filters, attrs):
        user_id = attrs.get("user_id", self.user_id)
        if "feed_id" in attrs:
            feed = FeedController().get(id=attrs["feed_id"])
            assert feed.user_id == user_id, "no right on feed %r" % feed.id
            attrs["category_id"] = feed.category_id
        if attrs.get("category_id"):
            cat = CategoryController().get(id=attrs["category_id"])
            assert self.user_id is None or cat.user_id == user_id, (
                "no right on cat %r" % cat.id
            )
        return super().update(filters, attrs)

    def get_history(self, year=None, month=None):
        """
        Sort articles by year and month.
        """
        articles_counter = Counter()
        articles = self.read()
        if year is not None:
            articles = articles.filter(sqlalchemy.extract("year", Article.date) == year)
            if month is not None:
                articles = articles.filter(
                    sqlalchemy.extract("month", Article.date) == month
                )
        for article in articles.all():
            if year is not None:
                articles_counter[article.date.month] += 1
            else:
                articles_counter[article.date.year] += 1
        return articles_counter, articles

    def read_light(self, **filters):
        return (
            super()
            .read(**filters)
            .with_entities(
                Article.id,
                Article.title,
                Article.readed,
                Article.like,
                Article.feed_id,
                Article.date,
                Article.category_id,
            )
            .order_by(Article.date.desc())
        )
bgstack15