#!/usr/bin/env python # Startdate: 2021-06-17 # goals: # accept kerberos or ldap "authorization: basic gowinablz;nuiowekj==" auth, to create a cookie for a session that lasts for 15 minutes. use the cookie to get to protected URLs # References: # https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 # https://www.techlifediary.com/python-web-development-tutorial-using-flask-session-cookies/ # delete cookie https://stackoverflow.com/a/14386413/3569534 # timeout sessions https://stackoverflow.com/a/11785722/3569534 # future: https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 # better timeout session: https://stackoverflow.com/a/49891626/3569534 # store "formdata" in session for changing the basic auth to form data for the ldap login https://stackoverflow.com/a/56904875/3569534 # Improve: # move all configs to config file # move all references to references section # make /login/basic actually request http auth if user/pw not provided, or is a GET # try inspecting flask-httpauth auth.login_required section? # accept a bind credential so we can perform lookups of users who match "uid=%s" under a basedn. # accept a ldap dns domain name, and a SRV lookup for _tcp._ldap # Run: # FLASK_APP=session_app.py FLASK_DEBUG=1 flask run --host 0.0.0.0 # Dependencies: # apt-get install python3-flask # pip3 install Flask-kerberos kerberos from flask import Flask, Response, redirect, url_for, render_template, request, _request_ctx_stack as stack, make_response, session from flask_kerberos import init_kerberos, requires_authentication, _unauthorized, _forbidden, _gssapi_authenticate import kerberos from functools import wraps import binascii, datetime import os import session_ldap DEBUG=True app = Flask(__name__) app.config.from_object(__name__) app.debug=True secret_key_value = os.urandom(24) secret_key_value_hex_encoded = binascii.hexlify(secret_key_value) app.config['SECRET_KEY'] = secret_key_value_hex_encoded app.config['LDAP_URI'] = "ldaps://dns1.ipa.internal.com:636" app.config['LDAP_USER_BASEDN'] = "cn=users,cn=accounts,dc=ipa,dc=internal,dc=com" app.config['LDAP_GROUP_BASEDN'] = "cn=groups,cn=accounts,dc=ipa,dc=internal,dc=com" app.config['LDAP_USER_FORMAT'] = "uid=%s,cn=users,cn=accounts,dc=ipa,dc=internal,dc=com" app.config['minutes'] = 2 app.permanent_session_lifetime=datetime.timedelta(minutes=app.config['minutes']) def requires_session(function): ''' Requires a valid session, provided by cookie! ''' @wraps(function) def decorated(*args, **kwargs): if not session: return Response("requires session",401) else: if 'user' not in session: return Response("User is not in this session.",401) s_user = session['user'] c_user = request.cookies.get('user') print(f"session user: {s_user}") print(f"cookie user: {c_user}") if session['user'] != c_user: return Response("Wrong user for this session!.",401) # otherwise, everything is good! #return Response(f"session user: {s_user}
cookie user: {c_user}", 200) # return to the passed function, from https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py return function(*args,**kwargs) # catch-all return Response("requires session",401) return decorated # imported from flask_kerberos and modified, because I want custom 401 message def requires_authn_kerberos(function): ''' Require that the wrapped view function only be called by users authenticated with Kerberos. The view function will have the authenticated users principal passed to it as its first argument. :param function: flask view function :type function: function :returns: decorated function :rtype: function ''' @wraps(function) def decorated(*args, **kwargs): header = request.headers.get("Authorization") if header: ctx = stack.top token = ''.join(header.split()[1:]) rc = _gssapi_authenticate(token) if rc == kerberos.AUTH_GSS_COMPLETE: response = function(ctx.kerberos_user, *args, **kwargs) response = make_response(response) if ctx.kerberos_token is not None: response.headers['WWW-Authenticate'] = ' '.join(['negotiate', ctx.kerberos_token]) return response elif rc != kerberos.AUTH_GSS_CONTINUE: return _forbidden() return _unauthorized_kerberos() return decorated def requires_authn_ldap(function): ''' Require that the wrapped view function only be called by users authenticated with ldap. The view function will have the authenticated users principal passed to it as its first argument. :param function: flask view function :type function: function :returns: decorated function :rtype: function ''' @wraps(function) def decorated(*args, **kwargs): # formdata is in session if we are coming from login_basic() form = session.get('formdata', None) if form: print(f"DEBUG: requires_authn_ldap form={form}") session.pop('formdata') if 'username' in form: username = form['username'] if 'password' in form: pw = form['password'] else: # then we are coming from the form with POST data if 'username' not in request.form or 'password' not in request.form: return _unauthorized_ldap() username = request.form['username'] pw = request.form['password'] #print(f"DEBUG: requires_authn_ldap with username={username} and pw={pw}") ll = ldap_login(username,pw) if ll: return function(ll.user,*args, **kwargs) else: return _unauthorized_ldap() return decorated def _unauthorized_kerberos(): ''' Indicate that authentication is required ''' # from https://billstclair.com/html-redirect2.html return Response(f'Unauthorized! No kerberos auth provided. Trying ldap automatically in a moment.', 401, {'WWW-Authenticate': 'Negotiate'}) def _unauthorized_ldap(): return Response(f'Unauthorized! Invalid ldap credentials... returning to login form', 401) @app.route("/") def index(): return render_template('index.html') @app.route("/protected/") @requires_session def protected_page(): return protected_page_real() def protected_page_real(): s_user = session['user'] c_user = request.cookies.get('user') cookie=request.cookies print(cookie) return render_template('view.html', c_user = c_user, s_user=s_user, cookie=cookie) @app.route("/login/new") @app.route("/login/new/") def login_new(): return redirect(url_for("login", new="")) @app.route("/login/", methods=['POST','GET']) def login(user="None"): if request.method == "GET": if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args): return redirect(url_for("protected_page")) auth_header = request.headers.get("Authorization") if auth_header: if "negotiate" in auth_header: # assume we are already trying to log in with kerberos return redirect(url_for("login_kerberos")) # default, show login form return redirect(url_for("login_form")) elif request.method == "POST": if request.authorization: return redirect(url_for("login_basic"),code=307) return handle_login_ldap_from_non_ldap(request) def ldap_login(username,password): #print(f"DEBUG: Trying user {username} with pw '{password}'") user = session_ldap.authenticated_user( app.config['LDAP_URI'], app.config['LDAP_USER_FORMAT'], username, password ) if user: return user else: return False return False @app.route("/login/kerberos") @app.route("/login/kerberos/") @requires_authn_kerberos def login_kerberos(user): resp = Response(f'success with kerberos') #resp.headers['login'] = "from-kerberos" resp.set_cookie('type',"kerberos") resp = login_generic(session,resp,user,None) return resp def login_generic(session,resp,user,groups=[]): resp.set_cookie('user',user) end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ") resp.set_cookie('timestamp',end_time_str) session.permanent = True session['user']=user session['end_time'] = end_time_str return resp @app.route("/login/ldap", methods=['POST','GET']) @app.route("/login/ldap/", methods=['POST','GET']) @requires_authn_ldap def login_ldap(user,groups=[]): resp = Response(f'success with ldap') resp.set_cookie('type',"ldap") resp = login_generic(session,resp,user,groups) return resp @app.route("/login/form", methods=['POST','GET']) @app.route("/login/form/", methods=['POST','GET']) def login_form(): if request.method == "GET": options = { "ldap": "ldap", "other": "other" } return render_template("login_form.html", login_url = url_for("login"), options=options, kerberos_url = url_for("login_kerberos") ) else: # assume it is a POST return redirect(url_for("login_ldap"), code=307) def handle_login_ldap_from_non_ldap(request): # set default logintype for user logintype = "ldap" # redirect to whichever option was chosen in the drop-down if 'logintype' in request.form: logintype = request.form['logintype'] if "ldap" == logintype: # preserve POST with code 307 https://stackoverflow.com/a/15480983/3569534 return redirect(url_for("login_ldap"), code=307) else: return f"Authentication method {logintype} not supported yet.",400 @app.route("/logout") @app.route("/logout/") def logout(): resp = Response(f"logged out") # Doing anything with session here leaves a cookie. #session['user']="" resp.set_cookie('user','',expires=0) resp.set_cookie('type','',expires=0) resp.set_cookie('session','',expires=0) resp.set_cookie('timestamp','',expires=0) return resp @app.route("/login/basic",methods=['POST']) @app.route("/login/basic/",methods=['POST']) def login_basic(): if not request.authorization: return Response(f"No username and password provided.",401) if 'username' not in request.authorization: return Response(f"No username provided.",401) if 'password' not in request.authorization: return Response(f"No password provided.",401) username = request.authorization.username pw = request.authorization.password form={'username':username,'password':pw} session['formdata'] = form return redirect(url_for("login_ldap"),code=307) ## This bumps the session lifetime to two minutes farther out from each web request with this session. #@app.before_request #def make_session_permanent(): # session.permanent = True # session['end_time'] = datetime.datetime.now()+app.permanent_session_lifetime # keytab from `/usr/sbin/ipa-getkeytab -p HTTP/d2-03a.ipa.example.com -k session.keytab` os.environ['KRB5_KTNAME'] = "./session.keytab" os.environ['KRB5_TRACE'] = "./kerberos.log" init_kerberos(app, hostname="d2-03a.ipa.internal.com", service="HTTP") if __name__ == '__main__': init_kerberos(app, hostname="d2-03a.ipa.internal.com", service="HTTP") app.run(host='0.0.0.0',debug=True)