Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Introduced the ECDH Keystore
- Refactored the TLSSessionCtx to use the new ECDH keystore
  • Loading branch information
Alex Moneger committed Oct 6, 2016
commit 6e77c6224caf59d8b76091842f1349b71f41c275
112 changes: 53 additions & 59 deletions scapy_ssl_tls/ssl_tls_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ def str_to_int(str_):
return int(binascii.hexlify(str_), 16)


def str_to_ec_point(ansi_str, ec_curve):
if not ansi_str.startswith("\x04"):
def ansi_str_to_point(str_):
if not str_.startswith("\x04"):
raise ValueError("ANSI octet string missing point prefix (0x04)")
ansi_str = ansi_str[1:]
if len(ansi_str) % 2 != 0:
str_ = str_[1:]
if len(str_) % 2 != 0:
raise ValueError("Can't parse curve point. Odd ANSI string length")
str_to_int = lambda x: int(binascii.hexlify(x), 16)
x, y = str_to_int(ansi_str[:len(ansi_str) // 2]), str_to_int(ansi_str[len(ansi_str) // 2:])
return ec.Point(ec_curve, x, y)
half = len(str_) // 2
return str_to_int(str_[:half]), str_to_int(str_[half:])


def point_to_ansi_str(point):
return "\x04%s%s" % (int_to_str(point.x), int_to_str(point.y))


class TLSSessionCtx(object):
Expand Down Expand Up @@ -110,21 +113,13 @@ def __init__(self, client=True):
self.crypto.client.asym_keystore = tlsk.EmptyAsymKeystore()
self.crypto.client.kex_keystore = tlsk.EmptyKexKeystore()

self.crypto.client.ecdh = namedtuple("ecdh", ["curve_name", "priv", "pub"])
self.crypto.client.ecdh.curve_name = None
self.crypto.client.ecdh.priv = None
self.crypto.client.ecdh.pub = None
self.crypto.server = namedtuple('server', ['enc','dec','rsa', "hmac", "asym_keystore", "kex_keystore"])
self.crypto.server.enc = None
self.crypto.server.dec = None
self.crypto.server.hmac = None
self.crypto.server.asym_keystore = tlsk.EmptyAsymKeystore()
self.crypto.server.kex_keystore = tlsk.EmptyKexKeystore()

self.crypto.server.ecdh = namedtuple("ecdh", ["curve_name", "priv", "pub"])
self.crypto.server.ecdh.curve_name = None
self.crypto.server.ecdh.priv = None
self.crypto.server.ecdh.pub = None
self.crypto.session = namedtuple('session', ["encrypted_premaster_secret",
'premaster_secret',
'master_secret',
Expand Down Expand Up @@ -177,13 +172,6 @@ def __repr__(self):
"crypto-server-asym_keystore": self.crypto.server.asym_keystore,
"crypto-server-kex_keystore": self.crypto.server.kex_keystore,

"crypto-client-ecdh-curve_name": repr(self.crypto.client.ecdh.curve_name),
"crypto-client-ecdh-priv": repr(self.crypto.client.ecdh.priv),
"crypto-client-ecdh-pub": repr(self.crypto.client.ecdh.pub),
"crypto-server-ecdh-curve_name": repr(self.crypto.server.ecdh.curve_name),
"crypto-server-ecdh-priv": repr(self.crypto.server.ecdh.priv),
"crypto-server-ecdh-pub": repr(self.crypto.server.ecdh.pub),

'crypto-session-encrypted_premaster_secret':repr(self.crypto.session.encrypted_premaster_secret),
'crypto-session-premaster_secret':repr(self.crypto.session.premaster_secret),
'crypto-session-master_secret':repr(self.crypto.session.master_secret),
Expand Down Expand Up @@ -226,13 +214,6 @@ def __repr__(self):
str_ += "\n\t crypto.server.asym_keystore=%(crypto-server-asym_keystore)s"
str_ += "\n\t crypto.server.kex_keystore=%(crypto-server-kex_keystore)s"

str_ += "\n\t crypto.client.ecdh.curve_name=%(crypto-client-ecdh-curve_name)s"
str_ += "\n\t crypto.client.ecdh.priv=%(crypto-client-ecdh-priv)s"
str_ += "\n\t crypto.client.ecdh.pub=%(crypto-client-ecdh-pub)s"
str_ += "\n\t crypto.server.ecdh.curve_name=%(crypto-server-ecdh-curve_name)s"
str_ += "\n\t crypto.server.ecdh.priv=%(crypto-server-ecdh-priv)s"
str_ += "\n\t crypto.server.ecdh.pub=%(crypto-server-ecdh-pub)s"

str_ +="\n\t crypto.session.encrypted_premaster_secret=%(crypto-session-encrypted_premaster_secret)s"
str_ +="\n\t crypto.session.premaster_secret=%(crypto-session-premaster_secret)s"
str_ +="\n\t crypto.session.master_secret=%(crypto-session-master_secret)s"
Expand Down Expand Up @@ -345,23 +326,26 @@ def _process(self, pkt):
public = str_to_int(pkt[tls.TLSServerDHParams].y_s)
self.crypto.server.kex_keystore = tlsk.DHKeyStore(g, p, public)
if pkt.haslayer(tls.TLSServerECDHParams):
try:
self.crypto.server.ecdh.curve_name = tls.TLS_ELLIPTIC_CURVES[pkt[tls.TLSServerECDHParams].curve_name]
# Unknown cuve case. Just record raw values, but do nothing with them
except KeyError:
self.crypto.server.ecdh.curve_name = pkt[tls.TLSServerECDHParams].curve_name
self.crypto.server.ecdh.pub = pkt[tls.TLSServerECDHParams].p
warnings.warn("Unknown elliptic curve. Client KEX calculation is up to you")
# We are on a known curve
else:
# TODO: DO not assume uncompressed EC points!
# Uncompressed EC points are recorded in ANSI format => \x04 + x_point + y_point
ansi_ec_point_str = pkt[tls.TLSServerECDHParams].p
if isinstance(self.crypto.server.kex_keystore, tlsk.EmptyKexKeystore):
try:
ec_curve = ec_reg.get_curve(self.crypto.server.ecdh.curve_name)
self.crypto.server.ecdh.pub = str_to_ec_point(ansi_ec_point_str, ec_curve)
except ValueError:
warnings.warn("Unsupported elliptic curve: %s" % self.crypto.server.ecdh.curve_name)
curve_id = pkt[tls.TLSServerECDHParams].curve_name
# TODO: DO not assume uncompressed EC points!
# Uncompressed EC points are recorded in ANSI format => \x04 + x_point + y_point
point = ansi_str_to_point(pkt[tls.TLSServerECDHParams].p)
curve_name = tls.TLS_ELLIPTIC_CURVES[curve_id]
# Unknown curve case. Just record raw values, but do nothing with them
except KeyError:
self.crypto.server.kex_keystore = tlsk.ECDHKeyStore(None, point)
warnings.warn("Unknown elliptic curve id: %d. Client KEX calculation is up to you" % curve_id)
# We are on a known curve
else:
try:
curve = ec_reg.get_curve(curve_name)
self.crypto.server.kex_keystore = tlsk.ECDHKeyStore(curve, ec.Point(curve, *point))
except ValueError:
self.crypto.server.kex_keystore = tlsk.ECDHKeyStore(None, point)
warnings.warn("Unsupported elliptic curve: %s" % curve_name)


# calculate key material
if pkt.haslayer(tls.TLSClientKeyExchange):
Expand All @@ -384,8 +368,14 @@ def _process(self, pkt):
else:
raise RuntimeError("Server keystore is not a DH keystore")
elif pkt.haslayer(tls.TLSClientECDHParams):
ec_curve = ec_reg.get_curve(self.crypto.server.ecdh.curve_name)
self.crypto.client.ecdh.pub = str_to_ec_point(pkt[tls.TLSClientECDHParams].data, ec_curve)
# Check if we have an unitialized keystore, and if so build a new one
if isinstance(self.crypto.client.kex_keystore, tlsk.EmptyKexKeystore):
server_kex_keystore = self.crypto.server.kex_keystore
# Check if server side is a ECDH keystore. Something is messed up otherwise
if isinstance(server_kex_keystore, tlsk.ECDHKeyStore):
curve = server_kex_keystore.curve
point = ansi_str_to_point(pkt[tls.TLSClientECDHParams].data)
self.crypto.client.kex_keystore = tlsk.ECDHKeyStore(curve, ec.Point(curve, *point))

explicit_iv = True if self.params.negotiated.version > tls.TLSVersion.TLS_1_0 else False
self.sec_params = TLSSecurityParameters(self.crypto.session.prf,
Expand Down Expand Up @@ -453,8 +443,8 @@ def get_encrypted_pms(self, pms=None):
return self.crypto.session.encrypted_premaster_secret

def get_client_dh_pubkey(self, private=None):
if isinstance(self.crypto.server.kex_keystore, tlsk.EmptyKexKeystore):
raise RuntimeError("Unitialized DH server keystore")
if not isinstance(self.crypto.server.kex_keystore, tlsk.DHKeyStore):
raise RuntimeError("Server keystore is not DH")
g = self.crypto.server.kex_keystore.g
p = self.crypto.server.kex_keystore.p
public = self.crypto.server.kex_keystore.public
Expand All @@ -466,20 +456,24 @@ def get_client_dh_pubkey(self, private=None):
self.crypto.session.premaster_secret = int_to_str(pms).lstrip("\x00")
return int_to_str(self.crypto.client.kex_keystore.public)

def get_client_ecdh_pubkey(self, priv_key=None):
# Will raise ValueError for unknown curves
ec_curve = ec_reg.get_curve(self.crypto.server.ecdh.curve_name)
server_keypair = ec.Keypair(ec_curve, pub=self.crypto.server.ecdh.pub)
if priv_key is None:
client_keypair = ec.make_keypair(ec_curve)
def get_client_ecdh_pubkey(self, private=None):
if not isinstance(self.crypto.server.kex_keystore, tlsk.ECDHKeyStore):
raise RuntimeError("Server keystore is not ECDH")
if self.crypto.server.kex_keystore.unknown_curve:
raise RuntimeError("Unknown EC. KEX calculation is up to you")

curve = self.crypto.server.kex_keystore.curve
server_keypair = self.crypto.server.kex_keystore.keys
if private is None:
client_keypair = ec.make_keypair(curve)
else:
client_keypair = ec.Keypair(ec_curve, priv_key)
self.crypto.client.ecdh.priv = int_to_str(client_keypair.priv)
self.crypto.client.ecdh.pub = client_keypair.pub
client_keypair = ec.Keypair(curve, private)
self.crypto.client.kex_keystore = tlsk.ECDHKeyStore.from_keypair(curve, client_keypair)

secret_point = ec.ECDH(client_keypair).get_secret(server_keypair)
# PMS is x coordinate of secret
self.crypto.session.premaster_secret = int_to_str(secret_point.x)
return "\x04%s%s" % (int_to_str(client_keypair.pub.x), int_to_str(client_keypair.pub.y))
return point_to_ansi_str(client_keypair.pub)

def get_client_kex_data(self, val=None):
if self.params.negotiated.key_exchange == tls.TLSKexNames.RSA:
Expand Down
31 changes: 31 additions & 0 deletions scapy_ssl_tls/ssl_tls_keystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
from scapy.asn1.asn1 import ASN1_SEQUENCE
import tinyec.ec as ec


def rsa_public_from_der_certificate(certificate):
Expand Down Expand Up @@ -174,3 +175,33 @@ def __str__(self):
private: {private}"""
return template.format(name=self.name, g=self.g, p=self.p, size=self.size, public=self.public,
private=self.private)


class ECDHKeyStore(KexKeyStore):
def __init__(self, curve, public, private=None):
self.curve = curve
self.public = public
self.private = private
if self.curve is None:
self.unknown_curve = True
self.size = 0
self.keys = (self.private, self.public)
else:
self.unknown_curve = False
self.size = nb_bits(self.curve.field.p)
self.keys = ec.Keypair(curve, self.private, self.public)
super(ECDHKeyStore, self).__init__("ECDH Keystore", public, private)

@classmethod
def from_keypair(cls, curve, keypair):
return cls(curve, keypair.pub, keypair.priv)

def __str__(self):
template = """
{name}:
curve: {curve}
size: {size}
public: {public}
private: {private}"""
curve_name = "Unknown" if self.unknown_curve else self.curve.name
return template.format(name=self.name, curve=curve_name, size=self.size, public=self.public, private=self.private)
12 changes: 5 additions & 7 deletions tests/test_ssl_tls_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,17 @@ def test_client_dh_parameters_generation_matches_fixed_data(self):

def test_client_ecdh_parameters_generation_matches_fixed_data(self):
tls_ctx = tlsc.TLSSessionCtx()
tls_ctx.crypto.server.ecdh.curve_name = "secp256r1"
secp256r1 = reg.get_curve(tls_ctx.crypto.server.ecdh.curve_name)
tls_ctx.crypto.server.ecdh.pub = ec.Point(
secp256r1,
71312736565121892539464098105317518227531978702333415386264829982789952731614L,
108064706642599821618918248475955325719985341096102200103424860263181813987462L)
secp256r1 = reg.get_curve("secp256r1")
public = ec.Point(secp256r1, 71312736565121892539464098105317518227531978702333415386264829982789952731614L,
108064706642599821618918248475955325719985341096102200103424860263181813987462L)
tls_ctx.crypto.server.kex_keystore = tlsk.ECDHKeyStore(secp256r1, public)
client_privkey = 15320484772785058360598040144348894600917526501829289880527760633524785596585L
client_keys = ec.Keypair(secp256r1, client_privkey)
client_pubkey = tls_ctx.get_client_ecdh_pubkey(client_privkey)
self.assertTrue(client_pubkey.startswith("\x04"))
self.assertEqual("\x04%s%s" % (tlsc.int_to_str(client_keys.pub.x), tlsc.int_to_str(client_keys.pub.y)),
client_pubkey)
self.assertEqual(client_keys.pub, tls_ctx.crypto.client.ecdh.pub)
self.assertEqual(client_keys.pub, tls_ctx.crypto.client.kex_keystore.public)
self.assertEqual("'(\x17\x94l\xd7AO\x03\xd4Fi\x05}mP\x1aX5C7\xf0_\xa9\xb0\xac\xba{r\x1f\x12\x8f",
tls_ctx.crypto.session.premaster_secret)

Expand Down