diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 32643c68..ec8cb609 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -68,15 +68,20 @@ class TimeoutConfig(IntEnum): Configuration values for the exponential backoff timer """ + # NOTE: Some detections take longer to generate their risk/notables than other; testing has + # shown that in a single run, 99% detections could generate risk/notables within 30s and less than 1% + # detections (20 to 30 detections) would need 60 to 90s to wait for risk/notables. + # base amount to sleep for before beginning exponential backoff during testing - BASE_SLEEP = 60 + BASE_SLEEP = 2 - # NOTE: Some detections take longer to generate their risk/notables than other; testing has - # shown 270s to likely be sufficient for all detections in 99% of runs; however we have - # encountered a handful of transient failures in the last few months. Since our success rate - # is at 100% now, we will round this to a flat 300s to accomodate these outliers. - # Max amount to wait before timing out during exponential backoff - MAX_SLEEP = 300 + # NOTE: Based on testing, there are 45 detections couldn't generate risk/notables within single dispatch, and + # they needed to be retried; 90s is a reasonable wait time before retrying dispatching the SavedSearch + # Wait time before retrying dispatching the SavedSearch + RETRY_DISPATCH = 90 + + # Time elapsed before adding additional wait time + ADD_WAIT_TIME = 30 # TODO (#226): evaluate sane defaults for timeframe for integration testing (e.g. 5y is good @@ -88,7 +93,7 @@ class ScheduleConfig(StrEnum): EARLIEST_TIME = "-5y@y" LATEST_TIME = "-1m@m" - CRON_SCHEDULE = "*/1 * * * *" + CRON_SCHEDULE = "0 0 1 1 *" class ResultIterator: @@ -202,6 +207,9 @@ class CorrelationSearch(BaseModel): # cleanup of this index test_index: str | None = Field(default=None, min_length=1) + # The search ID of the last dispatched search; this is used to query for risk/notable events + sid: str | None = Field(default=None) + # The logger to use (logs all go to a null pipe unless ENABLE_LOGGING is set to True, so as not # to conflict w/ tqdm) logger: logging.Logger = Field( @@ -437,6 +445,34 @@ def enable(self, refresh: bool = True) -> None: if refresh: self.refresh() + def dispatch(self) -> splunklib.Job: + """Dispatches the SavedSearch + + Dispatches the SavedSearch entity, returning a Job object representing the search job. + :return: a splunklib.Job object representing the search job when the SavedSearch is finished running + """ + self.logger.debug(f"Dispatching {self.name}...") + try: + job = self.saved_search.dispatch(trigger_actions=True) + + time_to_execute = 0 + # Check if the job is finished + while not job.is_done(): + self.logger.debug(f"Job {job.sid} is still running...") + time.sleep(1) + time_to_execute += 1 + self.logger.debug( + f"Job {job.sid} has finished running in {time_to_execute} seconds." + ) + + self.sid = job.sid + + return job # type: ignore + except HTTPError as e: + raise ServerError( + f"HTTP error encountered while dispatching detection: {e}" + ) + def disable(self, refresh: bool = True) -> None: """Disables the SavedSearch @@ -486,22 +522,6 @@ def update_timeframe( if refresh: self.refresh() - def force_run(self, refresh: bool = True) -> None: - """Forces a detection run - - Enables the detection, adjusts the cron schedule to run every 1 minute, and widens the earliest/latest window - to run on test data. - :param refresh: a bool indicating whether to refresh the metadata for the detection (default True) - """ - self.update_timeframe(refresh=False) - if not self.enabled: - self.enable(refresh=False) - else: - self.logger.warning(f"Detection '{self.name}' was already enabled") - - if refresh: - self.refresh() - def risk_event_exists(self) -> bool: """Whether at least one matching risk event exists @@ -533,12 +553,8 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: ) return self._risk_events - # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID - # Search for all risk events from a single scheduled search (indicated by orig_sid) - query = ( - f'search index=risk search_name="{self.name}" [search index=risk search ' - f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' - ) + # Search for all risk events from a single search (indicated by orig_sid) + query = f'search index=risk search_name="{self.name}" orig_sid="{self.sid}" | tojson' result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors @@ -610,11 +626,8 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: ) return self._notable_events - # Search for all notable events from a single scheduled search (indicated by orig_sid) - query = ( - f'search index=notable search_name="{self.name}" [search index=notable search ' - f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' - ) + # Search for all notable events from a single search (indicated by orig_sid) + query = f'search index=notable search_name="{self.name}" orig_sid="{self.sid}" | tojson' result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors @@ -688,12 +701,10 @@ def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEve ) return self._risk_dm_events - # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID - # Search for all risk data model events from a single scheduled search (indicated by + # Search for all risk data model events from a single search (indicated by # orig_sid) query = ( - f'datamodel Risk All_Risk flat | search search_name="{self.name}" [datamodel Risk ' - f'All_Risk flat | search search_name="{self.name}" | tail 1 | fields orig_sid] ' + f'datamodel Risk All_Risk flat | search search_name="{self.name}" orig_sid="{self.sid}" ' "| tojson" ) result_iterator = self._search(query) @@ -881,10 +892,105 @@ def notable_in_risk_dm(self) -> bool: return True return False + def validate_ara_events(self) -> None: + """ + Validate the risk and notable events created by the saved search. + An exception is raised if the validation fails for either risk or notable events. + + :raises ValidationFailed: If the expected risk events are not found or validation fails. + """ + # Validate risk events + if self.has_risk_analysis_action: + self.logger.debug("Checking for matching risk events") + if self.risk_event_exists(): + # TODO (PEX-435): should this in the retry loop? or outside it? + # -> I've observed there being a missing risk event (15/16) on + # the first few tries, so this does help us check for true + # positives; BUT, if we have lots of failing detections, this + # will definitely add to the total wait time + # -> certain types of failures (e.g. risk message, or any value + # checking) should fail testing automatically + # -> other types, like those based on counts of risk events, + # should happen should fail more slowly as more events may be + # produced + self.validate_risk_events() + else: + raise ValidationFailed( + f"TEST FAILED: No matching risk event created for: {self.name}" + ) + else: + self.logger.debug(f"No risk action defined for '{self.name}'") + + # Validate notable events + if self.has_notable_action: + self.logger.debug("Checking for matching notable events") + # NOTE: because we check this last, if both fail, the error message about notables will + # always be the last to be added and thus the one surfaced to the user + if self.notable_event_exists(): + # TODO (PEX-435): should this in the retry loop? or outside it? + self.validate_notable_events() + pass + else: + raise ValidationFailed( + f"TEST FAILED: No matching notable event created for: {self.name}" + ) + else: + self.logger.debug(f"No notable action defined for '{self.name}'") + + def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: + """Dispatch the saved search and validate the risk/notable events + + Dispatches the saved search and validates the risk/notable events created by it. If any + validation fails, raises a ValidationFailed exception. + + :param elapsed_sleep_time: Dictionary tracking the total elapsed sleep time across retries. + :type elapsed_sleep_time: dict[str, int] + + :raises ValidationFailed: If validation of risk/notable events fails after all retries. + """ + self.dispatch() + + wait_time = TimeoutConfig.BASE_SLEEP + time_elapsed = 0 + validation_error = None + + while time_elapsed <= TimeoutConfig.RETRY_DISPATCH: + validation_start_time = time.time() + + # reset validation_error for each iteration + validation_error = None + + # wait at least 30 seconds before adding to the wait time (we expect the vast majority of detections to show results w/in that window) + if time_elapsed > TimeoutConfig.ADD_WAIT_TIME: + time.sleep(wait_time) + elapsed_sleep_time["elapsed_sleep_time"] += wait_time + wait_time = min( + TimeoutConfig.RETRY_DISPATCH - int(time_elapsed), wait_time * 2 + ) + + try: + self.validate_ara_events() + except ValidationFailed as e: + self.logger.error(f"Validation failed: {e}") + validation_error = e + # break out of the loop if validation passes + if validation_error is None: + self.logger.info( + f"Validation passed for {self.name} after {elapsed_sleep_time['elapsed_sleep_time']} seconds" + ) + break + + validation_end_time = time.time() + time_elapsed += validation_end_time - validation_start_time + + if validation_error is not None: + raise validation_error + # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky def test( - self, max_sleep: int = TimeoutConfig.MAX_SLEEP, raise_on_exc: bool = False + self, + raise_on_exc: bool = False, ) -> IntegrationTestResult: """Execute the integration test @@ -892,43 +998,17 @@ def test( and clear the indexes if so. Then, we force a run of the detection, wait for `sleep` seconds, and finally we validate that the appropriate risk/notable events seem to have been created. NOTE: assumes the data already exists in the instance - :param max_sleep: max number of seconds to sleep for after enabling the detection before we check for created - events; re-checks are made upon failures using an exponential backoff until the max is reached :param raise_on_exc: bool flag indicating if an exception should be raised when caught by the test routine, or if the error state should just be recorded for the test """ - # max_sleep must be greater than the base value we must wait for the scheduled searchjob to run (jobs run every - # 60s) - if max_sleep < TimeoutConfig.BASE_SLEEP: - raise ClientError( - f"max_sleep value of {max_sleep} is less than the base sleep required " - f"({TimeoutConfig.BASE_SLEEP})" - ) # initialize result as None result: IntegrationTestResult | None = None # keep track of time slept and number of attempts for exponential backoff (base 2) - elapsed_sleep_time = 0 - num_tries = 0 - - # set the initial base sleep time - time_to_sleep = TimeoutConfig.BASE_SLEEP + elapsed_sleep_time = {"elapsed_sleep_time": 0} try: - # first make sure the indexes are currently empty and the detection is starting from a disabled state - self.logger.debug("Cleaning up any pre-existing risk/notable events...") - self.update_pbar(TestingStates.PRE_CLEANUP) - if self.risk_event_exists(): - self.logger.warning( - f"Risk events matching '{self.name}' already exist; marking for deletion" - ) - if self.notable_event_exists(): - self.logger.warning( - f"Notable events matching '{self.name}' already exist; marking for deletion" - ) - self.cleanup() - # skip test if no risk or notable action defined if not self.has_risk_analysis_action and not self.has_notable_action: message = ( @@ -944,95 +1024,32 @@ def test( # force the detection to run self.logger.info(f"Forcing a run on {self.name}") self.update_pbar(TestingStates.FORCE_RUN) - self.force_run() - - # loop so long as the elapsed time is less than max_sleep - while elapsed_sleep_time < max_sleep: - # sleep so the detection job can finish - self.logger.info( - f"Waiting {time_to_sleep} for {self.name} so it can finish" - ) - self.update_pbar(TestingStates.VALIDATING) - time.sleep(time_to_sleep) - elapsed_sleep_time += time_to_sleep - - self.logger.info( - f"Validating detection (attempt #{num_tries + 1} - {elapsed_sleep_time} seconds elapsed of " - f"{max_sleep} max)" - ) + self.update_timeframe(refresh=False) + self.enable(refresh=False) + attempt = 1 + while attempt <= 3: # reset the result to None on each loop iteration result = None + attempt += 1 try: - # Validate risk events - if self.has_risk_analysis_action: - self.logger.debug("Checking for matching risk events") - if self.risk_event_exists(): - # TODO (PEX-435): should this in the retry loop? or outside it? - # -> I've observed there being a missing risk event (15/16) on - # the first few tries, so this does help us check for true - # positives; BUT, if we have lots of failing detections, this - # will definitely add to the total wait time - # -> certain types of failures (e.g. risk message, or any value - # checking) should fail testing automatically - # -> other types, like those based on counts of risk events, - # should happen should fail more slowly as more events may be - # produced - self.validate_risk_events() - else: - raise ValidationFailed( - f"TEST FAILED: No matching risk event created for: {self.name}" - ) - else: - self.logger.debug( - f"No risk action defined for '{self.name}'" - ) - - # Validate notable events - if self.has_notable_action: - self.logger.debug("Checking for matching notable events") - # NOTE: because we check this last, if both fail, the error message about notables will - # always be the last to be added and thus the one surfaced to the user - if self.notable_event_exists(): - # TODO (PEX-435): should this in the retry loop? or outside it? - self.validate_notable_events() - pass - else: - raise ValidationFailed( - f"TEST FAILED: No matching notable event created for: {self.name}" - ) - else: - self.logger.debug( - f"No notable action defined for '{self.name}'" - ) + self.dispatch_and_validate(elapsed_sleep_time) except ValidationFailed as e: self.logger.error(f"Risk/notable validation failed: {e}") result = IntegrationTestResult( status=TestResultStatus.FAIL, message=f"TEST FAILED: {e}", - wait_duration=elapsed_sleep_time, + wait_duration=elapsed_sleep_time["elapsed_sleep_time"], ) - - # if result is still None, then all checks passed and we can break the loop if result is None: result = IntegrationTestResult( status=TestResultStatus.PASS, message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}", - wait_duration=elapsed_sleep_time, + wait_duration=elapsed_sleep_time["elapsed_sleep_time"], ) break - # increment number of attempts to validate detection - num_tries += 1 - - # compute the next time to sleep for - time_to_sleep = 2**num_tries - - # if the computed time to sleep will exceed max_sleep, adjust appropriately - if (elapsed_sleep_time + time_to_sleep) > max_sleep: - time_to_sleep = max_sleep - elapsed_sleep_time - # TODO (PEX-436): should cleanup be in a finally block so it runs even on exception? # cleanup the created events, disable the detection and return the result self.logger.debug("Cleaning up any created risk/notable events...") @@ -1043,7 +1060,7 @@ def test( result = IntegrationTestResult( status=TestResultStatus.ERROR, message=f"TEST FAILED (ERROR): Exception raised during integration test: {e}", - wait_duration=elapsed_sleep_time, + wait_duration=elapsed_sleep_time["elapsed_sleep_time"], exception=e, ) self.logger.exception(result.message) # type: ignore