1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
#!/usr/bin/env python3
# File: rrc_lib.py
# Location: https://gitlab.com/bgstack15/read-rdp-cert
# Author: bgstack15
# Startdate: 2021-07-28 14:02
# Title: Library for Reading RDP Certificates
# Purpose: Functions for reading packet captures of RDP negotiations to observe what cert the server is using.
# History:
# Usage:
# See read_rdp_cert.py
# References:
# packet capture filter with https://stackoverflow.com/questions/39624745/capture-only-ssl-handshake-with-tcpdump
# https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py
# Alternatives:
# uses different library https://github.com/thy09/isolation/blob/master/load_cert.py
# another library, not packaged by my distro https://security.stackexchange.com/questions/123851/how-can-i-extract-the-certificate-from-this-pcap-file
# wireshark, with above capture filter and display filter "tls.handshake.type == 11"
# Improve:
# Ensure that this handles a cert chain correctly?
# Operate on streams?
# Wild idea: initiate the network traffic that triggers the RDP and TLS certificate directly?
# Dependencies:
# python3-dpkt, python3-openssl
# python 3.9.2
import dpkt, os, ssl
from OpenSSL.crypto import load_certificate, FILETYPE_PEM
def read_pcap_file(infile):
inf = open(infile,"rb")
reader = dpkt.pcap.Reader(inf)
array = []
count = 0
help_shown = False
for ts, buf in reader:
count += 1;
print("-- Packet number:",count)
data = None
#array.append({"ts": ts, "buf": buf})
tcp = dpkt.tcp.TCP()
ip = iplayer_from_raw(buf,reader.datalink())
if ip.p == dpkt.ip.IP_PROTO_TCP:
tcp = ip.data
if not tcp.data:
print("Not tcp data.")
# THE USEFUL STUFF IS tcp.data
#print("tcp data")
continue
data = tcp.data
# now we can process data
if data:
a = extract_cert2(data)
if a != -1:
for i in a:
array.append(i)
return array
#class SamplePacket(dpkt.dpkt.Packet):
# __hdr__ = ()
def old_attempt1():
# try:
# record = dpkt.ssl.TLSRecord(data)
# except dpkt.NeedData:
# print(f"Need data, packet {count}")
# continue
# # this all works if we have already limited the capture filter
# record = dpkt.ssl.RECORD_TYPES[record.type](record.data)
# if not isinstance(record, dpkt.ssl.TLSHandshake):
# print(f"Packet {count} is not tls handshake")
# if isinstance(record, dpkt.ssl.TLSCertificate):
# print(f"Packet {count} is tls certificate")
# try:
# record = dpkt.ssl.TLSCertificate(data)
# except:
# print(f"Packet {count} could not be converted to tlscertificate")
# #a = dpkt.ssl.TLSCertificate() #.unpack(buf=record)
# #a.unpack(buf=record)
# #print(a)
# #if not help_shown:
# # help_shown = True
# # help(record.data)
# packet = SamplePacket(buf=buf)
# #extract_cert(packet)
return 0
# iplayer_from_raw ripped from https://github.com/0x71/cuckoo-linux/blob/82263c5df40ebe70dc35976b917293eb54a363af/modules/processing/network.py
def iplayer_from_raw(raw, linktype=1):
"""Converts a raw packet to a dpkt packet regarding of link type.
@param raw: raw packet
@param linktype: integer describing link type as expected by dpkt
"""
if linktype == 1: # ethernet
pkt = dpkt.ethernet.Ethernet(raw)
ip = pkt.data
elif linktype == 101: # raw
ip = dpkt.ip.IP(raw)
else:
return -1
return ip
def extract_cert2(data):
array = []
#print(f"Working with {data}")
# hard-coded validation against TLSv1 record because library dpkt sucks.
content_type = data[0]
tls_version = conv(data[1:3])
length = conv(data[3:5])
print(f"content_type {content_type}")
print(f"tls_version {tls_version}")
print(f"length {length}")
# so the next [length] contents need to be interpreted
if content_type != 22:
print("This is not a tlsv1 handshake")
return -1
final_byte = 5+1+length
subdata = data[5:final_byte]
tls_object_count = 0
print(f"length of subdata: {len(subdata)}")
current_byte = 0
protocol = 0
p_length = 0
MAX_LOOPS = 15
loop_count = 0
while(current_byte < len(subdata)):
loop_count += 1
if loop_count >= MAX_LOOPS:
print("Safety valve; too many loops through TLSv1 handshake inspection.")
break
protocol = subdata[current_byte]
p_length = conv(subdata[current_byte+1:current_byte+4])
if protocol == 1:
print(f"Client hello, length {p_length}")
elif protocol == 2:
print(f"Server hello, length {p_length}")
elif protocol == 11:
print(f"Certificate, the holy grail, length {p_length}")
# there is a separate field here for certs length, which should be 3 bytes shorter than p_length.
certs_length = conv(subdata[current_byte+4:current_byte+4+3])
print(f"For all certs here, length is {certs_length}")
# this will need to be written probably to loop when there is a chain of certificates
cert_length = conv(subdata[current_byte+4+3:current_byte+4+3+3])
print(f"Cert length is {cert_length}")
cert_body = subdata[current_byte+4+3+3:current_byte+4+3+3+cert_length]
array.append(cert_body)
print(f"This cert length is {len(cert_body)}")
#with open("/home/bgstack15/cert.out","wb") as o:
# o.write(cert_body)
elif protocol == 14:
print(f"Server hello done, length {p_length}")
elif protocol == 16:
print(f"Client key exchange, length {p_length}")
elif protocol == 20:
print(f"Change cipher spec, length {p_length}")
else:
print(f"Unknown protocol {protocol}, length {p_length}")
current_byte += (4 + p_length)
return array
def conv(hexinput):
return int.from_bytes(hexinput,'big')
def extract_cert(packet):
#if not isinstance(packet,dpkt.dpkg.Packet)
print("packet['buf'] is",packet['buf'])
try:
record = dpkt.ssl.TLSRecord(packet['buf'])
except dpkt.NeedData:
print("need data")
return -1
try:
record = dpkt.ssl.TLSRecord(packet['buf'])
print("Found it with packet['buf']")
except:
try:
record = dpkt.ssl.TLSRecord(packet) # maybe just the bytes?
print("Found it with packet")
except:
print("Error: extract_cert needs the packet passed to it.")
return -1
# so now we have a record object
if isinstance(record, dpkt.ssl.TLSHandshake):
print("found a handshake!")
def save_cert(data, directory = os.path.curdir):
# the other functions tend to output DER certs
cert = data
# so let's try to convert to PEM
try:
cert = ssl.DER_cert_to_PEM_cert(data)
except:
# but don't try very hard
pass
certificate = load_certificate(FILETYPE_PEM, cert)
subject = certificate.get_subject()
# We only need a simple subject name for my use case.
# from https://stackoverflow.com/questions/57877935/how-to-convert-an-x509name-object-to-a-string-in-python/57878347#57878347
#subject = "".join("/{0:s}={1:s}".format(name.decode(), value.decode()) for name, value in subject.get_components())
_, subject = subject.get_components()[0]
subject = subject.decode('utf-8')
print(subject)
outfile = os.path.join(directory,subject+".pem")
print(outfile)
with open(outfile,"w") as o:
o.write(cert)
|