Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add version based validation
  • Loading branch information
xqi-splunk committed Oct 20, 2025
commit 0d649d7fce4616666dba4abbffb7378c1265fa1d
117 changes: 97 additions & 20 deletions contentctl/objects/content_versioning_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import splunklib.client as splunklib # type: ignore
from pydantic import BaseModel, Field, PrivateAttr, computed_field
from semantic_version import Version
from splunklib.binding import HTTPError, ResponseReader # type: ignore
from splunklib.data import Record # type: ignore

Expand All @@ -21,6 +22,9 @@
LOG_LEVEL = logging.DEBUG
LOG_PATH = "content_versioning_service.log"

# The app name of ES; needed to check ES version
ES_APP_NAME = "SplunkEnterpriseSecuritySuite"


class ContentVersioningService(BaseModel):
"""
Expand Down Expand Up @@ -77,6 +81,47 @@ def setup_functions(self) -> list[tuple[Callable[[], None], str]]:
(self.validate_content_against_cms, "Validating Against CMS"),
]

@cached_property
def es_version(self) -> Version | None:
"""
Returns the version of Enterprise Security installed on the instance; None if not installed.

:return: the version of ES, as a semver aware object
:rtype: :class:`semantic_version.Version`
"""
if ES_APP_NAME not in self.service.apps:
return None
return Version(self.service.apps[ES_APP_NAME]["version"]) # type: ignore

@cached_property
def kvstore_content_versioning(self) -> bool:
"""
Indicates whether we should test content versioning based on kvstore logic. Content versioning
should be tested with kvstore logic when ES is at least version 8.3.0.

:return: a bool indicating whether we should test content versioning with kvstore logic
:rtype: bool
"""
es_version = self.es_version
return es_version is not None and es_version >= Version("8.3.0")

@cached_property
def datastore_content_versioning(self) -> bool:
"""
Indicates whether we should test content versioning based on datastore logic. Content versioning
should be tested with datastore logic when ES is less than version 8.3.0 but greater than or equal
to version 8.0.0.

:return: a bool indicating whether we should test content versioning with datastore logic
:rtype: bool
"""
es_version = self.es_version
return (
es_version is not None
and es_version >= Version("8.0.0")
and es_version < Version("8.3.0")
)

def _query_content_versioning_service(
self, method: str, body: dict[str, Any] = {}
) -> Record:
Expand Down Expand Up @@ -182,13 +227,43 @@ def cms_fields(self) -> list[str]:
:returns: a list of strings, the fields we want
:rtype: list[str]
"""
return [
"app_name",
"detection_id",
"version",
"content",
"sourcetype",
]
if self.kvstore_content_versioning:
return [
"app_name",
"detection_id",
"version",
"content",
"sourcetype",
]
elif self.datastore_content_versioning:
return [
"app_name",
"detection_id",
"version",
"action.correlationsearch.label",
"sourcetype",
]
raise Exception(f"Something went wrong with es version: {self.es_version}")

def _get_cms_entry_label(self, cms_event: dict[str, Any]) -> str:
"""
Extracts the correlation search label from a CMS event.
Handles both old (< 8.3.0) and new (>= 8.3.0) CMS lookup structures.

:param cms_event: The event from the cms_main index
:type cms_event: dict[str, Any]
:return: The correlation search label
:rtype: str
"""

if self.kvstore_content_versioning:
# ES 8.3.0+: Parse content JSON to get label
content = json.loads(cms_event["content"])
return content["action.correlationsearch.label"]
elif self.datastore_content_versioning:
# ES < 8.3.0 and ES >= 8.0.0: Label is at top level
return cms_event["action.correlationsearch.label"]
raise Exception("Something went wrong with es version: {self.es_version}")

@property
def is_cms_parser_enabled(self) -> bool:
Expand Down Expand Up @@ -292,10 +367,18 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job:
)

# Construct the query looking for CMS events matching the content app name
query = (
f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}"
f"| fields {', '.join(self.cms_fields)}"
)
if self.kvstore_content_versioning:
query = (
f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}"
f"| fields {', '.join(self.cms_fields)}"
)
elif self.datastore_content_versioning:
query = (
f"search index=cms_main sourcetype=stash_common_detection_model "
f'app_name="{self.global_config.app.appid}" | fields {", ".join(self.cms_fields)}'
)
else:
raise Exception("Something went wrong with es version: {self.es_version}")
self.logger.debug(
f"[{self.infrastructure.instance_name}] Query on cms_main: {query}"
)
Expand Down Expand Up @@ -397,15 +480,10 @@ def validate_content_against_cms(self) -> None:
offset += 1

# Get the name of the search in the CMS event
content = json.loads(cms_event["content"])
self.logger.debug(
f"[TESTING DEBUG INFO] CMS Event content: {cms_event['content']}"
)
cms_entry_name = self._get_cms_entry_label(cms_event)
self.logger.debug(
f"[TESTING DEBUG INFO] CMS Event content after json load: {type(content)}"
f"[TESTING DEBUG INFO] CMS Event content: {cms_entry_name}"
)
# Get the name of the search in the CMS event
cms_entry_name = content["action.correlationsearch.label"]
self.logger.info(
f"[{self.infrastructure.instance_name}] {offset}: Matching cms_main entry "
f"'{cms_entry_name}' against detections"
Expand Down Expand Up @@ -506,8 +584,7 @@ def validate_detection_against_cms_event(
self.global_config.app
)

content = json.loads(cms_event["content"])
cms_entry_name = content["action.correlationsearch.label"]
cms_entry_name = self._get_cms_entry_label(cms_event)

# Compare the correlation search label
if cms_entry_name != rule_name_from_detection:
Expand Down