aboutsummaryrefslogtreecommitdiff
path: root/session_ldap.py
blob: b60bc8bb4d7ad992fcebf80300d3bb8085b16082 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# python3 library
# Startdate: 2021-06-21
# Dependencies:
#    req-devuan: python3-ldap3

# reference: https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py

import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError

def get_ldap_connection(server_uri, bind_dn, bind_pw):
   server = ldap3.Server(server_uri)
   conn = ldap3.Connection(server, auto_bind=True,user=bind_dn, password=bind_pw)
   return conn

def list_matching_users(server_uri= "", bind_dn = "", bind_pw = "", connection = None, user_base = "", username = "", user_match_attrib = ""):
   search_filter=f"({user_match_attrib}={username})"
   if connection and isinstance(connection, ldap3.core.connection.Connection):
      conn = connection
   else:
      conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
   conn.search(
      search_base=user_base,
      search_filter=search_filter,
      search_scope="SUBTREE"
   )
   print(f"DEBUG: search_base {user_base}")
   print(f"DEBUG: search_filter {search_filter}")
   result = []
   for i in conn.entries:
      result.append(i.entry_dn)
   print(f"DEBUG: result {result}")
   return result

def authenticated_user(server_uri, user_dn, password):
   print(f"server_uri: {server_uri}")
   print(f"user_dn: {user_dn}")
   try:
      conn = get_ldap_connection(server_uri, user_dn, password)
      return conn
   except LDAPBindError as e:
      if 'invalidCredentials' in str(e):
         print("Invalid credentials.")
         return False
      else:
         raise e
   #except (LDAPPasswordIsMandatoryError, LDAPBindError):
   #   print("Either an ldap password is required, or we had another bind error.")
   #   return False
   return False

def list_ldap_servers_for_domain(domain):
   # return list of hostnames from the _ldap._tcp.{domain} SRV lookup
   try:
      import dns
      import dns.resolver
   except:
      print("Need python3-dns installed for dns lookups.")
      return [domain]
   namelist = []
   try:
      query = dns.resolver.query(f"_ldap._tcp.{domain}","SRV")
   except dns.resolver.NXDOMAIN:
      # no records exist that match the request, so we were probably given a specific hostname, and an empty query will trigger the logic below that will add the original domain to the list.
      query = []
   for i in query:
      namelist.append(i.target.to_text().rstrip("."))
   if not len(namelist):
      namelist.append(domain)
   return namelist

def get_ldap_user_groups(server_uri = "", bind_dn = "", bind_pw = "", connection = None, user_dn = "", user_attrib_memberof = "memberof", group_name_attrib = "uid", group_base = ""):
   if connection and isinstance(connection, ldap3.core.connection.Connection):
      conn = connection
   else:
      conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
   # so now we have a connection
   conn.search(
      search_base=user_dn,
      search_filter="(cn=*)", # this has the potential to not work in a directory where CN is not a part of any dn?
      search_scope="BASE",
      attributes=[user_attrib_memberof]
   )
   these_groups = conn.entries[0].entry_attributes_as_dict[user_attrib_memberof]
   #print(f"DEBUG: these_groups={these_groups}")
   result = []
   for group in these_groups:
      #print(f"DEBUG: will check for value {group_base} in {group}")
      if group_base in group:
         if group_name_attrib == "dn":
            #print(f"DEBUG: just add group via dn {group}")
            result.append(group)
         else:
            # we need to lookup this group and pick the attribute of it the admin wants.
            #print(f"DEBUG: need to lookup group {group} and extract attrib {group_name_attrib}")
            conn.search(
               search_base=group,
               search_filter="(objectClass=*)",
               search_scope="BASE",
               attributes=[group_name_attrib]
            )
            this_group=conn.entries[0].entry_attributes_as_dict[group_name_attrib][0]
            #print(f"DEBUG: Group {group} identified as attrib {group_name_attrib}={this_group}")
            result.append(this_group)
   return result

def get_ldap_attrib_from_krbPrincipalName(server_uri = None, bind_dn = "", bind_pw = "", connection = None, search_base = "", user_attrib = "uid", user_krbPrincipalName = "", krbPrincipalName_attrib = "krbPrincipalName"):
   if connection and isinstance(connection, ldap3.core.connection.Connection):
      conn = connection
   else:
      conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
   conn.search(
      search_base=search_base,
      search_scope="SUBTREE",
      search_filter=f"({krbPrincipalName_attrib}={user_krbPrincipalName})",
      attributes=[user_attrib]
   )
   entry = conn.entries[0]
   if user_attrib == "dn":
      return entry.entry_dn
   else:
      return entry.entry_attributes_as_dict[entry.entry_attributes[0]][0]

def get_ldap_username_attrib_from_dn(server_uri = None, bind_dn = "", bind_pw = "", authenticated_user = None, user_match_attrib = "dn", user_dn = ""):
   # Needs (server_uri, bind_dn, bind_pw, user_dn) or (authenticated_user)
   if authenticated_user and isinstance(authenticated_user, ldap3.core.connection.Connection):
      conn = authenticated_user
      search_base=authenticated_user.extend.standard.who_am_i().replace("dn: ","")
   else:
      # then we have to use a new connection
      conn = get_ldap_connection(server_uri, bind_dn, bind_pw)
   if user_dn:
      search_base=user_dn,
   # so now conn is the connection regardless of how we got there, and search_base
   #print(f"DEBUG: search_base {search_base} attributes {user_match_attrib}")
   conn.search(
      search_base=search_base,
      search_scope="BASE",
      search_filter="(cn=*)",
      attributes=[user_match_attrib]
   )
   entry = conn.entries[0]
   if user_match_attrib == "dn":
      return entry.entry_dn
   else:
      return entry.entry_attributes_as_dict[entry.entry_attributes[0]][0]
bgstack15