Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add activity loop retry
  • Loading branch information
NullOsama authored Feb 17, 2022
commit 6e3112ab42ce5e1df41ae05a92b45eccb7b231f3
3 changes: 2 additions & 1 deletion temporal/activity_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from temporal.activity import ActivityContext, ActivityTask, complete_exceptionally, complete
from temporal.api.taskqueue.v1 import TaskQueue, TaskQueueMetadata
from temporal.converter import get_fn_args_type_hints
from temporal.retry import retry
from temporal.retry import RetryException, retry
from temporal.service_helpers import get_identity
from temporal.worker import Worker, StopRequestedException
from temporal.api.workflowservice.v1 import WorkflowServiceStub as WorkflowService, PollActivityTaskQueueRequest, \
Expand Down Expand Up @@ -93,3 +93,4 @@ async def activity_task_loop_func(worker: Worker):
finally:
worker.notify_thread_stopped()
logger.info("Activity loop ended")
raise RetryException('sleep')
5 changes: 5 additions & 0 deletions temporal/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
MAX_DELAY_SECONDS = 5 * 60
RESET_DELAY_AFTER_SECONDS = 10 * 60

class RetryException(Exception):
pass

def retry(logger=None):
def wrapper(fp):
Expand All @@ -17,6 +19,9 @@ async def retry_loop(*args, **kwargs):
await fp(*args, **kwargs)
logger.debug("@retry decorated function %s exited, ending retry loop", fp.__name__)
break
except RetryException:
logger.info('sleeping...')
await asyncio.sleep(INITIAL_DELAY_SECONDS)
except Exception as ex:
now = calendar.timegm(time.gmtime())
if last_failed_time == -1 or (now - last_failed_time) > RESET_DELAY_AFTER_SECONDS:
Expand Down