#!/usr/bin/env python
# Startdate: 2021-06-17
# goals:
# accept kerberos or ldap "authorization: basic gowinablz;nuiowekj==" auth, to create a cookie for a session that lasts for 15 minutes. use the cookie to get to protected URLs
# References:
# https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101
# https://www.techlifediary.com/python-web-development-tutorial-using-flask-session-cookies/
# delete cookie https://stackoverflow.com/a/14386413/3569534
# timeout sessions https://stackoverflow.com/a/11785722/3569534
# future: https://code.tutsplus.com/tutorials/flask-authentication-with-ldap--cms-23101
# better timeout session: https://stackoverflow.com/a/49891626/3569534
# store "formdata" in session for changing the basic auth to form data for the ldap login https://stackoverflow.com/a/56904875/3569534
# modify url from urlparse https://stackoverflow.com/a/21629125/3569534
# Improve:
# move all configs to config file
# move all references to references section
# accept a bind credential so we can perform lookups of users who match "uid=%s" under a basedn.
# Run:
# FLASK_APP=session_app.py FLASK_DEBUG=1 flask run --host 0.0.0.0
# Dependencies:
# apt-get install python3-flask
# pip3 install Flask-kerberos kerberos
from flask import Flask, Response, redirect, url_for, render_template, request, _request_ctx_stack as stack, make_response, session
from flask_kerberos import init_kerberos, requires_authentication, _unauthorized, _forbidden, _gssapi_authenticate
import kerberos
from functools import wraps
import binascii, datetime
import os
import session_ldap
from urllib.parse import urlparse
DEBUG=True
app = Flask(__name__)
app.config.from_object(__name__)
app.debug=True
secret_key_value = os.urandom(24)
secret_key_value_hex_encoded = binascii.hexlify(secret_key_value)
app.config['SECRET_KEY'] = secret_key_value_hex_encoded
app.config['LDAP_URI'] = "ldaps://ipa.internal.com:636"
app.config['LDAP_USER_BASEDN'] = "cn=users,cn=accounts,dc=ipa,dc=internal,dc=com"
app.config['LDAP_GROUP_BASEDN'] = "cn=groups,cn=accounts,dc=ipa,dc=internal,dc=com"
app.config['LDAP_USER_FORMAT'] = "uid=%s,cn=users,cn=accounts,dc=ipa,dc=internal,dc=com"
app.config['minutes'] = 2
app.permanent_session_lifetime=datetime.timedelta(minutes=app.config['minutes'])
def requires_session(function):
'''
Requires a valid session, provided by cookie!
'''
@wraps(function)
def decorated(*args, **kwargs):
if not session:
return Response("requires session",401)
else:
if 'user' not in session:
return Response("User is not in this session.",401)
s_user = session['user']
c_user = request.cookies.get('user')
print(f"session user: {s_user}")
print(f"cookie user: {c_user}")
if session['user'] != c_user:
return Response("Wrong user for this session!.",401)
# otherwise, everything is good!
#return Response(f"session user: {s_user}
cookie user: {c_user}", 200)
# return to the passed function, from https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py
return function(*args,**kwargs)
# catch-all
return Response("requires session",401)
return decorated
# imported from flask_kerberos and modified, because I want custom 401 message
def requires_authn_kerberos(function):
'''
Require that the wrapped view function only be called by users
authenticated with Kerberos. The view function will have the authenticated
users principal passed to it as its first argument.
:param function: flask view function
:type function: function
:returns: decorated function
:rtype: function
'''
@wraps(function)
def decorated(*args, **kwargs):
header = request.headers.get("Authorization")
if header:
ctx = stack.top
token = ''.join(header.split()[1:])
rc = _gssapi_authenticate(token)
if rc == kerberos.AUTH_GSS_COMPLETE:
response = function(ctx.kerberos_user, *args, **kwargs)
response = make_response(response)
if ctx.kerberos_token is not None:
response.headers['WWW-Authenticate'] = ' '.join(['negotiate', ctx.kerberos_token])
return response
elif rc != kerberos.AUTH_GSS_CONTINUE:
return _forbidden()
return _unauthorized_kerberos()
return decorated
def requires_authn_ldap(function):
'''
Require that the wrapped view function only be called by users
authenticated with ldap. The view function will have the authenticated
users principal passed to it as its first argument.
:param function: flask view function
:type function: function
:returns: decorated function
:rtype: function
'''
@wraps(function)
def decorated(*args, **kwargs):
# formdata is in session if we are coming from login_basic()
form = session.get('formdata', None)
if form:
print(f"DEBUG: requires_authn_ldap form={form}")
session.pop('formdata')
if 'username' in form:
username = form['username']
if 'password' in form:
pw = form['password']
else:
# then we are coming from the form with POST data
if 'username' not in request.form or 'password' not in request.form:
return _unauthorized_ldap()
username = request.form['username']
pw = request.form['password']
#print(f"DEBUG: requires_authn_ldap with username={username} and pw={pw}")
ll = ldap_login(username,pw)
if ll:
return function(ll.user,*args, **kwargs)
else:
return _unauthorized_ldap()
return decorated
def _unauthorized_kerberos():
'''
Indicate that authentication is required
'''
# from https://billstclair.com/html-redirect2.html
return Response(f'Unauthorized! No kerberos auth provided. Trying ldap automatically in a moment.', 401, {'WWW-Authenticate': 'Negotiate'})
def _unauthorized_ldap():
return Response(f'Unauthorized! Invalid ldap credentials... returning to login form', 401)
@app.route("/")
def index():
return render_template('index.html')
@app.route("/protected/")
@requires_session
def protected_page():
return protected_page_real()
def protected_page_real():
s_user = session['user']
c_user = request.cookies.get('user')
cookie=request.cookies
print(cookie)
return render_template('view.html', c_user = c_user, s_user=s_user, cookie=cookie)
@app.route("/login/new")
@app.route("/login/new/")
def login_new():
return redirect(url_for("login", new=""))
@app.route("/login/", methods=['POST','GET'])
def login(user="None"):
if request.method == "GET":
if 'user' in session and request.cookies.get('user') == session['user'] and (not 'new' in request.args):
return redirect(url_for("protected_page"))
auth_header = request.headers.get("Authorization")
if auth_header:
if "negotiate" in auth_header:
# assume we are already trying to log in with kerberos
return redirect(url_for("login_kerberos"))
# default, show login form
return redirect(url_for("login_form"))
elif request.method == "POST":
if request.authorization:
return redirect(url_for("login_basic"),code=307)
return handle_login_ldap_from_non_ldap(request)
def ldap_login(username,password):
#print(f"DEBUG: Trying user {username} with pw '{password}'")
# on first ldap_login attempt, cache this lookup result:
if 'LDAP_HOSTS' not in app.config:
this_domain = urlparse(app.config['LDAP_URI']).hostname
app.config['LDAP_HOSTS'] = session_ldap.list_ldap_servers_for_domain(this_domain)
else:
# rotate them! So every ldap_login attempt will use the next ldap server in the list.
this_list = app.config['LDAP_HOSTS']
a = this_list[0]
this_list.append(a)
this_list.pop(0)
app.config['LDAP_HOSTS'] = this_list
# construct a new, full uri.
this_netloc = app.config['LDAP_HOSTS'][0]
up = urlparse(app.config['LDAP_URI'])
if up.port:
this_netloc += f":{up.port}"
this_uri = up._replace(netloc=this_netloc).geturl()
# Perform the ldap interactions
user = session_ldap.authenticated_user(
this_uri,
app.config['LDAP_USER_FORMAT'],
username,
password
)
if user:
return user
else:
return False
return False
@app.route("/login/kerberos")
@app.route("/login/kerberos/")
@requires_authn_kerberos
def login_kerberos(user):
resp = Response(f'success with kerberos')
#resp.headers['login'] = "from-kerberos"
resp.set_cookie('type',"kerberos")
resp = login_generic(session,resp,user,None)
return resp
def login_generic(session,resp,user,groups=[]):
resp.set_cookie('user',user)
end_time = datetime.datetime.now(datetime.timezone.utc) + app.permanent_session_lifetime
end_time_str = datetime.datetime.strftime(end_time,"%FT%TZ")
resp.set_cookie('timestamp',end_time_str)
session.permanent = True
session['user']=user
session['end_time'] = end_time_str
return resp
@app.route("/login/ldap", methods=['POST','GET'])
@app.route("/login/ldap/", methods=['POST','GET'])
@requires_authn_ldap
def login_ldap(user,groups=[]):
resp = Response(f'success with ldap')
resp.set_cookie('type',"ldap")
resp = login_generic(session,resp,user,groups)
return resp
@app.route("/login/form", methods=['POST','GET'])
@app.route("/login/form/", methods=['POST','GET'])
def login_form():
if request.method == "GET":
options = {
"ldap": "ldap",
"other": "other"
}
return render_template("login_form.html",
login_url = url_for("login"),
options=options,
kerberos_url = url_for("login_kerberos")
)
else:
# assume it is a POST
return redirect(url_for("login_ldap"), code=307)
def handle_login_ldap_from_non_ldap(request):
# set default logintype for user
logintype = "ldap"
# redirect to whichever option was chosen in the drop-down
if 'logintype' in request.form:
logintype = request.form['logintype']
if "ldap" == logintype:
# preserve POST with code 307 https://stackoverflow.com/a/15480983/3569534
return redirect(url_for("login_ldap"), code=307)
else:
return f"Authentication method {logintype} not supported yet.",400
@app.route("/logout")
@app.route("/logout/")
def logout():
resp = Response(f"logged out")
# Doing anything with session here leaves a cookie.
#session['user']=""
resp.set_cookie('user','',expires=0)
resp.set_cookie('type','',expires=0)
resp.set_cookie('session','',expires=0)
resp.set_cookie('timestamp','',expires=0)
return resp
@app.route("/login/basic",methods=['POST','GET'])
@app.route("/login/basic/",methods=['POST','GET'])
def login_basic():
if not request.authorization:
return Response(f"Please provide username and password.",401,{'WWW-Authenticate': 'Basic'})
if 'username' not in request.authorization:
return Response(f"No username provided.",401)
if 'password' not in request.authorization:
return Response(f"No password provided.",401)
username = request.authorization.username
pw = request.authorization.password
form={'username':username,'password':pw}
session['formdata'] = form
return redirect(url_for("login_ldap"),code=307)
## This bumps the session lifetime to two minutes farther out from each web request with this session.
#@app.before_request
#def make_session_permanent():
# session.permanent = True
# session['end_time'] = datetime.datetime.now()+app.permanent_session_lifetime
# keytab from `/usr/sbin/ipa-getkeytab -p HTTP/d2-03a.ipa.example.com -k session.keytab`
os.environ['KRB5_KTNAME'] = "./session.keytab"
os.environ['KRB5_TRACE'] = "./kerberos.log"
init_kerberos(app, hostname="d2-03a.ipa.internal.com", service="HTTP")
if __name__ == '__main__':
init_kerberos(app, hostname="d2-03a.ipa.internal.com", service="HTTP")
app.run(host='0.0.0.0',debug=True)