Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/azure-cli/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ Release History

* Remove support for Python 3.4

**HDInsight**

* Support for creating a Kafka cluster with Kafka Rest Proxy
* Upgrade azure-mgmt-hdinsight to 1.3.0

**Install**

* Install script support python 3.8
Expand Down
9 changes: 9 additions & 0 deletions src/azure-cli/azure/cli/command_modules/hdinsight/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@
--encryption-key-version 00000000000000000000000000000000 \\
--encryption-vault-uri https://MyKeyVault.vault.azure.net \\
--assign-identity MyMSI
- name: Create a kafka cluster with kafka rest proxy.
text: |-
az hdinsight create -t kafka -g MyResourceGroup -n MyCluster \\
-p "HttpPassword1234!" --workernode-data-disks-per-node 2 \\
--storage-account MyStorageAccount \\
--kafka-management-node-size "Standard_D4_v2" \\
--kafka-client-group-id MySecurityGroupId \\
--kafka-client-group-name MySecurityGroupName
--component-version kafka=2.1
- name: Create a cluster with Azure Data Lake Storage Gen2
text: |-
az hdinsight create -t spark -g MyResourceGroup -n MyCluster \\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ def load_arguments(self, _):
help='The size of the data disk in GB, e.g. 1023.')
c.argument('zookeepernode_size', arg_type=node_size_type)
c.argument('edgenode_size', arg_type=node_size_type)
c.argument('kafka_management_node_size', arg_type=node_size_type)
c.argument('workernode_count', options_list=['--workernode-count', '-c'], arg_group='Node',
help='The number of worker nodes in the cluster.')
c.argument('kafka_management_node_count', arg_group='Node',
help='The number of kafka management node in the cluster')

# Storage
c.argument('storage_account', arg_group='Storage', validator=validate_storage_account,
Expand Down Expand Up @@ -161,6 +164,12 @@ def load_arguments(self, _):
c.argument('encryption_algorithm', arg_type=get_enum_type(JsonWebKeyEncryptionAlgorithm),
arg_group='Customer Managed Key', help='Algorithm identifier for encryption.')

# Kafka Rest Proxy
c.argument('kafka_client_group_id', arg_group='Kafka Rest Proxy',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to provide short name for your argument here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to provide short name for your argument here?

Hi @Juliehzl , Thanks. This is the shortest name that can explain the meaning. And there are already some properties' name are longer than this one. For example : workernode_data_disk_storage_account_type=None, workernode_data_disk_size=None,
We should consider the meaning firstly, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can provide options_list=['--kafka-client-group-id', '-k'] to give both descriptive name and short name. (Feel free to find a vaild short name here)
The same for the following one.

help='The client AAD security group id for Kafka Rest Proxy')
c.argument('kafka_client_group_name', arg_group='Kafka Rest Proxy',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same

The same with above

help='The client AAD security group name for Kafka Rest Proxy')

# Managed Service Identity
c.argument('assign_identity', arg_group='Managed Service Identity', validator=validate_msi,
completer=get_resource_name_completion_list_under_subscription(
Expand Down
32 changes: 30 additions & 2 deletions src/azure-cli/azure/cli/command_modules/hdinsight/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
location=None, tags=None, no_wait=False, cluster_version='default', cluster_tier=None,
cluster_configurations=None, component_version=None,
headnode_size='large', workernode_size='large', zookeepernode_size=None, edgenode_size=None,
kafka_management_node_size=None, kafka_management_node_count=2,
kafka_client_group_id=None, kafka_client_group_name=None,
workernode_count=3, workernode_data_disks_per_node=None,
workernode_data_disk_storage_account_type=None, workernode_data_disk_size=None,
http_username=None, http_password=None,
Expand All @@ -35,7 +37,8 @@ def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
from azure.mgmt.hdinsight.models import ClusterCreateParametersExtended, ClusterCreateProperties, OSType, \
ClusterDefinition, ComputeProfile, HardwareProfile, Role, OsProfile, LinuxOperatingSystemProfile, \
StorageProfile, StorageAccount, DataDisksGroups, SecurityProfile, \
DirectoryType, DiskEncryptionProperties, Tier, SshProfile, SshPublicKey
DirectoryType, DiskEncryptionProperties, Tier, SshProfile, SshPublicKey, \
KafkaRestProperties, ClientGroupInfo

validate_esp_cluster_create_params(esp, cluster_name, resource_group_name, cluster_type,
subnet, domain, cluster_admin_account, assign_identity,
Expand Down Expand Up @@ -128,6 +131,11 @@ def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
raise CLIError('Either the encryption vault URI, key name and key version should be specified, '
'or none of them should be.')

# Validate kafka rest proxy parameters
if not _all_or_none(kafka_client_group_id, kafka_client_group_name):
raise CLIError('Either the kafka client group id and kafka client group name should be specified, '
'or none of them should be')

# Specify virtual network profile only when network arguments are provided
virtual_network_profile = subnet and build_virtual_network_profile(subnet)

Expand Down Expand Up @@ -175,6 +183,7 @@ def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
data_disks_groups=workernode_data_disk_groups
)
]

if zookeepernode_size:
roles.append(
Role(
Expand All @@ -193,6 +202,17 @@ def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
os_profile=os_profile,
virtual_network_profile=virtual_network_profile
))
if kafka_management_node_size:
# generate kafkaRestProperties
roles.append(
Role(
name="kafkamanagementnode",
target_instance_count=kafka_management_node_count,
hardware_profile=HardwareProfile(vm_size=kafka_management_node_size),
os_profile=os_profile,
virtual_network_profile=virtual_network_profile
)
)

storage_accounts = []
if storage_account:
Expand Down Expand Up @@ -253,6 +273,13 @@ def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
msi_resource_id=assign_identity
)

kafka_rest_properties = (kafka_client_group_id and kafka_client_group_name) and KafkaRestProperties(
client_group_info=ClientGroupInfo(
group_id=kafka_client_group_id,
group_name=kafka_client_group_name
)
)

create_params = ClusterCreateParametersExtended(
location=location,
tags=tags,
Expand All @@ -272,7 +299,8 @@ def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
storageaccounts=storage_accounts
),
security_profile=security_profile,
disk_encryption_properties=disk_encryption_properties
disk_encryption_properties=disk_encryption_properties,
kafka_rest_properties=kafka_rest_properties
),
identity=cluster_identity
)
Expand Down
Loading