Skip to content
Merged
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 154 additions & 91 deletions contentctl/objects/correlation_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,30 @@ class TimeoutConfig(IntEnum):
"""

# base amount to sleep for before beginning exponential backoff during testing
BASE_SLEEP = 60
BASE_SLEEP = 2

# NOTE: Some detections take longer to generate their risk/notables than other; testing has
# shown 270s to likely be sufficient for all detections in 99% of runs; however we have
# encountered a handful of transient failures in the last few months. Since our success rate
# is at 100% now, we will round this to a flat 300s to accomodate these outliers.
# shown 30s to likely be sufficient for all detections in 99% of runs and less than 1% of detections
# would need 60s and 90s to wait for risk/notables; therefore 30s is a reasonable interval for max
# wait time.
# Max amount to wait before timing out during exponential backoff
MAX_SLEEP = 300
MAX_SLEEP = 30

# NOTE: Based on testing, 99% of detections will generate risk/notables within 30s, and the remaining 1% of
# detections may need up to 150s to finish; so this is a reasonable total maximum wait time
# Total wait time before giving up on waiting for risk/notables to be generated
TOTAL_MAX_WAIT = 180

# NOTE: Based on testing, there is 1% detections couldn't generate risk/notables within single dispatch, and
# they needed to be retried; 90s is a reasonable wait time before retrying dispatching the SavedSearch
# Wait time before retrying dispatching the SavedSearch
RETRY_DISPATCH = 90

# NOTE: Based on testing, 99% of detections will generate risk/notables within 30s, and the validation of risks
# and notables would take around 5 to 10 seconds; so before adding additional wait time, we let the validation
# process work as the default wait time until we reach the ADD_WAIT_TIME and add additional wait time
# Time elased before adding additional wait time
ADD_WAIT_TIME = 30


# TODO (#226): evaluate sane defaults for timeframe for integration testing (e.g. 5y is good
Expand All @@ -88,7 +104,7 @@ class ScheduleConfig(StrEnum):

EARLIEST_TIME = "-5y@y"
LATEST_TIME = "-1m@m"
CRON_SCHEDULE = "*/1 * * * *"
CRON_SCHEDULE = "0 0 1 1 *"


class ResultIterator:
Expand Down Expand Up @@ -202,6 +218,9 @@ class CorrelationSearch(BaseModel):
# cleanup of this index
test_index: str | None = Field(default=None, min_length=1)

# The search ID of the last dispatched search; this is used to query for risk/notable events
sid: str | None = Field(default=None)

# The logger to use (logs all go to a null pipe unless ENABLE_LOGGING is set to True, so as not
# to conflict w/ tqdm)
logger: logging.Logger = Field(
Expand Down Expand Up @@ -437,6 +456,34 @@ def enable(self, refresh: bool = True) -> None:
if refresh:
self.refresh()

def dispatch(self) -> splunklib.Job:
"""Dispatches the SavedSearch

Dispatches the SavedSearch entity, returning a Job object representing the search job.
:return: a splunklib.Job object representing the search job when the SavedSearch is finished running
"""
self.logger.debug(f"Dispatching {self.name}...")
try:
job = self.saved_search.dispatch(trigger_actions=True)

time_to_execute = 0
# Check if the job is finished
while not job.is_done():
self.logger.info(f"Job {job.sid} is still running...")
time.sleep(1)
time_to_execute += 1
self.logger.info(
f"Job {job.sid} has finished running in {time_to_execute} seconds."
)

self.sid = job.sid

return job # type: ignore
except HTTPError as e:
raise ServerError(
f"HTTP error encountered while dispatching detection: {e}"
)

def disable(self, refresh: bool = True) -> None:
"""Disables the SavedSearch

Expand Down Expand Up @@ -496,6 +543,10 @@ def force_run(self, refresh: bool = True) -> None:
self.update_timeframe(refresh=False)
if not self.enabled:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want this behavior where force_run will throw a warning if called after the detection is enabled?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so? Is there a reason we would like to remove the warning?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force_run function is removed after refactoring as discussed.

self.enable(refresh=False)
job = self.dispatch()
self.logger.info(
f"Finished running detection '{self.name}' with job ID: {job.sid}"
)
else:
self.logger.warning(f"Detection '{self.name}' was already enabled")

Expand Down Expand Up @@ -535,10 +586,15 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]:

# TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID
# Search for all risk events from a single scheduled search (indicated by orig_sid)
query = (
f'search index=risk search_name="{self.name}" [search index=risk search '
f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson'
)
if self.sid is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case of maintaining this code path? Do we expect to be invoking get_risk_events without a successful dispatch returning an SID?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 1029 to line 1040, there are codes to make sure the indexes are currently empty and the detection is starting from a disabled state calling the self.risk_event_exists().

These validations are happened before dispatch(), therefore, the self.sid is always None for this case. If we drop the previous old query, the new query would always return empty and might give false positive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see. In that case I have two thoughts:

  1. Do we still need this cleanup check at the start of every test? This was very necessary when this was first implemented, as we did no filtering on SIDs of any kind, or even on the search name. We just said that the indexes needed to start empty, and if anything ended up in them, it was a success (so cleaning up beforehand was essential). Now, we search on both the detection name and the dispatched SID, so the cleanup prior to testing becomes less relevant. What do we think about dropping it? @xqi-splunk @pyth0n1c

  2. If we want to keep the cleanup prior to testing, then rework risk_event_exists() s.t. it takes a flag indicating whether we're checking for any risk/notable events related to this search, or for those specific to the SID. e.g. a param like for_sid which defaults to True. If for_sid is True and the SID is still None, it should throw an error. If for_sid is False, it should just search for any events matching the detection (no need for the subquery filtering).

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are filtering by SID (and on top of that the search name) I do not think this is required. We do run the post cleanup here as well:

Given that we are presently relying on the caller to do Attack Data Cleanup, I think this change is okay and is a small speed optimization of a second or two per test due to running less SPL (having this cleanup up function remove attack data, if we passed test_index as a value other than None would cause issues here anyway).

tl;dr - I would remove the cleanup check at the start of every test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the second point, I wonder what's the use case of for_sid set to False? Since we set to have maximum 3 retries, I feel like might be a bit risky if we simply search for any events matching the detection. That might give out doubling risk/notable objects. Suppose we encounter some extreme cases where the risk/notable objects doesn't generate in the first attempt but some how generated during the second attempt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For option 2, I was only suggesting that bool field in the case we still wanted to do cleanup pre-test. But I agree with Eric for all the reasons mentioned. I would remove this code path (or throw on self.sid is None) and remove the pre-test cleanup

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the cleanup check and removed the if self.sid is None: code path.

# query for validating detection is starting from a disabled state
query = (
f'search index=risk search_name="{self.name}" [search index=risk search '
f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson'
)
else:
# query after the detection has been enabled and dispatched
query = f'search index=risk search_name="{self.name}" orig_sid="{self.sid}" | tojson'
result_iterator = self._search(query)

# Iterate over the events, storing them in a list and checking for any errors
Expand Down Expand Up @@ -611,10 +667,15 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]:
return self._notable_events

# Search for all notable events from a single scheduled search (indicated by orig_sid)
query = (
f'search index=notable search_name="{self.name}" [search index=notable search '
f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson'
)
if self.sid is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the if self.sid is None: code path.

# query for validating detection is starting from a disabled state
query = (
f'search index=notable search_name="{self.name}" [search index=notable search '
f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson'
)
else:
# query after the detection has been enabled and dispatched
query = f'search index=notable search_name="{self.name}" orig_sid="{self.sid}" | tojson'
result_iterator = self._search(query)

# Iterate over the events, storing them in a list and checking for any errors
Expand Down Expand Up @@ -881,6 +942,59 @@ def notable_in_risk_dm(self) -> bool:
return True
return False

def validate_risk_notable_events(self) -> tuple[bool, str | None]:
"""Validates the existence of risk and notable events

Returns a bool indicating whether validating risks and notables is successful or not,
and a message indicating the reason for failure (if any).

:return: True if validation passes, False if it fails; None if no message, or a string
message indicating the reason for failure
"""
try:
# Validate risk events
if self.has_risk_analysis_action:
self.logger.debug("Checking for matching risk events")
if self.risk_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
# -> I've observed there being a missing risk event (15/16) on
# the first few tries, so this does help us check for true
# positives; BUT, if we have lots of failing detections, this
# will definitely add to the total wait time
# -> certain types of failures (e.g. risk message, or any value
# checking) should fail testing automatically
# -> other types, like those based on counts of risk events,
# should happen should fail more slowly as more events may be
# produced
self.validate_risk_events()
else:
raise ValidationFailed(
f"TEST FAILED: No matching risk event created for: {self.name}"
)
else:
self.logger.debug(f"No risk action defined for '{self.name}'")

# Validate notable events
if self.has_notable_action:
self.logger.debug("Checking for matching notable events")
# NOTE: because we check this last, if both fail, the error message about notables will
# always be the last to be added and thus the one surfaced to the user
if self.notable_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
self.validate_notable_events()
pass
else:
raise ValidationFailed(
f"TEST FAILED: No matching notable event created for: {self.name}"
)
else:
self.logger.debug(f"No notable action defined for '{self.name}'")

return True, None
except ValidationFailed as e:
self.logger.error(f"Risk/notable validation failed: {e}")
return False, str(e)

# NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls
# it for completion, but that seems more tricky
def test(
Expand Down Expand Up @@ -910,10 +1024,6 @@ def test(

# keep track of time slept and number of attempts for exponential backoff (base 2)
elapsed_sleep_time = 0
num_tries = 0

# set the initial base sleep time
time_to_sleep = TimeoutConfig.BASE_SLEEP

try:
# first make sure the indexes are currently empty and the detection is starting from a disabled state
Expand Down Expand Up @@ -946,92 +1056,45 @@ def test(
self.update_pbar(TestingStates.FORCE_RUN)
self.force_run()

# loop so long as the elapsed time is less than max_sleep
while elapsed_sleep_time < max_sleep:
# sleep so the detection job can finish
self.logger.info(
f"Waiting {time_to_sleep} for {self.name} so it can finish"
)
self.update_pbar(TestingStates.VALIDATING)
time.sleep(time_to_sleep)
elapsed_sleep_time += time_to_sleep
max_total_wait = TimeoutConfig.TOTAL_MAX_WAIT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, max_total_wait and max_wait are slightly confusing as variable names

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After refactoring, the max_total_wait = TimeoutConfig.TOTAL_MAX_WAIT is no longer needed.

wait_time = TimeoutConfig.BASE_SLEEP
max_wait = TimeoutConfig.MAX_SLEEP
time_elapsed = 0

self.logger.info(
f"Validating detection (attempt #{num_tries + 1} - {elapsed_sleep_time} seconds elapsed of "
f"{max_sleep} max)"
)
while time_elapsed <= max_total_wait:
# wait at least 90 seconds to rerun the SavedSearch
if time_elapsed > TimeoutConfig.RETRY_DISPATCH:
self.dispatch()

start_time = time.time()

# wait at least 30 seconds before adding to the wait time
if time_elapsed > TimeoutConfig.ADD_WAIT_TIME:
time.sleep(wait_time)
elapsed_sleep_time += wait_time
wait_time = min(max_wait, wait_time * 2)

# reset the result to None on each loop iteration
result = None

try:
# Validate risk events
if self.has_risk_analysis_action:
self.logger.debug("Checking for matching risk events")
if self.risk_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
# -> I've observed there being a missing risk event (15/16) on
# the first few tries, so this does help us check for true
# positives; BUT, if we have lots of failing detections, this
# will definitely add to the total wait time
# -> certain types of failures (e.g. risk message, or any value
# checking) should fail testing automatically
# -> other types, like those based on counts of risk events,
# should happen should fail more slowly as more events may be
# produced
self.validate_risk_events()
else:
raise ValidationFailed(
f"TEST FAILED: No matching risk event created for: {self.name}"
)
else:
self.logger.debug(
f"No risk action defined for '{self.name}'"
)
validate_pass, error = self.validate_risk_notable_events()

# Validate notable events
if self.has_notable_action:
self.logger.debug("Checking for matching notable events")
# NOTE: because we check this last, if both fail, the error message about notables will
# always be the last to be added and thus the one surfaced to the user
if self.notable_event_exists():
# TODO (PEX-435): should this in the retry loop? or outside it?
self.validate_notable_events()
pass
else:
raise ValidationFailed(
f"TEST FAILED: No matching notable event created for: {self.name}"
)
else:
self.logger.debug(
f"No notable action defined for '{self.name}'"
)
except ValidationFailed as e:
self.logger.error(f"Risk/notable validation failed: {e}")
result = IntegrationTestResult(
status=TestResultStatus.FAIL,
message=f"TEST FAILED: {e}",
wait_duration=elapsed_sleep_time,
)

# if result is still None, then all checks passed and we can break the loop
if result is None:
# if result is True, then all checks passed and we can break the loop
if validate_pass:
result = IntegrationTestResult(
status=TestResultStatus.PASS,
message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}",
wait_duration=elapsed_sleep_time,
)
break
else:
result = IntegrationTestResult(
status=TestResultStatus.FAIL,
message=f"TEST FAILED: {error}",
)

# increment number of attempts to validate detection
num_tries += 1

# compute the next time to sleep for
time_to_sleep = 2**num_tries

# if the computed time to sleep will exceed max_sleep, adjust appropriately
if (elapsed_sleep_time + time_to_sleep) > max_sleep:
time_to_sleep = max_sleep - elapsed_sleep_time
end_time = time.time()
time_elapsed += end_time - start_time

# TODO (PEX-436): should cleanup be in a finally block so it runs even on exception?
# cleanup the created events, disable the detection and return the result
Expand Down
Loading