Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Adapt to changes in Python 3.14 for scheduler and type hints
  • Loading branch information
chalmerlowe committed Oct 22, 2025
commit 5b653d8191ebb299259effb61877d7598c180937
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
return self._dispatcher

@property
def leaser(self) -> Optional[leaser.Leaser]:
def leaser(self) -> Optional["leaser.Leaser"]:
"""The leaser helper."""
return self._leaser

Expand Down
19 changes: 17 additions & 2 deletions google/cloud/pubsub_v1/subscriber/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import abc
import concurrent.futures
import queue
import sys
import typing
from typing import Callable, List, Optional
import warnings
Expand All @@ -37,7 +38,7 @@ class Scheduler(metaclass=abc.ABCMeta):

@property
@abc.abstractmethod
def queue(self) -> queue.Queue: # pragma: NO COVER
def queue(self) -> "queue.Queue": # pragma: NO COVER
"""Queue: A concurrency-safe queue specific to the underlying
concurrency implementation.

Expand Down Expand Up @@ -162,7 +163,21 @@ def shutdown(
work_item = self._executor._work_queue.get(block=False)
if work_item is None: # Exceutor in shutdown mode.
continue
dropped_messages.append(work_item.args[0]) # type: ignore[index]

dropped_message = None
if sys.version_info < (3, 14):
# For Python < 3.14, work_item.args is a tuple of positional arguments.
# The message is expected to be the first argument.
if hasattr(work_item, 'args') and work_item.args:
dropped_message = work_item.args[0] # type: ignore[index]
else:
# For Python >= 3.14, work_item.task is (fn, args, kwargs).
# The message is expected to be the first item in the args tuple (task[1]).
if hasattr(work_item, 'task') and len(work_item.task) == 3 and work_item.task[1]:
dropped_message = work_item.task[1][0]

if dropped_message is not None:
dropped_messages.append(dropped_message)
except queue.Empty:
pass

Expand Down
Loading