aboutsummaryrefslogtreecommitdiff
path: root/rrc_lib.py
diff options
context:
space:
mode:
Diffstat (limited to 'rrc_lib.py')
-rw-r--r--rrc_lib.py205
1 files changed, 205 insertions, 0 deletions
diff --git a/rrc_lib.py b/rrc_lib.py
new file mode 100644
index 0000000..82ecb27
--- /dev/null
+++ b/rrc_lib.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python3
+# File: rrc_lib.py
+# Location: https://gitlab.com/bgstack15/read-rdp-cert
+# Author: bgstack15
+# Startdate: 2021-07-28 14:02
+# Title: Library for Reading RDP Certificates
+# Purpose: Functions for reading packet captures of RDP negotiations to observe what cert the server is using.
+# History:
+# Usage:
+# See read_rdp_cert.py
+# References:
+# packet capture filter with https://stackoverflow.com/questions/39624745/capture-only-ssl-handshake-with-tcpdump
+# https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py
+# Alternatives:
+# uses different library https://github.com/thy09/isolation/blob/master/load_cert.py
+# another library, not packaged by my distro https://security.stackexchange.com/questions/123851/how-can-i-extract-the-certificate-from-this-pcap-file
+# wireshark, with above capture filter and display filter "tls.handshake.type == 11"
+# Improve:
+# Ensure that this handles a cert chain correctly?
+# Operate on streams?
+# Wild idea: initiate the network traffic that triggers the RDP and TLS certificate directly?
+# Dependencies:
+# python3-dpkt, python3-openssl
+# python 3.9.2
+import dpkt, os, ssl
+from OpenSSL.crypto import load_certificate, FILETYPE_PEM
+
+def read_pcap_file(infile):
+ inf = open(infile,"rb")
+ reader = dpkt.pcap.Reader(inf)
+ array = []
+ count = 0
+ help_shown = False
+ for ts, buf in reader:
+ count += 1;
+ print("-- Packet number:",count)
+ data = None
+ #array.append({"ts": ts, "buf": buf})
+ tcp = dpkt.tcp.TCP()
+ ip = iplayer_from_raw(buf,reader.datalink())
+ if ip.p == dpkt.ip.IP_PROTO_TCP:
+ tcp = ip.data
+ if not tcp.data:
+ print("Not tcp data.")
+ # THE USEFUL STUFF IS tcp.data
+ #print("tcp data")
+ continue
+ data = tcp.data
+ # now we can process data
+ if data:
+ a = extract_cert2(data)
+ if a != -1:
+ for i in a:
+ array.append(i)
+ return array
+
+#class SamplePacket(dpkt.dpkt.Packet):
+# __hdr__ = ()
+
+def old_attempt1():
+ # try:
+ # record = dpkt.ssl.TLSRecord(data)
+ # except dpkt.NeedData:
+ # print(f"Need data, packet {count}")
+ # continue
+ # # this all works if we have already limited the capture filter
+ # record = dpkt.ssl.RECORD_TYPES[record.type](record.data)
+ # if not isinstance(record, dpkt.ssl.TLSHandshake):
+ # print(f"Packet {count} is not tls handshake")
+ # if isinstance(record, dpkt.ssl.TLSCertificate):
+ # print(f"Packet {count} is tls certificate")
+ # try:
+ # record = dpkt.ssl.TLSCertificate(data)
+ # except:
+ # print(f"Packet {count} could not be converted to tlscertificate")
+ # #a = dpkt.ssl.TLSCertificate() #.unpack(buf=record)
+ # #a.unpack(buf=record)
+ # #print(a)
+ # #if not help_shown:
+ # # help_shown = True
+ # # help(record.data)
+ # packet = SamplePacket(buf=buf)
+ # #extract_cert(packet)
+ return 0
+
+# iplayer_from_raw ripped from https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py
+def iplayer_from_raw(raw, linktype=1):
+ """Converts a raw packet to a dpkt packet regarding of link type.
+ @param raw: raw packet
+ @param linktype: integer describing link type as expected by dpkt
+ """
+ if linktype == 1: # ethernet
+ pkt = dpkt.ethernet.Ethernet(raw)
+ ip = pkt.data
+ elif linktype == 101: # raw
+ ip = dpkt.ip.IP(raw)
+ else:
+ return -1
+ return ip
+
+def extract_cert2(data):
+ array = []
+ #print(f"Working with {data}")
+ # hard-coded validation against TLSv1 record because library dpkt sucks.
+ content_type = data[0]
+ tls_version = conv(data[1:3])
+ length = conv(data[3:5])
+ print(f"content_type {content_type}")
+ print(f"tls_version {tls_version}")
+ print(f"length {length}")
+ # so the next [length] contents need to be interpreted
+ if content_type != 22:
+ print("This is not a tlsv1 handshake")
+ return -1
+ final_byte = 5+1+length
+ subdata = data[5:final_byte]
+ tls_object_count = 0
+ print(f"length of subdata: {len(subdata)}")
+ current_byte = 0
+ protocol = 0
+ p_length = 0
+ MAX_LOOPS = 15
+ loop_count = 0
+ while(current_byte < len(subdata)):
+ loop_count += 1
+ if loop_count >= MAX_LOOPS:
+ print("Safety valve; too many loops through TLSv1 handshake inspection.")
+ break
+ protocol = subdata[current_byte]
+ p_length = conv(subdata[current_byte+1:current_byte+4])
+ if protocol == 1:
+ print(f"Client hello, length {p_length}")
+ elif protocol == 2:
+ print(f"Server hello, length {p_length}")
+ elif protocol == 11:
+ print(f"Certificate, the holy grail, length {p_length}")
+ # there is a separate field here for certs length, which should be 3 bytes shorter than p_length.
+ certs_length = conv(subdata[current_byte+4:current_byte+4+3])
+ print(f"For all certs here, length is {certs_length}")
+ # this will need to be written probably to loop when there is a chain of certificates
+ cert_length = conv(subdata[current_byte+4+3:current_byte+4+3+3])
+ print(f"Cert length is {cert_length}")
+ cert_body = subdata[current_byte+4+3+3:current_byte+4+3+3+cert_length]
+ array.append(cert_body)
+ print(f"This cert length is {len(cert_body)}")
+ #with open("/home/bgstack15/cert.out","wb") as o:
+ # o.write(cert_body)
+ elif protocol == 14:
+ print(f"Server hello done, length {p_length}")
+ elif protocol == 16:
+ print(f"Client key exchange, length {p_length}")
+ elif protocol == 20:
+ print(f"Change cipher spec, length {p_length}")
+ else:
+ print(f"Unknown protocol {protocol}, length {p_length}")
+
+ current_byte += (4 + p_length)
+ return array
+
+def conv(hexinput):
+ return int.from_bytes(hexinput,'big')
+
+def extract_cert(packet):
+ #if not isinstance(packet,dpkt.dpkg.Packet)
+ print("packet['buf'] is",packet['buf'])
+ try:
+ record = dpkt.ssl.TLSRecord(packet['buf'])
+ except dpkt.NeedData:
+ print("need data")
+ return -1
+ try:
+ record = dpkt.ssl.TLSRecord(packet['buf'])
+ print("Found it with packet['buf']")
+ except:
+ try:
+ record = dpkt.ssl.TLSRecord(packet) # maybe just the bytes?
+ print("Found it with packet")
+ except:
+ print("Error: extract_cert needs the packet passed to it.")
+ return -1
+ # so now we have a record object
+ if isinstance(record, dpkt.ssl.TLSHandshake):
+ print("found a handshake!")
+
+def save_cert(data, directory = os.path.curdir):
+ # the other functions tend to output DER certs
+ cert = data
+ # so let's try to convert to PEM
+ try:
+ cert = ssl.DER_cert_to_PEM_cert(data)
+ except:
+ # but don't try very hard
+ pass
+ certificate = load_certificate(FILETYPE_PEM, cert)
+ subject = certificate.get_subject()
+ # We only need a simple subject name for my use case.
+ # from https://stackoverflow.com/questions/57877935/how-to-convert-an-x509name-object-to-a-string-in-python/57878347#57878347
+ #subject = "".join("/{0:s}={1:s}".format(name.decode(), value.decode()) for name, value in subject.get_components())
+ _, subject = subject.get_components()[0]
+ subject = subject.decode('utf-8')
+ print(subject)
+ outfile = os.path.join(directory,subject+".pem")
+ print(outfile)
+ with open(outfile,"w") as o:
+ o.write(cert)
bgstack15