Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.keyvault.keys.models import DeletedKey, JsonWebKey, Key, KeyBase, KeyOperationResult
from azure.keyvault.keys.models import DeletedKey, JsonWebKey, Key, KeyBase
from azure.keyvault.keys._shared import AsyncKeyVaultClientBase

from ..crypto.aio import CryptographyClient
Expand Down Expand Up @@ -517,75 +517,3 @@ async def import_key(
self.vault_url, name, key=key, hsm=hsm, key_attributes=attributes, tags=tags, **kwargs
)
return Key._from_key_bundle(bundle)

@distributed_trace_async
async def wrap_key(
self, name: str, algorithm: str, value: bytes, version: Optional[str] = None, **kwargs: "**Any"
) -> KeyOperationResult:
"""Wraps a symmetric key using a specified key.

The WRAP operation supports encryption of a symmetric key using a key
encryption key that has previously been stored in an Azure Key Vault.
The WRAP operation is only strictly necessary for symmetric keys stored
in Azure Key Vault since protection with an asymmetric key can be
performed using the public portion of the key. This operation is
supported for asymmetric keys as a convenience for callers that have a
key-reference but do not have access to the public key material. This
operation requires the keys/wrapKey permission.

:param name: The name of the key.
:type name: str
:param version: The version of the key.
:type version: str
:param algorithm: algorithm identifier. Possible values include:
'RSA-OAEP', 'RSA-OAEP-256', 'RSA1_5'
:type algorithm: str or
~azure.keyvault.keys.models.JsonWebKeyEncryptionAlgorithm
:param value:
:type value: bytes
:returns: The wrapped symmetric key.
:rtype: ~azure.keyvault.keys._models.KeyOperationResult

"""
if version is None:
version = ""

bundle = await self._client.wrap_key(
self.vault_url, name, key_version=version, algorithm=algorithm, value=value, **kwargs
)
return KeyOperationResult(id=bundle.kid, value=bundle.result)

@distributed_trace_async
async def unwrap_key(
self, name: str, algorithm: str, value: bytes, version: Optional[str] = None, **kwargs: "**Any"
) -> KeyOperationResult:
"""Unwraps a symmetric key using the specified key that was initially used
for wrapping that key.

The UNWRAP operation supports decryption of a symmetric key using the
target key encryption key. This operation is the reverse of the WRAP
operation. The UNWRAP operation applies to asymmetric and symmetric
keys stored in Azure Key Vault since it uses the private portion of the
key. This operation requires the keys/unwrapKey permission.

:param name: The name of the key.
:type name: str
:param version: The version of the key.
:type version: str
:param algorithm: algorithm identifier. Possible values include:
'RSA-OAEP', 'RSA-OAEP-256', 'RSA1_5'
:type algorithm: str or
~azure.keyvault.keys.models.JsonWebKeyEncryptionAlgorithm
:param value:
:type value: bytes
:returns: The unwrapped symmetric key.
:rtype: ~azure.keyvault.keys._models.KeyOperationResult

"""
if version is None:
version = ""

bundle = await self._client.unwrap_key(
self.vault_url, name, key_version=version, algorithm=algorithm, value=value, **kwargs
)
return KeyOperationResult(id=bundle.kid, value=bundle.result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from .algorithm import (
Algorithm,
AsymmetricEncryptionAlgorithm,
SymmetricEncryptionAlgorithm,
AuthenticatedSymmetricEncryptionAlgorithm,
SignatureAlgorithm,
)
from .ec_key import EllipticCurveKey
from .key import Key
from .rsa_key import RsaKey
from .transform import CryptoTransform, BlockCryptoTransform, AuthenticatedCryptoTransform, SignatureTransform

__all__ = {
"Key",
"EllipticCurveKey",
"RsaKey",
"Algorithm",
"AsymmetricEncryptionAlgorithm",
"SymmetricEncryptionAlgorithm",
"AuthenticatedCryptoTransform",
"SignatureAlgorithm",
"CryptoTransform",
"BlockCryptoTransform",
"AuthenticatedSymmetricEncryptionAlgorithm",
"SignatureTransform",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------

import codecs
from base64 import b64encode, b64decode

from cryptography.hazmat.primitives.asymmetric import utils


def _bytes_to_int(b):
if not b or not isinstance(b, bytes):
raise ValueError("b must be non-empty byte string")

return int(codecs.encode(b, "hex"), 16)


def _int_to_bytes(i):
h = hex(i)
if len(h) > 1 and h[0:2] == "0x":
h = h[2:]

# need to strip L in python 2.x
h = h.strip("L")

if len(h) % 2:
h = "0" + h
return codecs.decode(h, "hex")


def _bstr_to_b64url(bstr):
"""Serialize bytes into base-64 string.
:param str: Object to be serialized.
:rtype: str
"""
encoded = b64encode(bstr).decode()
return encoded.strip("=").replace("+", "-").replace("/", "_")


def _str_to_b64url(s):
"""Serialize str into base-64 string.
:param str: Object to be serialized.
:rtype: str
"""
return _bstr_to_b64url(s.encode(encoding="utf8"))


def _b64_to_bstr(b64str):
"""Deserialize base64 encoded string into string.
:param str b64str: response string to be deserialized.
:rtype: bytearray
:raises: TypeError if string format invalid.
"""
padding = "=" * (3 - (len(b64str) + 3) % 4)
b64str = b64str + padding
encoded = b64str.replace("-", "+").replace("_", "/")
return b64decode(encoded)


def _b64_to_str(b64str):
"""Deserialize base64 encoded string into string.
:param str b64str: response string to be deserialized.
:rtype: str
:raises: TypeError if string format invalid.
"""
return _b64_to_bstr(b64str).decode("utf8")


def _int_to_bigendian_8_bytes(i):
b = _int_to_bytes(i)

if len(b) > 8:
raise ValueError("the specified integer is to large to be represented by 8 bytes")

if len(b) < 8:
b = (b"\0" * (8 - len(b))) + b

return b


def encode_key_vault_ecdsa_signature(signature):
"""
ASN.1 DER encode a Key Vault ECDSA signature.

Key Vault returns ECDSA signatures as the concatenated bytes of two equal-size integers. ``cryptography`` expects
ECDSA signatures be ASN.1 DER encoded.

:param bytes signature: ECDSA signature returned by Key Vault
:return: signature encoded for use by ``cryptography``
"""
mid = len(signature) // 2
r = _bytes_to_int(signature[:mid])
s = _bytes_to_int(signature[mid:])
return utils.encode_dss_signature(r, s)
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from abc import abstractmethod

_alg_registry = {}


class Algorithm(object):
_name = None

@classmethod
def name(cls):
return cls._name

@classmethod
def register(cls):
_alg_registry[cls._name] = cls

@staticmethod
def resolve(name):
if name not in _alg_registry:
return None
return _alg_registry[name]()


class AsymmetricEncryptionAlgorithm(Algorithm):
@abstractmethod
def create_encryptor(self, key):
raise NotImplementedError()

@abstractmethod
def create_decryptor(self, key):
raise NotImplementedError()


class SymmetricEncryptionAlgorithm(Algorithm):
@abstractmethod
def create_encryptor(self, key, iv):
raise NotImplementedError()

@abstractmethod
def create_decryptor(self, key, iv):
raise NotImplementedError()


class AuthenticatedSymmetricEncryptionAlgorithm(Algorithm):
@abstractmethod
def create_encryptor(self, key, iv, auth_data, auth_tag):
raise NotImplementedError()

@abstractmethod
def create_decryptor(self, key, iv, auth_data, auth_tag):
raise NotImplementedError()


class SignatureAlgorithm(Algorithm):
_default_hash_algorithm = None

@property
def default_hash_algorithm(self):
return self._default_hash_algorithm

@abstractmethod
def create_signature_transform(self, key):
raise NotImplementedError()


class HashAlgorithm(Algorithm):
@abstractmethod
def create_digest(self):
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from .aes_cbc import Aes128Cbc, Aes192Cbc, Aes256Cbc
from .aes_cbc_hmac import Aes128CbcHmacSha256, Aes192CbcHmacSha384, Aes256CbcHmacSha512
from .aes_kw import AesKw128, AesKw192, AesKw256
from .ecdsa import Ecdsa256, Es256, Es384, Es512
from .rsa_encryption import Rsa1_5, RsaOaep, RsaOaep256
from .rsa_signing import Ps256, Ps384, Ps512, Rs256, Rs384, Rs512

__all__ = [
"Aes128Cbc",
"Aes192Cbc",
"Aes256Cbc",
"Aes128CbcHmacSha256",
"Aes192CbcHmacSha384",
"Aes256CbcHmacSha512",
"AesKw128",
"AesKw192",
"AesKw256",
"Ecdsa256",
"Es256",
"Es384",
"Es512",
"Ps256",
"Ps384",
"Ps512",
"Rsa1_5",
"Rs256",
"Rs384",
"Rs512",
"RsaOaep",
"RsaOaep256",
]
Loading