1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# python3 library
# Startdate: 2021-06-21
# Dependencies:
# req-devuan: python3-ldap3
# reference: https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError
def get_ldap_connection(server_uri, bind_dn, bind_pw):
server = ldap3.Server(server_uri)
conn = ldap3.Connection(server, auto_bind=True,user=bind_dn, password=bind_pw)
return conn
def list_matching_users(server_uri= "", bind_dn = "", bind_pw = "", connection = None, user_base = "", username = "", user_match_attrib = ""):
search_filter=f"({user_match_attrib}={username})"
if connection and isinstance(connection, ldap3.core.connection.Connection):
conn = connection
else:
conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
conn.search(
search_base=user_base,
search_filter=search_filter,
search_scope="SUBTREE"
)
print(f"DEBUG: search_base {user_base}")
print(f"DEBUG: search_filter {search_filter}")
result = []
for i in conn.entries:
result.append(i.entry_dn)
print(f"DEBUG: result {result}")
return result
def authenticated_user(server_uri, user_dn, password):
print(f"server_uri: {server_uri}")
print(f"user_dn: {user_dn}")
try:
conn = get_ldap_connection(server_uri, user_dn, password)
return conn
except LDAPBindError as e:
if 'invalidCredentials' in str(e):
print("Invalid credentials.")
return False
else:
raise e
#except (LDAPPasswordIsMandatoryError, LDAPBindError):
# print("Either an ldap password is required, or we had another bind error.")
# return False
return False
def list_ldap_servers_for_domain(domain):
# return list of hostnames from the _ldap._tcp.{domain} SRV lookup
try:
import dns
import dns.resolver
except:
print("Need python3-dns installed for dns lookups.")
return [domain]
namelist = []
try:
query = dns.resolver.query(f"_ldap._tcp.{domain}","SRV")
except dns.resolver.NXDOMAIN:
# no records exist that match the request, so we were probably given a specific hostname, and an empty query will trigger the logic below that will add the original domain to the list.
query = []
for i in query:
namelist.append(i.target.to_text().rstrip("."))
if not len(namelist):
namelist.append(domain)
return namelist
def get_ldap_user_groups(server_uri = "", bind_dn = "", bind_pw = "", connection = None, user_dn = "", user_attrib_memberof = "memberof", group_name_attrib = "uid", group_base = ""):
if connection and isinstance(connection, ldap3.core.connection.Connection):
conn = connection
else:
conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
# so now we have a connection
conn.search(
search_base=user_dn,
search_filter="(cn=*)", # this has the potential to not work in a directory where CN is not a part of any dn?
search_scope="BASE",
attributes=[user_attrib_memberof]
)
these_groups = conn.entries[0].entry_attributes_as_dict[user_attrib_memberof]
#print(f"DEBUG: these_groups={these_groups}")
result = []
for group in these_groups:
#print(f"DEBUG: will check for value {group_base} in {group}")
if group_base in group:
if group_name_attrib == "dn":
#print(f"DEBUG: just add group via dn {group}")
result.append(group)
else:
# we need to lookup this group and pick the attribute of it the admin wants.
#print(f"DEBUG: need to lookup group {group} and extract attrib {group_name_attrib}")
conn.search(
search_base=group,
search_filter="(objectClass=*)",
search_scope="BASE",
attributes=[group_name_attrib]
)
this_group=conn.entries[0].entry_attributes_as_dict[group_name_attrib][0]
#print(f"DEBUG: Group {group} identified as attrib {group_name_attrib}={this_group}")
result.append(this_group)
return result
def get_ldap_attrib_from_krbPrincipalName(server_uri = None, bind_dn = "", bind_pw = "", connection = None, search_base = "", user_attrib = "uid", user_krbPrincipalName = "", krbPrincipalName_attrib = "krbPrincipalName"):
if connection and isinstance(connection, ldap3.core.connection.Connection):
conn = connection
else:
conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
conn.search(
search_base=search_base,
search_scope="SUBTREE",
search_filter=f"({krbPrincipalName_attrib}={user_krbPrincipalName})",
attributes=[user_attrib]
)
entry = conn.entries[0]
if user_attrib == "dn":
return entry.entry_dn
else:
return entry.entry_attributes_as_dict[entry.entry_attributes[0]][0]
def get_ldap_username_attrib_from_dn(server_uri = None, bind_dn = "", bind_pw = "", authenticated_user = None, user_match_attrib = "dn", user_dn = ""):
# Needs (server_uri, bind_dn, bind_pw, user_dn) or (authenticated_user)
if authenticated_user and isinstance(authenticated_user, ldap3.core.connection.Connection):
conn = authenticated_user
search_base=authenticated_user.extend.standard.who_am_i().replace("dn: ","")
else:
# then we have to use a new connection
conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
if user_dn:
search_base=user_dn,
# so now conn is the connection regardless of how we got there, and search_base
#print(f"DEBUG: search_base {search_base} attributes {user_match_attrib}")
conn.search(
search_base=search_base,
search_scope="BASE",
search_filter="(cn=*)",
attributes=[user_match_attrib]
)
entry = conn.entries[0]
if user_match_attrib == "dn":
return entry.entry_dn
else:
return entry.entry_attributes_as_dict[entry.entry_attributes[0]][0]
|