summaryrefslogtreecommitdiff
path: root/session_app.py
diff options
context:
space:
mode:
Diffstat (limited to 'session_app.py')
-rwxr-xr-xsession_app.py379
1 files changed, 379 insertions, 0 deletions
diff --git a/session_app.py b/session_app.py
new file mode 100755
index 0000000..847df19
--- /dev/null
+++ b/session_app.py
@@ -0,0 +1,379 @@
+#!/usr/bin/env python
+# Startdate: 2021-06-17
+# goals:
+# accept kerberos or ldap "authorization: basic gowinablz;nuiowekj==" auth, to create a cookie for a session that lasts for 15 minutes. use the cookie to get to protected URLs
+# References:
+# https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101
+# https://www.techlifediary.com/python-web-development-tutorial-using-flask-session-cookies/
+# delete cookie https://stackoverflow.com/a/14386413/3569534
+# timeout sessions https://stackoverflow.com/a/11785722/3569534
+# future: https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101
+# better timeout session: https://stackoverflow.com/a/49891626/3569534
+# store "formdata" in session for changing the basic auth to form data for the ldap login https://stackoverflow.com/a/56904875/3569534
+# requires_session similar to requires_auth (kerberos) return to the passed function, from https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py
+# modify url from urlparse https://stackoverflow.com/a/21629125/3569534
+# _unauthorized_kerberos meta redirect from https://billstclair.com/html-redirect2.html
+# preserve POST with code 307 https://stackoverflow.com/a/15480983/3569534
+# Improve:
+# remove session info, when logging out?
+# provide web page for adjusting settings like ldap uri
+# Run:
+# FLASK_APP=session_app.py FLASK_DEBUG=1 flask run --host 0.0.0.0
+# Dependencies:
+# apt-get install python3-flask
+# pip3 install Flask-kerberos kerberos
+
+from flask import Flask, Response, redirect, url_for, render_template, request, _request_ctx_stack as stack, make_response, session
+from flask_kerberos import init_kerberos, requires_authentication, _unauthorized, _forbidden, _gssapi_authenticate
+from functools import wraps
+from urllib.parse import urlparse
+import kerberos, binascii, datetime, os
+import session_ldap
+
+app = Flask(__name__)
+if 'SESSION_CONFIG' in os.environ:
+ conf_file=os.environ['SESSION_CONFIG']
+else:
+ conf_file="dev.cfg"
+app.config.from_pyfile(conf_file, silent=False)
+#app.config.from_object(__name__)
+if 'DEBUG' in app.config and app.config['DEBUG']:
+ app.debug=True
+secret_key_value = os.urandom(24)
+secret_key_value_hex_encoded = binascii.hexlify(secret_key_value)
+app.config['SECRET_KEY'] = secret_key_value_hex_encoded
+app.permanent_session_lifetime=datetime.timedelta(minutes=app.config['SESSION_DURATION_MINUTES'])
+
+def requires_session(function):
+ '''
+ Requires a valid session, provided by cookie!
+ '''
+ @wraps(function)
+ def decorated(*args, **kwargs):
+ if not session:
+ return Response("requires session",401)
+ else:
+ if 'user' not in session:
+ return Response("User is not in this session.",401)
+ s_user = session['user']
+ c_user = request.cookies.get('user')
+ print(f"session user: {s_user}")
+ print(f"cookie user: {c_user}")
+ if session['user'] != c_user:
+ return Response("Wrong user for this session!.",401)
+ # otherwise, everything is good!
+ return function(*args,**kwargs)
+ # catch-all
+ return Response("requires session",401)
+ return decorated
+
+# imported from flask_kerberos and modified, because I want custom 401 message
+def requires_authn_kerberos(function):
+ '''
+ Require that the wrapped view function only be called by users
+ authenticated with Kerberos. The view function will have the authenticated
+ users principal passed to it as its first argument.
+
+ :param function: flask view function
+ :type function: function
+ :returns: decorated function
+ :rtype: function
+ '''
+ @wraps(function)
+ def decorated(*args, **kwargs):
+ header = request.headers.get("Authorization")
+ if header:
+ ctx = stack.top
+ token = ''.join(header.split()[1:])
+ rc = _gssapi_authenticate(token)
+ if rc == kerberos.AUTH_GSS_COMPLETE:
+ # if ldap config options are set, then do kerberos -> short username resolution
+ user = ctx.kerberos_user
+ if 'LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB' in app.config:
+ conn = session_ldap.get_ldap_connection(
+ server_uri=get_next_ldap_server(app),
+ bind_dn=app.config['LDAP_BIND_DN'],
+ bind_pw=app.config['LDAP_BIND_PASSWORD'],
+ )
+ this_user = session_ldap.get_ldap_attrib_from_krbPrincipalName(
+ connection = conn,
+ search_base=app.config['LDAP_USER_BASE'],
+ user_attrib="dn",
+ user_krbPrincipalName=user,
+ krbPrincipalName_attrib=app.config['LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB']
+ )
+ #print(f"DEBUG: krb user {user} is ldap dn {this_user}")
+ shortuser = session_ldap.get_ldap_username_attrib_from_dn(
+ authenticated_user=conn,
+ user_dn=this_user,
+ user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB']
+ )
+ #print(f"DEBUG: shortuser {shortuser}")
+ groups = session_ldap.get_ldap_user_groups(
+ connection=conn,
+ user_dn=this_user,
+ user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'],
+ group_name_attrib=app.config['LDAP_GROUP_DISPLAY_ATTRIB'],
+ group_base=app.config['LDAP_GROUP_BASE']
+ )
+ #print(f"DEBUG: groups {groups}")
+ response = function(shortuser, groups, *args, **kwargs)
+ response = make_response(response)
+ if ctx.kerberos_token is not None:
+ response.headers['WWW-Authenticate'] = ' '.join(['negotiate', ctx.kerberos_token])
+ return response
+ elif rc != kerberos.AUTH_GSS_CONTINUE:
+ return _forbidden()
+ return _unauthorized_kerberos()
+ return decorated
+
+def requires_authn_ldap(function):
+ '''
+ Require that the wrapped view function only be called by users
+ authenticated with ldap. The view function will have the authenticated
+ users principal passed to it as its first argument.
+ :param function: flask view function
+ :type function: function
+ :returns: decorated function
+ :rtype: function
+ '''
+ @wraps(function)
+ def decorated(*args, **kwargs):
+ # formdata is in session if we are coming from login_basic()
+ form = session.get('formdata', None)
+ if form:
+ #print(f"DEBUG: requires_authn_ldap form={form}")
+ session.pop('formdata')
+ if 'username' in form:
+ username = form['username']
+ if 'password' in form:
+ pw = form['password']
+ else:
+ # then we are coming from the form with POST data
+ if 'username' not in request.form or 'password' not in request.form:
+ return _unauthorized_ldap()
+ username = request.form['username']
+ pw = request.form['password']
+ #print(f"DEBUG: requires_authn_ldap with username={username} and pw={pw}")
+ # learn dn of user
+ this_uri = get_next_ldap_server(app)
+ this_user = session_ldap.list_matching_users(
+ server_uri=this_uri,
+ bind_dn=app.config['LDAP_BIND_DN'],
+ bind_pw=app.config['LDAP_BIND_PASSWORD'],
+ user_base=app.config['LDAP_USER_BASE'],
+ username=username,
+ user_match_attrib=app.config['LDAP_USER_MATCH_ATTRIB']
+ )
+ # list_matching_users always returns list, so if it contains <> 1 we are in trouble
+ if len(this_user) != 1:
+ print(f"WARNING: cannot determine unique user for {app.config['LDAP_USER_MATCH_ATTRIB']}={username} which returned {this_user}")
+ return _unauthorized_ldap()
+ this_user = this_user[0]
+ print(f"DEBUG: requires_authn_ldap: found in ldap the username {this_user}")
+ ll = ldap_login(this_user,pw)
+ if ll:
+ shortuser = session_ldap.get_ldap_username_attrib_from_dn(
+ authenticated_user=ll,
+ user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB']
+ )
+ groups = session_ldap.get_ldap_user_groups(
+ connection=ll,
+ user_dn=this_user,
+ user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'],
+ group_name_attrib=app.config['LDAP_GROUP_DISPLAY_ATTRIB'],
+ group_base=app.config['LDAP_GROUP_BASE']
+ )
+ print(f"DEBUG: user {shortuser} has groups {groups}")
+ return function(shortuser, groups ,*args, **kwargs)
+ else:
+ return _unauthorized_ldap()
+ return decorated
+
+def _unauthorized_kerberos():
+ '''
+ Indicate that authentication is required
+ '''
+ return Response(f'<meta http-equiv="Refresh" content="4; url={url_for("login_ldap")}">Unauthorized! No kerberos auth provided. Trying <a href="{url_for("login_ldap")}">ldap</a> automatically in a moment.', 401, {'WWW-Authenticate': 'Negotiate'})
+
+def _unauthorized_ldap():
+ return Response(f'<meta http-equiv="Refresh" content="4; url={url_for("login")}">Unauthorized! Invalid ldap credentials... returning to login form', 401)
+
+@app.route("/")
+def index():
+ return render_template('index.html')
+
+@app.route("/protected/")
+@requires_session
+def protected_page():
+ return protected_page_real()
+
+def protected_page_real():
+ s_user = session['user']
+ c_user = request.cookies.get('user')
+ groups = session['groups']
+ cookie=request.cookies
+ print(cookie)
+ return render_template('view.html', c_user = c_user, s_user=s_user, cookie=cookie, groups=groups)
+
+@app.route("/login/new")
+@app.route("/login/new/")
+def login_new():
+ return redirect(url_for("login", new=""))
+
+@app.route("/login/", methods=['POST','GET'])
+def login(user="None"):
+ if request.method == "GET":
+ if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args):
+ return redirect(url_for("protected_page"))
+ auth_header = request.headers.get("Authorization")
+ if auth_header:
+ if "negotiate" in auth_header:
+ # assume we are already trying to log in with kerberos
+ return redirect(url_for("login_kerberos"))
+ # default, show login form
+ return redirect(url_for("login_form"))
+ elif request.method == "POST":
+ if request.authorization:
+ return redirect(url_for("login_basic"),code=307)
+ return handle_login_ldap_from_non_ldap(request)
+
+def get_next_ldap_server(app):
+ # on first ldap_login attempt, cache this lookup result:
+ if 'LDAP_HOSTS' not in app.config:
+ this_domain = urlparse(app.config['LDAP_URI']).hostname
+ app.config['LDAP_HOSTS'] = session_ldap.list_ldap_servers_for_domain(this_domain)
+ else:
+ # rotate them! So every ldap_login attempt will use the next ldap server in the list.
+ this_list = app.config['LDAP_HOSTS']
+ a = this_list[0]
+ this_list.append(a)
+ this_list.pop(0)
+ app.config['LDAP_HOSTS'] = this_list
+ # construct a new, full uri.
+ this_netloc = app.config['LDAP_HOSTS'][0]
+ up = urlparse(app.config['LDAP_URI'])
+ if up.port:
+ this_netloc += f":{up.port}"
+ this_uri = up._replace(netloc=this_netloc).geturl()
+ return this_uri
+
+def ldap_login(username,password):
+ #print(f"DEBUG: Trying user {username} with pw '{password}'")
+ this_uri = get_next_ldap_server(app)
+ # Perform the ldap interactions
+ user = session_ldap.authenticated_user(
+ server_uri=this_uri,
+ user_dn=username,
+ password=password
+ )
+ if user:
+ return user
+ else:
+ return False
+ return False
+
+@app.route("/login/kerberos")
+@app.route("/login/kerberos/")
+@requires_authn_kerberos
+def login_kerberos(user,groups=[]):
+ resp = Response(f'<meta http-equiv="Refresh" content="1; url={url_for("protected_page")}">success with kerberos')
+ #resp.headers['login'] = "from-kerberos"
+ resp.set_cookie('type',"kerberos")
+ resp = login_generic(session,resp,user,groups)
+ return resp
+
+@app.route("/login/ldap", methods=['POST','GET'])
+@app.route("/login/ldap/", methods=['POST','GET'])
+@requires_authn_ldap
+def login_ldap(user,groups=[]):
+ resp = Response(f'<meta http-equiv="Refresh" content="1; url={url_for("protected_page")}">success with ldap')
+ resp.set_cookie('type',"ldap")
+ resp = login_generic(session,resp,user,groups)
+ return resp
+
+def login_generic(session,resp,user,groups=[]):
+ resp.set_cookie('user',user)
+ end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime
+ end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ")
+ resp.set_cookie('timestamp',end_time_str)
+ session.permanent = True
+ session['user']=user
+ session['end_time'] = end_time_str
+ session['groups'] = groups
+ return resp
+
+@app.route("/login/form", methods=['POST','GET'])
+@app.route("/login/form/", methods=['POST','GET'])
+def login_form():
+ if request.method == "GET":
+ options = {
+ "ldap": "ldap",
+ "other": "other"
+ }
+ return render_template("login_form.html",
+ login_url = url_for("login"),
+ options=options,
+ kerberos_url = url_for("login_kerberos")
+ )
+ else:
+ # assume it is a POST
+ return redirect(url_for("login_ldap"), code=307)
+
+def handle_login_ldap_from_non_ldap(request):
+ # set default logintype for user
+ logintype = "ldap"
+ # redirect to whichever option was chosen in the drop-down
+ if 'logintype' in request.form:
+ logintype = request.form['logintype']
+ if "ldap" == logintype:
+ return redirect(url_for("login_ldap"), code=307)
+ else:
+ return f"Authentication method {logintype} not supported yet.",400
+
+@app.route("/logout")
+@app.route("/logout/")
+def logout():
+ resp = Response(f'<meta http-equiv="Refresh" content="1; url={url_for("index")}">logged out')
+ # Doing anything with session here leaves a cookie.
+ #session['user']=""
+ resp.set_cookie('user','',expires=0)
+ resp.set_cookie('type','',expires=0)
+ resp.set_cookie('session','',expires=0)
+ resp.set_cookie('timestamp','',expires=0)
+ return resp
+
+@app.route("/login/basic",methods=['POST','GET'])
+@app.route("/login/basic/",methods=['POST','GET'])
+def login_basic():
+ if not request.authorization:
+ return Response(f"Please provide username and password.",401,{'WWW-Authenticate': 'Basic'})
+ if 'username' not in request.authorization:
+ return Response(f"No username provided.",401)
+ if 'password' not in request.authorization:
+ return Response(f"No password provided.",401)
+ username = request.authorization.username
+ pw = request.authorization.password
+ form={'username':username,'password':pw}
+ session['formdata'] = form
+ return redirect(url_for("login_ldap"),code=307)
+
+## This bumps the session lifetime to two minutes farther out from each web request with this session.
+#@app.before_request
+#def make_session_permanent():
+# session.permanent = True
+# session['end_time'] = datetime.datetime.now()+app.permanent_session_lifetime
+
+os.environ['KRB5_KTNAME'] = app.config['KRB5_KTNAME']
+if 'KRB5_TRACE' in app.config:
+ os.environ['KRB5_TRACE'] = app.config['KRB5_TRACE']
+
+init_kerberos(app, hostname=app.config['HOSTNAME'], service=app.config['KRB5_SERVICE'])
+if __name__ == '__main__':
+ #init_kerberos(app, hostname=app.config['HOSTNAME'], service=app.config['KRB5_SERVICE'])
+ print("should listen to ",app.config['LISTEN_HOST'])
+ app.run(
+ host=app.config['LISTEN_HOST'],
+ port=app.config['LISTEN_PORT'],
+ debug=app.config['DEBUG']
+ )
bgstack15