#!/usr/bin/env python # Startdate: 2021-06-17 # goals: # accept kerberos or ldap "authorization: basic gowinablz;nuiowekj==" auth, to create a cookie for a session that lasts for 15 minutes. use the cookie to get to protected URLs # References: # https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 # https://www.techlifediary.com/python-web-development-tutorial-using-flask-session-cookies/ # delete cookie https://stackoverflow.com/a/14386413/3569534 # timeout sessions https://stackoverflow.com/a/11785722/3569534 # future: https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 # better timeout session: https://stackoverflow.com/a/49891626/3569534 # store "formdata" in session for changing the basic auth to form data for the ldap login https://stackoverflow.com/a/56904875/3569534 # modify url from urlparse https://stackoverflow.com/a/21629125/3569534 # Improve: # move all configs to config file # move all references to references section # Run: # FLASK_APP=session_app.py FLASK_DEBUG=1 flask run --host 0.0.0.0 # Dependencies: # apt-get install python3-flask # pip3 install Flask-kerberos kerberos from flask import Flask, Response, redirect, url_for, render_template, request, _request_ctx_stack as stack, make_response, session from flask_kerberos import init_kerberos, requires_authentication, _unauthorized, _forbidden, _gssapi_authenticate import kerberos from functools import wraps import binascii, datetime import os import session_ldap from urllib.parse import urlparse DEBUG=True app = Flask(__name__) app.config.from_object(__name__) app.debug=True secret_key_value = os.urandom(24) secret_key_value_hex_encoded = binascii.hexlify(secret_key_value) app.config['SECRET_KEY'] = secret_key_value_hex_encoded app.config['LDAP_URI'] = "ldaps://ipa.internal.com:636" app.config['LDAP_USER_BASE'] = "cn=users,cn=accounts,dc=ipa,dc=internal,dc=com" app.config['LDAP_GROUP_BASE'] = "cn=groups,cn=accounts,dc=ipa,dc=internal,dc=com" app.config['LDAP_USER_MATCH_ATTRIB'] = "uid" app.config['LDAP_USER_DISPLAY_ATTRIB'] = "uid" app.config['LDAP_USER_ATTRIB_MEMBEROF'] = "memberof" app.config['LDAP_GROUP_NAME_ATTRIB'] = "cn" app.config['LDAP_BIND_DN'] = "uid=domainjoin,cn=users,cn=accounts,dc=ipa,dc=internal,dc=com" app.config['LDAP_BIND_PASSWORD'] = "bulkpassword" app.config['LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB'] = "krbPrincipalName" app.config['minutes'] = 2 app.permanent_session_lifetime=datetime.timedelta(minutes=app.config['minutes']) def requires_session(function): ''' Requires a valid session, provided by cookie! ''' @wraps(function) def decorated(*args, **kwargs): if not session: return Response("requires session",401) else: if 'user' not in session: return Response("User is not in this session.",401) s_user = session['user'] c_user = request.cookies.get('user') print(f"session user: {s_user}") print(f"cookie user: {c_user}") if session['user'] != c_user: return Response("Wrong user for this session!.",401) # otherwise, everything is good! #return Response(f"session user: {s_user}
cookie user: {c_user}", 200) # return to the passed function, from https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py return function(*args,**kwargs) # catch-all return Response("requires session",401) return decorated # imported from flask_kerberos and modified, because I want custom 401 message def requires_authn_kerberos(function): ''' Require that the wrapped view function only be called by users authenticated with Kerberos. The view function will have the authenticated users principal passed to it as its first argument. :param function: flask view function :type function: function :returns: decorated function :rtype: function ''' @wraps(function) def decorated(*args, **kwargs): header = request.headers.get("Authorization") if header: ctx = stack.top token = ''.join(header.split()[1:]) rc = _gssapi_authenticate(token) if rc == kerberos.AUTH_GSS_COMPLETE: # if ldap config options are set, then do kerberos -> short username resolution user = ctx.kerberos_user if 'LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB' in app.config: conn = session_ldap.get_ldap_connection( server_uri=get_next_ldap_server(app), bind_dn=app.config['LDAP_BIND_DN'], bind_pw=app.config['LDAP_BIND_PASSWORD'], ) this_user = session_ldap.get_ldap_attrib_from_krbPrincipalName( connection = conn, search_base=app.config['LDAP_USER_BASE'], user_attrib="dn", user_krbPrincipalName=user, krbPrincipalName_attrib=app.config['LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB'] ) #print(f"DEBUG: krb user {user} is ldap dn {this_user}") shortuser = session_ldap.get_ldap_username_attrib_from_dn( authenticated_user=conn, user_dn=this_user, user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB'] ) #print(f"DEBUG: shortuser {shortuser}") groups = session_ldap.get_ldap_user_groups( connection=conn, user_dn=this_user, user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'], group_name_attrib=app.config['LDAP_GROUP_NAME_ATTRIB'], group_base=app.config['LDAP_GROUP_BASE'] ) #print(f"DEBUG: groups {groups}") response = function(shortuser, groups, *args, **kwargs) response = make_response(response) if ctx.kerberos_token is not None: response.headers['WWW-Authenticate'] = ' '.join(['negotiate', ctx.kerberos_token]) return response elif rc != kerberos.AUTH_GSS_CONTINUE: return _forbidden() return _unauthorized_kerberos() return decorated def requires_authn_ldap(function): ''' Require that the wrapped view function only be called by users authenticated with ldap. The view function will have the authenticated users principal passed to it as its first argument. :param function: flask view function :type function: function :returns: decorated function :rtype: function ''' @wraps(function) def decorated(*args, **kwargs): # formdata is in session if we are coming from login_basic() form = session.get('formdata', None) if form: #print(f"DEBUG: requires_authn_ldap form={form}") session.pop('formdata') if 'username' in form: username = form['username'] if 'password' in form: pw = form['password'] else: # then we are coming from the form with POST data if 'username' not in request.form or 'password' not in request.form: return _unauthorized_ldap() username = request.form['username'] pw = request.form['password'] #print(f"DEBUG: requires_authn_ldap with username={username} and pw={pw}") # learn dn of user this_uri = get_next_ldap_server(app) this_user = session_ldap.list_matching_users( server_uri=this_uri, bind_dn=app.config['LDAP_BIND_DN'], bind_pw=app.config['LDAP_BIND_PASSWORD'], user_base=app.config['LDAP_USER_BASE'], username=username, user_match_attrib=app.config['LDAP_USER_MATCH_ATTRIB'] ) # list_matching_users always returns list, so if it contains <> 1 we are in trouble if len(this_user) != 1: print(f"WARNING: cannot determine unique user for {app.config['LDAP_USER_MATCH_ATTRIB']}={username} which returned {this_user}") return _unauthorized_ldap() this_user = this_user[0] print(f"DEBUG: requires_authn_ldap: found in ldap the username {this_user}") ll = ldap_login(this_user,pw) if ll: shortuser = session_ldap.get_ldap_username_attrib_from_dn( authenticated_user=ll, user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB'] ) groups = session_ldap.get_ldap_user_groups( connection=ll, user_dn=this_user, user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'], group_name_attrib=app.config['LDAP_GROUP_NAME_ATTRIB'], group_base=app.config['LDAP_GROUP_BASE'] ) print(f"DEBUG: user {shortuser} has groups {groups}") return function(shortuser, groups ,*args, **kwargs) else: return _unauthorized_ldap() return decorated def _unauthorized_kerberos(): ''' Indicate that authentication is required ''' # from https://billstclair.com/html-redirect2.html return Response(f'Unauthorized! No kerberos auth provided. Trying ldap automatically in a moment.', 401, {'WWW-Authenticate': 'Negotiate'}) def _unauthorized_ldap(): return Response(f'Unauthorized! Invalid ldap credentials... returning to login form', 401) @app.route("/") def index(): return render_template('index.html') @app.route("/protected/") @requires_session def protected_page(): return protected_page_real() def protected_page_real(): s_user = session['user'] c_user = request.cookies.get('user') groups = session['groups'] cookie=request.cookies print(cookie) return render_template('view.html', c_user = c_user, s_user=s_user, cookie=cookie, groups=groups) @app.route("/login/new") @app.route("/login/new/") def login_new(): return redirect(url_for("login", new="")) @app.route("/login/", methods=['POST','GET']) def login(user="None"): if request.method == "GET": if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args): return redirect(url_for("protected_page")) auth_header = request.headers.get("Authorization") if auth_header: if "negotiate" in auth_header: # assume we are already trying to log in with kerberos return redirect(url_for("login_kerberos")) # default, show login form return redirect(url_for("login_form")) elif request.method == "POST": if request.authorization: return redirect(url_for("login_basic"),code=307) return handle_login_ldap_from_non_ldap(request) def get_next_ldap_server(app): # on first ldap_login attempt, cache this lookup result: if 'LDAP_HOSTS' not in app.config: this_domain = urlparse(app.config['LDAP_URI']).hostname app.config['LDAP_HOSTS'] = session_ldap.list_ldap_servers_for_domain(this_domain) else: # rotate them! So every ldap_login attempt will use the next ldap server in the list. this_list = app.config['LDAP_HOSTS'] a = this_list[0] this_list.append(a) this_list.pop(0) app.config['LDAP_HOSTS'] = this_list # construct a new, full uri. this_netloc = app.config['LDAP_HOSTS'][0] up = urlparse(app.config['LDAP_URI']) if up.port: this_netloc += f":{up.port}" this_uri = up._replace(netloc=this_netloc).geturl() return this_uri def ldap_login(username,password): #print(f"DEBUG: Trying user {username} with pw '{password}'") this_uri = get_next_ldap_server(app) # Perform the ldap interactions user = session_ldap.authenticated_user( server_uri=this_uri, user_dn=username, password=password ) if user: return user else: return False return False @app.route("/login/kerberos") @app.route("/login/kerberos/") @requires_authn_kerberos def login_kerberos(user,groups=[]): resp = Response(f'success with kerberos') #resp.headers['login'] = "from-kerberos" resp.set_cookie('type',"kerberos") resp = login_generic(session,resp,user,groups) return resp @app.route("/login/ldap", methods=['POST','GET']) @app.route("/login/ldap/", methods=['POST','GET']) @requires_authn_ldap def login_ldap(user,groups=[]): resp = Response(f'success with ldap') resp.set_cookie('type',"ldap") resp = login_generic(session,resp,user,groups) return resp def login_generic(session,resp,user,groups=[]): resp.set_cookie('user',user) end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ") resp.set_cookie('timestamp',end_time_str) session.permanent = True session['user']=user session['end_time'] = end_time_str session['groups'] = groups return resp @app.route("/login/form", methods=['POST','GET']) @app.route("/login/form/", methods=['POST','GET']) def login_form(): if request.method == "GET": options = { "ldap": "ldap", "other": "other" } return render_template("login_form.html", login_url = url_for("login"), options=options, kerberos_url = url_for("login_kerberos") ) else: # assume it is a POST return redirect(url_for("login_ldap"), code=307) def handle_login_ldap_from_non_ldap(request): # set default logintype for user logintype = "ldap" # redirect to whichever option was chosen in the drop-down if 'logintype' in request.form: logintype = request.form['logintype'] if "ldap" == logintype: # preserve POST with code 307 https://stackoverflow.com/a/15480983/3569534 return redirect(url_for("login_ldap"), code=307) else: return f"Authentication method {logintype} not supported yet.",400 @app.route("/logout") @app.route("/logout/") def logout(): resp = Response(f"logged out") # Doing anything with session here leaves a cookie. #session['user']="" resp.set_cookie('user','',expires=0) resp.set_cookie('type','',expires=0) resp.set_cookie('session','',expires=0) resp.set_cookie('timestamp','',expires=0) return resp @app.route("/login/basic",methods=['POST','GET']) @app.route("/login/basic/",methods=['POST','GET']) def login_basic(): if not request.authorization: return Response(f"Please provide username and password.",401,{'WWW-Authenticate': 'Basic'}) if 'username' not in request.authorization: return Response(f"No username provided.",401) if 'password' not in request.authorization: return Response(f"No password provided.",401) username = request.authorization.username pw = request.authorization.password form={'username':username,'password':pw} session['formdata'] = form return redirect(url_for("login_ldap"),code=307) ## This bumps the session lifetime to two minutes farther out from each web request with this session. #@app.before_request #def make_session_permanent(): # session.permanent = True # session['end_time'] = datetime.datetime.now()+app.permanent_session_lifetime # keytab from `/usr/sbin/ipa-getkeytab -p HTTP/d2-03a.ipa.example.com -k session.keytab` os.environ['KRB5_KTNAME'] = "./session.keytab" os.environ['KRB5_TRACE'] = "./kerberos.log" init_kerberos(app, hostname="d2-03a.ipa.internal.com", service="HTTP") if __name__ == '__main__': init_kerberos(app, hostname="d2-03a.ipa.internal.com", service="HTTP") app.run(host='0.0.0.0',debug=True)