summaryrefslogtreecommitdiff
path: root/session_ldap.py
blob: 423f322ce565d429c756fe8eeaa813099c6ea4dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# python3 library
# Startdate: 2021-06-21
# Dependencies:
#    req-devuan: python3-ldap3

# reference: https://github.com/ArtemAngelchev/flask-basicauth-ldap/blob/master/flask_basicauth_ldap.py

import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError

def list_matching_users(server_uri, bind_dn, bind_pw,user_base, username, user_match_attrib):
   search_filter=f"({user_match_attrib}={username})"
   server = ldap3.Server(server_uri)
   conn = ldap3.Connection(server, auto_bind=True,user=bind_dn, password=bind_pw)
   conn.search(
      search_base=user_base,
      search_filter=search_filter)
   print(f"DEBUG: search_base {user_base}")
   print(f"DEBUG: search_filter {search_filter}")
   result = []
   for i in conn.entries:
      result.append(i.entry_dn)
   print(f"DEBUG: result {result}")
   return result

def authenticated_user(server_uri, user_dn, password):
   print(f"server_uri: {server_uri}")
   print(f"user_dn: {user_dn}")
   try:
      server = ldap3.Server(server_uri)
      conn = ldap3.Connection(server, auto_bind=True, user=user_dn, password=password)
      return conn
   except LDAPBindError as e:
      if 'invalidCredentials' in str(e):
         print("Invalid credentials.")
         return False
      else:
         raise e
   #except (LDAPPasswordIsMandatoryError, LDAPBindError):
   #   print("Either an ldap password is required, or we had another bind error.")
   #   return False
   return False

def list_ldap_servers_for_domain(domain):
   # return list of hostnames from the _ldap._tcp.{domain} SRV lookup
   try:
      import dns
      import dns.resolver
   except:
      print("Need python3-dns installed for dns lookups.")
      return [domain]
   namelist = []
   try:
      query = dns.resolver.query(f"_ldap._tcp.{domain}","SRV")
   except dns.resolver.NXDOMAIN:
      # no records exist that match the request, so we were probably given a specific hostname, and an empty query will trigger the logic below that will add the original domain to the list.
      query = []
   for i in query:
      namelist.append(i.target.to_text().rstrip("."))
   if not len(namelist):
      namelist.append(domain)
   return namelist

def get_ldap_user_groups(server_uri, bind_dn, bind_pw,user_dn,user_attrib_memberof,group_name_attrib,group_base):
   server = ldap3.Server(server_uri)
   conn = ldap3.Connection(server, auto_bind=True,user=bind_dn, password=bind_pw)
   conn.search(
      search_base=user_dn,
      search_filter="(cn=*)", # this has the potential to not work in a directory where CN is not a part of any dn?
      search_scope="BASE",
      attributes=[user_attrib_memberof]
   )
   these_groups = conn.entries[0].entry_attributes_as_dict[user_attrib_memberof]
   #print(f"DEBUG: these_groups={these_groups}")
   result = []
   for group in these_groups:
      #print(f"DEBUG: will check for value {group_base} in {group}")
      if group_base in group:
         if group_name_attrib == "dn":
            #print(f"DEBUG: just add group via dn {group}")
            result.append(group)
         else:
            # we need to lookup this group and pick the attribute of it the admin wants.
            #print(f"DEBUG: need to lookup group {group} and extract attrib {group_name_attrib}")
            conn.search(
               search_base=group,
               search_filter="(objectClass=*)",
               search_scope="BASE",
               attributes=[group_name_attrib]
            )
            this_group=conn.entries[0].entry_attributes_as_dict[group_name_attrib][0]
            #print(f"DEBUG: Group {group} identified as attrib {group_name_attrib}={this_group}")
            result.append(this_group)
   return result

def get_ldap_dn_from_krbPrincipalName(server_uri, bind_dn, bind_pw,user_krbPrincipalName):
   # goal: return as string the dn
   print("stub")
bgstack15