From d52971b79245328b4c0764bf0269d443a485f249 Mon Sep 17 00:00:00 2001 From: "B. Stack" Date: Sun, 27 Jun 2021 17:26:06 -0400 Subject: move config to separate file also move references to top of file --- session_app.py | 379 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100755 session_app.py (limited to 'session_app.py') diff --git a/session_app.py b/session_app.py new file mode 100755 index 0000000..847df19 --- /dev/null +++ b/session_app.py @@ -0,0 +1,379 @@ +#!/usr/bin/env python +# Startdate: 2021-06-17 +# goals: +# accept kerberos or ldap "authorization: basic gowinablz;nuiowekj==" auth, to create a cookie for a session that lasts for 15 minutes. use the cookie to get to protected URLs +# References: +# https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 +# https://www.techlifediary.com/python-web-development-tutorial-using-flask-session-cookies/ +# delete cookie https://stackoverflow.com/a/14386413/3569534 +# timeout sessions https://stackoverflow.com/a/11785722/3569534 +# future: https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 +# better timeout session: https://stackoverflow.com/a/49891626/3569534 +# store "formdata" in session for changing the basic auth to form data for the ldap login https://stackoverflow.com/a/56904875/3569534 +# requires_session similar to requires_auth (kerberos) return to the passed function, from https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py +# modify url from urlparse https://stackoverflow.com/a/21629125/3569534 +# _unauthorized_kerberos meta redirect from https://billstclair.com/html-redirect2.html +# preserve POST with code 307 https://stackoverflow.com/a/15480983/3569534 +# Improve: +# remove session info, when logging out? +# provide web page for adjusting settings like ldap uri +# Run: +# FLASK_APP=session_app.py FLASK_DEBUG=1 flask run --host 0.0.0.0 +# Dependencies: +# apt-get install python3-flask +# pip3 install Flask-kerberos kerberos + +from flask import Flask, Response, redirect, url_for, render_template, request, _request_ctx_stack as stack, make_response, session +from flask_kerberos import init_kerberos, requires_authentication, _unauthorized, _forbidden, _gssapi_authenticate +from functools import wraps +from urllib.parse import urlparse +import kerberos, binascii, datetime, os +import session_ldap + +app = Flask(__name__) +if 'SESSION_CONFIG' in os.environ: + conf_file=os.environ['SESSION_CONFIG'] +else: + conf_file="dev.cfg" +app.config.from_pyfile(conf_file, silent=False) +#app.config.from_object(__name__) +if 'DEBUG' in app.config and app.config['DEBUG']: + app.debug=True +secret_key_value = os.urandom(24) +secret_key_value_hex_encoded = binascii.hexlify(secret_key_value) +app.config['SECRET_KEY'] = secret_key_value_hex_encoded +app.permanent_session_lifetime=datetime.timedelta(minutes=app.config['SESSION_DURATION_MINUTES']) + +def requires_session(function): + ''' + Requires a valid session, provided by cookie! + ''' + @wraps(function) + def decorated(*args, **kwargs): + if not session: + return Response("requires session",401) + else: + if 'user' not in session: + return Response("User is not in this session.",401) + s_user = session['user'] + c_user = request.cookies.get('user') + print(f"session user: {s_user}") + print(f"cookie user: {c_user}") + if session['user'] != c_user: + return Response("Wrong user for this session!.",401) + # otherwise, everything is good! + return function(*args,**kwargs) + # catch-all + return Response("requires session",401) + return decorated + +# imported from flask_kerberos and modified, because I want custom 401 message +def requires_authn_kerberos(function): + ''' + Require that the wrapped view function only be called by users + authenticated with Kerberos. The view function will have the authenticated + users principal passed to it as its first argument. + + :param function: flask view function + :type function: function + :returns: decorated function + :rtype: function + ''' + @wraps(function) + def decorated(*args, **kwargs): + header = request.headers.get("Authorization") + if header: + ctx = stack.top + token = ''.join(header.split()[1:]) + rc = _gssapi_authenticate(token) + if rc == kerberos.AUTH_GSS_COMPLETE: + # if ldap config options are set, then do kerberos -> short username resolution + user = ctx.kerberos_user + if 'LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB' in app.config: + conn = session_ldap.get_ldap_connection( + server_uri=get_next_ldap_server(app), + bind_dn=app.config['LDAP_BIND_DN'], + bind_pw=app.config['LDAP_BIND_PASSWORD'], + ) + this_user = session_ldap.get_ldap_attrib_from_krbPrincipalName( + connection = conn, + search_base=app.config['LDAP_USER_BASE'], + user_attrib="dn", + user_krbPrincipalName=user, + krbPrincipalName_attrib=app.config['LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB'] + ) + #print(f"DEBUG: krb user {user} is ldap dn {this_user}") + shortuser = session_ldap.get_ldap_username_attrib_from_dn( + authenticated_user=conn, + user_dn=this_user, + user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB'] + ) + #print(f"DEBUG: shortuser {shortuser}") + groups = session_ldap.get_ldap_user_groups( + connection=conn, + user_dn=this_user, + user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'], + group_name_attrib=app.config['LDAP_GROUP_DISPLAY_ATTRIB'], + group_base=app.config['LDAP_GROUP_BASE'] + ) + #print(f"DEBUG: groups {groups}") + response = function(shortuser, groups, *args, **kwargs) + response = make_response(response) + if ctx.kerberos_token is not None: + response.headers['WWW-Authenticate'] = ' '.join(['negotiate', ctx.kerberos_token]) + return response + elif rc != kerberos.AUTH_GSS_CONTINUE: + return _forbidden() + return _unauthorized_kerberos() + return decorated + +def requires_authn_ldap(function): + ''' + Require that the wrapped view function only be called by users + authenticated with ldap. The view function will have the authenticated + users principal passed to it as its first argument. + :param function: flask view function + :type function: function + :returns: decorated function + :rtype: function + ''' + @wraps(function) + def decorated(*args, **kwargs): + # formdata is in session if we are coming from login_basic() + form = session.get('formdata', None) + if form: + #print(f"DEBUG: requires_authn_ldap form={form}") + session.pop('formdata') + if 'username' in form: + username = form['username'] + if 'password' in form: + pw = form['password'] + else: + # then we are coming from the form with POST data + if 'username' not in request.form or 'password' not in request.form: + return _unauthorized_ldap() + username = request.form['username'] + pw = request.form['password'] + #print(f"DEBUG: requires_authn_ldap with username={username} and pw={pw}") + # learn dn of user + this_uri = get_next_ldap_server(app) + this_user = session_ldap.list_matching_users( + server_uri=this_uri, + bind_dn=app.config['LDAP_BIND_DN'], + bind_pw=app.config['LDAP_BIND_PASSWORD'], + user_base=app.config['LDAP_USER_BASE'], + username=username, + user_match_attrib=app.config['LDAP_USER_MATCH_ATTRIB'] + ) + # list_matching_users always returns list, so if it contains <> 1 we are in trouble + if len(this_user) != 1: + print(f"WARNING: cannot determine unique user for {app.config['LDAP_USER_MATCH_ATTRIB']}={username} which returned {this_user}") + return _unauthorized_ldap() + this_user = this_user[0] + print(f"DEBUG: requires_authn_ldap: found in ldap the username {this_user}") + ll = ldap_login(this_user,pw) + if ll: + shortuser = session_ldap.get_ldap_username_attrib_from_dn( + authenticated_user=ll, + user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB'] + ) + groups = session_ldap.get_ldap_user_groups( + connection=ll, + user_dn=this_user, + user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'], + group_name_attrib=app.config['LDAP_GROUP_DISPLAY_ATTRIB'], + group_base=app.config['LDAP_GROUP_BASE'] + ) + print(f"DEBUG: user {shortuser} has groups {groups}") + return function(shortuser, groups ,*args, **kwargs) + else: + return _unauthorized_ldap() + return decorated + +def _unauthorized_kerberos(): + ''' + Indicate that authentication is required + ''' + return Response(f'Unauthorized! No kerberos auth provided. Trying ldap automatically in a moment.', 401, {'WWW-Authenticate': 'Negotiate'}) + +def _unauthorized_ldap(): + return Response(f'Unauthorized! Invalid ldap credentials... returning to login form', 401) + +@app.route("/") +def index(): + return render_template('index.html') + +@app.route("/protected/") +@requires_session +def protected_page(): + return protected_page_real() + +def protected_page_real(): + s_user = session['user'] + c_user = request.cookies.get('user') + groups = session['groups'] + cookie=request.cookies + print(cookie) + return render_template('view.html', c_user = c_user, s_user=s_user, cookie=cookie, groups=groups) + +@app.route("/login/new") +@app.route("/login/new/") +def login_new(): + return redirect(url_for("login", new="")) + +@app.route("/login/", methods=['POST','GET']) +def login(user="None"): + if request.method == "GET": + if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args): + return redirect(url_for("protected_page")) + auth_header = request.headers.get("Authorization") + if auth_header: + if "negotiate" in auth_header: + # assume we are already trying to log in with kerberos + return redirect(url_for("login_kerberos")) + # default, show login form + return redirect(url_for("login_form")) + elif request.method == "POST": + if request.authorization: + return redirect(url_for("login_basic"),code=307) + return handle_login_ldap_from_non_ldap(request) + +def get_next_ldap_server(app): + # on first ldap_login attempt, cache this lookup result: + if 'LDAP_HOSTS' not in app.config: + this_domain = urlparse(app.config['LDAP_URI']).hostname + app.config['LDAP_HOSTS'] = session_ldap.list_ldap_servers_for_domain(this_domain) + else: + # rotate them! So every ldap_login attempt will use the next ldap server in the list. + this_list = app.config['LDAP_HOSTS'] + a = this_list[0] + this_list.append(a) + this_list.pop(0) + app.config['LDAP_HOSTS'] = this_list + # construct a new, full uri. + this_netloc = app.config['LDAP_HOSTS'][0] + up = urlparse(app.config['LDAP_URI']) + if up.port: + this_netloc += f":{up.port}" + this_uri = up._replace(netloc=this_netloc).geturl() + return this_uri + +def ldap_login(username,password): + #print(f"DEBUG: Trying user {username} with pw '{password}'") + this_uri = get_next_ldap_server(app) + # Perform the ldap interactions + user = session_ldap.authenticated_user( + server_uri=this_uri, + user_dn=username, + password=password + ) + if user: + return user + else: + return False + return False + +@app.route("/login/kerberos") +@app.route("/login/kerberos/") +@requires_authn_kerberos +def login_kerberos(user,groups=[]): + resp = Response(f'success with kerberos') + #resp.headers['login'] = "from-kerberos" + resp.set_cookie('type',"kerberos") + resp = login_generic(session,resp,user,groups) + return resp + +@app.route("/login/ldap", methods=['POST','GET']) +@app.route("/login/ldap/", methods=['POST','GET']) +@requires_authn_ldap +def login_ldap(user,groups=[]): + resp = Response(f'success with ldap') + resp.set_cookie('type',"ldap") + resp = login_generic(session,resp,user,groups) + return resp + +def login_generic(session,resp,user,groups=[]): + resp.set_cookie('user',user) + end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime + end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ") + resp.set_cookie('timestamp',end_time_str) + session.permanent = True + session['user']=user + session['end_time'] = end_time_str + session['groups'] = groups + return resp + +@app.route("/login/form", methods=['POST','GET']) +@app.route("/login/form/", methods=['POST','GET']) +def login_form(): + if request.method == "GET": + options = { + "ldap": "ldap", + "other": "other" + } + return render_template("login_form.html", + login_url = url_for("login"), + options=options, + kerberos_url = url_for("login_kerberos") + ) + else: + # assume it is a POST + return redirect(url_for("login_ldap"), code=307) + +def handle_login_ldap_from_non_ldap(request): + # set default logintype for user + logintype = "ldap" + # redirect to whichever option was chosen in the drop-down + if 'logintype' in request.form: + logintype = request.form['logintype'] + if "ldap" == logintype: + return redirect(url_for("login_ldap"), code=307) + else: + return f"Authentication method {logintype} not supported yet.",400 + +@app.route("/logout") +@app.route("/logout/") +def logout(): + resp = Response(f'logged out') + # Doing anything with session here leaves a cookie. + #session['user']="" + resp.set_cookie('user','',expires=0) + resp.set_cookie('type','',expires=0) + resp.set_cookie('session','',expires=0) + resp.set_cookie('timestamp','',expires=0) + return resp + +@app.route("/login/basic",methods=['POST','GET']) +@app.route("/login/basic/",methods=['POST','GET']) +def login_basic(): + if not request.authorization: + return Response(f"Please provide username and password.",401,{'WWW-Authenticate': 'Basic'}) + if 'username' not in request.authorization: + return Response(f"No username provided.",401) + if 'password' not in request.authorization: + return Response(f"No password provided.",401) + username = request.authorization.username + pw = request.authorization.password + form={'username':username,'password':pw} + session['formdata'] = form + return redirect(url_for("login_ldap"),code=307) + +## This bumps the session lifetime to two minutes farther out from each web request with this session. +#@app.before_request +#def make_session_permanent(): +# session.permanent = True +# session['end_time'] = datetime.datetime.now()+app.permanent_session_lifetime + +os.environ['KRB5_KTNAME'] = app.config['KRB5_KTNAME'] +if 'KRB5_TRACE' in app.config: + os.environ['KRB5_TRACE'] = app.config['KRB5_TRACE'] + +init_kerberos(app, hostname=app.config['HOSTNAME'], service=app.config['KRB5_SERVICE']) +if __name__ == '__main__': + #init_kerberos(app, hostname=app.config['HOSTNAME'], service=app.config['KRB5_SERVICE']) + print("should listen to ",app.config['LISTEN_HOST']) + app.run( + host=app.config['LISTEN_HOST'], + port=app.config['LISTEN_PORT'], + debug=app.config['DEBUG'] + ) -- cgit