#!/usr/bin/env python3 # File: rrc_lib.py # Location: https://gitlab.com/bgstack15/read-rdp-cert # Author: bgstack15 # Startdate: 2021-07-28 14:02 # Title: Library for Reading RDP Certificates # Purpose: Functions for reading packet captures of RDP negotiations to observe what cert the server is using. # History: # Usage: # See read_rdp_cert.py # References: # packet capture filter with https://stackoverflow.com/questions/39624745/capture-only-ssl-handshake-with-tcpdump # https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py # Alternatives: # uses different library https://github.com/thy09/isolation/blob/master/load_cert.py # another library, not packaged by my distro https://security.stackexchange.com/questions/123851/how-can-i-extract-the-certificate-from-this-pcap-file # wireshark, with above capture filter and display filter "tls.handshake.type == 11" # Improve: # Ensure that this handles a cert chain correctly? # Operate on streams? # Wild idea: initiate the network traffic that triggers the RDP and TLS certificate directly? # Dependencies: # python3-dpkt, python3-openssl # python 3.9.2 import dpkt, os, ssl from OpenSSL.crypto import load_certificate, FILETYPE_PEM def read_pcap_file(infile): inf = open(infile,"rb") reader = dpkt.pcap.Reader(inf) array = [] count = 0 help_shown = False for ts, buf in reader: count += 1; print("-- Packet number:",count) data = None #array.append({"ts": ts, "buf": buf}) tcp = dpkt.tcp.TCP() ip = iplayer_from_raw(buf,reader.datalink()) if ip.p == dpkt.ip.IP_PROTO_TCP: tcp = ip.data if not tcp.data: print("Not tcp data.") # THE USEFUL STUFF IS tcp.data #print("tcp data") continue data = tcp.data # now we can process data if data: a = extract_cert2(data) if a != -1: for i in a: array.append(i) return array #class SamplePacket(dpkt.dpkt.Packet): # __hdr__ = () def old_attempt1(): # try: # record = dpkt.ssl.TLSRecord(data) # except dpkt.NeedData: # print(f"Need data, packet {count}") # continue # # this all works if we have already limited the capture filter # record = dpkt.ssl.RECORD_TYPES[record.type](record.data) # if not isinstance(record, dpkt.ssl.TLSHandshake): # print(f"Packet {count} is not tls handshake") # if isinstance(record, dpkt.ssl.TLSCertificate): # print(f"Packet {count} is tls certificate") # try: # record = dpkt.ssl.TLSCertificate(data) # except: # print(f"Packet {count} could not be converted to tlscertificate") # #a = dpkt.ssl.TLSCertificate() #.unpack(buf=record) # #a.unpack(buf=record) # #print(a) # #if not help_shown: # # help_shown = True # # help(record.data) # packet = SamplePacket(buf=buf) # #extract_cert(packet) return 0 # iplayer_from_raw ripped from https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py def iplayer_from_raw(raw, linktype=1): """Converts a raw packet to a dpkt packet regarding of link type. @param raw: raw packet @param linktype: integer describing link type as expected by dpkt """ if linktype == 1: # ethernet pkt = dpkt.ethernet.Ethernet(raw) ip = pkt.data elif linktype == 101: # raw ip = dpkt.ip.IP(raw) else: return -1 return ip def extract_cert2(data): array = [] #print(f"Working with {data}") # hard-coded validation against TLSv1 record because library dpkt sucks. content_type = data[0] tls_version = conv(data[1:3]) length = conv(data[3:5]) print(f"content_type {content_type}") print(f"tls_version {tls_version}") print(f"length {length}") # so the next [length] contents need to be interpreted if content_type != 22: print("This is not a tlsv1 handshake") return -1 final_byte = 5+1+length subdata = data[5:final_byte] tls_object_count = 0 print(f"length of subdata: {len(subdata)}") current_byte = 0 protocol = 0 p_length = 0 MAX_LOOPS = 15 loop_count = 0 while(current_byte < len(subdata)): loop_count += 1 if loop_count >= MAX_LOOPS: print("Safety valve; too many loops through TLSv1 handshake inspection.") break protocol = subdata[current_byte] p_length = conv(subdata[current_byte+1:current_byte+4]) if protocol == 1: print(f"Client hello, length {p_length}") elif protocol == 2: print(f"Server hello, length {p_length}") elif protocol == 11: print(f"Certificate, the holy grail, length {p_length}") # there is a separate field here for certs length, which should be 3 bytes shorter than p_length. certs_length = conv(subdata[current_byte+4:current_byte+4+3]) print(f"For all certs here, length is {certs_length}") # this will need to be written probably to loop when there is a chain of certificates cert_length = conv(subdata[current_byte+4+3:current_byte+4+3+3]) print(f"Cert length is {cert_length}") cert_body = subdata[current_byte+4+3+3:current_byte+4+3+3+cert_length] array.append(cert_body) print(f"This cert length is {len(cert_body)}") #with open("/home/bgstack15/cert.out","wb") as o: # o.write(cert_body) elif protocol == 14: print(f"Server hello done, length {p_length}") elif protocol == 16: print(f"Client key exchange, length {p_length}") elif protocol == 20: print(f"Change cipher spec, length {p_length}") else: print(f"Unknown protocol {protocol}, length {p_length}") current_byte += (4 + p_length) return array def conv(hexinput): return int.from_bytes(hexinput,'big') def extract_cert(packet): #if not isinstance(packet,dpkt.dpkg.Packet) print("packet['buf'] is",packet['buf']) try: record = dpkt.ssl.TLSRecord(packet['buf']) except dpkt.NeedData: print("need data") return -1 try: record = dpkt.ssl.TLSRecord(packet['buf']) print("Found it with packet['buf']") except: try: record = dpkt.ssl.TLSRecord(packet) # maybe just the bytes? print("Found it with packet") except: print("Error: extract_cert needs the packet passed to it.") return -1 # so now we have a record object if isinstance(record, dpkt.ssl.TLSHandshake): print("found a handshake!") def save_cert(data, directory = os.path.curdir): # the other functions tend to output DER certs cert = data # so let's try to convert to PEM try: cert = ssl.DER_cert_to_PEM_cert(data) except: # but don't try very hard pass certificate = load_certificate(FILETYPE_PEM, cert) subject = certificate.get_subject() # We only need a simple subject name for my use case. # from https://stackoverflow.com/questions/57877935/how-to-convert-an-x509name-object-to-a-string-in-python/57878347#57878347 #subject = "".join("/{0:s}={1:s}".format(name.decode(), value.decode()) for name, value in subject.get_components()) _, subject = subject.get_components()[0] subject = subject.decode('utf-8') print(subject) outfile = os.path.join(directory,subject+".pem") print(outfile) with open(outfile,"w") as o: o.write(cert)