diff options
author | B. Stack <bgstack15@gmail.com> | 2022-02-13 22:11:38 -0500 |
---|---|---|
committer | B. Stack <bgstack15@gmail.com> | 2022-02-13 22:11:38 -0500 |
commit | 39eb97cbceeb332c21eaeeb4843b58b34667cfb2 (patch) | |
tree | eb11eee877bc42163ce3e17c8fcd99cf72eb1d69 /stackbin.py | |
parent | add expiry, and wsgi usage to support that (diff) | |
download | stackbin-39eb97cbceeb332c21eaeeb4843b58b34667cfb2.tar.gz stackbin-39eb97cbceeb332c21eaeeb4843b58b34667cfb2.tar.bz2 stackbin-39eb97cbceeb332c21eaeeb4843b58b34667cfb2.zip |
refactor filenames and improve docs
Diffstat (limited to 'stackbin.py')
-rwxr-xr-x | stackbin.py | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/stackbin.py b/stackbin.py new file mode 100755 index 0000000..aac3f49 --- /dev/null +++ b/stackbin.py @@ -0,0 +1,281 @@ +# File: stackbin.py +# SPDX-License-Identifier: GPL-3.0 +# Authors: mitsuhiko, su27, bgstack15 +from datetime import datetime, timedelta +from itsdangerous import Signer +from flask import (Flask, request, url_for, redirect, g, render_template, session, abort) +from flask_sqlalchemy import SQLAlchemy +from pytimeparse.timeparse import timeparse # python3-pytimeparse +# uwsgidecorators load will fail when using initdb.py but is also not necessary +try: + from uwsgidecorators import timer # python3-uwsgidecorators +except: + pass +import time + +## ripped from https://stackoverflow.com/questions/183042/how-can-i-use-uuids-in-sqlalchemy/812363#812363 +from sqlalchemy import types +from sqlalchemy.dialects.mysql.base import MSBinary +from sqlalchemy.schema import Column +import uuid +class UUID(types.TypeDecorator): + impl = MSBinary + def __init__(self): + self.impl.length = 16 + self.cache_ok = False # to shut up some useless warning + types.TypeDecorator.__init__(self,length=self.impl.length) + def process_bind_param(self,value,dialect=None): + if value and isinstance(value,uuid.UUID): + return value.bytes + elif value and not isinstance(value,uuid.UUID): + raise ValueError('value %s is not a valid uuid.UUID' % value) + else: + return None + def process_result_value(self,value,dialect=None): + if value: + return uuid.UUID(bytes=value) + else: + return None + def is_mutable(self): + return False +id_column_name = "id" +def id_column(): + return Column(id_column_name,UUID(),primary_key=True,default=uuid.uuid4) + +def get_signed(string, salt="blank"): + return Signer(app.secret_key, salt=salt).sign(str(string)) + +def get_unsigned(string, salt="blank"): + return Signer(app.secret_key, salt=salt).unsign(str(string)).decode("utf-8") + +app = Flask(__name__) +app.config.from_pyfile('config.cfg') +db = SQLAlchemy(app) + +def url_for_other_page(page): + args = request.view_args.copy() + args['page'] = page + return url_for(request.endpoint, **args) +app.jinja_env.globals['url_for_other_page'] = url_for_other_page +app.jinja_env.globals['appname'] = app.config['APPNAME'] + +def refresh_string(delay,url): + """ + Returns a string for html content for redirecting the user back after the + requested delay, to the requested url. + """ + return f'<meta http-equiv="Refresh" content="{delay}; url={url}">' + +@app.before_request +def check_user_status(): + g.user = None + if 'user_id' in session: + g.user = User.query.get(session['user_id']) + +class Paste(db.Model): + id = id_column() + code = db.Column(db.Text) + title = db.Column(db.Text) + pub_date = db.Column(db.DateTime) + exp_date = db.Column(db.DateTime) + user_id = db.Column(db.Integer, db.ForeignKey('user.id')) + is_private = db.Column(db.Boolean) + parent_id = db.Column(UUID(), db.ForeignKey('paste.id')) + parent = db.relationship('Paste', lazy=True, backref='children', uselist=False, remote_side=[id]) + + def __init__(self, user, code, title, relative_expiration_seconds, parent=None, is_private=False): + self.user = user + self.code = code + self.title = title + self.is_private = is_private + u = datetime.utcnow() + try: + # this will fail on POSTed value of "never" for exp + b = timedelta(seconds=relative_expiration_seconds) + except: + # so timedelta() will return 0 seconds, which makes the exp_date = pub_date, which is treated + # by the cleanup logic and jinja2 templates as never-expires. + b = timedelta() + self.pub_date = u + self.exp_date = u + b + self.parent = parent + +class User(db.Model): + id = db.Column(db.Integer, primary_key=True) + display_name = db.Column(db.String(120)) + fb_id = db.Column(db.String(30), unique=True) + pastes = db.relationship(Paste, lazy='dynamic', backref='user') + +@app.route('/', methods=['GET', 'POST']) +def new_paste(): + parent = None + reply_to = request.args.get('reply_to') + if reply_to is not None: + try: + parent = Paste.query.get(uuid.UUID(reply_to)) + except: + parent = Paste.query.get(reply_to) + if request.method == 'POST' and request.form['code']: + is_private = bool(request.form.get('is_private')) + title = "Untitled paste" + if request.form['pastetitle'] and request.form['pastetitle'] != "Enter title here": + title = request.form['pastetitle'] + relative_expiration_seconds = 0 + exp = 0 # start with an empty value + if 'exp' in request.form and request.form['exp']: + exp_opt = request.form['exp'] + if exp_opt not in app.config['EXPIRATION_OPTIONS']: + try: + exp = timeparse(f"+ {app.config['EXPIRATION_OPTIONS'][0]}") + except: + exp = 60 * 60 # failsafe, 1 hour + print(f"WARNING: requested expiration \"{exp_opt}\" is not in the list of options {app.config['EXPIRATION_OPTIONS']}, so will use {exp}") + else: + try: + exp = timeparse(f"+ {exp_opt}") + except: + exp = 0 + paste = Paste( + g.user, + request.form['code'], + title, + exp, + parent=parent, + is_private=is_private + ) + db.session.add(paste) + db.session.commit() + sign = get_signed(paste.id, salt=app.config['SALT']) \ + if is_private else None + return redirect(url_for('show_paste', paste_id=paste.id, s=sign)) + try: + exp_opts = app.config['EXPIRATION_OPTIONS'] + except: + exp_opts = None + return render_template('new_paste.html', parent=parent, exp_opts = exp_opts) + +# This @timer is from the uwsgidecorators +try: + @timer(app.config['LOOP_DELAY']) + def cleanup_expired_pastes(num): + # num is useless. + """ + Every LOOP_DELAY seconds, find any entries that have expired and then delete them. + """ + all1 = Paste.query.all() + need_commit = False + for p in all1: + # the exp_date != pub_date is very important, because anything with "never" expires + # is stored in the db as exp_date = pub_date + if p.exp_date and p.exp_date != p.pub_date and p.exp_date <= datetime.utcnow(): + print(f"INFO: deleting paste \"{p.title}\" with expiration {p.exp_date}.") + Paste.query.filter(Paste.id == p.id).delete() + need_commit = True + # only run the commit once! + if need_commit: + db.session.commit() +except: + pass + +@app.route('/<paste_id>/') +@app.route('/<paste_id>') +def show_paste(paste_id): + try: + paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id) + except: + paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id)) + if paste.is_private: + try: + sign = request.args.get('s', '') + assert str(paste.id) == \ + get_unsigned(sign, salt=app.config['SALT']) + except: + abort(403) + parent = None + if paste.parent_id: + try: + parent = Paste.query.get(uuid.UUID(paste.parent_id)) + except: + parent = Paste.query.get(paste.parent_id) + children = [] + if paste.children: + for i in paste.children: + j = None + try: + j = Paste.query.get(uuid.UUID(i.id)) + except: + j = Paste.query.get(i.id) + if j: + k = j.id, j.title + children.append(k) + return render_template('show_paste.html', paste=paste, parent=parent, children=children) + +@app.route('/<paste_id>/delete/', methods=['POST']) +@app.route('/<paste_id>/delete', methods=['POST']) +def delete_paste(paste_id): + try: + paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id) + except: + paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id)) + sign = str(request.form['s']) + try: + assert str(paste.id) == get_unsigned(sign, salt=app.config['DELETESALT']) + except: + abort(403) + try: + Paste.query.filter(Paste.id == paste.id).delete() + db.session.commit() + message = refresh_string(1, url_for("admin")) + "OK" + return message,200 + except: + return f"failure to delete object. Select <a href='{url_for('admin')}'>here</a> to return to the admin panel.",500 + +def get_all_pastes(): + """ + Get custom arrangement of pastes for Admin view + """ + all1 = Paste.query.all() + all2 = [] + for p1 in all1: + parent_id = None + parent_title = None + children = [] + if p1.parent_id: + parent_id = p1.parent_id + try: + parent_title = Paste.query.get(p1.parent_id).title + except: + parent_title = "" # works better than None for the parent column of the generated html + if p1.children: + for c1 in p1.children: + child = Paste.query.get(c1.id) + child_title = child.title + c2 = c1.id, child_title + children.append(c2) + private = None + if p1.is_private: + private = get_signed(p1.id, salt=app.config['SALT']) + p2 = { + "id": p1.id, + "title": p1.title, + "pub_date": p1.pub_date, + "exp_date": p1.exp_date, + "private": private, + "user_id": p1.user_id, + "is_private": p1.is_private, + "parent": (parent_id, parent_title), + "children": children, + "delete": get_signed(p1.id, salt=app.config['DELETESALT']).decode("utf-8") + } + all2.append(p2) + return all2 + +@app.route('/admin/') +@app.route('/admin') +def admin(): + all_pastes = get_all_pastes() + return render_template('admin.html', pastes = all_pastes) + +if __name__ == "__main__": + manager.add_command('runserver', Server(host=app.config["APP_HOST"], port=app.config["APP_PORT"])) + app.run() |