aboutsummaryrefslogtreecommitdiff
path: root/newspipe/web/controllers/abstract.py
diff options
context:
space:
mode:
Diffstat (limited to 'newspipe/web/controllers/abstract.py')
-rw-r--r--newspipe/web/controllers/abstract.py90
1 files changed, 53 insertions, 37 deletions
diff --git a/newspipe/web/controllers/abstract.py b/newspipe/web/controllers/abstract.py
index 764ff305..9d9e84f2 100644
--- a/newspipe/web/controllers/abstract.py
+++ b/newspipe/web/controllers/abstract.py
@@ -11,7 +11,7 @@ logger = logging.getLogger(__name__)
class AbstractController:
_db_cls = None # reference to the database class
- _user_id_key = 'user_id'
+ _user_id_key = "user_id"
def __init__(self, user_id=None, ignore_context=False):
"""User id is a right management mechanism that should be used to
@@ -36,25 +36,25 @@ class AbstractController:
"""
db_filters = set()
for key, value in filters.items():
- if key == '__or__':
+ if key == "__or__":
db_filters.add(or_(*self._to_filters(**value)))
- elif key.endswith('__gt'):
+ elif key.endswith("__gt"):
db_filters.add(getattr(self._db_cls, key[:-4]) > value)
- elif key.endswith('__lt'):
+ elif key.endswith("__lt"):
db_filters.add(getattr(self._db_cls, key[:-4]) < value)
- elif key.endswith('__ge'):
+ elif key.endswith("__ge"):
db_filters.add(getattr(self._db_cls, key[:-4]) >= value)
- elif key.endswith('__le'):
+ elif key.endswith("__le"):
db_filters.add(getattr(self._db_cls, key[:-4]) <= value)
- elif key.endswith('__ne'):
+ elif key.endswith("__ne"):
db_filters.add(getattr(self._db_cls, key[:-4]) != value)
- elif key.endswith('__in'):
+ elif key.endswith("__in"):
db_filters.add(getattr(self._db_cls, key[:-4]).in_(value))
- elif key.endswith('__contains'):
+ elif key.endswith("__contains"):
db_filters.add(getattr(self._db_cls, key[:-10]).contains(value))
- elif key.endswith('__like'):
+ elif key.endswith("__like"):
db_filters.add(getattr(self._db_cls, key[:-6]).like(value))
- elif key.endswith('__ilike'):
+ elif key.endswith("__ilike"):
db_filters.add(getattr(self._db_cls, key[:-7]).ilike(value))
else:
db_filters.add(getattr(self._db_cls, key) == value)
@@ -66,8 +66,11 @@ class AbstractController:
dependent) and the user is not an admin and the filters doesn't already
contains a filter for that user.
"""
- if self._user_id_key is not None and self.user_id \
- and filters.get(self._user_id_key) != self.user_id:
+ if (
+ self._user_id_key is not None
+ and self.user_id
+ and filters.get(self._user_id_key) != self.user_id
+ ):
filters[self._user_id_key] = self.user_id
return self._db_cls.query.filter(*self._to_filters(**filters))
@@ -76,20 +79,27 @@ class AbstractController:
obj = self._get(**filters).first()
if obj and not self._has_right_on(obj):
- raise Forbidden({'message': 'No authorized to access %r (%r)'
- % (self._db_cls.__class__.__name__, filters)})
+ raise Forbidden(
+ {
+ "message": "No authorized to access %r (%r)"
+ % (self._db_cls.__class__.__name__, filters)
+ }
+ )
if not obj:
- raise NotFound({'message': 'No %r (%r)'
- % (self._db_cls.__class__.__name__, filters)})
+ raise NotFound(
+ {"message": "No %r (%r)" % (self._db_cls.__class__.__name__, filters)}
+ )
return obj
def create(self, **attrs):
assert attrs, "attributes to update must not be empty"
if self._user_id_key is not None and self._user_id_key not in attrs:
attrs[self._user_id_key] = self.user_id
- assert self._user_id_key is None or self._user_id_key in attrs \
- or self.user_id is None, \
- "You must provide user_id one way or another"
+ assert (
+ self._user_id_key is None
+ or self._user_id_key in attrs
+ or self.user_id is None
+ ), "You must provide user_id one way or another"
obj = self._db_cls(**attrs)
db.session.add(obj)
@@ -123,39 +133,45 @@ class AbstractController:
# user_id == None is like being admin
if self._user_id_key is None:
return True
- return self.user_id is None \
- or getattr(obj, self._user_id_key, None) == self.user_id
+ return (
+ self.user_id is None
+ or getattr(obj, self._user_id_key, None) == self.user_id
+ )
def _count_by(self, elem_to_group_by, filters):
if self.user_id:
- filters['user_id'] = self.user_id
- return dict(db.session.query(elem_to_group_by, func.count('id'))
- .filter(*self._to_filters(**filters))
- .group_by(elem_to_group_by).all())
+ filters["user_id"] = self.user_id
+ return dict(
+ db.session.query(elem_to_group_by, func.count("id"))
+ .filter(*self._to_filters(**filters))
+ .group_by(elem_to_group_by)
+ .all()
+ )
@classmethod
def _get_attrs_desc(cls, role, right=None):
result = defaultdict(dict)
- if role == 'admin':
+ if role == "admin":
columns = cls._db_cls.__table__.columns.keys()
else:
- assert role in {'base', 'api'}, 'unknown role %r' % role
- assert right in {'read', 'write'}, \
- "right must be 'read' or 'write' with role %r" % role
- columns = getattr(cls._db_cls, 'fields_%s_%s' % (role, right))()
+ assert role in {"base", "api"}, "unknown role %r" % role
+ assert right in {"read", "write"}, (
+ "right must be 'read' or 'write' with role %r" % role
+ )
+ columns = getattr(cls._db_cls, "fields_%s_%s" % (role, right))()
for column in columns:
result[column] = {}
db_col = getattr(cls._db_cls, column).property.columns[0]
try:
- result[column]['type'] = db_col.type.python_type
+ result[column]["type"] = db_col.type.python_type
except NotImplementedError:
if db_col.default:
- result[column]['type'] = db_col.default.arg.__class__
+ result[column]["type"] = db_col.default.arg.__class__
if column not in result:
continue
- if issubclass(result[column]['type'], datetime):
- result[column]['default'] = datetime.utcnow()
- result[column]['type'] = lambda x: dateutil.parser.parse(x)
+ if issubclass(result[column]["type"], datetime):
+ result[column]["default"] = datetime.utcnow()
+ result[column]["type"] = lambda x: dateutil.parser.parse(x)
elif db_col.default:
- result[column]['default'] = db_col.default.arg
+ result[column]["default"] = db_col.default.arg
return result
bgstack15