Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refractor retry block structure
  • Loading branch information
xqi-splunk committed Jun 24, 2025
commit 150c2096d45d7156d270642ebb376364df1f8653
207 changes: 100 additions & 107 deletions contentctl/objects/correlation_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ class TimeoutConfig(IntEnum):
# Max amount to wait before timing out during exponential backoff
MAX_SLEEP = 30

# NOTE: Based on testing, 99% of detections will generate risk/notables within 30s, and the remaining 1% of
# detections may need up to 150s to finish; so this is a reasonable total maximum wait time
# Total wait time before giving up on waiting for risk/notables to be generated
TOTAL_MAX_WAIT = 180

# NOTE: Based on testing, there is 1% detections couldn't generate risk/notables within single dispatch, and
# they needed to be retried; 90s is a reasonable wait time before retrying dispatching the SavedSearch
# Wait time before retrying dispatching the SavedSearch
Expand Down Expand Up @@ -533,26 +528,6 @@ def update_timeframe(
if refresh:
self.refresh()

def force_run(self, refresh: bool = True) -> None:
"""Forces a detection run

Enables the detection, adjusts the cron schedule to run every 1 minute, and widens the earliest/latest window
to run on test data.
:param refresh: a bool indicating whether to refresh the metadata for the detection (default True)
"""
self.update_timeframe(refresh=False)
if not self.enabled:
self.enable(refresh=False)
job = self.dispatch()
self.logger.info(
f"Finished running detection '{self.name}' with job ID: {job.sid}"
)
else:
self.logger.warning(f"Detection '{self.name}' was already enabled")

if refresh:
self.refresh()

def risk_event_exists(self) -> bool:
"""Whether at least one matching risk event exists

Expand Down Expand Up @@ -942,59 +917,93 @@ def notable_in_risk_dm(self) -> bool:
return True
return False

def validate_risk_notable_events(self) -> tuple[bool, str | None]:
"""Validates the existence of risk and notable events

Returns a bool indicating whether validating risks and notables is successful or not,
and a message indicating the reason for failure (if any).

:return: True if validation passes, False if it fails; None if no message, or a string
message indicating the reason for failure
def validate_risk_notable_events(self) -> None:
"""
try:
# Validate risk events
if self.has_risk_analysis_action:
self.logger.debug("Checking for matching risk events")
if self.risk_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
# -> I've observed there being a missing risk event (15/16) on
# the first few tries, so this does help us check for true
# positives; BUT, if we have lots of failing detections, this
# will definitely add to the total wait time
# -> certain types of failures (e.g. risk message, or any value
# checking) should fail testing automatically
# -> other types, like those based on counts of risk events,
# should happen should fail more slowly as more events may be
# produced
self.validate_risk_events()
else:
raise ValidationFailed(
f"TEST FAILED: No matching risk event created for: {self.name}"
)
Validate the risk and notable events created by the saved search
"""
# Validate risk events
if self.has_risk_analysis_action:
self.logger.debug("Checking for matching risk events")
if self.risk_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
# -> I've observed there being a missing risk event (15/16) on
# the first few tries, so this does help us check for true
# positives; BUT, if we have lots of failing detections, this
# will definitely add to the total wait time
# -> certain types of failures (e.g. risk message, or any value
# checking) should fail testing automatically
# -> other types, like those based on counts of risk events,
# should happen should fail more slowly as more events may be
# produced
self.validate_risk_events()
else:
self.logger.debug(f"No risk action defined for '{self.name}'")

# Validate notable events
if self.has_notable_action:
self.logger.debug("Checking for matching notable events")
# NOTE: because we check this last, if both fail, the error message about notables will
# always be the last to be added and thus the one surfaced to the user
if self.notable_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
self.validate_notable_events()
pass
else:
raise ValidationFailed(
f"TEST FAILED: No matching notable event created for: {self.name}"
)
raise ValidationFailed(
f"TEST FAILED: No matching risk event created for: {self.name}"
)
else:
self.logger.debug(f"No risk action defined for '{self.name}'")

# Validate notable events
if self.has_notable_action:
self.logger.debug("Checking for matching notable events")
# NOTE: because we check this last, if both fail, the error message about notables will
# always be the last to be added and thus the one surfaced to the user
if self.notable_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
self.validate_notable_events()
pass
else:
self.logger.debug(f"No notable action defined for '{self.name}'")
raise ValidationFailed(
f"TEST FAILED: No matching notable event created for: {self.name}"
)
else:
self.logger.debug(f"No notable action defined for '{self.name}'")

def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None:
"""Dispatch the saved search and validate the risk/notable events

Dispatches the saved search and validates the risk/notable events created by it. If any
validation fails, raises a ValidationFailed exception.
"""
self.dispatch()

wait_time = TimeoutConfig.BASE_SLEEP
max_wait = TimeoutConfig.MAX_SLEEP
time_elapsed = 0
validation_failed = False

while time_elapsed <= TimeoutConfig.RETRY_DISPATCH:
start_time = time.time()

# reset validation_failed for each iteration
validation_failed = False

return True, None
except ValidationFailed as e:
self.logger.error(f"Risk/notable validation failed: {e}")
return False, str(e)
# wait at least 30 seconds before adding to the wait time
if time_elapsed > TimeoutConfig.ADD_WAIT_TIME:
time.sleep(wait_time)
elapsed_sleep_time["elapsed_sleep_time"] += wait_time
wait_time = min(max_wait, wait_time * 2)

try:
self.validate_risk_notable_events()
except ValidationFailed as e:
self.logger.error(f"Validation failed: {e}")
validation_failed = True

end_time = time.time()
time_elapsed += end_time - start_time

if not validation_failed:
self.logger.info(
f"Validation passed for {self.name} after {elapsed_sleep_time['elapsed_sleep_time']} seconds"
)
break

if validation_failed:
raise ValidationFailed(
f"TEST FAILED: No matching notable event created for: {self.name}"
)

# NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls
# it for completion, but that seems more tricky
def test(
Expand Down Expand Up @@ -1023,7 +1032,7 @@ def test(
result: IntegrationTestResult | None = None

# keep track of time slept and number of attempts for exponential backoff (base 2)
elapsed_sleep_time = 0
elapsed_sleep_time = {'elapsed_sleep_time': 0}

try:
# first make sure the indexes are currently empty and the detection is starting from a disabled state
Expand Down Expand Up @@ -1054,47 +1063,31 @@ def test(
# force the detection to run
self.logger.info(f"Forcing a run on {self.name}")
self.update_pbar(TestingStates.FORCE_RUN)
self.force_run()

max_total_wait = TimeoutConfig.TOTAL_MAX_WAIT
wait_time = TimeoutConfig.BASE_SLEEP
max_wait = TimeoutConfig.MAX_SLEEP
time_elapsed = 0

while time_elapsed <= max_total_wait:
# wait at least 90 seconds to rerun the SavedSearch
if time_elapsed > TimeoutConfig.RETRY_DISPATCH:
self.dispatch()

start_time = time.time()

# wait at least 30 seconds before adding to the wait time
if time_elapsed > TimeoutConfig.ADD_WAIT_TIME:
time.sleep(wait_time)
elapsed_sleep_time += wait_time
wait_time = min(max_wait, wait_time * 2)
self.update_timeframe(refresh=False)
self.enable(refresh=False)

attempt = 1
while attempt <= 3:
# reset the result to None on each loop iteration
result = None

validate_pass, error = self.validate_risk_notable_events()

# if result is True, then all checks passed and we can break the loop
if validate_pass:
attempt += 1
try:
self.dispatch_and_validate(elapsed_sleep_time)
except ValidationFailed as e:
self.logger.error(f"Risk/notable validation failed: {e}")
result = IntegrationTestResult(
status=TestResultStatus.FAIL,
message=f"TEST FAILED: {e}",
wait_duration=elapsed_sleep_time["elapsed_sleep_time"],
)
if result is None:
result = IntegrationTestResult(
status=TestResultStatus.PASS,
message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}",
wait_duration=elapsed_sleep_time,
wait_duration=elapsed_sleep_time["elapsed_sleep_time"],
)
break
else:
result = IntegrationTestResult(
status=TestResultStatus.FAIL,
message=f"TEST FAILED: {error}",
)

end_time = time.time()
time_elapsed += end_time - start_time

# TODO (PEX-436): should cleanup be in a finally block so it runs even on exception?
# cleanup the created events, disable the detection and return the result
Expand All @@ -1106,7 +1099,7 @@ def test(
result = IntegrationTestResult(
status=TestResultStatus.ERROR,
message=f"TEST FAILED (ERROR): Exception raised during integration test: {e}",
wait_duration=elapsed_sleep_time,
wait_duration=elapsed_sleep_time["elapsed_sleep_time"],
exception=e,
)
self.logger.exception(result.message) # type: ignore
Expand Down