Skip to content
Next Next commit
feat: add cloud.region and transaction_tag in span attributes
  • Loading branch information
sinhasubham committed Oct 27, 2025
commit e49afa6931414609bd0e95210e24598f03465dfb
30 changes: 21 additions & 9 deletions examples/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

# Setup common variables that'll be used between Spanner and traces.
project_id = os.environ.get('SPANNER_PROJECT_ID', 'test-project')
project_id = os.environ.get('SPANNER_PROJECT_ID', 'span-cloud-testing')

def spanner_with_cloud_trace():
# [START spanner_opentelemetry_traces_cloudtrace_usage]
Expand Down Expand Up @@ -62,16 +62,13 @@ def spanner_with_otlp():


def main():
# Setup OpenTelemetry, trace and Cloud Trace exporter.
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = CloudTraceSpanExporter(project_id=project_id)
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))

# Setup the Cloud Spanner Client.
# Change to "spanner_client = spanner_with_otlp" to use OTLP exporter
spanner_client = spanner_with_cloud_trace()
instance = spanner_client.instance('test-instance')
database = instance.database('test-db')
instance = spanner_client.instance('suvham-testing')
instance.reload()
database = instance.database('gildb')
tracer_provider = spanner_client.observability_options["tracer_provider"]

# Set W3C Trace Context as the global propagator for end to end tracing.
set_global_textmap(TraceContextTextMapPropagator())
Expand All @@ -93,12 +90,27 @@ def main():
# Purposefully issue a bad SQL statement to examine exceptions
# that get recorded and a ERROR span status.
try:
data = snapshot.execute_sql('SELECT CURRENT_TIMESTAMPx()')
data = snapshot.execute_sql('SELECT CURRENT_TIMESTAMP()')
for row in data:
print(row)
except Exception as e:
print(e)

# Example of a read-write transaction with a transaction tag
with tracer.start_as_current_span('TaggedTransaction'):
def update_singer_name(transaction):
transaction.execute_update(
"UPDATE Singers SET FirstName = 'Timothy' WHERE SingerId = 1",
request_options={
"request_tag": "app=concert,env=dev,action=update"
},
)
print("Updated singer's name.")

database.run_in_transaction(
update_singer_name, transaction_tag="app=concert,env=dev"
)


if __name__ == '__main__':
main()
40 changes: 36 additions & 4 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import time
import base64
import threading
import logging

from google.protobuf.struct_pb2 import ListValue
from google.protobuf.struct_pb2 import Value
Expand All @@ -29,11 +30,16 @@
from google.api_core import datetime_helpers
from google.api_core.exceptions import Aborted
from google.cloud._helpers import _date_from_iso8601_date
from google.cloud.spanner_v1 import TypeCode
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1 import JsonObject, Interval
from google.cloud.spanner_v1 import TransactionOptions
from google.cloud.spanner_v1.types import ExecuteSqlRequest
from google.cloud.spanner_v1.types import TransactionOptions
from google.cloud.spanner_v1.data_types import JsonObject, Interval
from google.cloud.spanner_v1.request_id_header import with_request_id
from google.cloud.spanner_v1.types import TypeCode
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.resourcedetector.gcp_resource_detector import (
GoogleCloudResourceDetector,
)

from google.rpc.error_details_pb2 import RetryInfo

try:
Expand All @@ -55,6 +61,10 @@
+ "numeric has a whole component with precision {}"
)

GOOGLE_CLOUD_REGION_GLOBAL = "global"

log = logging.getLogger(__name__)


if HAS_OPENTELEMETRY_INSTALLED:

Expand All @@ -79,6 +89,28 @@ def set(self, carrier: List[Tuple[str, str]], key: str, value: str) -> None:
carrier.append((key, value))


def _get_cloud_region() -> str:
"""Get the location of the resource.

Returns:
str: The location of the resource. If OpenTelemetry is not installed, returns a global region.
"""
if not HAS_OPENTELEMETRY_INSTALLED:
return GOOGLE_CLOUD_REGION_GLOBAL
try:
detector = GoogleCloudResourceDetector()
resources = detector.detect()

if ResourceAttributes.CLOUD_REGION in resources.attributes:
return resources.attributes[ResourceAttributes.CLOUD_REGION]
except Exception as e:
log.warning(
"Failed to detect GCP resource location for Spanner metrics, defaulting to 'global'. Error: %s",
e,
)
return GOOGLE_CLOUD_REGION_GLOBAL


def _try_to_coerce_bytes(bytestring):
"""Try to coerce a byte string into the right thing based on Python
version and whether or not it is base64 encoded.
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud.spanner_v1 import SpannerClient
from google.cloud.spanner_v1 import gapic_version
from google.cloud.spanner_v1._helpers import (
_get_cloud_region,
_metadata_with_span_context,
)

Expand Down Expand Up @@ -85,6 +86,7 @@ def trace_call(
enable_end_to_end_tracing = False

db_name = ""
cloud_region = None
if session and getattr(session, "_database", None):
db_name = session._database.name

Expand All @@ -97,7 +99,9 @@ def trace_call(
"enable_end_to_end_tracing", enable_end_to_end_tracing
)
db_name = observability_options.get("db_name", db_name)
cloud_region = observability_options.get("cloud_region", cloud_region)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of this ?
IMU, observability_options is something which customers can set and does not have cloud_region property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is not required as _get_cloud_region is overriding the value in next line. Removed


cloud_region = _get_cloud_region()
tracer = get_tracer(tracer_provider)

# Set base attributes that we know for every trace created
Expand All @@ -107,6 +111,7 @@ def trace_call(
"db.instance": db_name,
"net.host.name": SpannerClient.DEFAULT_ENDPOINT,
OTEL_SCOPE_NAME: TRACER_NAME,
"cloud.region": cloud_region,
OTEL_SCOPE_VERSION: TRACER_VERSION,
# Standard GCP attributes for OTel, attributes are used for internal purpose and are subjected to change
"gcp.client.service": "spanner",
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,14 @@ def run_in_transaction(self, func, *args, **kw):
reraises any non-ABORT exceptions raised by ``func``.
"""
observability_options = getattr(self, "observability_options", None)
transaction_tag = kw.get("transaction_tag")
extra_attributes = {}
if transaction_tag:
extra_attributes["transaction.tag"] = transaction_tag

with trace_call(
"CloudSpanner.Database.run_in_transaction",
extra_attributes=extra_attributes,
observability_options=observability_options,
), MetricsCapture():
# Sanity check: Is there a transaction already running?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,9 @@
from .metrics_tracer_factory import MetricsTracerFactory
import os
import logging
from .constants import (
SPANNER_SERVICE_NAME,
GOOGLE_CLOUD_REGION_KEY,
GOOGLE_CLOUD_REGION_GLOBAL,
)
from .constants import SPANNER_SERVICE_NAME

try:
from opentelemetry.resourcedetector import gcp_resource_detector

# Overwrite the requests timeout for the detector.
# This is necessary as the client will wait the full timeout if the
# code is not run in a GCP environment, with the location endpoints available.
gcp_resource_detector._TIMEOUT_SEC = 0.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if we need this timeout in the new code after refactoring. Currently get region does not have any timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes got missed while moving the logic to helper. Added back


import mmh3

logging.getLogger("opentelemetry.resourcedetector.gcp_resource_detector").setLevel(
Expand All @@ -44,6 +33,7 @@

from .metrics_tracer import MetricsTracer
from google.cloud.spanner_v1 import __version__
from google.cloud.spanner_v1._helpers import _get_cloud_region
from uuid import uuid4

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,7 +76,7 @@ def __new__(
cls._metrics_tracer_factory.set_client_hash(
cls._generate_client_hash(client_uid)
)
cls._metrics_tracer_factory.set_location(cls._get_location())
cls._metrics_tracer_factory.set_location(_get_cloud_region())
cls._metrics_tracer_factory.gfe_enabled = gfe_enabled

if cls._metrics_tracer_factory.enabled != enabled:
Expand Down Expand Up @@ -153,28 +143,3 @@ def _generate_client_hash(client_uid: str) -> str:

# Return as 6 digit zero padded hex string
return f"{sig_figs:06x}"

@staticmethod
def _get_location() -> str:
"""Get the location of the resource.

In case of any error during detection, this method will log a warning
and default to the "global" location.

Returns:
str: The location of the resource. If OpenTelemetry is not installed, returns a global region.
"""
if not HAS_OPENTELEMETRY_INSTALLED:
return GOOGLE_CLOUD_REGION_GLOBAL
try:
detector = gcp_resource_detector.GoogleCloudResourceDetector()
resources = detector.detect()

if GOOGLE_CLOUD_REGION_KEY in resources.attributes:
return resources.attributes[GOOGLE_CLOUD_REGION_KEY]
except Exception as e:
log.warning(
"Failed to detect GCP resource location for Spanner metrics, defaulting to 'global'. Error: %s",
e,
)
return GOOGLE_CLOUD_REGION_GLOBAL
5 changes: 5 additions & 0 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,14 @@ def run_in_transaction(self, func, *args, **kw):
database = self._database
log_commit_stats = database.log_commit_stats

extra_attributes = {}
if transaction_tag:
extra_attributes["transaction.tag"] = transaction_tag

with trace_call(
"CloudSpanner.Session.run_in_transaction",
self,
extra_attributes=extra_attributes,
observability_options=getattr(database, "observability_options", None),
) as span, MetricsCapture():
attempts: int = 0
Expand Down
8 changes: 6 additions & 2 deletions google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,10 @@ def execute_update(
request_options = RequestOptions(request_options)
request_options.transaction_tag = self.transaction_tag

trace_attributes = {"db.statement": dml}
trace_attributes = {
"db.statement": dml,
"request_options": request_options,
}

# If this request begins the transaction, we need to lock
# the transaction until the transaction ID is updated.
Expand Down Expand Up @@ -629,7 +632,8 @@ def batch_update(

trace_attributes = {
# Get just the queries from the DML statement batch
"db.statement": ";".join([statement.sql for statement in parsed])
"db.statement": ";".join([statement.sql for statement in parsed]),
"request_options": request_options,
}

# If this request begins the transaction, we need to lock
Expand Down
43 changes: 43 additions & 0 deletions tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
import unittest
import mock

from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes


from google.cloud.spanner_v1 import TransactionOptions


Expand Down Expand Up @@ -89,6 +93,45 @@ def test_base_object_merge_dict(self):
self.assertEqual(result, expected)


class Test_get_cloud_region(unittest.TestCase):
def _callFUT(self, *args, **kw):
from google.cloud.spanner_v1._helpers import _get_cloud_region

return _get_cloud_region(*args, **kw)

@mock.patch("google.cloud.spanner_v1._helpers.GoogleCloudResourceDetector.detect")
def test_get_location_with_region(self, mock_detect):
"""Test that _get_cloud_region returns the region when detected."""
mock_resource = Resource.create(
{ResourceAttributes.CLOUD_REGION: "us-central1"}
)
mock_detect.return_value = mock_resource

location = self._callFUT()
self.assertEqual(location, "us-central1")

@mock.patch("google.cloud.spanner_v1._helpers.GoogleCloudResourceDetector.detect")
def test_get_location_without_region(self, mock_detect):
"""Test that _get_cloud_region returns 'global' when no region is detected."""
mock_resource = Resource.create({}) # No region attribute
mock_detect.return_value = mock_resource

location = self._callFUT()
self.assertEqual(location, "global")

@mock.patch("google.cloud.spanner_v1._helpers.GoogleCloudResourceDetector.detect")
def test_get_location_with_exception(self, mock_detect):
"""Test that _get_cloud_region returns 'global' and logs a warning on exception."""
mock_detect.side_effect = Exception("detector failed")

with self.assertLogs(
"google.cloud.spanner_v1._helpers", level="WARNING"
) as log:
location = self._callFUT()
self.assertEqual(location, "global")
self.assertIn("Failed to detect GCP resource location", log.output[0])


class Test_make_value_pb(unittest.TestCase):
def _callFUT(self, *args, **kw):
from google.cloud.spanner_v1._helpers import _make_value_pb
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test__opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def test_trace_call(self):
"db.type": "spanner",
"db.url": "spanner.googleapis.com",
"net.host.name": "spanner.googleapis.com",
"cloud.region": "global",
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
Expand Down Expand Up @@ -95,6 +96,7 @@ def test_trace_error(self):
"db.type": "spanner",
"db.url": "spanner.googleapis.com",
"net.host.name": "spanner.googleapis.com",
"cloud.region": "global",
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
"cloud.region": "global",
}
enrich_with_otel_scope(BASE_ATTRIBUTES)

Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class TestFixedSizePool(OpenTelemetryBase):
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
"cloud.region": "global",
}
enrich_with_otel_scope(BASE_ATTRIBUTES)

Expand Down Expand Up @@ -496,6 +497,7 @@ class TestBurstyPool(OpenTelemetryBase):
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
"cloud.region": "global",
}
enrich_with_otel_scope(BASE_ATTRIBUTES)

Expand Down Expand Up @@ -737,6 +739,7 @@ class TestPingingPool(OpenTelemetryBase):
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
"cloud.region": "global",
}
enrich_with_otel_scope(BASE_ATTRIBUTES)

Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class TestSession(OpenTelemetryBase):
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
"cloud.region": "global",
}
enrich_with_otel_scope(BASE_ATTRIBUTES)

Expand Down
Loading