Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 187 additions & 43 deletions contentctl/objects/content_versioning_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
from typing import Any, Callable

import splunklib.client as splunklib # type: ignore
from pydantic import BaseModel, Field, PrivateAttr, computed_field
from pydantic import (
BaseModel,
Field,
PrivateAttr,
computed_field,
model_validator,
)
from semantic_version import Version
from splunklib.binding import HTTPError, ResponseReader # type: ignore
from splunklib.data import Record # type: ignore

Expand All @@ -21,6 +28,58 @@
LOG_LEVEL = logging.DEBUG
LOG_PATH = "content_versioning_service.log"

# The app name of ES; needed to check ES version
ES_APP_NAME = "SplunkEnterpriseSecuritySuite"


class CMSEvent(BaseModel):
"""
A model representing a CMS event. This is used to validate that detections have been installed
in a way that is compatible with content versioning.
"""

content: str # JSON string

# The app name of the detection
app_name: str

# The detection id of the detection
detection_id: str

# The version of the detection
version: str

# The saved search name of the detection
action_correlationsearch_label: str

@model_validator(mode="before")
@classmethod
def extract_from_content(cls, data):
"""Extract fields from content JSON if not already provided"""
if isinstance(data, dict) and "content" in data:
try:
content_str = data.get("content")
parsed = json.loads(content_str)

# Extract metadata fields - note the key has dots in it
metadata_str = parsed.get("action.correlationsearch.metadata", {})
metadata = (
json.loads(metadata_str)
if isinstance(metadata_str, str)
else metadata_str
)
data.setdefault("app_name", metadata.get("app_name"))
data.setdefault("detection_id", metadata.get("detection_id"))
data.setdefault("version", metadata.get("version"))
data.setdefault(
"action_correlationsearch_label",
parsed.get("action.correlationsearch.label"),
)
except (json.JSONDecodeError, AttributeError, KeyError, TypeError):
# If parsing fails, let Pydantic handle validation errors
raise ValueError("Failed to parse content JSON {}".format(data))
return data


class ContentVersioningService(BaseModel):
"""
Expand Down Expand Up @@ -77,6 +136,47 @@ def setup_functions(self) -> list[tuple[Callable[[], None], str]]:
(self.validate_content_against_cms, "Validating Against CMS"),
]

@cached_property
def es_version(self) -> Version | None:
"""
Returns the version of Enterprise Security installed on the instance; None if not installed.

:return: the version of ES, as a semver aware object
:rtype: :class:`semantic_version.Version`
"""
if ES_APP_NAME not in self.service.apps:
return None
return Version(self.service.apps[ES_APP_NAME]["version"]) # type: ignore

@cached_property
def kvstore_content_versioning(self) -> bool:
"""
Indicates whether we should test content versioning based on kvstore logic. Content versioning
should be tested with kvstore logic when ES is at least version 8.3.0.

:return: a bool indicating whether we should test content versioning with kvstore logic
:rtype: bool
"""
es_version = self.es_version
return es_version is not None and es_version >= Version("8.3.0")

@cached_property
def indexbased_content_versioning(self) -> bool:
"""
Indicates whether we should test content versioning based on indexbased logic. Content versioning
should be tested with indexbased logic when ES is less than version 8.3.0 but greater than or equal
to version 8.0.0.

:return: a bool indicating whether we should test content versioning with indexbased logic
:rtype: bool
"""
es_version = self.es_version
return (
es_version is not None
and es_version >= Version("8.0.0")
and es_version < Version("8.3.0")
)

def _query_content_versioning_service(
self, method: str, body: dict[str, Any] = {}
) -> Record:
Expand All @@ -97,12 +197,19 @@ def _query_content_versioning_service(

# Query the content versioning service
try:
response = self.service.request( # type: ignore
method=method,
path_segment="configs/conf-feature_flags/general",
body=body,
app="SA-ContentVersioning",
)
if method == "GET" and self.kvstore_content_versioning:
response = self.service.request(
method=method,
path_segment="content_versioning/versioning_apps",
app="SA-ContentVersioning",
)
if self.indexbased_content_versioning:
response = self.service.request( # type: ignore
method=method,
path_segment="configs/conf-feature_flags/general",
body=body,
app="SA-ContentVersioning",
)
except HTTPError as e:
# Raise on any HTTP errors
raise HTTPError(f"Error querying content versioning service: {e}") from e
Expand Down Expand Up @@ -141,9 +248,26 @@ def is_versioning_activated(self) -> bool:

# Find the versioning_activated field and report any errors
try:
for entry in data["entry"]:
if entry["name"] == "general":
return bool(int(entry["content"]["versioning_activated"]))
if self.kvstore_content_versioning:
if "content" in data:
for app in data["content"]:
if app.get("name") == "DA-ESS-ContentUpdate":
# If there is error message versioning is not activated properly
if "message" in app:
return False

# If the installed verion is not the same as the test version
if app.get("version") != self.global_config.app.version:
return False

if app.get("status") == "active":
return True
else:
return False
if self.indexbased_content_versioning:
for entry in data["entry"]:
if entry["name"] == "general":
return bool(int(entry["content"]["versioning_activated"]))
except KeyError as e:
raise KeyError(
"Cannot retrieve versioning status, unable to determine versioning status using "
Expand All @@ -159,9 +283,19 @@ def activate_versioning(self) -> None:
Activate the content versioning service
"""
# Post to the SA-ContentVersioning service to set versioning status
self._query_content_versioning_service(
method="POST", body={"versioning_activated": True}
)
if self.indexbased_content_versioning:
self._query_content_versioning_service(
method="POST", body={"versioning_activated": True}
)

# Wait for versioning to be activated for ES 8.3.0+
if self.kvstore_content_versioning:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this step needed? kvstore versioning should be activated by default; I don't think we need to wait for it to activate

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to David, although the kvstore versioning is activated by default, it takes some time for it to be ready. So I added the wait logic here, just to ensure that it's activated successfully (like some time for versioning to setup properly) before searching in kvstore.
From the testing pipeline, the setup time is usually very quick.

Copy link
Contributor

@cmcginley-splunk cmcginley-splunk Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like we're waiting for versioning initialization, not activation, no? Possible I'm not understanding something in the new model, but previously versioning activation was instantaneous, and we were just waiting for the parser to ingest everything

timeout = 600
while not self.is_versioning_activated:
time.sleep(60)
timeout -= 60
if timeout <= 0:
break

# Confirm versioning has been enabled
if not self.is_versioning_activated:
Expand All @@ -173,23 +307,6 @@ def activate_versioning(self) -> None:
f"[{self.infrastructure.instance_name}] Versioning service successfully activated"
)

@computed_field
@cached_property
def cms_fields(self) -> list[str]:
"""
Property listing the fields we want to pull from the cms_main index

:returns: a list of strings, the fields we want
:rtype: list[str]
"""
return [
"app_name",
"detection_id",
"version",
"action.correlationsearch.label",
"sourcetype",
]

@property
def is_cms_parser_enabled(self) -> bool:
"""
Expand Down Expand Up @@ -292,16 +409,37 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job:
)

# Construct the query looking for CMS events matching the content app name
query = (
f"search index=cms_main sourcetype=stash_common_detection_model "
f'app_name="{self.global_config.app.appid}" | fields {", ".join(self.cms_fields)}'
)
if self.kvstore_content_versioning:
query = (
f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}"
f"| fields content"
)
elif self.indexbased_content_versioning:
query = (
f"search index=cms_main sourcetype=stash_common_detection_model "
f'app_name="{self.global_config.app.appid}" | fields _raw'
)
else:
if self.kvstore_content_versioning:
raise Exception(
f"Unable to perform search to cms_content_lookup in ES version {self.es_version}"
)
elif self.indexbased_content_versioning:
raise Exception(
f"Unable to perform search to cms_main index in ES version {self.es_version}"
)
else:
raise Exception(
f"Unable to determine content versioning method for ES version {self.es_version}. "
"Expected ES version >= 8.0.0."
)
self.logger.debug(
f"[{self.infrastructure.instance_name}] Query on cms_main: {query}"
)

# Get the job as a blocking operation, set the cache, and return
self._cms_main_job = self.service.search(query, exec_mode="blocking") # type: ignore

return self._cms_main_job

def get_num_cms_events(self, use_cache: bool = False) -> int:
Expand Down Expand Up @@ -375,8 +513,13 @@ def validate_content_against_cms(self) -> None:
# Increment the offset for each result
offset += 1

if self.kvstore_content_versioning:
cms_event = CMSEvent(content=cms_event["content"])
elif self.indexbased_content_versioning:
cms_event = CMSEvent(content=cms_event["_raw"])

# Get the name of the search in the CMS event
cms_entry_name = cms_event["action.correlationsearch.label"]
cms_entry_name = cms_event.action_correlationsearch_label
self.logger.info(
f"[{self.infrastructure.instance_name}] {offset}: Matching cms_main entry "
f"'{cms_entry_name}' against detections"
Expand Down Expand Up @@ -456,14 +599,14 @@ def validate_content_against_cms(self) -> None:
)

def validate_detection_against_cms_event(
self, cms_event: dict[str, Any], detection: Detection
self, cms_event: CMSEvent, detection: Detection
) -> Exception | None:
"""
Given an event from the cms_main index and the matched detection, compare fields and look
for any inconsistencies

:param cms_event: The event from the cms_main index
:type cms_event: dict[str, Any]
:type cms_event: CMSEvent
:param detection: The matched detection
:type detection: :class:`contentctl.objects.detection.Detection`

Expand All @@ -472,17 +615,18 @@ def validate_detection_against_cms_event(
"""
# TODO (PEX-509): validate additional fields between the cms_event and the detection

cms_uuid = uuid.UUID(cms_event["detection_id"])
cms_uuid = uuid.UUID(cms_event.detection_id)
rule_name_from_detection = detection.get_action_dot_correlationsearch_dot_label(
self.global_config.app
)

cms_entry_name = cms_event.action_correlationsearch_label

# Compare the correlation search label
if cms_event["action.correlationsearch.label"] != rule_name_from_detection:
if cms_entry_name != rule_name_from_detection:
msg = (
f"[{self.infrastructure.instance_name}][{detection.name}]: Correlation search "
f"label in cms_event ('{cms_event['action.correlationsearch.label']}') does not "
"match detection name"
f"label in cms_event ('{cms_entry_name}') does not match detection name"
)
self.logger.error(msg)
return Exception(msg)
Expand All @@ -494,12 +638,12 @@ def validate_detection_against_cms_event(
)
self.logger.error(msg)
return Exception(msg)
elif cms_event["version"] != f"{detection.version}.1":
elif cms_event.version != f"{detection.version}.1":
# Compare the versions (we append '.1' to the detection version to be in line w/ the
# internal representation in ES)
msg = (
f"[{self.infrastructure.instance_name}] [{detection.name}]: Version in cms_event "
f"('{cms_event['version']}') does not match version in detection "
f"('{cms_event.version}') does not match version in detection "
f"('{detection.version}.1')"
)
self.logger.error(msg)
Expand Down
Loading