# python3 library # Startdate: 2021-06-21 # Dependencies: # req-devuan: python3-ldap3 # reference: https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py import ldap3 from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError def get_ldap_connection(server_uri, bind_dn, bind_pw): server = ldap3.Server(server_uri) conn = ldap3.Connection(server, auto_bind=True,user=bind_dn, password=bind_pw) return conn def list_matching_users(server_uri= "", bind_dn = "", bind_pw = "", connection = None, user_base = "", username = "", user_match_attrib = ""): search_filter=f"({user_match_attrib}={username})" if connection and isinstance(connection, ldap3.core.connection.Connection): conn = connection else: conn = get_ldap_connection(server_uri, bind_dn, bind_pw) conn.search( search_base=user_base, search_filter=search_filter, search_scope="SUBTREE" ) print(f"DEBUG: search_base {user_base}") print(f"DEBUG: search_filter {search_filter}") result = [] for i in conn.entries: result.append(i.entry_dn) print(f"DEBUG: result {result}") return result def authenticated_user(server_uri, user_dn, password): print(f"server_uri: {server_uri}") print(f"user_dn: {user_dn}") try: conn = get_ldap_connection(server_uri, user_dn, password) return conn except LDAPBindError as e: if 'invalidCredentials' in str(e): print("Invalid credentials.") return False else: raise e #except (LDAPPasswordIsMandatoryError, LDAPBindError): # print("Either an ldap password is required, or we had another bind error.") # return False return False def list_ldap_servers_for_domain(domain): # return list of hostnames from the _ldap._tcp.{domain} SRV lookup try: import dns import dns.resolver except: print("Need python3-dns installed for dns lookups.") return [domain] namelist = [] try: query = dns.resolver.query(f"_ldap._tcp.{domain}","SRV") except dns.resolver.NXDOMAIN: # no records exist that match the request, so we were probably given a specific hostname, and an empty query will trigger the logic below that will add the original domain to the list. query = [] for i in query: namelist.append(i.target.to_text().rstrip(".")) if not len(namelist): namelist.append(domain) return namelist def get_ldap_user_groups(server_uri = "", bind_dn = "", bind_pw = "", connection = None, user_dn = "", user_attrib_memberof = "memberof", group_name_attrib = "uid", group_base = ""): if connection and isinstance(connection, ldap3.core.connection.Connection): conn = connection else: conn = get_ldap_connection(server_uri, bind_dn, bind_pw) # so now we have a connection conn.search( search_base=user_dn, search_filter="(cn=*)", # this has the potential to not work in a directory where CN is not a part of any dn? search_scope="BASE", attributes=[user_attrib_memberof] ) these_groups = conn.entries[0].entry_attributes_as_dict[user_attrib_memberof] #print(f"DEBUG: these_groups={these_groups}") result = [] for group in these_groups: #print(f"DEBUG: will check for value {group_base} in {group}") if group_base in group: if group_name_attrib == "dn": #print(f"DEBUG: just add group via dn {group}") result.append(group) else: # we need to lookup this group and pick the attribute of it the admin wants. #print(f"DEBUG: need to lookup group {group} and extract attrib {group_name_attrib}") conn.search( search_base=group, search_filter="(objectClass=*)", search_scope="BASE", attributes=[group_name_attrib] ) this_group=conn.entries[0].entry_attributes_as_dict[group_name_attrib][0] #print(f"DEBUG: Group {group} identified as attrib {group_name_attrib}={this_group}") result.append(this_group) return result def get_ldap_attrib_from_krbPrincipalName(server_uri = None, bind_dn = "", bind_pw = "", connection = None, search_base = "", user_attrib = "uid", user_krbPrincipalName = "", krbPrincipalName_attrib = "krbPrincipalName"): if connection and isinstance(connection, ldap3.core.connection.Connection): conn = connection else: conn = get_ldap_connection(server_uri, bind_dn, bind_pw) conn.search( search_base=search_base, search_scope="SUBTREE", search_filter=f"({krbPrincipalName_attrib}={user_krbPrincipalName})", attributes=[user_attrib] ) entry = conn.entries[0] if user_attrib == "dn": return entry.entry_dn else: return entry.entry_attributes_as_dict[entry.entry_attributes[0]][0] def get_ldap_username_attrib_from_dn(server_uri = None, bind_dn = "", bind_pw = "", authenticated_user = None, user_match_attrib = "dn", user_dn = ""): # Needs (server_uri, bind_dn, bind_pw, user_dn) or (authenticated_user) if authenticated_user and isinstance(authenticated_user, ldap3.core.connection.Connection): conn = authenticated_user search_base=authenticated_user.extend.standard.who_am_i().replace("dn: ","") else: # then we have to use a new connection conn = get_ldap_connection(server_uri, bind_dn, bind_pw) if user_dn: search_base=user_dn, # so now conn is the connection regardless of how we got there, and search_base #print(f"DEBUG: search_base {search_base} attributes {user_match_attrib}") conn.search( search_base=search_base, search_scope="BASE", search_filter="(cn=*)", attributes=[user_match_attrib] ) entry = conn.entries[0] if user_match_attrib == "dn": return entry.entry_dn else: return entry.entry_attributes_as_dict[entry.entry_attributes[0]][0]