aboutsummaryrefslogtreecommitdiff
path: root/stackbin.py
diff options
context:
space:
mode:
authorB. Stack <bgstack15@gmail.com>2022-02-13 22:11:38 -0500
committerB. Stack <bgstack15@gmail.com>2022-02-13 22:11:38 -0500
commit39eb97cbceeb332c21eaeeb4843b58b34667cfb2 (patch)
treeeb11eee877bc42163ce3e17c8fcd99cf72eb1d69 /stackbin.py
parentadd expiry, and wsgi usage to support that (diff)
downloadstackbin-39eb97cbceeb332c21eaeeb4843b58b34667cfb2.tar.gz
stackbin-39eb97cbceeb332c21eaeeb4843b58b34667cfb2.tar.bz2
stackbin-39eb97cbceeb332c21eaeeb4843b58b34667cfb2.zip
refactor filenames and improve docs
Diffstat (limited to 'stackbin.py')
-rwxr-xr-xstackbin.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/stackbin.py b/stackbin.py
new file mode 100755
index 0000000..aac3f49
--- /dev/null
+++ b/stackbin.py
@@ -0,0 +1,281 @@
+# File: stackbin.py
+# SPDX-License-Identifier: GPL-3.0
+# Authors: mitsuhiko, su27, bgstack15
+from datetime import datetime, timedelta
+from itsdangerous import Signer
+from flask import (Flask, request, url_for, redirect, g, render_template, session, abort)
+from flask_sqlalchemy import SQLAlchemy
+from pytimeparse.timeparse import timeparse # python3-pytimeparse
+# uwsgidecorators load will fail when using initdb.py but is also not necessary
+try:
+ from uwsgidecorators import timer # python3-uwsgidecorators
+except:
+ pass
+import time
+
+## ripped from https://stackoverflow.com/questions/183042/how-can-i-use-uuids-in-sqlalchemy/812363#812363
+from sqlalchemy import types
+from sqlalchemy.dialects.mysql.base import MSBinary
+from sqlalchemy.schema import Column
+import uuid
+class UUID(types.TypeDecorator):
+ impl = MSBinary
+ def __init__(self):
+ self.impl.length = 16
+ self.cache_ok = False # to shut up some useless warning
+ types.TypeDecorator.__init__(self,length=self.impl.length)
+ def process_bind_param(self,value,dialect=None):
+ if value and isinstance(value,uuid.UUID):
+ return value.bytes
+ elif value and not isinstance(value,uuid.UUID):
+ raise ValueError('value %s is not a valid uuid.UUID' % value)
+ else:
+ return None
+ def process_result_value(self,value,dialect=None):
+ if value:
+ return uuid.UUID(bytes=value)
+ else:
+ return None
+ def is_mutable(self):
+ return False
+id_column_name = "id"
+def id_column():
+ return Column(id_column_name,UUID(),primary_key=True,default=uuid.uuid4)
+
+def get_signed(string, salt="blank"):
+ return Signer(app.secret_key, salt=salt).sign(str(string))
+
+def get_unsigned(string, salt="blank"):
+ return Signer(app.secret_key, salt=salt).unsign(str(string)).decode("utf-8")
+
+app = Flask(__name__)
+app.config.from_pyfile('config.cfg')
+db = SQLAlchemy(app)
+
+def url_for_other_page(page):
+ args = request.view_args.copy()
+ args['page'] = page
+ return url_for(request.endpoint, **args)
+app.jinja_env.globals['url_for_other_page'] = url_for_other_page
+app.jinja_env.globals['appname'] = app.config['APPNAME']
+
+def refresh_string(delay,url):
+ """
+ Returns a string for html content for redirecting the user back after the
+ requested delay, to the requested url.
+ """
+ return f'<meta http-equiv="Refresh" content="{delay}; url={url}">'
+
+@app.before_request
+def check_user_status():
+ g.user = None
+ if 'user_id' in session:
+ g.user = User.query.get(session['user_id'])
+
+class Paste(db.Model):
+ id = id_column()
+ code = db.Column(db.Text)
+ title = db.Column(db.Text)
+ pub_date = db.Column(db.DateTime)
+ exp_date = db.Column(db.DateTime)
+ user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
+ is_private = db.Column(db.Boolean)
+ parent_id = db.Column(UUID(), db.ForeignKey('paste.id'))
+ parent = db.relationship('Paste', lazy=True, backref='children', uselist=False, remote_side=[id])
+
+ def __init__(self, user, code, title, relative_expiration_seconds, parent=None, is_private=False):
+ self.user = user
+ self.code = code
+ self.title = title
+ self.is_private = is_private
+ u = datetime.utcnow()
+ try:
+ # this will fail on POSTed value of "never" for exp
+ b = timedelta(seconds=relative_expiration_seconds)
+ except:
+ # so timedelta() will return 0 seconds, which makes the exp_date = pub_date, which is treated
+ # by the cleanup logic and jinja2 templates as never-expires.
+ b = timedelta()
+ self.pub_date = u
+ self.exp_date = u + b
+ self.parent = parent
+
+class User(db.Model):
+ id = db.Column(db.Integer, primary_key=True)
+ display_name = db.Column(db.String(120))
+ fb_id = db.Column(db.String(30), unique=True)
+ pastes = db.relationship(Paste, lazy='dynamic', backref='user')
+
+@app.route('/', methods=['GET', 'POST'])
+def new_paste():
+ parent = None
+ reply_to = request.args.get('reply_to')
+ if reply_to is not None:
+ try:
+ parent = Paste.query.get(uuid.UUID(reply_to))
+ except:
+ parent = Paste.query.get(reply_to)
+ if request.method == 'POST' and request.form['code']:
+ is_private = bool(request.form.get('is_private'))
+ title = "Untitled paste"
+ if request.form['pastetitle'] and request.form['pastetitle'] != "Enter title here":
+ title = request.form['pastetitle']
+ relative_expiration_seconds = 0
+ exp = 0 # start with an empty value
+ if 'exp' in request.form and request.form['exp']:
+ exp_opt = request.form['exp']
+ if exp_opt not in app.config['EXPIRATION_OPTIONS']:
+ try:
+ exp = timeparse(f"+ {app.config['EXPIRATION_OPTIONS'][0]}")
+ except:
+ exp = 60 * 60 # failsafe, 1 hour
+ print(f"WARNING: requested expiration \"{exp_opt}\" is not in the list of options {app.config['EXPIRATION_OPTIONS']}, so will use {exp}")
+ else:
+ try:
+ exp = timeparse(f"+ {exp_opt}")
+ except:
+ exp = 0
+ paste = Paste(
+ g.user,
+ request.form['code'],
+ title,
+ exp,
+ parent=parent,
+ is_private=is_private
+ )
+ db.session.add(paste)
+ db.session.commit()
+ sign = get_signed(paste.id, salt=app.config['SALT']) \
+ if is_private else None
+ return redirect(url_for('show_paste', paste_id=paste.id, s=sign))
+ try:
+ exp_opts = app.config['EXPIRATION_OPTIONS']
+ except:
+ exp_opts = None
+ return render_template('new_paste.html', parent=parent, exp_opts = exp_opts)
+
+# This @timer is from the uwsgidecorators
+try:
+ @timer(app.config['LOOP_DELAY'])
+ def cleanup_expired_pastes(num):
+ # num is useless.
+ """
+ Every LOOP_DELAY seconds, find any entries that have expired and then delete them.
+ """
+ all1 = Paste.query.all()
+ need_commit = False
+ for p in all1:
+ # the exp_date != pub_date is very important, because anything with "never" expires
+ # is stored in the db as exp_date = pub_date
+ if p.exp_date and p.exp_date != p.pub_date and p.exp_date <= datetime.utcnow():
+ print(f"INFO: deleting paste \"{p.title}\" with expiration {p.exp_date}.")
+ Paste.query.filter(Paste.id == p.id).delete()
+ need_commit = True
+ # only run the commit once!
+ if need_commit:
+ db.session.commit()
+except:
+ pass
+
+@app.route('/<paste_id>/')
+@app.route('/<paste_id>')
+def show_paste(paste_id):
+ try:
+ paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id)
+ except:
+ paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id))
+ if paste.is_private:
+ try:
+ sign = request.args.get('s', '')
+ assert str(paste.id) == \
+ get_unsigned(sign, salt=app.config['SALT'])
+ except:
+ abort(403)
+ parent = None
+ if paste.parent_id:
+ try:
+ parent = Paste.query.get(uuid.UUID(paste.parent_id))
+ except:
+ parent = Paste.query.get(paste.parent_id)
+ children = []
+ if paste.children:
+ for i in paste.children:
+ j = None
+ try:
+ j = Paste.query.get(uuid.UUID(i.id))
+ except:
+ j = Paste.query.get(i.id)
+ if j:
+ k = j.id, j.title
+ children.append(k)
+ return render_template('show_paste.html', paste=paste, parent=parent, children=children)
+
+@app.route('/<paste_id>/delete/', methods=['POST'])
+@app.route('/<paste_id>/delete', methods=['POST'])
+def delete_paste(paste_id):
+ try:
+ paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id)
+ except:
+ paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id))
+ sign = str(request.form['s'])
+ try:
+ assert str(paste.id) == get_unsigned(sign, salt=app.config['DELETESALT'])
+ except:
+ abort(403)
+ try:
+ Paste.query.filter(Paste.id == paste.id).delete()
+ db.session.commit()
+ message = refresh_string(1, url_for("admin")) + "OK"
+ return message,200
+ except:
+ return f"failure to delete object. Select <a href='{url_for('admin')}'>here</a> to return to the admin panel.",500
+
+def get_all_pastes():
+ """
+ Get custom arrangement of pastes for Admin view
+ """
+ all1 = Paste.query.all()
+ all2 = []
+ for p1 in all1:
+ parent_id = None
+ parent_title = None
+ children = []
+ if p1.parent_id:
+ parent_id = p1.parent_id
+ try:
+ parent_title = Paste.query.get(p1.parent_id).title
+ except:
+ parent_title = "" # works better than None for the parent column of the generated html
+ if p1.children:
+ for c1 in p1.children:
+ child = Paste.query.get(c1.id)
+ child_title = child.title
+ c2 = c1.id, child_title
+ children.append(c2)
+ private = None
+ if p1.is_private:
+ private = get_signed(p1.id, salt=app.config['SALT'])
+ p2 = {
+ "id": p1.id,
+ "title": p1.title,
+ "pub_date": p1.pub_date,
+ "exp_date": p1.exp_date,
+ "private": private,
+ "user_id": p1.user_id,
+ "is_private": p1.is_private,
+ "parent": (parent_id, parent_title),
+ "children": children,
+ "delete": get_signed(p1.id, salt=app.config['DELETESALT']).decode("utf-8")
+ }
+ all2.append(p2)
+ return all2
+
+@app.route('/admin/')
+@app.route('/admin')
+def admin():
+ all_pastes = get_all_pastes()
+ return render_template('admin.html', pastes = all_pastes)
+
+if __name__ == "__main__":
+ manager.add_command('runserver', Server(host=app.config["APP_HOST"], port=app.config["APP_PORT"]))
+ app.run()
bgstack15