#!/usr/bin/env python # Startdate: 2021-06-17 # goals: # accept kerberos or ldap "authorization: basic gowinablz;nuiowekj==" auth, to create a cookie for a session that lasts for 15 minutes. use the cookie to get to protected URLs # References: # https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 # https://www.techlifediary.com/python-web-development-tutorial-using-flask-session-cookies/ # delete cookie https://stackoverflow.com/a/14386413/3569534 # timeout sessions https://stackoverflow.com/a/11785722/3569534 # future: https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101 # better timeout session: https://stackoverflow.com/a/49891626/3569534 # store "formdata" in session for changing the basic auth to form data for the ldap login https://stackoverflow.com/a/56904875/3569534 # requires_session similar to requires_auth (kerberos) return to the passed function, from https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py # modify url from urlparse https://stackoverflow.com/a/21629125/3569534 # _unauthorized_kerberos meta redirect from https://billstclair.com/html-redirect2.html # preserve POST with code 307 https://stackoverflow.com/a/15480983/3569534 # Improve: # Run: # FLASK_APP=session_app.py FLASK_DEBUG=1 flask run --host 0.0.0.0 # Dependencies: # apt-get install python3-flask python3-kerberos # pip3 install Flask-kerberos from flask import Flask, Response, redirect, url_for, render_template, request, _request_ctx_stack as stack, make_response, session from flask_kerberos import init_kerberos, requires_authentication, _unauthorized, _forbidden, _gssapi_authenticate from functools import wraps from urllib.parse import urlparse import kerberos, binascii, datetime, os import session_ldap app = Flask(__name__) if 'SESSION_CONFIG' in os.environ: conf_file=os.environ['SESSION_CONFIG'] else: conf_file="dev.cfg" app.config.from_pyfile(conf_file, silent=False) #app.config.from_object(__name__) if 'DEBUG' in app.config and app.config['DEBUG']: app.debug=True secret_key_value = os.urandom(24) secret_key_value_hex_encoded = binascii.hexlify(secret_key_value) app.config['SECRET_KEY'] = secret_key_value_hex_encoded app.permanent_session_lifetime=datetime.timedelta(minutes=app.config['SESSION_DURATION_MINUTES']) def requires_session(function): ''' Requires a valid session, provided by cookie! ''' @wraps(function) def decorated(*args, **kwargs): if not session: return Response("requires session",401) else: if 'user' not in session: return Response("User is not in this session.",401) s_user = session['user'] s_groups = session['groups'] c_user = request.cookies.get('user') print(f"session user: {s_user}") print(f"cookie user: {c_user}") if session['user'] != c_user: return Response("Wrong user for this session!.",401) # otherwise, everything is good! return function(s_user, s_groups, *args,**kwargs) # catch-all return Response("requires session",401) return decorated # imported from flask_kerberos and modified, because I want custom 401 message def requires_authn_kerberos(function): ''' Require that the wrapped view function only be called by users authenticated with Kerberos. The view function will have the authenticated users principal passed to it as its first argument. :param function: flask view function :type function: function :returns: decorated function :rtype: function ''' @wraps(function) def decorated(*args, **kwargs): header = request.headers.get("Authorization") if header: ctx = stack.top token = ''.join(header.split()[1:]) rc = _gssapi_authenticate(token) if rc == kerberos.AUTH_GSS_COMPLETE: # if ldap config options are set, then do kerberos -> short username resolution user = ctx.kerberos_user if 'LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB' in app.config: conn = session_ldap.get_ldap_connection( server_uri=get_next_ldap_server(app), bind_dn=app.config['LDAP_BIND_DN'], bind_pw=app.config['LDAP_BIND_PASSWORD'], ) this_user = session_ldap.get_ldap_attrib_from_krbPrincipalName( connection = conn, search_base=app.config['LDAP_USER_BASE'], user_attrib="dn", user_krbPrincipalName=user, krbPrincipalName_attrib=app.config['LDAP_USER_KERBEROS_PRINCIPAL_ATTRIB'] ) #print(f"DEBUG: krb user {user} is ldap dn {this_user}") shortuser = session_ldap.get_ldap_username_attrib_from_dn( authenticated_user=conn, user_dn=this_user, user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB'] ) #print(f"DEBUG: shortuser {shortuser}") groups = session_ldap.get_ldap_user_groups( connection=conn, user_dn=this_user, user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'], group_name_attrib=app.config['LDAP_GROUP_DISPLAY_ATTRIB'], group_base=app.config['LDAP_GROUP_BASE'] ) #print(f"DEBUG: groups {groups}") response = function(shortuser, groups, *args, **kwargs) response = make_response(response) if ctx.kerberos_token is not None: response.headers['WWW-Authenticate'] = ' '.join(['negotiate', ctx.kerberos_token]) return response elif rc != kerberos.AUTH_GSS_CONTINUE: return _forbidden() return _unauthorized_kerberos() return decorated def requires_authn_ldap(function): ''' Require that the wrapped view function only be called by users authenticated with ldap. The view function will have the authenticated users principal passed to it as its first argument. :param function: flask view function :type function: function :returns: decorated function :rtype: function ''' @wraps(function) def decorated(*args, **kwargs): # formdata is in session if we are coming from login_basic() form = session.get('formdata', None) if form: #print(f"DEBUG: requires_authn_ldap form={form}") session.pop('formdata') if 'username' in form: username = form['username'] if 'password' in form: pw = form['password'] else: # then we are coming from the form with POST data if 'username' not in request.form or 'password' not in request.form: return _unauthorized_ldap() username = request.form['username'] pw = request.form['password'] #print(f"DEBUG: requires_authn_ldap with username={username} and pw={pw}") # learn dn of user this_uri = get_next_ldap_server(app) this_user = session_ldap.list_matching_users( server_uri=this_uri, bind_dn=app.config['LDAP_BIND_DN'], bind_pw=app.config['LDAP_BIND_PASSWORD'], user_base=app.config['LDAP_USER_BASE'], username=username, user_match_attrib=app.config['LDAP_USER_MATCH_ATTRIB'] ) # list_matching_users always returns list, so if it contains <> 1 we are in trouble if len(this_user) != 1: print(f"WARNING: cannot determine unique user for {app.config['LDAP_USER_MATCH_ATTRIB']}={username} which returned {this_user}") return _unauthorized_ldap() this_user = this_user[0] print(f"DEBUG: requires_authn_ldap: found in ldap the username {this_user}") ll = ldap_login(this_user,pw) if ll: shortuser = session_ldap.get_ldap_username_attrib_from_dn( authenticated_user=ll, user_match_attrib=app.config['LDAP_USER_DISPLAY_ATTRIB'] ) groups = session_ldap.get_ldap_user_groups( connection=ll, user_dn=this_user, user_attrib_memberof=app.config['LDAP_USER_ATTRIB_MEMBEROF'], group_name_attrib=app.config['LDAP_GROUP_DISPLAY_ATTRIB'], group_base=app.config['LDAP_GROUP_BASE'] ) print(f"DEBUG: user {shortuser} has groups {groups}") return function(shortuser, groups ,*args, **kwargs) else: return _unauthorized_ldap() return decorated def _unauthorized_kerberos(): ''' Indicate that authentication is required ''' return Response(f'Unauthorized! No kerberos auth provided. Trying ldap automatically in a moment.', 401, {'WWW-Authenticate': 'Negotiate'}) def _unauthorized_ldap(): return Response(f'Unauthorized! Invalid ldap credentials... returning to login form', 401) @app.route("/") def index(): return render_template('index.html') @app.route("/protected/") @requires_session def protected_page(user=None,groups=None): return protected_page_real() def protected_page_real(): s_user = session['user'] c_user = request.cookies.get('user') groups = session['groups'] cookie=request.cookies print(cookie) return render_template('view.html', c_user = c_user, s_user=s_user, cookie=cookie, groups=groups) @app.route("/login/new") @app.route("/login/new/") def login_new(): return redirect(url_for("login", new="")) @app.route("/login/", methods=['POST','GET']) def login(user="None"): if request.method == "GET": if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args): return redirect(url_for("protected_page")) auth_header = request.headers.get("Authorization") if auth_header: if "negotiate" in auth_header: # assume we are already trying to log in with kerberos return redirect(url_for("login_kerberos")) # default, show login form return redirect(url_for("login_form")) elif request.method == "POST": if request.authorization: return redirect(url_for("login_basic"),code=307) return handle_login_ldap_from_non_ldap(request) def get_next_ldap_server(app): # on first ldap_login attempt, cache this lookup result: if 'LDAP_HOSTS' not in app.config: this_domain = urlparse(app.config['LDAP_URI']).hostname app.config['LDAP_HOSTS'] = session_ldap.list_ldap_servers_for_domain(this_domain) else: # rotate them! So every ldap_login attempt will use the next ldap server in the list. this_list = app.config['LDAP_HOSTS'] a = this_list[0] this_list.append(a) this_list.pop(0) app.config['LDAP_HOSTS'] = this_list # construct a new, full uri. this_netloc = app.config['LDAP_HOSTS'][0] up = urlparse(app.config['LDAP_URI']) if up.port: this_netloc += f":{up.port}" this_uri = up._replace(netloc=this_netloc).geturl() return this_uri def ldap_login(username,password): #print(f"DEBUG: Trying user {username} with pw '{password}'") this_uri = get_next_ldap_server(app) # Perform the ldap interactions user = session_ldap.authenticated_user( server_uri=this_uri, user_dn=username, password=password ) if user: return user else: return False return False @app.route("/login/kerberos") @app.route("/login/kerberos/") @requires_authn_kerberos def login_kerberos(user,groups=[]): resp = Response(f'success with kerberos') #resp.headers['login'] = "from-kerberos" resp.set_cookie('type',"kerberos") resp = login_generic(session,resp,user,groups) return resp @app.route("/login/ldap", methods=['POST','GET']) @app.route("/login/ldap/", methods=['POST','GET']) @requires_authn_ldap def login_ldap(user,groups=[]): resp = Response(f'success with ldap') resp.set_cookie('type',"ldap") resp = login_generic(session,resp,user,groups) return resp def login_generic(session,resp,user,groups=[]): resp.set_cookie('user',user) end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ") resp.set_cookie('timestamp',end_time_str) session.permanent = True session['user']=user session['end_time'] = end_time_str session['groups'] = groups return resp @app.route("/login/form", methods=['POST','GET']) @app.route("/login/form/", methods=['POST','GET']) def login_form(): if request.method == "GET": options = { "ldap": "ldap", "other": "other" } return render_template("login_form.html", login_url = url_for("login"), options=options, kerberos_url = url_for("login_kerberos") ) else: # assume it is a POST return redirect(url_for("login_ldap"), code=307) def handle_login_ldap_from_non_ldap(request): # set default logintype for user logintype = "ldap" # redirect to whichever option was chosen in the drop-down if 'logintype' in request.form: logintype = request.form['logintype'] if "ldap" == logintype: return redirect(url_for("login_ldap"), code=307) else: return f"Authentication method {logintype} not supported yet.",400 @app.route("/logout") @app.route("/logout/") def logout(): resp = Response(f'logged out') # not documented but is found on the Internet in a few random places: session.clear() resp.set_cookie('user','',expires=0) resp.set_cookie('type','',expires=0) resp.set_cookie('session','',expires=0) resp.set_cookie('timestamp','',expires=0) return resp @app.route("/login/basic",methods=['POST','GET']) @app.route("/login/basic/",methods=['POST','GET']) def login_basic(): if not request.authorization: return Response(f"Please provide username and password.",401,{'WWW-Authenticate': 'Basic'}) if 'username' not in request.authorization: return Response(f"No username provided.",401) if 'password' not in request.authorization: return Response(f"No password provided.",401) username = request.authorization.username pw = request.authorization.password form={'username':username,'password':pw} session['formdata'] = form return redirect(url_for("login_ldap"),code=307) @app.route("/protected/settings/", methods=['GET','POST']) @requires_session def protected_settings(user,groups): print(f"DEBUG: visit settings page as user {user}") print(f"DEBUG: with groups {groups}") if "admins" not in groups: #return Response(f'

Not Found

What you were looking for is just not there.

Start over', 404) #return Response(f'

Not Found

The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.

', 404) return Response(f'

Not Authorized

You are not authorized to access this page.

', 403) else: if request.method == "GET": return render_template( 'settings.html', ldap_uri=app.config['LDAP_URI'] ) elif request.method == "POST": form = request.form print(f"Form: {form}") message = "" if 'ldap_uri' not in form: return Response("Invalid input.", 400) else: new_ldap_uri = form['ldap_uri'] if new_ldap_uri != app.config['LDAP_URI']: app.config['LDAP_URI'] = new_ldap_uri # removing LDAP_HOSTS causes get_new_ldap_server to reidentify the ldap servers for this uri. if 'LDAP_HOSTS' in app.config: app.config.pop('LDAP_HOSTS') message += "
  • LDAP_URI
  • " if "" != message: message = "Settings updated:" message += f"
    " return Response(message, 200) ## This bumps the session lifetime to two minutes farther out from each web request with this session. #@app.before_request #def make_session_permanent(): # session.permanent = True # session['end_time'] = datetime.datetime.now()+app.permanent_session_lifetime os.environ['KRB5_KTNAME'] = app.config['KRB5_KTNAME'] if 'KRB5_TRACE' in app.config: os.environ['KRB5_TRACE'] = app.config['KRB5_TRACE'] init_kerberos(app, hostname=app.config['HOSTNAME'], service=app.config['KRB5_SERVICE']) if __name__ == '__main__': #init_kerberos(app, hostname=app.config['HOSTNAME'], service=app.config['KRB5_SERVICE']) print("should listen to ",app.config['LISTEN_HOST']) app.run( host=app.config['LISTEN_HOST'], port=app.config['LISTEN_PORT'], debug=app.config['DEBUG'] )