Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use dispatch() to trigger savedsearch
  • Loading branch information
xqi-splunk committed Jun 13, 2025
commit b1aa8b9177e47cdd4d6d6836ba43ca4be94e54cb
94 changes: 73 additions & 21 deletions contentctl/objects/correlation_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from contentctl.objects.risk_event import RiskEvent

# Suppress logging by default; enable for local testing
ENABLE_LOGGING = False
ENABLE_LOGGING = True
LOG_LEVEL = logging.DEBUG
LOG_PATH = "correlation_search.log"

Expand Down Expand Up @@ -88,7 +88,7 @@ class ScheduleConfig(StrEnum):

EARLIEST_TIME = "-5y@y"
LATEST_TIME = "-1m@m"
CRON_SCHEDULE = "*/1 * * * *"
CRON_SCHEDULE = "0 0 1 1 *"


class ResultIterator:
Expand Down Expand Up @@ -437,6 +437,18 @@ def enable(self, refresh: bool = True) -> None:
if refresh:
self.refresh()

def dispatch(self) -> splunklib.Job:
"""Dispatches the SavedSearch

Dispatches the SavedSearch entity, returning a Job object representing the search job.
:return: a splunklib.Job object representing the search job
"""
self.logger.debug(f"Dispatching {self.name}...")
try:
return self.saved_search.dispatch(trigger_actions=True) # type: ignore
except HTTPError as e:
raise ServerError(f"HTTP error encountered while dispatching detection: {e}")

def disable(self, refresh: bool = True) -> None:
"""Disables the SavedSearch

Expand Down Expand Up @@ -496,6 +508,18 @@ def force_run(self, refresh: bool = True) -> None:
self.update_timeframe(refresh=False)
if not self.enabled:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want this behavior where force_run will throw a warning if called after the detection is enabled?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so? Is there a reason we would like to remove the warning?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force_run function is removed after refactoring as discussed.

self.enable(refresh=False)
job = self.dispatch()
self.logger.info(f"Force running detection '{self.name}' with job ID: {job.sid}")

time_to_execute = 0

# Check if the job is finished
while not job.is_done():
self.logger.info(f"Job {job.sid} is still running...")
time.sleep(1)
time_to_execute += 1

self.logger.info(f"Job {job.sid} has finished running in {time_to_execute} seconds.")
else:
self.logger.warning(f"Detection '{self.name}' was already enabled")

Expand Down Expand Up @@ -946,24 +970,52 @@ def test(
self.update_pbar(TestingStates.FORCE_RUN)
self.force_run()

# loop so long as the elapsed time is less than max_sleep
while elapsed_sleep_time < max_sleep:
# sleep so the detection job can finish
self.logger.info(
f"Waiting {time_to_sleep} for {self.name} so it can finish"
)
self.update_pbar(TestingStates.VALIDATING)
time.sleep(time_to_sleep)
elapsed_sleep_time += time_to_sleep
# # loop so long as the elapsed time is less than max_sleep
# while elapsed_sleep_time < max_sleep:
# # sleep so the detection job can finish
# self.logger.info(
# f"Waiting {time_to_sleep} for {self.name} so it can finish"
# )
# self.update_pbar(TestingStates.VALIDATING)
# # time.sleep(time_to_sleep)
# self.logger.info(
# f"Skipping sleeping time for testing purposes"
# )
# elapsed_sleep_time += time_to_sleep

# self.logger.info(
# f"Validating detection (attempt #{num_tries + 1} - {elapsed_sleep_time} seconds elapsed of "
# f"{max_sleep} max)"
# )

# # reset the result to None on each loop iteration
# result = None

max_retries = 10
initial_wait = 2
max_wait = 60
max_total_wait = 300

current_turn = 1
wait_time = initial_wait
total_waited = 0

while current_turn <= max_retries and total_waited < max_total_wait:
current_turn += 1

self.logger.info(
f"Validating detection (attempt #{num_tries + 1} - {elapsed_sleep_time} seconds elapsed of "
f"{max_sleep} max)"
f"Skipping sleeping time for testing purposes"
)

if current_turn > 3:
time.sleep(wait_time)
total_waited += wait_time
self.logger.info(f"Waiting {wait_time}s before retry {current_turn}...")

wait_time = min(wait_time * 2, max_wait)

# reset the result to None on each loop iteration
result = None

try:
# Validate risk events
if self.has_risk_analysis_action:
Expand Down Expand Up @@ -1023,15 +1075,15 @@ def test(
)
break

# increment number of attempts to validate detection
num_tries += 1
# # increment number of attempts to validate detection
# num_tries += 1

# compute the next time to sleep for
time_to_sleep = 2**num_tries
# # compute the next time to sleep for
# time_to_sleep = 2**num_tries

# if the computed time to sleep will exceed max_sleep, adjust appropriately
if (elapsed_sleep_time + time_to_sleep) > max_sleep:
time_to_sleep = max_sleep - elapsed_sleep_time
# # if the computed time to sleep will exceed max_sleep, adjust appropriately
# if (elapsed_sleep_time + time_to_sleep) > max_sleep:
# time_to_sleep = max_sleep - elapsed_sleep_time

# TODO (PEX-436): should cleanup be in a finally block so it runs even on exception?
# cleanup the created events, disable the detection and return the result
Expand Down