diff options
author | B. Stack <bgstack15@gmail.com> | 2021-07-29 09:44:57 -0400 |
---|---|---|
committer | B. Stack <bgstack15@gmail.com> | 2021-07-29 09:44:57 -0400 |
commit | 518c1fb1d43e895a4f3070653174c6e3ec481889 (patch) | |
tree | 2551381509c404f27b04f77751f6ca7198215738 /rrc_lib.py | |
download | read-rdp-cert-518c1fb1d43e895a4f3070653174c6e3ec481889.tar.gz read-rdp-cert-518c1fb1d43e895a4f3070653174c6e3ec481889.tar.bz2 read-rdp-cert-518c1fb1d43e895a4f3070653174c6e3ec481889.zip |
initial commit
Diffstat (limited to 'rrc_lib.py')
-rw-r--r-- | rrc_lib.py | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/rrc_lib.py b/rrc_lib.py new file mode 100644 index 0000000..82ecb27 --- /dev/null +++ b/rrc_lib.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 +# File: rrc_lib.py +# Location: https://gitlab.com/bgstack15/read-rdp-cert +# Author: bgstack15 +# Startdate: 2021-07-28 14:02 +# Title: Library for Reading RDP Certificates +# Purpose: Functions for reading packet captures of RDP negotiations to observe what cert the server is using. +# History: +# Usage: +# See read_rdp_cert.py +# References: +# packet capture filter with https://stackoverflow.com/questions/39624745/capture-only-ssl-handshake-with-tcpdump +# https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py +# Alternatives: +# uses different library https://github.com/thy09/isolation/blob/master/load_cert.py +# another library, not packaged by my distro https://security.stackexchange.com/questions/123851/how-can-i-extract-the-certificate-from-this-pcap-file +# wireshark, with above capture filter and display filter "tls.handshake.type == 11" +# Improve: +# Ensure that this handles a cert chain correctly? +# Operate on streams? +# Wild idea: initiate the network traffic that triggers the RDP and TLS certificate directly? +# Dependencies: +# python3-dpkt, python3-openssl +# python 3.9.2 +import dpkt, os, ssl +from OpenSSL.crypto import load_certificate, FILETYPE_PEM + +def read_pcap_file(infile): + inf = open(infile,"rb") + reader = dpkt.pcap.Reader(inf) + array = [] + count = 0 + help_shown = False + for ts, buf in reader: + count += 1; + print("-- Packet number:",count) + data = None + #array.append({"ts": ts, "buf": buf}) + tcp = dpkt.tcp.TCP() + ip = iplayer_from_raw(buf,reader.datalink()) + if ip.p == dpkt.ip.IP_PROTO_TCP: + tcp = ip.data + if not tcp.data: + print("Not tcp data.") + # THE USEFUL STUFF IS tcp.data + #print("tcp data") + continue + data = tcp.data + # now we can process data + if data: + a = extract_cert2(data) + if a != -1: + for i in a: + array.append(i) + return array + +#class SamplePacket(dpkt.dpkt.Packet): +# __hdr__ = () + +def old_attempt1(): + # try: + # record = dpkt.ssl.TLSRecord(data) + # except dpkt.NeedData: + # print(f"Need data, packet {count}") + # continue + # # this all works if we have already limited the capture filter + # record = dpkt.ssl.RECORD_TYPES[record.type](record.data) + # if not isinstance(record, dpkt.ssl.TLSHandshake): + # print(f"Packet {count} is not tls handshake") + # if isinstance(record, dpkt.ssl.TLSCertificate): + # print(f"Packet {count} is tls certificate") + # try: + # record = dpkt.ssl.TLSCertificate(data) + # except: + # print(f"Packet {count} could not be converted to tlscertificate") + # #a = dpkt.ssl.TLSCertificate() #.unpack(buf=record) + # #a.unpack(buf=record) + # #print(a) + # #if not help_shown: + # # help_shown = True + # # help(record.data) + # packet = SamplePacket(buf=buf) + # #extract_cert(packet) + return 0 + +# iplayer_from_raw ripped from https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py +def iplayer_from_raw(raw, linktype=1): + """Converts a raw packet to a dpkt packet regarding of link type. + @param raw: raw packet + @param linktype: integer describing link type as expected by dpkt + """ + if linktype == 1: # ethernet + pkt = dpkt.ethernet.Ethernet(raw) + ip = pkt.data + elif linktype == 101: # raw + ip = dpkt.ip.IP(raw) + else: + return -1 + return ip + +def extract_cert2(data): + array = [] + #print(f"Working with {data}") + # hard-coded validation against TLSv1 record because library dpkt sucks. + content_type = data[0] + tls_version = conv(data[1:3]) + length = conv(data[3:5]) + print(f"content_type {content_type}") + print(f"tls_version {tls_version}") + print(f"length {length}") + # so the next [length] contents need to be interpreted + if content_type != 22: + print("This is not a tlsv1 handshake") + return -1 + final_byte = 5+1+length + subdata = data[5:final_byte] + tls_object_count = 0 + print(f"length of subdata: {len(subdata)}") + current_byte = 0 + protocol = 0 + p_length = 0 + MAX_LOOPS = 15 + loop_count = 0 + while(current_byte < len(subdata)): + loop_count += 1 + if loop_count >= MAX_LOOPS: + print("Safety valve; too many loops through TLSv1 handshake inspection.") + break + protocol = subdata[current_byte] + p_length = conv(subdata[current_byte+1:current_byte+4]) + if protocol == 1: + print(f"Client hello, length {p_length}") + elif protocol == 2: + print(f"Server hello, length {p_length}") + elif protocol == 11: + print(f"Certificate, the holy grail, length {p_length}") + # there is a separate field here for certs length, which should be 3 bytes shorter than p_length. + certs_length = conv(subdata[current_byte+4:current_byte+4+3]) + print(f"For all certs here, length is {certs_length}") + # this will need to be written probably to loop when there is a chain of certificates + cert_length = conv(subdata[current_byte+4+3:current_byte+4+3+3]) + print(f"Cert length is {cert_length}") + cert_body = subdata[current_byte+4+3+3:current_byte+4+3+3+cert_length] + array.append(cert_body) + print(f"This cert length is {len(cert_body)}") + #with open("/home/bgstack15/cert.out","wb") as o: + # o.write(cert_body) + elif protocol == 14: + print(f"Server hello done, length {p_length}") + elif protocol == 16: + print(f"Client key exchange, length {p_length}") + elif protocol == 20: + print(f"Change cipher spec, length {p_length}") + else: + print(f"Unknown protocol {protocol}, length {p_length}") + + current_byte += (4 + p_length) + return array + +def conv(hexinput): + return int.from_bytes(hexinput,'big') + +def extract_cert(packet): + #if not isinstance(packet,dpkt.dpkg.Packet) + print("packet['buf'] is",packet['buf']) + try: + record = dpkt.ssl.TLSRecord(packet['buf']) + except dpkt.NeedData: + print("need data") + return -1 + try: + record = dpkt.ssl.TLSRecord(packet['buf']) + print("Found it with packet['buf']") + except: + try: + record = dpkt.ssl.TLSRecord(packet) # maybe just the bytes? + print("Found it with packet") + except: + print("Error: extract_cert needs the packet passed to it.") + return -1 + # so now we have a record object + if isinstance(record, dpkt.ssl.TLSHandshake): + print("found a handshake!") + +def save_cert(data, directory = os.path.curdir): + # the other functions tend to output DER certs + cert = data + # so let's try to convert to PEM + try: + cert = ssl.DER_cert_to_PEM_cert(data) + except: + # but don't try very hard + pass + certificate = load_certificate(FILETYPE_PEM, cert) + subject = certificate.get_subject() + # We only need a simple subject name for my use case. + # from https://stackoverflow.com/questions/57877935/how-to-convert-an-x509name-object-to-a-string-in-python/57878347#57878347 + #subject = "".join("/{0:s}={1:s}".format(name.decode(), value.decode()) for name, value in subject.get_components()) + _, subject = subject.get_components()[0] + subject = subject.decode('utf-8') + print(subject) + outfile = os.path.join(directory,subject+".pem") + print(outfile) + with open(outfile,"w") as o: + o.write(cert) |