aboutsummaryrefslogtreecommitdiff
path: root/newspipe/controllers/article.py
blob: f82ca5590d354ea0d81257b743111b08e2ff182b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import logging
from collections import Counter

import sqlalchemy
from sqlalchemy import func

from .abstract import AbstractController
from newspipe.bootstrap import db
from newspipe.controllers import CategoryController
from newspipe.controllers import FeedController
from newspipe.lib.article_utils import process_filters
from newspipe.models import Article

logger = logging.getLogger(__name__)


class ArticleController(AbstractController):
    _db_cls = Article

    def challenge(self, ids):
        """Will return each id that wasn't found in the database."""
        for id_ in ids:
            if self.read(**id_).first():
                continue
            yield id_

    def count_by_category(self, **filters):
        return self._count_by(Article.category_id, filters)

    def count_by_feed(self, **filters):
        return self._count_by(Article.feed_id, filters)

    def count_by_user_id(self, **filters):
        return dict(
            db.session.query(Article.user_id, func.count(Article.id))
            .filter(*self._to_filters(**filters))
            .group_by(Article.user_id)
            .all()
        )

    def create(self, **attrs):
        # handling special denorm for article rights
        assert "feed_id" in attrs, "must provide feed_id when creating article"
        feed = FeedController(attrs.get("user_id", self.user_id)).get(
            id=attrs["feed_id"]
        )
        if "user_id" in attrs:
            assert feed.user_id == attrs["user_id"] or self.user_id is None, (
                "no right on feed %r" % feed.id
            )
        attrs["user_id"], attrs["category_id"] = feed.user_id, feed.category_id

        skipped, read, liked = process_filters(feed.filters, attrs)
        if skipped:
            return None
        article = super().create(**attrs)
        return article

    def update(self, filters, attrs):
        user_id = attrs.get("user_id", self.user_id)
        if "feed_id" in attrs:
            feed = FeedController().get(id=attrs["feed_id"])
            assert feed.user_id == user_id, "no right on feed %r" % feed.id
            attrs["category_id"] = feed.category_id
        if attrs.get("category_id"):
            try:
                cat = CategoryController().get(id=attrs["category_id"])
                assert self.user_id is None or cat.user_id == user_id, (
                    "no right on cat %r" % cat.id
                )
            except Exception:
                pass
        return super().update(filters, attrs)

    def get_history(self, year=None, month=None):
        """
        Sort articles by year and month.
        """
        articles_counter = Counter()
        articles = self.read_light()
        if year is not None:
            articles = articles.filter(sqlalchemy.extract("year", Article.date) == year)
            if month is not None:
                articles = articles.filter(
                    sqlalchemy.extract("month", Article.date) == month
                )
        if year is not None:
            for article in articles.all():
                articles_counter[article.date.month] += 1
        else:
            for article in articles.all():
                articles_counter[article.date.year] += 1
        return articles_counter, articles

    def read_light(self, **filters):
        return (
            super()
            .read(**filters)
            .with_entities(
                Article.id,
                Article.title,
                Article.readed,
                Article.like,
                Article.feed_id,
                Article.date,
                Article.category_id,
            )
            .order_by(Article.date.desc())
        )

    def read_ordered(self, **filters):
        return super().read(**filters).order_by(Article.date.desc())
bgstack15