Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Py310 fix (#6)
* Revert "add activity loop retry"

This reverts commit 6e3112a.

* added asyncio.CancelledError to retry exception handling

* sepereated CancelledError from normal Exception in retry decorator

* minor syntax update

* catch gaierror exception as well

Co-authored-by: Osama Maharmeh <[email protected]>
  • Loading branch information
malkaed and NullOsama authored May 31, 2022
commit aab27d8f10435e006c016c361e8874ed5a22301f
3 changes: 1 addition & 2 deletions temporal/activity_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from temporal.activity import ActivityContext, ActivityTask, complete_exceptionally, complete
from temporal.api.taskqueue.v1 import TaskQueue, TaskQueueMetadata
from temporal.converter import get_fn_args_type_hints
from temporal.retry import RetryException, retry
from temporal.retry import retry
from temporal.service_helpers import get_identity
from temporal.worker import Worker, StopRequestedException
from temporal.api.workflowservice.v1 import WorkflowServiceStub as WorkflowService, PollActivityTaskQueueRequest, \
Expand Down Expand Up @@ -93,4 +93,3 @@ async def activity_task_loop_func(worker: Worker):
finally:
worker.notify_thread_stopped()
logger.info("Activity loop ended")
raise RetryException('sleep')
7 changes: 3 additions & 4 deletions temporal/retry.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import asyncio
import calendar
from socket import gaierror
import time

INITIAL_DELAY_SECONDS = 3
BACK_OFF_MULTIPLIER = 2
MAX_DELAY_SECONDS = 5 * 60
RESET_DELAY_AFTER_SECONDS = 10 * 60

class RetryException(Exception):
pass

def retry(logger=None):
def wrapper(fp):
Expand All @@ -19,8 +18,8 @@ async def retry_loop(*args, **kwargs):
await fp(*args, **kwargs)
logger.debug("@retry decorated function %s exited, ending retry loop", fp.__name__)
break
except RetryException:
logger.info('sleeping...')
except (asyncio.CancelledError, gaierror) as err:
logger.info(f"{fp.__name__} raised {err}, retrying...")
await asyncio.sleep(INITIAL_DELAY_SECONDS)
except Exception as ex:
now = calendar.timegm(time.gmtime())
Expand Down