# File: stackbin.py
# Location: http://gitlab.com/bgstack15/stackbin/
# Authors: mitsuhiko, ofshellohicy, su27, bgstack15
# SPDX-License-Identifier: GPL-3.0
# Startdate: 2011 by mitsuhiko
# Title: Stackbin
# Purpose: Flask-based pastebin
# History:
# 2014 ofshellohicy removed some features
# 2016 su27 added some features
# 2022 bgstack15 hard forked
# Reference:
# fuss.py
# Improve:
# Dependencies:
# req-INCOMPLETE-devuan: python3-pytimeparse, python3-uwsgidecorators
# req-fedora: uwsgi, uwsgi-logger-file, python36-flask uwsgi-plugin-python36 python36-sqlalchemy, python36-uwsgidecorators, python3-pytimeparse, python3-uwsgidecorators
# req-centos7: uwsgi, uwsgi-logger-file, python36-flask uwsgi-plugin-python36 python36-sqlalchemy, python36-uwsgidecorators
# pip-centos7: flask-sqlalchemy, pytimeparse
# Documentation: see README.md
from datetime import datetime, timedelta
from itsdangerous import Signer
from flask import (Flask, Response, request, url_for, redirect, g, render_template, session, abort)
from flask_sqlalchemy import SQLAlchemy
from werkzeug.middleware.proxy_fix import ProxyFix
from pytimeparse.timeparse import timeparse # python3-pytimeparse
import os
from functools import wraps
# uwsgidecorators load will fail when using initdb.py but is also not necessary
try:
from uwsgidecorators import timer # python3-uwsgidecorators
except:
print("Warning! Without uwsgidecorators, the cleanup timer cannot run.")
import time
from sqlalchemy import types
from sqlalchemy.dialects.mysql.base import MSBinary
from sqlalchemy.schema import Column
import uuid
class UUID(types.TypeDecorator):
impl = MSBinary
def __init__(self):
self.impl.length = 16
self.cache_ok = False # to shut up some useless warning
types.TypeDecorator.__init__(self,length=self.impl.length)
def process_bind_param(self,value,dialect=None):
if value and isinstance(value,uuid.UUID):
return value.bytes
elif value and not isinstance(value,uuid.UUID):
raise ValueError('value %s is not a valid uuid.UUID' % value)
else:
return None
def process_result_value(self,value,dialect=None):
if value:
return uuid.UUID(bytes=value)
else:
return None
def is_mutable(self):
return False
id_column_name = "id"
def id_column():
return Column(id_column_name,UUID(),primary_key=True,default=uuid.uuid4)
def get_signed(string, salt="blank"):
return Signer(app.secret_key, salt=salt).sign(str(string))
def get_unsigned(string, salt="blank"):
return Signer(app.secret_key, salt=salt).unsign(str(string)).decode("utf-8")
app = Flask(__name__)
try:
app.config.from_pyfile(os.environ['STACKBIN_CONF'])
except:
app.config.from_pyfile('stackbin.conf')
db = SQLAlchemy(app)
if "STATIC_FOLDER" in app.config:
app.static_folder=app.config["STATIC_FOLDER"]
if "TEMPLATE_FOLDER" in app.config:
app.template_folder=app.config["TEMPLATE_FOLDER"]
def requires_session(function):
'''
Requires a valid session, provided by cookie!
'''
@wraps(function)
def decorated(*args, **kwargs):
if not session:
#return Response("requires session",401)
return redirect(url_for('login'))
else:
if 'user' not in session:
return Response("User is not in this session.",401)
s_user = session['user']
c_user = request.cookies.get('user')
print(f"session user: {s_user}")
print(f"cookie user: {c_user}")
if session['user'] != c_user:
return Response("Wrong user for this session!.",401)
# otherwise, everything is good!
return function(s_user, [], *args,**kwargs)
# catch-all
#return Response("requires session",401)
return redirect(url_for('login'))
return decorated
def requires_admin_credential(function):
"""
Requires the user pass the correct admin credential configured
in the conf file.
"""
@wraps(function)
def decorated(*args, **kwargs):
# formdata is in session if we are coming from login_basic()
form = session.get('formdata', None)
if form:
session.pop('formdata')
if 'username' in form:
username = form['username']
if 'password' in form:
pw = form['password']
else:
# then we are coming from the form with POST data
if 'username' not in request.form or 'password' not in request.form:
return _unauthorized_admin()
username = request.form['username']
pw = request.form['password']
if 'ADMIN_USERNAME' in app.config and \
'ADMIN_PASSWORD' in app.config and \
username == app.config['ADMIN_USERNAME'] and pw == app.config['ADMIN_PASSWORD']:
return function(username, [], *args, **kwargs)
else:
return _unauthorized_admin()
return decorated
def _unauthorized_admin():
return Response(f'Unauthorized! Invalid admin credential... returning to login form', 401)
def url_for_other_page(page):
args = request.view_args.copy()
args['page'] = page
return url_for(request.endpoint, **args)
app.jinja_env.globals['url_for_other_page'] = url_for_other_page
app.jinja_env.globals['appname'] = app.config['APPNAME']
def refresh_string(delay,url):
"""
Returns a string for html content for redirecting the user back after the
requested delay, to the requested url.
"""
return f''
@app.before_request
def check_user_status():
g.user = None
if 'user_id' in session:
#g.user = User.query.get(session['user_id'])
g.user = session['user_id']
class Paste(db.Model):
id = id_column()
code = db.Column(db.Text)
title = db.Column(db.Text)
pub_date = db.Column(db.DateTime)
exp_date = db.Column(db.DateTime)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
is_private = db.Column(db.Boolean)
parent_id = db.Column(UUID(), db.ForeignKey('paste.id'))
parent = db.relationship('Paste', lazy=True, backref='children', uselist=False, remote_side=[id])
def __init__(self, user, code, title, relative_expiration_seconds, parent=None, is_private=False):
self.user = user
self.code = code
self.title = title
self.is_private = is_private
u = datetime.utcnow()
try:
# this will fail on POSTed value of "never" for exp
b = timedelta(seconds=relative_expiration_seconds)
except:
# so timedelta() will return 0 seconds, which makes the exp_date = pub_date, which is treated
# by the cleanup logic and jinja2 templates as never-expires.
b = timedelta()
self.pub_date = u
self.exp_date = u + b
self.parent = parent
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
display_name = db.Column(db.String(120))
fb_id = db.Column(db.String(30), unique=True)
pastes = db.relationship(Paste, lazy='dynamic', backref='user')
@app.route('/', methods=['GET', 'POST'])
def new_paste():
parent = None
reply_to = request.args.get('reply_to')
if reply_to is not None:
try:
parent = Paste.query.get(uuid.UUID(reply_to))
except:
parent = Paste.query.get(reply_to)
if request.method == 'POST' and request.form['code']:
is_private = bool(request.form.get('is_private'))
title = "Untitled paste"
if request.form['pastetitle'] and request.form['pastetitle'] != "Enter title here":
title = request.form['pastetitle']
relative_expiration_seconds = 0
exp = 0 # start with an empty value
if 'exp' in request.form and request.form['exp']:
exp_opt = request.form['exp']
if exp_opt not in app.config['EXPIRATION_OPTIONS']:
try:
exp = timeparse(f"+ {app.config['EXPIRATION_OPTIONS'][0]}")
except:
exp = 60 * 60 # failsafe, 1 hour
print(f"WARNING: requested expiration \"{exp_opt}\" is not in the list of options {app.config['EXPIRATION_OPTIONS']}, so will use {exp}")
else:
try:
exp = timeparse(f"+ {exp_opt}")
except:
exp = 0
paste = Paste(
g.user,
request.form['code'],
title,
exp,
parent=parent,
is_private=is_private
)
db.session.add(paste)
db.session.commit()
sign = get_signed(paste.id, salt=app.config['SALT']) \
if is_private else None
return redirect(url_for('show_paste', paste_id=paste.id, s=sign))
try:
exp_opts = app.config['EXPIRATION_OPTIONS']
except:
exp_opts = None
return render_template('new_paste.html', parent=parent, exp_opts = exp_opts)
# This @timer is from the uwsgidecorators
try:
@timer(app.config['LOOP_DELAY'])
def cleanup_expired_pastes(num):
# num is useless.
"""
Every LOOP_DELAY seconds, find any entries that have expired and then delete them.
"""
all1 = Paste.query.all()
need_commit = False
for p in all1:
# the exp_date != pub_date is very important, because anything with "never" expires
# is stored in the db as exp_date = pub_date
if p.exp_date and p.exp_date != p.pub_date and p.exp_date <= datetime.utcnow():
print(f"INFO: deleting paste \"{p.title}\" with expiration {p.exp_date}.")
Paste.query.filter(Paste.id == p.id).delete()
need_commit = True
# only run the commit once!
if need_commit:
db.session.commit()
except:
pass
@app.route('//')
@app.route('/')
def show_paste(paste_id):
try:
paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id)
except:
paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id))
if paste.is_private:
try:
sign = request.args.get('s', '')
assert str(paste.id) == \
get_unsigned(sign, salt=app.config['SALT'])
except:
abort(403)
parent = None
if paste.parent_id:
try:
parent = Paste.query.get(uuid.UUID(paste.parent_id))
except:
parent = Paste.query.get(paste.parent_id)
children = []
if paste.children:
for i in paste.children:
j = None
try:
j = Paste.query.get(uuid.UUID(i.id))
except:
j = Paste.query.get(i.id)
if j:
k = j.id, j.title
children.append(k)
return render_template('show_paste.html', paste=paste, parent=parent, children=children)
@app.route('//delete/', methods=['POST'])
@app.route('//delete', methods=['POST'])
def delete_paste(paste_id):
try:
paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id)
except:
paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id))
sign = str(request.form['s'])
try:
assert str(paste.id) == get_unsigned(sign, salt=app.config['DELETESALT'])
except:
abort(403)
try:
Paste.query.filter(Paste.id == paste.id).delete()
db.session.commit()
message = refresh_string(1, url_for("admin")) + "OK"
return message,200
except:
return f"failure to delete object. Select here to return to the admin panel.",500
def get_all_pastes():
"""
Get custom arrangement of pastes for Admin view
"""
all1 = Paste.query.all()
all2 = []
for p1 in all1:
parent_id = None
parent_title = None
children = []
if p1.parent_id:
parent_id = p1.parent_id
try:
parent_title = Paste.query.get(p1.parent_id).title
except:
parent_title = "" # works better than None for the parent column of the generated html
if p1.children:
for c1 in p1.children:
child = Paste.query.get(c1.id)
child_title = child.title
c2 = c1.id, child_title
children.append(c2)
private = None
if p1.is_private:
private = get_signed(p1.id, salt=app.config['SALT'])
p2 = {
"id": p1.id,
"title": p1.title,
"pub_date": p1.pub_date,
"exp_date": p1.exp_date,
"private": private,
"user_id": p1.user_id,
"is_private": p1.is_private,
"parent": (parent_id, parent_title),
"children": children,
"delete": get_signed(p1.id, salt=app.config['DELETESALT']).decode("utf-8")
}
all2.append(p2)
return all2
@app.route('/admin/')
@app.route('/admin')
@requires_session
def admin(username, groups = []):
all_pastes = get_all_pastes()
return render_template('admin.html', pastes = all_pastes)
@app.route('/favicon.ico')
def favicon():
try:
return redirect(url_for('static', filename='favicon.ico'))
except:
abort(404)
@app.route('/set')
def get_proxied_path():
if 'HTTP_X_FORWARDED_PREFIX' in request.environ:
pl = len(dict(request.headers)["X-Forwarded-Host"].split(", "))
#prefix = request.environ['HTTP_X_FORWARDED_PREFIX']
app.wsgi_app = ProxyFix(app.wsgi_app,x_for=pl,x_host=pl,x_port=pl,x_prefix=pl,x_proto=pl)
return redirect(url_for('new_paste'))
@app.route("/logout")
@app.route("/logout/")
def logout():
resp = Response(f'logged out')
# not documented but is found on the Internet in a few random places:
session.clear()
#resp.set_cookie('user','',expires=0)
return resp
@app.route("/login/new")
@app.route("/login/new/")
def login_new():
return redirect(url_for("login", new=""))
@app.route("/login/", methods=['POST','GET'])
def login(user="None"):
if request.method == "GET":
if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args):
return redirect(url_for("admin"))
auth_header = request.headers.get("Authorization")
# default, show login form
return redirect(url_for("login_form"))
elif request.method == "POST":
if request.authorization:
return redirect(url_for("login_basic"),code=307)
return redirect(url_for("login_generic"))
#return f"Authentication method not supported yet.",400
@app.route("/login/basic",methods=['POST','GET'])
@app.route("/login/basic/",methods=['POST','GET'])
def login_basic():
if not request.authorization:
return Response(f"Please provide username and password.",401,{'WWW-Authenticate': 'Basic'})
if 'username' not in request.authorization:
return Response(f"No username provided.",401)
if 'password' not in request.authorization:
return Response(f"No password provided.",401)
username = request.authorization.username
pw = request.authorization.password
form={'username':username,'password':pw}
session['formdata'] = form
return redirect(url_for("login_generic"),code=307)
@app.route("/login/form", methods=['POST','GET'])
@app.route("/login/form/", methods=['POST','GET'])
def login_form():
if request.method == "GET":
return render_template("login_form.html",
login_url = url_for("login_form")
)
else:
# assume it is a POST
username=""
if 'username' in request.form:
username = request.form['username']
password=""
if 'password' in request.form:
password = request.form['password']
form={'username':username,'password':password}
session['formdata'] = form
return redirect(url_for("login_generic"), code=307)
@app.route("/login/generic", methods=['POST','GET'])
@app.route("/login/generic/", methods=['POST','GET'])
@requires_admin_credential
def login_generic(user,groups=[]):
resp = Response(f'success')
session['user_id'] = "admin"
resp = login_success(session,resp,user,groups)
return resp
def login_success(session,resp,user,groups=[]):
resp.set_cookie('user',user)
#end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime
#end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ")
#resp.set_cookie('timestamp',end_time_str)
session.permanent = True
session['user']=user
#session['end_time'] = end_time_str
print(f"DEBUG: got valid user {user}")
return resp
# Initialize the database if it does not already exist
db.create_all()
if __name__ == "__main__":
app.run()