Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions temporal/activity_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from temporal.activity import ActivityContext, ActivityTask, complete_exceptionally, complete
from temporal.api.taskqueue.v1 import TaskQueue, TaskQueueMetadata
from temporal.converter import get_fn_args_type_hints
from temporal.retry import RetryException, retry
from temporal.retry import retry
from temporal.service_helpers import get_identity
from temporal.worker import Worker, StopRequestedException
from temporal.api.workflowservice.v1 import WorkflowServiceStub as WorkflowService, PollActivityTaskQueueRequest, \
Expand Down Expand Up @@ -93,4 +93,3 @@ async def activity_task_loop_func(worker: Worker):
finally:
worker.notify_thread_stopped()
logger.info("Activity loop ended")
raise RetryException('sleep')
7 changes: 3 additions & 4 deletions temporal/retry.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import asyncio
import calendar
from socket import gaierror
import time

INITIAL_DELAY_SECONDS = 3
BACK_OFF_MULTIPLIER = 2
MAX_DELAY_SECONDS = 5 * 60
RESET_DELAY_AFTER_SECONDS = 10 * 60

class RetryException(Exception):
pass

def retry(logger=None):
def wrapper(fp):
Expand All @@ -19,8 +18,8 @@ async def retry_loop(*args, **kwargs):
await fp(*args, **kwargs)
logger.debug("@retry decorated function %s exited, ending retry loop", fp.__name__)
break
except RetryException:
logger.info('sleeping...')
except (asyncio.CancelledError, gaierror) as err:
logger.info(f"{fp.__name__} raised {err}, retrying...")
await asyncio.sleep(INITIAL_DELAY_SECONDS)
except Exception as ex:
now = calendar.timegm(time.gmtime())
Expand Down