# File: stackbin.py # Location: http://gitlab.com/bgstack15/stackbin/ # Authors: mitsuhiko, ofshellohicy, su27, bgstack15 # SPDX-License-Identifier: GPL-3.0 # Startdate: 2011 by mitsuhiko # Title: Stackbin # Purpose: Flask-based pastebin # History: # 2014 ofshellohicy removed some features # 2016 su27 added some features # 2022 bgstack15 hard forked # Reference: # fuss.py # Improve: # Dependencies: # req-INCOMPLETE-devuan: python3-pytimeparse, python3-uwsgidecorators # req-fedora: uwsgi, uwsgi-logger-file, python36-flask uwsgi-plugin-python36 python36-sqlalchemy, python36-uwsgidecorators, python3-pytimeparse, python3-uwsgidecorators # req-centos7: uwsgi, uwsgi-logger-file, python36-flask uwsgi-plugin-python36 python36-sqlalchemy, python36-uwsgidecorators # pip-centos7: flask-sqlalchemy, pytimeparse # Documentation: see README.md from datetime import datetime, timedelta from itsdangerous import Signer from flask import (Flask, Response, request, url_for, redirect, g, render_template, session, abort) from flask_sqlalchemy import SQLAlchemy from werkzeug.middleware.proxy_fix import ProxyFix from pytimeparse.timeparse import timeparse # python3-pytimeparse import os from functools import wraps # uwsgidecorators load will fail when using initdb.py but is also not necessary try: from uwsgidecorators import timer # python3-uwsgidecorators except: print("Warning! Without uwsgidecorators, the cleanup timer cannot run.") import time from sqlalchemy import types from sqlalchemy.dialects.mysql.base import MSBinary from sqlalchemy.schema import Column import uuid class UUID(types.TypeDecorator): impl = MSBinary def __init__(self): self.impl.length = 16 self.cache_ok = False # to shut up some useless warning types.TypeDecorator.__init__(self,length=self.impl.length) def process_bind_param(self,value,dialect=None): if value and isinstance(value,uuid.UUID): return value.bytes elif value and not isinstance(value,uuid.UUID): raise ValueError('value %s is not a valid uuid.UUID' % value) else: return None def process_result_value(self,value,dialect=None): if value: return uuid.UUID(bytes=value) else: return None def is_mutable(self): return False id_column_name = "id" def id_column(): return Column(id_column_name,UUID(),primary_key=True,default=uuid.uuid4) def get_signed(string, salt="blank"): return Signer(app.secret_key, salt=salt).sign(str(string)) def get_unsigned(string, salt="blank"): return Signer(app.secret_key, salt=salt).unsign(str(string)).decode("utf-8") app = Flask(__name__) try: app.config.from_pyfile(os.environ['STACKBIN_CONF']) except: app.config.from_pyfile('stackbin.conf') db = SQLAlchemy(app) if "STATIC_FOLDER" in app.config: app.static_folder=app.config["STATIC_FOLDER"] if "TEMPLATE_FOLDER" in app.config: app.template_folder=app.config["TEMPLATE_FOLDER"] def requires_session(function): ''' Requires a valid session, provided by cookie! ''' @wraps(function) def decorated(*args, **kwargs): if not session: #return Response("requires session",401) return redirect(url_for('login')) else: if 'user' not in session: return Response("User is not in this session.",401) s_user = session['user'] c_user = request.cookies.get('user') print(f"session user: {s_user}") print(f"cookie user: {c_user}") if session['user'] != c_user: return Response("Wrong user for this session!.",401) # otherwise, everything is good! return function(s_user, [], *args,**kwargs) # catch-all #return Response("requires session",401) return redirect(url_for('login')) return decorated def requires_admin_credential(function): """ Requires the user pass the correct admin credential configured in the conf file. """ @wraps(function) def decorated(*args, **kwargs): # formdata is in session if we are coming from login_basic() form = session.get('formdata', None) if form: session.pop('formdata') if 'username' in form: username = form['username'] if 'password' in form: pw = form['password'] else: # then we are coming from the form with POST data if 'username' not in request.form or 'password' not in request.form: return _unauthorized_admin() username = request.form['username'] pw = request.form['password'] if 'ADMIN_USERNAME' in app.config and \ 'ADMIN_PASSWORD' in app.config and \ username == app.config['ADMIN_USERNAME'] and pw == app.config['ADMIN_PASSWORD']: return function(username, [], *args, **kwargs) else: return _unauthorized_admin() return decorated def _unauthorized_admin(): return Response(f'Unauthorized! Invalid admin credential... returning to login form', 401) def url_for_other_page(page): args = request.view_args.copy() args['page'] = page return url_for(request.endpoint, **args) app.jinja_env.globals['url_for_other_page'] = url_for_other_page app.jinja_env.globals['appname'] = app.config['APPNAME'] def refresh_string(delay,url): """ Returns a string for html content for redirecting the user back after the requested delay, to the requested url. """ return f'' @app.before_request def check_user_status(): g.user = None if 'user_id' in session: #g.user = User.query.get(session['user_id']) g.user = session['user_id'] class Paste(db.Model): id = id_column() code = db.Column(db.Text) title = db.Column(db.Text) pub_date = db.Column(db.DateTime) exp_date = db.Column(db.DateTime) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) is_private = db.Column(db.Boolean) parent_id = db.Column(UUID(), db.ForeignKey('paste.id')) parent = db.relationship('Paste', lazy=True, backref='children', uselist=False, remote_side=[id]) def __init__(self, user, code, title, relative_expiration_seconds, parent=None, is_private=False): self.user = user self.code = code self.title = title self.is_private = is_private u = datetime.utcnow() try: # this will fail on POSTed value of "never" for exp b = timedelta(seconds=relative_expiration_seconds) except: # so timedelta() will return 0 seconds, which makes the exp_date = pub_date, which is treated # by the cleanup logic and jinja2 templates as never-expires. b = timedelta() self.pub_date = u self.exp_date = u + b self.parent = parent class User(db.Model): id = db.Column(db.Integer, primary_key=True) display_name = db.Column(db.String(120)) fb_id = db.Column(db.String(30), unique=True) pastes = db.relationship(Paste, lazy='dynamic', backref='user') @app.route('/', methods=['GET', 'POST']) def new_paste(): parent = None reply_to = request.args.get('reply_to') if reply_to is not None: try: parent = Paste.query.get(uuid.UUID(reply_to)) except: parent = Paste.query.get(reply_to) if request.method == 'POST' and request.form['code']: is_private = bool(request.form.get('is_private')) title = "Untitled paste" if request.form['pastetitle'] and request.form['pastetitle'] != "Enter title here": title = request.form['pastetitle'] relative_expiration_seconds = 0 exp = 0 # start with an empty value if 'exp' in request.form and request.form['exp']: exp_opt = request.form['exp'] if exp_opt not in app.config['EXPIRATION_OPTIONS']: try: exp = timeparse(f"+ {app.config['EXPIRATION_OPTIONS'][0]}") except: exp = 60 * 60 # failsafe, 1 hour print(f"WARNING: requested expiration \"{exp_opt}\" is not in the list of options {app.config['EXPIRATION_OPTIONS']}, so will use {exp}") else: try: exp = timeparse(f"+ {exp_opt}") except: exp = 0 paste = Paste( g.user, request.form['code'], title, exp, parent=parent, is_private=is_private ) db.session.add(paste) db.session.commit() sign = get_signed(paste.id, salt=app.config['SALT']) \ if is_private else None return redirect(url_for('show_paste', paste_id=paste.id, s=sign)) try: exp_opts = app.config['EXPIRATION_OPTIONS'] except: exp_opts = None return render_template('new_paste.html', parent=parent, exp_opts = exp_opts) # This @timer is from the uwsgidecorators try: @timer(app.config['LOOP_DELAY']) def cleanup_expired_pastes(num): # num is useless. """ Every LOOP_DELAY seconds, find any entries that have expired and then delete them. """ all1 = Paste.query.all() need_commit = False for p in all1: # the exp_date != pub_date is very important, because anything with "never" expires # is stored in the db as exp_date = pub_date if p.exp_date and p.exp_date != p.pub_date and p.exp_date <= datetime.utcnow(): print(f"INFO: deleting paste \"{p.title}\" with expiration {p.exp_date}.") Paste.query.filter(Paste.id == p.id).delete() need_commit = True # only run the commit once! if need_commit: db.session.commit() except: pass @app.route('//') @app.route('/') def show_paste(paste_id): try: paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id) except: paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id)) if paste.is_private: try: sign = request.args.get('s', '') assert str(paste.id) == \ get_unsigned(sign, salt=app.config['SALT']) except: abort(403) parent = None if paste.parent_id: try: parent = Paste.query.get(uuid.UUID(paste.parent_id)) except: parent = Paste.query.get(paste.parent_id) children = [] if paste.children: for i in paste.children: j = None try: j = Paste.query.get(uuid.UUID(i.id)) except: j = Paste.query.get(i.id) if j: k = j.id, j.title children.append(k) return render_template('show_paste.html', paste=paste, parent=parent, children=children) @app.route('//delete/', methods=['POST']) @app.route('//delete', methods=['POST']) def delete_paste(paste_id): try: paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id) except: paste = Paste.query.options(db.eagerload('children')).get_or_404(uuid.UUID(paste_id)) sign = str(request.form['s']) try: assert str(paste.id) == get_unsigned(sign, salt=app.config['DELETESALT']) except: abort(403) try: Paste.query.filter(Paste.id == paste.id).delete() db.session.commit() message = refresh_string(1, url_for("admin")) + "OK" return message,200 except: return f"failure to delete object. Select here to return to the admin panel.",500 def get_all_pastes(): """ Get custom arrangement of pastes for Admin view """ all1 = Paste.query.all() all2 = [] for p1 in all1: parent_id = None parent_title = None children = [] if p1.parent_id: parent_id = p1.parent_id try: parent_title = Paste.query.get(p1.parent_id).title except: parent_title = "" # works better than None for the parent column of the generated html if p1.children: for c1 in p1.children: child = Paste.query.get(c1.id) child_title = child.title c2 = c1.id, child_title children.append(c2) private = None if p1.is_private: private = get_signed(p1.id, salt=app.config['SALT']) p2 = { "id": p1.id, "title": p1.title, "pub_date": p1.pub_date, "exp_date": p1.exp_date, "private": private, "user_id": p1.user_id, "is_private": p1.is_private, "parent": (parent_id, parent_title), "children": children, "delete": get_signed(p1.id, salt=app.config['DELETESALT']).decode("utf-8") } all2.append(p2) return all2 @app.route('/admin/') @app.route('/admin') @requires_session def admin(username, groups = []): all_pastes = get_all_pastes() return render_template('admin.html', pastes = all_pastes) @app.route('/favicon.ico') def favicon(): try: return redirect(url_for('static', filename='favicon.ico')) except: abort(404) @app.route('/set') def get_proxied_path(): if 'HTTP_X_FORWARDED_PREFIX' in request.environ: pl = len(dict(request.headers)["X-Forwarded-Host"].split(", ")) #prefix = request.environ['HTTP_X_FORWARDED_PREFIX'] app.wsgi_app = ProxyFix(app.wsgi_app,x_for=pl,x_host=pl,x_port=pl,x_prefix=pl,x_proto=pl) return redirect(url_for('new_paste')) @app.route("/logout") @app.route("/logout/") def logout(): resp = Response(f'logged out') # not documented but is found on the Internet in a few random places: session.clear() #resp.set_cookie('user','',expires=0) return resp @app.route("/login/new") @app.route("/login/new/") def login_new(): return redirect(url_for("login", new="")) @app.route("/login/", methods=['POST','GET']) def login(user="None"): if request.method == "GET": if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args): return redirect(url_for("admin")) auth_header = request.headers.get("Authorization") # default, show login form return redirect(url_for("login_form")) elif request.method == "POST": if request.authorization: return redirect(url_for("login_basic"),code=307) return redirect(url_for("login_generic")) #return f"Authentication method not supported yet.",400 @app.route("/login/basic",methods=['POST','GET']) @app.route("/login/basic/",methods=['POST','GET']) def login_basic(): if not request.authorization: return Response(f"Please provide username and password.",401,{'WWW-Authenticate': 'Basic'}) if 'username' not in request.authorization: return Response(f"No username provided.",401) if 'password' not in request.authorization: return Response(f"No password provided.",401) username = request.authorization.username pw = request.authorization.password form={'username':username,'password':pw} session['formdata'] = form return redirect(url_for("login_generic"),code=307) @app.route("/login/form", methods=['POST','GET']) @app.route("/login/form/", methods=['POST','GET']) def login_form(): if request.method == "GET": return render_template("login_form.html", login_url = url_for("login_form") ) else: # assume it is a POST username="" if 'username' in request.form: username = request.form['username'] password="" if 'password' in request.form: password = request.form['password'] form={'username':username,'password':password} session['formdata'] = form return redirect(url_for("login_generic"), code=307) @app.route("/login/generic", methods=['POST','GET']) @app.route("/login/generic/", methods=['POST','GET']) @requires_admin_credential def login_generic(user,groups=[]): resp = Response(f'success') session['user_id'] = "admin" resp = login_success(session,resp,user,groups) return resp def login_success(session,resp,user,groups=[]): resp.set_cookie('user',user) #end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime #end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ") #resp.set_cookie('timestamp',end_time_str) session.permanent = True session['user']=user #session['end_time'] = end_time_str print(f"DEBUG: got valid user {user}") return resp # Initialize the database if it does not already exist db.create_all() if __name__ == "__main__": app.run()