-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Python Eventhubs EventProcessor partition manager blob storage #7053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* [AutoPR datafactory/resource-manager] [Datafactory] ADLS Gen 2 support for HDI BYOC and vNet support for HDI on demand (Azure#5663) * Generated from e4bd3471cedb625a2d65c1045f8d13f532f3f945 ADLS Gen 2 support for HDI BYOC and vNet support for HDI on demand * Packaging update of azure-mgmt-datafactory * [AutoPR datafactory/resource-manager] Add Dataset and CopySource for SAP HANA (Azure#5835) * Generated from 5f85e81e98e9fea4da62b1d4eed0a9bfc4b2bf5e Update Pipeline.json * Generated from 5f85e81e98e9fea4da62b1d4eed0a9bfc4b2bf5e Update Pipeline.json * [AutoPR datafactory/resource-manager] (Public swagger update) Add TeradataSource,TeradataPartitionSettings,TeradataTableDataset,TeradataTableDatasetTypeProperties (Azure#5865) * Generated from d2b6a0a231eeeef8cd8f82383d786706289b8b75 add TerdateTableDataset,TeradataSource * Generated from 0fb95a04203b7d79f6f007221e2c34535b0c3baf modify specified * [AutoPR datafactory/resource-manager] fix public swagger issues (Azure#5985) * Generated from b0ddfd5a2aefefdca6d220fd03714b3fdfc779a6 modify swagger * Generated from 76032c5b6d424dceb3a9b03b7df79e009eb5c183 Change XxxSetting to XxxSettings in private swagger * [AutoPR datafactory/resource-manager] [Datafactory] Add three new connectors (Azure#6281) * Generated from 0ee2888c7118dfe04f56d37b3bdb491b88981fff [Datafactory] Add Azure SQL Database Managed Instance, Dynamics CRM and Common Data Service for Apps * Generated from e164e4233491e47b7335ed6a797b03d18445f705 Change enum type to string * Packaging update of azure-mgmt-datafactory * [AutoPR datafactory/resource-manager] [Datafactory] Add three new connectors (Azure#6328) * Generated from 034a934c3d28b814e488fc8134b330a33f1c0c57 [Datafactory] Add three new connectors * Generated from 55361517217e7bef074e143a836e9a823256ade3 Add Informix into custom-words.txt * [AutoPR datafactory/resource-manager] SSIS File System Support (Azure#6216) * Generated from 29f3be5668f9d26352c4711117630ff4a4fd431b SSIS File System Support * Generated from 29f3be5668f9d26352c4711117630ff4a4fd431b SSIS File System Support * [AutoPR datafactory/resource-manager] Introduce ADX Command (Azure#6404) * Generated from 0ae079d21b3b37fb36dfa54e0d0ad46c81329e48 Introduce ADX Command * Generated from 37671c3194eee7f29e4d05851515a094ad8cca91 Use full ADX name * [AutoPR datafactory/resource-manager] fix: datafactory character encoding (Azure#6423) * Generated from 1f768e0b1251c521df6386353c805af1f1980b87 fix: datafactory character encoding * Generated from 1f768e0b1251c521df6386353c805af1f1980b87 fix: datafactory character encoding * Generated from 6daaa9ba96f917b57001720be038e62850d1ccbc (Azure#6471) Change type name and add timeout property * Generated from 04df2c4ad1350ec47a500e1a1d1a609d43398aee (Azure#6505) support dataset v2 split name * [AutoPR datafactory/resource-manager] [DataFactory]SapBwCube and Sybase Dataset (Azure#6518) * Generated from b88af2e2b065a6ff559d879d690d65096d1bb56f [DataFactory]SapBwCube and Sybase Dataset * Generated from b88af2e2b065a6ff559d879d690d65096d1bb56f [DataFactory]SapBwCube and Sybase Dataset * [AutoPR datafactory/resource-manager] Enable Avro Dataset in public swagger (Azure#6567) * Generated from ec112148bf30430557ff3fac0c74f0706b1042de Enable Avro Dataset in public swagger * Generated from e41431428e45beaa5bbb12344d3332479c095e31 UPDATE * Generated from ccc8c92e96ab27329cf637c7214ebb35da8dce23 (Azure#6625) Fix model validation * updated release notes * fixed duplicate row * breaking changes * Generated from 65a2679abd2e6a4aa56f0d4e5ef459407f105ae6 (Azure#6774) [DataFactory]Fix typo for binary sink * Generated from d22072afd73683450b42a2d626e10013330ab31b (Azure#6795) event triggers subcription apis * Generated from 6ca38e062bb3184e7207e058d4aa05656e9a755f (Azure#6800) chore: jsonfmt datafactory * Generated from 3c745e4716094361aaa9e683d3e6ec582af89f9d (Azure#6815) refactor table option * Generated from 2658bfcd4e5ede36535616ef4e44125701d14366 (Azure#6832) remove redundant property * Generated from 5e1bb35d5c3314d8f4fead76c3d69a2522be026b (Azure#7005) Update review comments * using old version of autorest * v2 * v3.0.52 * v3.0.52 * manually updated history and readded tabular translator and copy translator * changed date to 08-30
Fix pylint errors
…r-python into eventhubs_blobstorage
…zure#6894) * Fix bug that iothub hub can't receive * Support direct mgmt ops of iothub * Improve mgmt ops and update livetest * Small fix * Improvement of iothub mgmt
* Retry refactor * Refactor retry, delay and handle exception * Remove unused module * Small fix * Small fix
| logger.exception("An exception occurred during list_ownership for eventhub %r consumer group %r." | ||
| " An empty ownership list is returned", | ||
| eventhub_name, consumer_group_name, exc_info=azure_err) | ||
| return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't the partition manager try to take ownership of all partitions if you return an empty list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventProcess only claims ownership on the returned list. Empty list means no ownership.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous version of the code tries to take ownership of all partitions not currently owned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous version doesn't have multiple EventProcessors so that worked.
EventProcessor now has load balancing so the code has changed a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mixed list_ownership and claim_ownership by mistake. This should raise the exception instead of return []
| "etag": b.etag, | ||
| "last_modified_time": b.last_modified.timestamp() if b.last_modified else None | ||
| } | ||
| ownership.update(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want metadata to override any properties previously set in case of name conflicts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, metadata(owner_id, offset, sequence_number) should override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be sure, if the blob has metadata "partition_id": "...", you want "..." as the value for "partition_id" that rather than the name of the blob?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our code put and update what's in metadata. If that happens, there is already a bug causing corrupt data.
"partition_id" is not a part of metadata. It's used as the blob name.
| " An empty ownership list is returned", | ||
| eventhub_name, consumer_group_name, exc_info=azure_err) | ||
| return [] | ||
| async for b in blobs: # TODO: running them concurrently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the TODO... what is supposed to run concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every partition (blob) has an async lock. I was thinking of maybe executing them concurrently will be faster. But not sure as there is no I/O bound operation. This is just a reminder for future exploration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not put TODOs in the code in this fashion. If you feel like we should add an issue, please do so instead,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed "TODO"
| try: | ||
| blob_client = await self._container_client.upload_blob( | ||
| name=partition_id, data=UPLOAD_DATA, metadata=metadata, overwrite=True) | ||
| uploaded_blob_properties = await blob_client.get_blob_properties() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here. Please use the blob client to upload the blob since that gives you access to the etag directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
container_client.upload_blob gets the blob client and then calls blob_client.upload_blob. So it already uses blob client. The following is the implementation of container_client.upload_blob()
blob = self.get_blob_client(name)
await blob.upload_blob(
data,
blob_type=blob_type,
overwrite=overwrite,
length=length,
metadata=metadata,
content_settings=content_settings,
validate_content=validate_content,
lease=lease,
timeout=timeout,
max_connections=max_connections,
encoding=encoding,
**kwargs
)
return blobThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct an error. BlobClient is indeed faster than ContainerClient. I didn't go into the very details of ContainerClient source code. But my test result shows that BlobClient.upload_blob is faster.
ContainerClient.get_blob_client is not a trivial operation.
So I changed to cache BlobClient for every partition to improve performance.
| } | ||
| async with self._cached_ownership_locks[partition_id]: | ||
| try: | ||
| blob_client = await self._container_client.upload_blob( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to unconditionally overwrite the blob?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this upload is to get renewed etag and last_modified. Uploaded content has no change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upload doesn't get the metadata, however. You are overwriting the metadata at this point. You are getting the properties in the get_blob_properties call later.
I find it very curious that we are not using etags for optimistic concurrency in the code to avoid clients stomping on each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It uses etag to avoid stomping. update_checkpoint() has a bug for not using if-match. Updated.
| cached_ownership = self._cached_ownership_dict[partition_id] | ||
| cached_ownership["etag"] = uploaded_blob_properties.etag | ||
| cached_ownership["last_modified_time"] = uploaded_blob_properties.last_modified.timestamp() | ||
| except (ResourceModifiedError, ResourceExistsError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what circumstances would you expect to get either of these exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When multiple EventProcessors are running. The etag could be changed by another EventProcessor when an EventProcessor tries to upload blob.
| logger.exception("An exception occurred when EventProcessor instance %r claim_ownership for " | ||
| "eventhub %r consumer group %r partition %r. The ownership is now lost", | ||
| owner_id, eventhub_name, consumer_group_name, partition_id, exc_info=err) | ||
| else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd avoid try/except/else since that it is a somewhat confusing construct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put that statement at the end of of try block and remove else.
I had thought else in python is a cool thing. Other languages don't have it. I've no problem removing it however.
| else: | ||
| etag_match = {"if_none_match": '"*"'} | ||
| try: | ||
| blob_client = await self._container_client.upload_blob( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here. Please use the blob client instead of the container client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as the comment in another place. container client upload_blob calls blob client upload_blob
| result.append(ownership) | ||
| return result | ||
|
|
||
| async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is code duplication/very similar code between claim_ownership and update_checkpoint. Refactor/share?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the code to reuse the upload_blob part of code
| exclude_packages = [ | ||
| 'tests', | ||
| 'examples', | ||
| # Exclude packages that will be covered by PEP420 or nspkg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to exclude packages that will be covered by azure-eventhubs as well. Or things are unlikely to go well.
| classifiers=[ | ||
| 'Development Status :: 3 - Alpha', | ||
| 'Programming Language :: Python', | ||
| 'Programming Language :: Python :: 2', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is async only yet we claim support for Python 2. Which seems incorrect.
| 'azure', | ||
| ] | ||
|
|
||
| if sys.version_info < (3, 5, 3): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This distribution package requires Python 3.5.3+, right? If so, this is incorrect.
| @@ -0,0 +1,12 @@ | |||
| # -------------------------------------------------------------------------------------------- | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider removing this file/folder since there is only a single file in the package.
| eventhub_name, consumer_group_name, exc_info=azure_err) | ||
| return [] | ||
| async for b in blobs: # TODO: running them concurrently | ||
| async with self._cached_ownership_locks[b.name]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have concerns about the scope of this lock. What is it trying to guard against?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, both claim_ownership and update_checkpoint used by one EventProcessor are changing the etag/last_modifed concurrently. Without the lock, they cause trouble to each other.
* Fix pylint * Update accessibility of of class * Small fix in livetest * Wait longer in iothub livetest * Small updates in livetest
* Update samples and codes according to the review * Small update
* Draft EventProcessor Loadbalancing * EventProcessor Load balancing * small changes from bryan's review * remove checkpoint manager from initialize * small changes * Draft EventProcessor Loadbalancing * EventProcessor Load balancing * small changes from bryan's review * remove checkpoint manager from initialize * small changes * Fix code review feedback * Packaging update of azure-mgmt-datalake-analytics * Packaging update of azure-loganalytics * Packaging update of azure-mgmt-storage * code review fixes and pylint error * reduce dictionary access * Revert "Packaging update of azure-mgmt-storage" This reverts commit cf22c7c. * Revert "Packaging update of azure-loganalytics" This reverts commit 40c7f03. * Revert "Packaging update of azure-mgmt-datalake-analytics" This reverts commit c126bea. * Trivial code change * Refine exception handling for eventprocessor * Enable pylint for eventprocessor * Expose OwnershipLostError * Move eventprocessor to aio rename Sqlite3PartitionManager to SamplePartitionManager * change checkpoint_manager to partition context * fix pylint error * fix a small issue * Catch list_ownership/claim_ownership exceptions and retry * Fix code review issues * fix event processor long running test * Remove utils.py * Remove close() method * Updated docstrings * add pytest * small fixes * Revert "Remove utils.py" This reverts commit a9446de. * change asyncio.create_task to 3.5 friendly code * Remove Callable * raise CancelledError instead of break
|
Merged to feature branch eventhubs_previews directly after this pull request code review |
Please focus on
The package structure still needs to be reviewed.
setup.py will need to be changed accordingly.