aboutsummaryrefslogtreecommitdiff
path: root/newspipe/lib/feed_utils.py
blob: 9f1e23543adf2c42a1c520b25a5f52e14ae72a01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import html
import urllib
import logging
import requests
import feedparser
from conf import CRAWLER_USER_AGENT
from bs4 import BeautifulSoup, SoupStrainer

from lib.utils import try_keys, try_get_icon_url, rebuild_url

logger = logging.getLogger(__name__)
logging.captureWarnings(True)
ACCEPTED_MIMETYPES = (
    "application/rss+xml",
    "application/rdf+xml",
    "application/atom+xml",
    "application/xml",
    "text/xml",
)


def is_parsing_ok(parsed_feed):
    return parsed_feed["entries"] or not parsed_feed["bozo"]


def escape_keys(*keys):
    def wrapper(func):
        def metawrapper(*args, **kwargs):
            result = func(*args, **kwargs)
            for key in keys:
                if key in result:
                    result[key] = html.unescape(result[key] or "")
            return result

        return metawrapper

    return wrapper


@escape_keys("title", "description")
def construct_feed_from(url=None, fp_parsed=None, feed=None, query_site=True):
    requests_kwargs = {"headers": {"User-Agent": CRAWLER_USER_AGENT}, "verify": False}
    if url is None and fp_parsed is not None:
        url = fp_parsed.get("url")
    if url is not None and fp_parsed is None:
        try:
            response = requests.get(url, **requests_kwargs)
            fp_parsed = feedparser.parse(
                response.content, request_headers=response.headers
            )
        except Exception:
            logger.exception("failed to retrieve that url")
            fp_parsed = {"bozo": True}
    assert url is not None and fp_parsed is not None
    feed = feed or {}
    feed_split = urllib.parse.urlsplit(url)
    site_split = None
    if is_parsing_ok(fp_parsed):
        feed["link"] = url
        feed["site_link"] = try_keys(fp_parsed["feed"], "href", "link")
        feed["title"] = fp_parsed["feed"].get("title")
        feed["description"] = try_keys(fp_parsed["feed"], "subtitle", "title")
        feed["icon_url"] = try_keys(fp_parsed["feed"], "icon")
    else:
        feed["site_link"] = url

    if feed.get("site_link"):
        feed["site_link"] = rebuild_url(feed["site_link"], feed_split)
        site_split = urllib.parse.urlsplit(feed["site_link"])

    if feed.get("icon_url"):
        feed["icon_url"] = try_get_icon_url(feed["icon_url"], site_split, feed_split)
        if feed["icon_url"] is None:
            del feed["icon_url"]

    if (
        not feed.get("site_link")
        or not query_site
        or all(bool(feed.get(k)) for k in ("link", "title", "icon_url"))
    ):
        return feed

    try:
        response = requests.get(feed["site_link"], **requests_kwargs)
    except requests.exceptions.InvalidSchema as e:
        return feed
    except:
        logger.exception("failed to retrieve %r", feed["site_link"])
        return feed
    bs_parsed = BeautifulSoup(
        response.content, "html.parser", parse_only=SoupStrainer("head")
    )

    if not feed.get("title"):
        try:
            feed["title"] = bs_parsed.find_all("title")[0].text
        except Exception:
            pass

    def check_keys(**kwargs):
        def wrapper(elem):
            for key, vals in kwargs.items():
                if not elem.has_attr(key):
                    return False
                if not all(val in elem.attrs[key] for val in vals):
                    return False
            return True

        return wrapper

    if not feed.get("icon_url"):
        icons = bs_parsed.find_all(check_keys(rel=["icon", "shortcut"]))
        if not len(icons):
            icons = bs_parsed.find_all(check_keys(rel=["icon"]))
        if len(icons) >= 1:
            for icon in icons:
                feed["icon_url"] = try_get_icon_url(
                    icon.attrs["href"], site_split, feed_split
                )
                if feed["icon_url"] is not None:
                    break

        if feed.get("icon_url") is None:
            feed["icon_url"] = try_get_icon_url("/favicon.ico", site_split, feed_split)
        if "icon_url" in feed and feed["icon_url"] is None:
            del feed["icon_url"]

    if not feed.get("link"):
        for type_ in ACCEPTED_MIMETYPES:
            alternates = bs_parsed.find_all(check_keys(rel=["alternate"], type=[type_]))
            if len(alternates) >= 1:
                feed["link"] = rebuild_url(alternates[0].attrs["href"], feed_split)
                break
    return feed
bgstack15