Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions src/cosmosdb-preview/azext_cosmosdb_preview/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,109 @@
az cosmosdb dts cancel --account-name "ddb1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb copy'] = """
type: group
short-summary: Manage copy job
"""

helps['cosmosdb copy create'] = """
type: command
short-summary: "Creates a Copy Job."
parameters:
- name: --src-cassandra
short-summary: "Source Cassandra table"
long-summary: |
Usage: --src-cassandra keyspace=XX table=XX'
keyspace: Keyspace name of CosmosDB Cassandra.
table: Table name of CosmosDB Cassandra.
- name: --dest-cassandra
short-summary: "Destination Cassandra table"
long-summary: |
Usage: --dest-cassandra keyspace=XX table=XX'
keyspace: Keyspace name of CosmosDB Cassandra.
table: Table name of CosmosDB Cassandra.
- name: --src-nosql
short-summary: "Source NoSql container"
long-summary: |
Usage: --src-nosql database=XX container=XX'
database: Database name of CosmosDB Sql.
container: Container name of CosmosDB Sql.
- name: --dest-nosql
short-summary: "Destination NoSql container"
long-summary: |
Usage: --dest-nosql database=XX container=XX'
database: Database name of CosmosDB NoSql.
container: Container name of CosmosDB NoSql.
- name: --src-mongo
short-summary: "Source Mongo collection"
long-summary: |
Usage: --src-mongo database=XX collection=XX'
database: Database name of CosmosDB Mongo.
collection: Collection name of CosmosDB Mongo.
- name: --dest-mongo
short-summary: "Destination Mongo collection"
long-summary: |
Usage: --dest-mongo database=XX collection=XX'
database: Database name of CosmosDB Mongo.
collection: Collection name of CosmosDB Mongo.

examples:
- name: Copy NoSql container
text: |-
az cosmosdb copy create -g "rg1" --job-name "j1" --src-account "db1" --dest-account "db1" --src-nosql database=db1 container=c1 --dest-nosql database=db2 container=c2
- name: Copy Cassandra table
text: |-
az cosmosdb copy create -g "rg1" --job-name "j1" --src-account "db1" --dest-account "db1" --src-cassandra keyspace=k1 table=t1 --dest-cassandra keyspace=k2 table=t2
- name: Copy Mongo collection
text: |-
az cosmosdb copy create -g "rg1" --job-name "j1" --src-account "db1" --dest-account "db1" --src-mongo database=d1 collection=c1 --dest-mongo database=d2 collection=c2
"""

helps['cosmosdb copy list'] = """
type: command
short-summary: "Get a list of Copy Jobs."
examples:
- name: List all jobs
text: |-
az cosmosdb dts list --dest-account "db1" -g "rg1"
"""

helps['cosmosdb copy show'] = """
type: command
short-summary: "Get a Copy Job."
examples:
- name: Show details of job j1
text: |-
az cosmosdb dts show --dest-account "db1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb copy pause'] = """
type: command
short-summary: "Pause a Copy Job."
examples:
- name: Pause job j1
text: |-
az cosmosdb dts pause --dest-account "db1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb copy resume'] = """
type: command
short-summary: "Resume a Copy Job."
examples:
- name: Resume job j1
text: |-
az cosmosdb dts resume --dest-account "db1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb copy cancel'] = """
type: command
short-summary: "Cancels a Copy Job."
examples:
- name: Cancel job j1
text: |-
az cosmosdb dts cancel --dest-account "db1" --job-name "j1" -g "rg1"
"""

helps['cosmosdb sql container merge'] = """
type: command
short-summary: "Merges the partitions of a sql container."
Expand Down
37 changes: 31 additions & 6 deletions src/cosmosdb-preview/azext_cosmosdb_preview/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------
# pylint: disable=line-too-long, too-many-statements

import argparse
from argcomplete.completers import FilesCompleter

from azext_cosmosdb_preview._validators import (
Expand Down Expand Up @@ -425,7 +426,7 @@ def load_arguments(self, _):
with self.argument_context('cosmosdb dts') as c:
c.argument('account_name', account_name_type, id_part=None, help='Name of the CosmosDB database account.')

job_name_type = CLIArgumentType(options_list=['--job-name', '-n'], help='Name of the Data Transfer Job. A random job name will be generated if not passed.')
job_name_type = CLIArgumentType(options_list=['--job-name', '-n'], help='Name of the copy job. A random job name will be generated if not passed.')
with self.argument_context('cosmosdb dts copy') as c:
c.argument('job_name', job_name_type)
c.argument('source_cassandra_table', nargs='+', action=AddCassandraTableAction, help='Source cassandra table')
Expand All @@ -436,13 +437,37 @@ def load_arguments(self, _):
c.argument('dest_sql_container', nargs='+', action=AddSqlContainerAction, help='Destination sql container')
c.argument('worker_count', type=int, help='Worker count')

with self.argument_context('cosmosdb copy') as c:
c.argument('src_account', help='Name of the CosmosDB source database account.', completer=get_resource_name_completion_list('Microsoft.DocumentDb/databaseAccounts'), id_part='name')
c.argument('dest_account', help='Name of the CosmosDB destination database account.', completer=get_resource_name_completion_list('Microsoft.DocumentDb/databaseAccounts'), id_part='name')

with self.argument_context('cosmosdb copy create') as c:
c.argument('job_name', job_name_type)
c.argument('src_cassandra', nargs='+', arg_group='CosmosDB for Cassandra Table Copy', action=AddCassandraTableAction, help='Source Cassandra table details')
c.argument('src_mongo', nargs='+', arg_group='CosmosDB for MongoDB Collection Copy', action=AddMongoCollectionAction, help='Source Mongo collection details')
c.argument('src_nosql', nargs='+', arg_group='Cosmos DB for NoSQL Container Copy', action=AddSqlContainerAction, help='Source NoSql container details')
c.argument('dest_cassandra', nargs='+', arg_group='CosmosDB for Cassandra Table Copy', action=AddCassandraTableAction, help='Destination Cassandra table details')
c.argument('dest_mongo', nargs='+', arg_group='CosmosDB for MongoDB Collection Copy', action=AddMongoCollectionAction, help='Destination Mongo collection details')
c.argument('dest_nosql', nargs='+', arg_group='Cosmos DB for NoSQL Container Copy', action=AddSqlContainerAction, help='Destination NoSql container details')
c.argument('host_copy_on_src', help=argparse.SUPPRESS)
c.argument('worker_count', type=int, help=argparse.SUPPRESS)

for scope in [
'cosmosdb copy list',
'cosmosdb copy show',
'cosmosdb copy pause',
'cosmosdb copy resume',
'cosmosdb copy cancel']:
with self.argument_context(scope) as c:
c.argument('account_name', options_list=["--account-name", "--dest-account", "--src-account"], required=True, help='CosmosDB account name where the job is created.')

for scope in [
'cosmosdb dts show',
'cosmosdb dts pause',
'cosmosdb dts resume',
'cosmosdb dts cancel']:
'cosmosdb copy show',
'cosmosdb copy pause',
'cosmosdb copy resume',
'cosmosdb copy cancel']:
with self.argument_context(scope) as c:
c.argument('job_name', options_list=['--job-name', '-n'], help='Name of the Data Transfer Job.')
c.argument('job_name', help='Name of the Copy Job.', id_part='child_name_1', required=True)

max_throughput_type = CLIArgumentType(options_list=['--max-throughput'], help='The maximum throughput resource can scale to (RU/s). Provided when the resource is autoscale enabled. The minimum value can be 4000 (RU/s)')

Expand Down
16 changes: 13 additions & 3 deletions src/cosmosdb-preview/azext_cosmosdb_preview/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ def __call__(self, parser, namespace, values, option_string=None):
namespace.source_cassandra_table = cassandra_table
elif option_string == "--dest-cassandra-table":
namespace.dest_cassandra_table = cassandra_table
elif option_string == "--src-cassandra":
namespace.src_cassandra = cassandra_table
elif option_string == "--dest-cassandra":
namespace.dest_cassandra = cassandra_table
else:
namespace.cassandra_table = cassandra_table

Expand Down Expand Up @@ -174,6 +178,8 @@ def __call__(self, parser, namespace, values, option_string=None):
namespace.source_mongo = mongo_collection
elif option_string == "--dest-mongo":
namespace.dest_mongo = mongo_collection
elif option_string == "--src-mongo":
namespace.src_mongo = mongo_collection
else:
namespace.mongo_collection = mongo_collection

Expand Down Expand Up @@ -207,12 +213,16 @@ def __call__(self, parser, namespace, values, option_string=None):
if container_name is None:
raise CLIError(f'usage error: missing key container in {option_string} component')

sql_container = CosmosSqlDataTransferDataSourceSink(database_name=database_name, container_name=container_name)
nosql_container = CosmosSqlDataTransferDataSourceSink(database_name=database_name, container_name=container_name)

if option_string == "--source-sql-container":
namespace.source_sql_container = sql_container
namespace.source_sql_container = nosql_container
elif option_string == "--dest-sql-container":
namespace.dest_sql_container = sql_container
namespace.dest_sql_container = nosql_container
elif option_string == "--src-nosql":
namespace.src_nosql = nosql_container
elif option_string == "--dest-nosql":
namespace.dest_nosql = nosql_container
else:
namespace.sql_container = sql_container

Expand Down
14 changes: 14 additions & 0 deletions src/cosmosdb-preview/azext_cosmosdb_preview/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,20 @@ def load_command_table(self, _):
g.command('resume', 'resume')
g.command('cancel', 'cancel')

# Data Transfer Service
cosmosdb_copy_job = CliCommandType(
operations_tmpl='azext_cosmosdb_preview.vendored_sdks.azure_mgmt_cosmosdb.operations._data_transfer_jobs_operations#DataTransferJobsOperations.{}',
client_factory=cf_data_transfer_job
)

with self.command_group('cosmosdb copy', cosmosdb_copy_job, client_factory=cf_data_transfer_job, is_preview=True) as g:
g.custom_command('create', 'cosmosdb_copy_job')
g.command('list', 'list_by_database_account')
g.show_command('show', 'get')
g.command('pause', 'pause')
g.command('resume', 'resume')
g.command('cancel', 'cancel')

# Merge partitions for Sql containers
cosmosdb_sql_sdk = CliCommandType(
operations_tmpl='azure.mgmt.cosmosdb.operations#SqlResourcesOperations.{}',
Expand Down
99 changes: 98 additions & 1 deletion src/cosmosdb-preview/azext_cosmosdb_preview/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@
MongoCluster,
NodeGroupSpec,
NodeKind,
FirewallRule
FirewallRule,
CosmosCassandraDataTransferDataSourceSink,
CosmosSqlDataTransferDataSourceSink,
CosmosMongoDataTransferDataSourceSink
)

from azext_cosmosdb_preview._client_factory import (
Expand Down Expand Up @@ -1564,6 +1567,100 @@ def cosmosdb_data_transfer_copy_job(client,
job_create_parameters=job_create_parameters)


def cosmosdb_copy_job(client,
resource_group_name,
dest_account,
src_account,
src_cassandra=None,
dest_cassandra=None,
src_nosql=None,
dest_nosql=None,
src_mongo=None,
dest_mongo=None,
job_name=None,
worker_count=0,
host_copy_on_src="false"):
host_copy_on_src = host_copy_on_src.lower() == "true"
job_create_properties = {}
is_cross_account = src_account != dest_account
remote_account_name = dest_account if host_copy_on_src else src_account

source = None
if src_cassandra is not None:
if source is not None:
raise CLIError('Invalid input: multiple source components')
if is_cross_account and not host_copy_on_src:
source = CosmosCassandraDataTransferDataSourceSink(keyspace_name=src_cassandra.keyspace_name, table_name=src_cassandra.table_name, remote_account_name=remote_account_name)
else:
source = src_cassandra

if src_nosql is not None:
if source is not None:
raise CLIError('Invalid input: multiple source components')
if is_cross_account and not host_copy_on_src:
source = CosmosSqlDataTransferDataSourceSink(database_name=src_nosql.database_name, container_name=src_nosql.container_name, remote_account_name=remote_account_name)
else:
source = src_nosql

if src_mongo is not None:
if source is not None:
raise CLIError('Invalid input: multiple source components')
if is_cross_account and not host_copy_on_src:
source = CosmosMongoDataTransferDataSourceSink(database_name=src_mongo.database_name, collection_name=src_mongo.collection_name, remote_account_name=remote_account_name)
else:
source = src_mongo

if source is None:
raise CLIError('source component is missing')
job_create_properties['source'] = source

destination = None
if dest_cassandra is not None:
if destination is not None:
raise CLIError('Invalid input: multiple destination components')
destination = dest_cassandra
if is_cross_account and host_copy_on_src:
destination = CosmosCassandraDataTransferDataSourceSink(keyspace_name=dest_cassandra.keyspace_name, table_name=dest_cassandra.table_name, remote_account_name=remote_account_name)
else:
destination = dest_cassandra

if dest_nosql is not None:
if destination is not None:
raise CLIError('Invalid input: multiple destination components')
if is_cross_account and host_copy_on_src:
destination = CosmosSqlDataTransferDataSourceSink(database_name=dest_nosql.database_name, container_name=dest_nosql.container_name, remote_account_name=remote_account_name)
else:
destination = dest_nosql

if dest_mongo is not None:
if destination is not None:
raise CLIError('Invalid input: multiple destination components')
if is_cross_account and host_copy_on_src:
destination = CosmosMongoDataTransferDataSourceSink(database_name=dest_mongo.database_name, collection_name=dest_mongo.collection_name, remote_account_name=remote_account_name)
else:
destination = dest_mongo

if destination is None:
raise CLIError('destination component is missing')
job_create_properties['destination'] = destination

if worker_count > 0:
job_create_properties['worker_count'] = worker_count

job_create_parameters = {}
job_create_parameters['properties'] = job_create_properties

if job_name is None:
job_name = _gen_guid()

host_account_name = src_account if host_copy_on_src else dest_account

return client.create(resource_group_name=resource_group_name,
account_name=host_account_name,
job_name=job_name,
job_create_parameters=job_create_parameters)


def cli_begin_list_sql_container_partition_merge(client,
resource_group_name,
account_name,
Expand Down
Loading