Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Renamed keystore to asym_keystore
- Made keystore naming consistent with kex_keystore
- Print the size of RSA keys in use
  • Loading branch information
Alex Moneger committed Oct 5, 2016
commit 5a996133070dfc0ffe091adfac6f295dac2babcb
2 changes: 1 addition & 1 deletion scapy_ssl_tls/ssl_tls_automata.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def do_bind(self):

pemo = pem_get_objects(self.pemkey)
for key_pk in (k for k in pemo.keys() if "PRIVATE" in k.upper()):
self.srv_tls_sock.tls_ctx.crypto.server.keystore = tlsk.RSAKeystore.from_private(pemo[key_pk].get("full"))
self.srv_tls_sock.tls_ctx.crypto.server.asym_keystore = tlsk.RSAKeystore.from_private(pemo[key_pk].get("full"))
break

@hookable
Expand Down
38 changes: 19 additions & 19 deletions scapy_ssl_tls/ssl_tls_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,22 @@ def __init__(self, client=True):
self.compression = namedtuple("compression", ["method"])
self.compression.method = None
self.crypto = namedtuple('crypto', ['client','server'])
self.crypto.client = namedtuple('client', ['enc', 'dec', "hmac", "keystore", "kex_keystore"])
self.crypto.client = namedtuple('client', ['enc', 'dec', "hmac", "asym_keystore", "kex_keystore"])
self.crypto.client.enc = None
self.crypto.client.dec = None
self.crypto.client.hmac = None
self.crypto.client.keystore = tlsk.EmptyAsymKeystore()
self.crypto.client.asym_keystore = tlsk.EmptyAsymKeystore()
self.crypto.client.kex_keystore = tlsk.EmptyKexKeystore()

self.crypto.client.ecdh = namedtuple("ecdh", ["curve_name", "priv", "pub"])
self.crypto.client.ecdh.curve_name = None
self.crypto.client.ecdh.priv = None
self.crypto.client.ecdh.pub = None
self.crypto.server = namedtuple('server', ['enc','dec','rsa', "hmac", "keystore", "kex_keystore"])
self.crypto.server = namedtuple('server', ['enc','dec','rsa', "hmac", "asym_keystore", "kex_keystore"])
self.crypto.server.enc = None
self.crypto.server.dec = None
self.crypto.server.hmac = None
self.crypto.server.keystore = tlsk.EmptyAsymKeystore()
self.crypto.server.asym_keystore = tlsk.EmptyAsymKeystore()
self.crypto.server.kex_keystore = tlsk.EmptyKexKeystore()

self.crypto.server.ecdh = namedtuple("ecdh", ["curve_name", "priv", "pub"])
Expand Down Expand Up @@ -172,9 +172,9 @@ def __repr__(self):
'crypto-server-enc':repr(self.crypto.server.enc),
'crypto-server-dec':repr(self.crypto.server.dec),

"crypto-client-keystore": self.crypto.client.keystore,
"crypto-client-asym_keystore": self.crypto.client.asym_keystore,
"crypto-client-kex_keystore": self.crypto.client.kex_keystore,
"crypto-server-keystore": self.crypto.server.keystore,
"crypto-server-asym_keystore": self.crypto.server.asym_keystore,
"crypto-server-kex_keystore": self.crypto.server.kex_keystore,

"crypto-client-ecdh-curve_name": repr(self.crypto.client.ecdh.curve_name),
Expand Down Expand Up @@ -221,9 +221,9 @@ def __repr__(self):
str_ +="\n\t crypto.server.enc=%(crypto-server-enc)s"
str_ +="\n\t crypto.server.dec=%(crypto-server-dec)s"

str_ += "\n\t crypto.client.keystore=%(crypto-client-keystore)s"
str_ += "\n\t crypto.client.asym_keystore=%(crypto-client-asym_keystore)s"
str_ += "\n\t crypto.client.kex_keystore=%(crypto-client-kex_keystore)s"
str_ += "\n\t crypto.server.keystore=%(crypto-server-keystore)s"
str_ += "\n\t crypto.server.asym_keystore=%(crypto-server-asym_keystore)s"
str_ += "\n\t crypto.server.kex_keystore=%(crypto-server-kex_keystore)s"

str_ += "\n\t crypto.client.ecdh.curve_name=%(crypto-client-ecdh-curve_name)s"
Expand Down Expand Up @@ -322,11 +322,11 @@ def _process(self, pkt):
# fetch server pubkey // PKCS1_v1_5
cert = pkt[tls.TLSCertificateList].certificates[0].data
# If we have a default keystore, create an RSA keystore and populate it from data on the wire
if isinstance(self.crypto.server.keystore, tlsk.EmptyAsymKeystore):
self.crypto.server.keystore = tlsk.RSAKeystore.from_der_certificate(str(cert))
if isinstance(self.crypto.server.asym_keystore, tlsk.EmptyAsymKeystore):
self.crypto.server.asym_keystore = tlsk.RSAKeystore.from_der_certificate(str(cert))
# Else keystore was assigned by user. Just add cert from the wire to the store
else:
self.crypto.server.keystore.certificate = str(cert)
self.crypto.server.asym_keystore.certificate = str(cert)
# TODO: In the future also handle kex = DH and extract static DH params from cert
elif self.params.negotiated.key_exchange is not None and self.params.negotiated.sig == DSA:
# TODO: Handle DSA sig key loading here to allow sig checks
Expand Down Expand Up @@ -368,7 +368,7 @@ def _process(self, pkt):
if pkt.haslayer(tls.TLSClientRSAParams):
self.crypto.session.encrypted_premaster_secret = pkt[tls.TLSClientRSAParams].data
# If we have the private key, let's decrypt the PMS
private = self.crypto.server.keystore.private
private = self.crypto.server.asym_keystore.private
if private is not None:
self.crypto.session.premaster_secret = PKCS1_v1_5.new(
private).decrypt(self.crypto.session.encrypted_premaster_secret, None)
Expand Down Expand Up @@ -426,26 +426,26 @@ def rsa_load_keys_from_file(self, priv_key_file, client=False):
for key_pk in (k for k in pemo.keys() if "PRIVATE" in k.upper()):
try:
if client:
self.crypto.client.keystore = tlsk.RSAKeystore.from_private(pemo[key_pk].get("full"))
self.crypto.client.asym_keystore = tlsk.RSAKeystore.from_private(pemo[key_pk].get("full"))
else:
self.crypto.server.keystore = tlsk.RSAKeystore.from_private(pemo[key_pk].get("full"))
self.crypto.server.asym_keystore = tlsk.RSAKeystore.from_private(pemo[key_pk].get("full"))
return
except ValueError:
pass
raise ValueError("Unable to load PRIVATE key from pem file: %s"%priv_key_file)

def rsa_load_keys(self, private, client=False):
if client:
self.crypto.client.keystore = tlsk.RSAKeystore.from_private(private)
self.crypto.client.asym_keystore = tlsk.RSAKeystore.from_private(private)
else:
self.crypto.server.keystore = tlsk.RSAKeystore.from_private(private)
self.crypto.server.asym_keystore = tlsk.RSAKeystore.from_private(private)

def _generate_random_pms(self, version):
return "%s%s" % (struct.pack("!H", version), os.urandom(46))

def get_encrypted_pms(self, pms=None):
cleartext = pms or self.crypto.session.premaster_secret
public = self.crypto.server.keystore.public
public = self.crypto.server.asym_keystore.public
if public is not None:
self.crypto.session.encrypted_premaster_secret = PKCS1_v1_5.new(public).encrypt(cleartext)
else:
Expand Down Expand Up @@ -531,13 +531,13 @@ def get_handshake_hash(self, hash_):
return hash_

def get_client_signed_handshake_hash(self, hash_=SHA256.new(), pre_sign_hook=None):
if self.crypto.client.keystore.private is None:
if self.crypto.client.asym_keystore.private is None:
raise RuntimeError("Missing client private key. Can't sign")
msg_hash = self.get_handshake_hash(hash_)
if pre_sign_hook is not None:
msg_hash = pre_sign_hook(msg_hash)
# Will throw exception if we can't sign or if data is larger the modulus
return Sig_PKCS1_v1_5.new(self.crypto.client.keystore.private).sign(msg_hash)
return Sig_PKCS1_v1_5.new(self.crypto.client.asym_keystore.private).sign(msg_hash)

def set_mode(self, client=None, server=None):
self.client = client if client else not server
Expand Down
17 changes: 11 additions & 6 deletions scapy_ssl_tls/ssl_tls_keystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def __init__(self, name, public, private=None):
self.name = name
self.private = private
self.public = public
if self.public is not None:
self.size = nb_bits(self.public.n)
else:
self.size = 0
self.keys = (self.private, self.public)
self.certificate = None

Expand All @@ -84,17 +88,18 @@ def from_private(cls, private):

def __str__(self):
template = """
{name}:
certificate: {certificate}
public: {public}
private: {private}"""
return template.format(name=self.name, certificate=repr(self.certificate), public=self.public,
{name}:
certificate: {certificate}
size: {size}
public: {public}
private: {private}"""
return template.format(name=self.name, certificate=repr(self.certificate), size=self.size, public=self.public,
private=self.private)


class EmptyAsymKeystore(AsymKeyStore):
def __init__(self):
super(EmptyAsymKeystore, self).__init__("Empty Keystore", None, None)
super(EmptyAsymKeystore, self).__init__("Empty Asymmetrical Keystore", None, None)


class RSAKeystore(AsymKeyStore):
Expand Down
16 changes: 8 additions & 8 deletions tests/test_ssl_tls_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ def test_keys_are_set_in_context_when_loaded(self):
pkt = tls.TLSRecord() / tls.TLSHandshake() / tls.TLSClientHello(version=0x0301)
tls_ctx.insert(pkt)
tls_ctx.rsa_load_keys(self.pem_priv_key)
self.assertIsNotNone(tls_ctx.crypto.server.keystore.private)
self.assertIsNotNone(tls_ctx.crypto.server.keystore.public)
self.assertIsNotNone(tls_ctx.crypto.server.asym_keystore.private)
self.assertIsNotNone(tls_ctx.crypto.server.asym_keystore.public)
# Broken due to pycrypto bug: https://github.com/dlitz/pycrypto/issues/114
# Uncomment when fixed upstream
# self.assertTrue(tls_ctx.crypto.server.keystore.private.can_decrypt())
# self.assertTrue(tls_ctx.crypto.server.keystore.public.can_decrypt())
self.assertTrue(tls_ctx.crypto.server.keystore.private.can_encrypt())
# self.assertTrue(tls_ctx.crypto.server.asym_keystore.private.can_decrypt())
# self.assertTrue(tls_ctx.crypto.server.asym_keystore.public.can_decrypt())
self.assertTrue(tls_ctx.crypto.server.asym_keystore.private.can_encrypt())
# TODO: Invertigate further: broken also in pycrypto. Should return False for public keys.
# self.assertFalse(tls_ctx.crypto.server.keystore.public.can_encrypt())
# self.assertFalse(tls_ctx.crypto.server.asym_keystore.public.can_encrypt())

def test_decrypted_pms_matches_generated_pms(self):
tls_ctx = tlsc.TLSSessionCtx()
Expand Down Expand Up @@ -300,8 +300,8 @@ def test_load_rsa_privkey_from_pem_file(self):
pem_file = env_local_file("openssl_1_0_1_f_server.pem")
tls_ctx = tlsc.TLSSessionCtx()
tls_ctx.rsa_load_keys_from_file(pem_file)
self.assertTrue(tls_ctx.crypto.server.keystore.private)
self.assertTrue(tls_ctx.crypto.server.keystore.public)
self.assertTrue(tls_ctx.crypto.server.asym_keystore.private)
self.assertTrue(tls_ctx.crypto.server.asym_keystore.public)


class TestNullCompression(unittest.TestCase):
Expand Down