Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ._blob_service_client import BlobServiceClient
from ._lease import BlobLeaseClient
from ._download import StorageStreamDownloader
from ._quick_query_helper import QuickQueryReader
from ._shared_access_signature import generate_account_sas, generate_container_sas, generate_blob_sas
from ._shared.policies import ExponentialRetry, LinearRetry
from ._shared.response_handlers import PartialBatchErrorException
Expand Down Expand Up @@ -49,7 +50,9 @@
ContainerSasPermissions,
BlobSasPermissions,
CustomerProvidedEncryptionKey,
ContainerEncryptionScope
ContainerEncryptionScope,
QuickQueryError,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename both QuickQueryError and QuickQueryReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would you suggest?

DelimitedTextConfiguration
)

__version__ = VERSION
Expand Down Expand Up @@ -206,5 +209,8 @@ def download_blob_from_url(
'generate_container_sas',
'generate_blob_sas',
'PartialBatchErrorException',
'ContainerEncryptionScope'
'ContainerEncryptionScope',
'QuickQueryError',
'DelimitedTextConfiguration',
'QuickQueryReader'
]
107 changes: 102 additions & 5 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ._shared.request_handlers import (
add_metadata_headers, get_length, read_length,
validate_and_format_range_headers)
from ._shared.response_handlers import return_response_headers, process_storage_error
from ._shared.response_handlers import return_response_headers, process_storage_error, return_headers_and_deserialized
from ._generated import AzureBlobStorage, VERSION
from ._generated.models import ( # pylint: disable=unused-import
DeleteSnapshotsOptionType,
Expand All @@ -35,10 +35,12 @@
AppendPositionAccessConditions,
SequenceNumberAccessConditions,
StorageErrorException,
UserDelegationKey,
QueryRequest,
CpkInfo)
from ._serialize import get_modify_conditions, get_source_conditions, get_cpk_scope_info, get_api_version
from ._serialize import get_modify_conditions, get_source_conditions, get_cpk_scope_info, get_api_version, \
get_quick_query_serialization_info
from ._deserialize import get_page_ranges_result, deserialize_blob_properties, deserialize_blob_stream
from ._quick_query_helper import QuickQueryReader
from ._upload_helpers import (
upload_block_blob,
upload_append_blob,
Expand All @@ -51,9 +53,7 @@
from datetime import datetime
from ._generated.models import BlockList
from ._models import ( # pylint: disable=unused-import
ContainerProperties,
BlobProperties,
BlobSasPermissions,
ContentSettings,
PremiumPageBlobTier,
StandardBlobTier,
Expand Down Expand Up @@ -616,6 +616,103 @@ def download_blob(self, offset=None, length=None, **kwargs):
**kwargs)
return StorageStreamDownloader(**options)

def _quick_query_options(self, query_expression,
**kwargs):
# type: (str, **Any) -> Dict[str, Any]
input_serialization = kwargs.pop('input_serialization', None)
output_serialization = kwargs.pop('output_serialization', None)
query_request = QueryRequest(expression=query_expression,
input_serialization=get_quick_query_serialization_info(input_serialization),
output_serialization=get_quick_query_serialization_info(output_serialization))
access_conditions = get_access_conditions(kwargs.pop('lease', None))
mod_conditions = get_modify_conditions(kwargs)

cpk = kwargs.pop('cpk', None)
cpk_info = None
if cpk:
if self.scheme.lower() != 'https':
raise ValueError("Customer provided encryption key must be used over HTTPS.")
cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
encryption_algorithm=cpk.algorithm)
options = {
'query_request': query_request,
'lease_access_conditions': access_conditions,
'modified_access_conditions': mod_conditions,
'cpk_info': cpk_info,
'progress_callback': kwargs.pop('progress_callback', None),
'snapshot': self.snapshot,
'timeout': kwargs.pop('timeout', None),
'cls': return_headers_and_deserialized,
'client': self._client,
'name': self.blob_name,
'container': self.container_name}
options.update(kwargs)
return options

@distributed_trace
def query(self, query_expression, # type: str
Copy link
Member

@annatisch annatisch May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be
def query_data(self, query, **kwargs)
or
def query_blob_data(self, query, **kwargs)

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the returned items are not blobs, it's the partial content of a single blob(eg. the first column of a csv file)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the platforms are calling it query, so do we want all platforms to rename it?

**kwargs):
# type: (str, **Any) -> QuickQueryReader
"""Enables users to select/project on blob/or blob snapshot data by providing simple query expressions.
This operations returns a QuickQueryReader, users need to use readall() or readinto() to get query data.

:param str query_expression:
Required. a query statement.
:keyword func(~azure.storage.blob.QuickQueryError, int, int) progress_callback:
Callback where the caller can track progress of the operation as well as the quick query failures.
:keyword input_serialization:
Optional. Defines the input serialization for a blob quick query request.
This keyword arg could be set for delimited (CSV) serialization or JSON serialization.
When the input_serialization is set for JSON records, only a record separator in str format is needed.
:paramtype input_serialization: ~azure.storage.blob.DelimitedTextConfiguration or str
:keyword output_serialization:
Optional. Defines the output serialization for a blob quick query request.
This keyword arg could be set for delimited (CSV) serialization or JSON serialization.
When the input_serialization is set for JSON records, only a record separator in str format is needed.
:paramtype output_serialization: ~azure.storage.blob.DelimitedTextConfiguration or str.
:keyword lease:
Required if the blob has an active lease. Value can be a BlobLeaseClient object
or the lease ID as a string.
:paramtype lease: ~azure.storage.blob.BlobLeaseClient or str
:keyword ~datetime.datetime if_modified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only
if the resource has been modified since the specified time.
:keyword ~datetime.datetime if_unmodified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only if
the resource has not been modified since the specified date/time.
:keyword str etag:
An ETag value, or the wildcard character (*). Used to check if the resource has changed,
and act according to the condition specified by the `match_condition` parameter.
:keyword ~azure.core.MatchConditions match_condition:
The match condition to use upon the etag.
:keyword ~azure.storage.blob.CustomerProvidedEncryptionKey cpk:
Encrypts the data on the service-side with the given key.
Use of customer-provided keys must be done over HTTPS.
As the encryption key itself is provided in the request,
a secure connection must be established to transfer the key.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:returns: A streaming object (QuickQueryReader)
:rtype: ~azure.storage.blob.QuickQueryReader

.. admonition:: Example:

.. literalinclude:: ../samples/blob_samples_query.py
:start-after: [START query]
:end-before: [END query]
:language: python
:dedent: 4
:caption: select/project on blob/or blob snapshot data by providing simple query expressions.
"""
options = self._quick_query_options(query_expression, **kwargs)
return QuickQueryReader(**options)

@staticmethod
def _generic_delete_blob_options(delete_snapshots=False, **kwargs):
# type: (bool, **Any) -> Dict[str, Any]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# license information.
# --------------------------------------------------------------------------
# pylint: disable=no-self-use

from typing import ( # pylint: disable=unused-import
Tuple, Dict, List,
TYPE_CHECKING
Expand Down
46 changes: 46 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._generated.models import StorageErrorException
from ._generated.models import BlobPrefix as GenBlobPrefix
from ._generated.models import BlobItemInternal
from ._generated.models import DelimitedTextConfiguration as GenDelimitedTextConfiguration


class BlobType(str, Enum):
Expand Down Expand Up @@ -1092,3 +1093,48 @@ def _from_generated(cls, generated):
)
return scope
return None


class DelimitedTextConfiguration(GenDelimitedTextConfiguration):
"""Defines the input or output delimited (CSV) serialization for a blob quick query request.

:keyword str column_separator: column separator, defaults to ','
:keyword str field_quote: field quote, defaults to '"'
:keyword str record_separator: record separator, defaults to '\n'
:keyword str escape_char: escape char, defaults to empty
:keyword bool headers_present: has headers, defaults to False
"""
def __init__(self, **kwargs):
column_separator = kwargs.pop('column_separator', ',')
field_quote = kwargs.pop('field_quote', '"')
record_separator = kwargs.pop('record_separator', '\n')
escape_char = kwargs.pop('escape_char', "")
headers_present = kwargs.pop('headers_present', False)

super(DelimitedTextConfiguration, self).__init__(
column_separator=column_separator,
field_quote=field_quote,
record_separator=record_separator,
escape_char=escape_char,
headers_present=headers_present)


class QuickQueryError(object):
"""The error happened during quick query operation.

:ivar str name:
The name of the error.
:ivar bool is_fatal:
If true, this error prevents further query processing. More result data may be returned,
but there is no guarantee that all of the original data will be processed.
If false, this error does not prevent further query processing.
:ivar str description:
A description of the error.
:ivar int position:
The blob offset at which the error occurred.
"""
def __init__(self, name=None, is_fatal=False, description=None, position=None):
self.name = name
self.is_fatal = is_fatal
self.description = description
self.position = position
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from io import BytesIO

from ._shared.avro.datafile import DataFileReader
from ._shared.avro.avro_io import DatumReader

from ._models import QuickQueryError


class QuickQueryReader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to read quick query result.

:ivar str name:
The name of the blob for the quick query request.
:ivar str container:
The name of the container where the blob is.
:ivar dict response_headers:
The response_headers of the quick query request.
:ivar int total_bytes:
The size of the total data in the stream.
"""

def __init__(
self,
client=None,
name=None,
container=None,
progress_callback=None,
encoding=None,
**kwargs
):
self.name = name
self.container = container
self.response_headers = None
self.total_bytes = None
self.bytes_processed = 0
self._progress_callback = progress_callback
self._client = client
self._request_options = kwargs
self._encoding = encoding

def __len__(self):
return self.total_bytes

def readall(self):
"""Return all quick query results.

This operation is blocking until all data is downloaded.
:rtype: bytes
"""
stream = BytesIO()
self.readinto(stream)
data = stream.getvalue()
if self._encoding:
return data.decode(self._encoding)
return data

def readinto(self, stream):
"""Download the quick query result to a stream.

:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream.
:returns: None
"""
headers, raw_response_body = self._client.blob.quick_query(**self._request_options)
self.response_headers = headers
self._parse_quick_query_result(raw_response_body, stream, progress_callback=self._progress_callback)

def _parse_quick_query_result(self, raw_response_body, stream, progress_callback=None):
parsed_results = DataFileReader(QuickQueryStreamer(raw_response_body), DatumReader())
for parsed_result in parsed_results:
if parsed_result.get('data'):
stream.write(parsed_result.get('data'))
continue

self.bytes_processed = parsed_result.get('bytesScanned')
self.total_bytes = parsed_result.get('totalBytes')
fatal = QuickQueryError(parsed_result['name'],
parsed_result['fatal'],
parsed_result['description'],
parsed_result['position']) if parsed_result.get('fatal') else None
if progress_callback:
progress_callback(fatal, self.bytes_processed, self.total_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this progress callback - instead we can make this object iterable, so that if a user needs this information that can iterate over the data in the result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fatal, bytes_processed and total bytes are service defined schema, they are mixed with data in query response body. That means when we use avro to parse the query response body, avro could give us some data(not guaranteed a full record), fatal, bytes_processed or total bytes. If we parse the record for users, we bytes_processed will not make sense, because we can only give user the full record even more bytes for the next record has been returned by arvo parser.

Besides the record could be JSON, or CSV type, the record separator and column separator etc. are based on what user specified which make it more difficult to extract the result from

Also currently query doesn't allow us to do range download.. Once we trigger query, it will keep returning data until there's no more result. So we cannot use ItemPaged....

We had a discussion across all platforms and think make it has a similar interface as download, we will have further discussion if you feel strongly about making it iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts @annatisch ?



class QuickQueryStreamer(object):
"""
File-like streaming iterator.
"""

def __init__(self, generator):
self.generator = generator
self.iterator = iter(generator)
self._buf = b""
self._point = 0
self._download_offset = 0
self._buf_start = 0
self.file_length = None

def __len__(self):
return self.file_length

def __iter__(self):
return self.iterator

@staticmethod
def seekable():
return True

def next(self):
next_part = next(self.iterator)
self._download_offset += len(next_part)
return next_part

def tell(self):
return self._point

def seek(self, offset, whence=0):
if whence == 0:
self._point = offset
elif whence == 1:
self._point += offset
else:
raise ValueError("whence must be 0, or 1")
if self._point < 0:
self._point = 0 # XXX is this right?

def read(self, size):
try:
# keep reading from the generator until the buffer of this stream has enough data to read
while self._point + size > self._download_offset:
self._buf += self.next()
except StopIteration:
self.file_length = self._download_offset

start_point = self._point

# EOF
self._point = min(self._point + size, self._download_offset)

relative_start = start_point - self._buf_start
if relative_start < 0:
raise ValueError("Buffer has dumped too much data")
relative_end = relative_start + size
data = self._buf[relative_start: relative_end]

# dump the extra data in buffer
# buffer start--------------------16bytes----current read position
dumped_size = max(relative_end - 16 - relative_start, 0)
self._buf_start += dumped_size
self._buf = self._buf[dumped_size:]

return data
Loading