Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,57 +64,57 @@ def initialize_async(self):
udp_collection = self.try_create_document_collection(create_client, database, udp_collection)

lww_sproc = {'id':'resolver',
'body': "function resolver(incomingRecord, existingRecord, isTombstone, conflictingRecords) {\r\n" +
" var collection = getContext().getCollection();\r\n" +
"\r\n" +
" if (!incomingRecord) {\r\n" +
" if (existingRecord) {\r\n" +
"\r\n" +
" collection.deleteDocument(existingRecord._self, {}, function(err, responseOptions) {\r\n" +
" if (err) throw err;\r\n" +
" });\r\n" +
" }\r\n" +
" } else if (isTombstone) {\r\n" +
" // delete always wins.\r\n" +
" } else {\r\n" +
" var documentToUse = incomingRecord;\r\n" +
"\r\n" +
" if (existingRecord) {\r\n" +
" if (documentToUse.regionId < existingRecord.regionId) {\r\n" +
" documentToUse = existingRecord;\r\n" +
" }\r\n" +
" }\r\n" +
"\r\n" +
" var i;\r\n" +
" for (i = 0; i < conflictingRecords.length; i++) {\r\n" +
" if (documentToUse.regionId < conflictingRecords[i].regionId) {\r\n" +
" documentToUse = conflictingRecords[i];\r\n" +
" }\r\n" +
" }\r\n" +
"\r\n" +
" tryDelete(conflictingRecords, incomingRecord, existingRecord, documentToUse);\r\n" +
" }\r\n" +
"\r\n" +
" function tryDelete(documents, incoming, existing, documentToInsert) {\r\n" +
" if (documents.length > 0) {\r\n" +
" collection.deleteDocument(documents[0]._self, {}, function(err, responseOptions) {\r\n" +
" if (err) throw err;\r\n" +
"\r\n" +
" documents.shift();\r\n" +
" tryDelete(documents, incoming, existing, documentToInsert);\r\n" +
" });\r\n" +
" } else if (existing) {\r\n" +
" collection.replaceDocument(existing._self, documentToInsert,\r\n" +
" function(err, documentCreated) {\r\n" +
" if (err) throw err;\r\n" +
" });\r\n" +
" } else {\r\n" +
" collection.createDocument(collection.getSelfLink(), documentToInsert,\r\n" +
" function(err, documentCreated) {\r\n" +
" if (err) throw err;\r\n" +
" });\r\n" +
" }\r\n" +
" }\r\n" +
'body': "function resolver(incomingRecord, existingRecord, isTombstone, conflictingRecords) {\r\n" +
" var collection = getContext().getCollection();\r\n" +
"\r\n" +
" if (!incomingRecord) {\r\n" +
" if (existingRecord) {\r\n" +
"\r\n" +
" collection.deleteDocument(existingRecord._self, {}, function(err, responseOptions) {\r\n" +
" if (err) throw err;\r\n" +
" });\r\n" +
" }\r\n" +
" } else if (isTombstone) {\r\n" +
" // delete always wins.\r\n" +
" } else {\r\n" +
" var documentToUse = incomingRecord;\r\n" +
"\r\n" +
" if (existingRecord) {\r\n" +
" if (documentToUse.regionId < existingRecord.regionId) {\r\n" +
" documentToUse = existingRecord;\r\n" +
" }\r\n" +
" }\r\n" +
"\r\n" +
" var i;\r\n" +
" for (i = 0; i < conflictingRecords.length; i++) {\r\n" +
" if (documentToUse.regionId < conflictingRecords[i].regionId) {\r\n" +
" documentToUse = conflictingRecords[i];\r\n" +
" }\r\n" +
" }\r\n" +
"\r\n" +
" tryDelete(conflictingRecords, incomingRecord, existingRecord, documentToUse);\r\n" +
" }\r\n" +
"\r\n" +
" function tryDelete(documents, incoming, existing, documentToInsert) {\r\n" +
" if (documents.length > 0) {\r\n" +
" collection.deleteDocument(documents[0]._self, {}, function(err, responseOptions) {\r\n" +
" if (err) throw err;\r\n" +
"\r\n" +
" documents.shift();\r\n" +
" tryDelete(documents, incoming, existing, documentToInsert);\r\n" +
" });\r\n" +
" } else if (existing) {\r\n" +
" collection.replaceDocument(existing._self, documentToInsert,\r\n" +
" function(err, documentCreated) {\r\n" +
" if (err) throw err;\r\n" +
" });\r\n" +
" } else {\r\n" +
" collection.createDocument(collection.getSelfLink(), documentToInsert,\r\n" +
" function(err, documentCreated) {\r\n" +
" if (err) throw err;\r\n" +
" });\r\n" +
" }\r\n" +
" }\r\n" +
"}"
}
try:
Expand Down Expand Up @@ -205,7 +205,7 @@ def run_update_conflict_on_manual_async(self):
conflict_document_for_insertion = {'id': id, 'regionId': 0, 'regionEndpoint': self.clients[0].ReadEndpoint}
conflict_document_for_insertion = self.try_insert_document(self.clients[0], self.manual_collection_link, conflict_document_for_insertion)
time.sleep(1) #1 Second for write to sync.

print("1) Performing conflicting update across %d regions on %s" % (len(self.clients), self.manual_collection_link));

i = 0
Expand Down Expand Up @@ -243,7 +243,7 @@ def run_delete_conflict_on_manual_async(self):
conflict_document_for_insertion = {'id': id, 'regionId': 0, 'regionEndpoint': self.clients[0].ReadEndpoint}
conflict_document_for_insertion = self.try_insert_document(self.clients[0], self.manual_collection_link, conflict_document_for_insertion)
time.sleep(1) #1 Second for write to sync.

print("1) Performing conflicting delete across %d regions on %s" % (len(self.clients), self.manual_collection_link));

i = 0
Expand Down Expand Up @@ -375,7 +375,7 @@ def run_delete_conflict_on_LWW_async(self):
def run_insert_conflict_on_UDP_async(self):
while True:
print("1) Performing conflicting insert across 3 regions on %s" % self.udp_collection_link)

id = str(uuid.uuid4())
i = 0
pool = ThreadPool(processes = len(self.clients))
Expand Down Expand Up @@ -587,7 +587,7 @@ def validate_LWW_async_internal(self, client, conflict_document, has_delete_conf
options = {'partitionKey': conflict_document[0]['id']}
client.ReadItem(conflict_document[0]['_self'], options)

self.trace_error("Delete conflict for document %s didnt win @ %s" %
self.trace_error("Delete conflict for document %s didnt win @ %s" %
(conflict_document[0]['id'], client.ReadEndpoint))

time.sleep(0.5)
Expand All @@ -614,7 +614,7 @@ def validate_LWW_async_internal(self, client, conflict_document, has_delete_conf
existing_document = client.ReadItem(winner_document['_self'], options)

if int(existing_document['regionId']) == int(winner_document['regionId']):
print("Winner document from region %d found at %s" %
print("Winner document from region %d found at %s" %
(int(existing_document['regionId']), client.ReadEndpoint))
break
else:
Expand All @@ -623,9 +623,9 @@ def validate_LWW_async_internal(self, client, conflict_document, has_delete_conf

time.sleep(0.5)
except exceptions.AzureError as e:
self.trace_error("Winner document from region %d is not found @ %s, retrying..." %
self.trace_error("Winner document from region %d is not found @ %s, retrying..." %
(int(winner_document["regionId"]), client.WriteEndpoint))

time.sleep(0.5)

def validate_UDP_async(self, clients, conflict_document, has_delete_conflict):
Expand Down Expand Up @@ -677,7 +677,7 @@ def validate_UDP_async_internal(self, client, conflict_document, has_delete_conf
existing_document = client.ReadItem(self.udp_collection_link + "/docs/" + winner_document['id'], options)

if int(existing_document['regionId']) == int(winner_document['regionId']):
print("Winner document from region %d found at %s" %
print("Winner document from region %d found at %s" %
(int(existing_document["regionId"]), client.ReadEndpoint))
break
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def run_basic_async(self):
print("####################################################")

print("1) Starting insert loops across multiple regions ...")

documents_to_insert_per_worker = 100

run_loop_futures = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def run_loop_async(self, documents_to_insert):
start = int(round(time.time() * 1000))
self.client.CreateItem(self.document_collection_link, document)
end = int(round(time.time() * 1000))

latency.append(end - start)

latency = sorted(latency)
Expand Down
10 changes: 5 additions & 5 deletions sdk/cosmos/azure-cosmos/samples/change_feed_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import config

# ----------------------------------------------------------------------------------------------------------
# Prerequistes -
#
# 1. An Azure Cosmos account -
# Prerequistes -
#
# 1. An Azure Cosmos account -
# https:#azure.microsoft.com/en-us/documentation/articles/documentdb-create-account/
#
# 2. Microsoft Azure Cosmos PyPi package -
# 2. Microsoft Azure Cosmos PyPi package -
# https://pypi.python.org/pypi/azure-cosmos/
# ----------------------------------------------------------------------------------------------------------
# Sample - demonstrates how to consume the Change Feed and iterate on the results.
Expand Down Expand Up @@ -85,7 +85,7 @@ def run_sample():

except exceptions.CosmosHttpResponseError as e:
print('\nrun_sample has caught an error. {0}'.format(e.message))

finally:
print("\nrun_sample done")

Expand Down
Loading