diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 68a529ba..9e774f98 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -7,7 +7,14 @@ from typing import Any, Callable import splunklib.client as splunklib # type: ignore -from pydantic import BaseModel, Field, PrivateAttr, computed_field +from pydantic import ( + BaseModel, + Field, + PrivateAttr, + computed_field, + model_validator, +) +from semantic_version import Version from splunklib.binding import HTTPError, ResponseReader # type: ignore from splunklib.data import Record # type: ignore @@ -21,6 +28,58 @@ LOG_LEVEL = logging.DEBUG LOG_PATH = "content_versioning_service.log" +# The app name of ES; needed to check ES version +ES_APP_NAME = "SplunkEnterpriseSecuritySuite" + + +class CMSEvent(BaseModel): + """ + A model representing a CMS event. This is used to validate that detections have been installed + in a way that is compatible with content versioning. + """ + + content: str # JSON string + + # The app name of the detection + app_name: str + + # The detection id of the detection + detection_id: str + + # The version of the detection + version: str + + # The saved search name of the detection + action_correlationsearch_label: str + + @model_validator(mode="before") + @classmethod + def extract_from_content(cls, data): + """Extract fields from content JSON if not already provided""" + if isinstance(data, dict) and "content" in data: + try: + content_str = data.get("content") + parsed = json.loads(content_str) + + # Extract metadata fields - note the key has dots in it + metadata_str = parsed.get("action.correlationsearch.metadata", {}) + metadata = ( + json.loads(metadata_str) + if isinstance(metadata_str, str) + else metadata_str + ) + data.setdefault("app_name", metadata.get("app_name")) + data.setdefault("detection_id", metadata.get("detection_id")) + data.setdefault("version", metadata.get("version")) + data.setdefault( + "action_correlationsearch_label", + parsed.get("action.correlationsearch.label"), + ) + except (json.JSONDecodeError, AttributeError, KeyError, TypeError): + # If parsing fails, let Pydantic handle validation errors + raise ValueError("Failed to parse content JSON {}".format(data)) + return data + class ContentVersioningService(BaseModel): """ @@ -77,6 +136,47 @@ def setup_functions(self) -> list[tuple[Callable[[], None], str]]: (self.validate_content_against_cms, "Validating Against CMS"), ] + @cached_property + def es_version(self) -> Version | None: + """ + Returns the version of Enterprise Security installed on the instance; None if not installed. + + :return: the version of ES, as a semver aware object + :rtype: :class:`semantic_version.Version` + """ + if ES_APP_NAME not in self.service.apps: + return None + return Version(self.service.apps[ES_APP_NAME]["version"]) # type: ignore + + @cached_property + def kvstore_content_versioning(self) -> bool: + """ + Indicates whether we should test content versioning based on kvstore logic. Content versioning + should be tested with kvstore logic when ES is at least version 8.3.0. + + :return: a bool indicating whether we should test content versioning with kvstore logic + :rtype: bool + """ + es_version = self.es_version + return es_version is not None and es_version >= Version("8.3.0") + + @cached_property + def indexbased_content_versioning(self) -> bool: + """ + Indicates whether we should test content versioning based on indexbased logic. Content versioning + should be tested with indexbased logic when ES is less than version 8.3.0 but greater than or equal + to version 8.0.0. + + :return: a bool indicating whether we should test content versioning with indexbased logic + :rtype: bool + """ + es_version = self.es_version + return ( + es_version is not None + and es_version >= Version("8.0.0") + and es_version < Version("8.3.0") + ) + def _query_content_versioning_service( self, method: str, body: dict[str, Any] = {} ) -> Record: @@ -97,12 +197,19 @@ def _query_content_versioning_service( # Query the content versioning service try: - response = self.service.request( # type: ignore - method=method, - path_segment="configs/conf-feature_flags/general", - body=body, - app="SA-ContentVersioning", - ) + if method == "GET" and self.kvstore_content_versioning: + response = self.service.request( + method=method, + path_segment="content_versioning/versioning_apps", + app="SA-ContentVersioning", + ) + if self.indexbased_content_versioning: + response = self.service.request( # type: ignore + method=method, + path_segment="configs/conf-feature_flags/general", + body=body, + app="SA-ContentVersioning", + ) except HTTPError as e: # Raise on any HTTP errors raise HTTPError(f"Error querying content versioning service: {e}") from e @@ -141,9 +248,26 @@ def is_versioning_activated(self) -> bool: # Find the versioning_activated field and report any errors try: - for entry in data["entry"]: - if entry["name"] == "general": - return bool(int(entry["content"]["versioning_activated"])) + if self.kvstore_content_versioning: + if "content" in data: + for app in data["content"]: + if app.get("name") == "DA-ESS-ContentUpdate": + # If there is error message versioning is not activated properly + if "message" in app: + return False + + # If the installed verion is not the same as the test version + if app.get("version") != self.global_config.app.version: + return False + + if app.get("status") == "active": + return True + else: + return False + if self.indexbased_content_versioning: + for entry in data["entry"]: + if entry["name"] == "general": + return bool(int(entry["content"]["versioning_activated"])) except KeyError as e: raise KeyError( "Cannot retrieve versioning status, unable to determine versioning status using " @@ -159,9 +283,19 @@ def activate_versioning(self) -> None: Activate the content versioning service """ # Post to the SA-ContentVersioning service to set versioning status - self._query_content_versioning_service( - method="POST", body={"versioning_activated": True} - ) + if self.indexbased_content_versioning: + self._query_content_versioning_service( + method="POST", body={"versioning_activated": True} + ) + + # Wait for versioning to be activated for ES 8.3.0+ + if self.kvstore_content_versioning: + timeout = 600 + while not self.is_versioning_activated: + time.sleep(60) + timeout -= 60 + if timeout <= 0: + break # Confirm versioning has been enabled if not self.is_versioning_activated: @@ -173,23 +307,6 @@ def activate_versioning(self) -> None: f"[{self.infrastructure.instance_name}] Versioning service successfully activated" ) - @computed_field - @cached_property - def cms_fields(self) -> list[str]: - """ - Property listing the fields we want to pull from the cms_main index - - :returns: a list of strings, the fields we want - :rtype: list[str] - """ - return [ - "app_name", - "detection_id", - "version", - "action.correlationsearch.label", - "sourcetype", - ] - @property def is_cms_parser_enabled(self) -> bool: """ @@ -292,16 +409,37 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: ) # Construct the query looking for CMS events matching the content app name - query = ( - f"search index=cms_main sourcetype=stash_common_detection_model " - f'app_name="{self.global_config.app.appid}" | fields {", ".join(self.cms_fields)}' - ) + if self.kvstore_content_versioning: + query = ( + f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}" + f"| fields content" + ) + elif self.indexbased_content_versioning: + query = ( + f"search index=cms_main sourcetype=stash_common_detection_model " + f'app_name="{self.global_config.app.appid}" | fields _raw' + ) + else: + if self.kvstore_content_versioning: + raise Exception( + f"Unable to perform search to cms_content_lookup in ES version {self.es_version}" + ) + elif self.indexbased_content_versioning: + raise Exception( + f"Unable to perform search to cms_main index in ES version {self.es_version}" + ) + else: + raise Exception( + f"Unable to determine content versioning method for ES version {self.es_version}. " + "Expected ES version >= 8.0.0." + ) self.logger.debug( f"[{self.infrastructure.instance_name}] Query on cms_main: {query}" ) # Get the job as a blocking operation, set the cache, and return self._cms_main_job = self.service.search(query, exec_mode="blocking") # type: ignore + return self._cms_main_job def get_num_cms_events(self, use_cache: bool = False) -> int: @@ -375,8 +513,13 @@ def validate_content_against_cms(self) -> None: # Increment the offset for each result offset += 1 + if self.kvstore_content_versioning: + cms_event = CMSEvent(content=cms_event["content"]) + elif self.indexbased_content_versioning: + cms_event = CMSEvent(content=cms_event["_raw"]) + # Get the name of the search in the CMS event - cms_entry_name = cms_event["action.correlationsearch.label"] + cms_entry_name = cms_event.action_correlationsearch_label self.logger.info( f"[{self.infrastructure.instance_name}] {offset}: Matching cms_main entry " f"'{cms_entry_name}' against detections" @@ -456,14 +599,14 @@ def validate_content_against_cms(self) -> None: ) def validate_detection_against_cms_event( - self, cms_event: dict[str, Any], detection: Detection + self, cms_event: CMSEvent, detection: Detection ) -> Exception | None: """ Given an event from the cms_main index and the matched detection, compare fields and look for any inconsistencies :param cms_event: The event from the cms_main index - :type cms_event: dict[str, Any] + :type cms_event: CMSEvent :param detection: The matched detection :type detection: :class:`contentctl.objects.detection.Detection` @@ -472,17 +615,18 @@ def validate_detection_against_cms_event( """ # TODO (PEX-509): validate additional fields between the cms_event and the detection - cms_uuid = uuid.UUID(cms_event["detection_id"]) + cms_uuid = uuid.UUID(cms_event.detection_id) rule_name_from_detection = detection.get_action_dot_correlationsearch_dot_label( self.global_config.app ) + cms_entry_name = cms_event.action_correlationsearch_label + # Compare the correlation search label - if cms_event["action.correlationsearch.label"] != rule_name_from_detection: + if cms_entry_name != rule_name_from_detection: msg = ( f"[{self.infrastructure.instance_name}][{detection.name}]: Correlation search " - f"label in cms_event ('{cms_event['action.correlationsearch.label']}') does not " - "match detection name" + f"label in cms_event ('{cms_entry_name}') does not match detection name" ) self.logger.error(msg) return Exception(msg) @@ -494,12 +638,12 @@ def validate_detection_against_cms_event( ) self.logger.error(msg) return Exception(msg) - elif cms_event["version"] != f"{detection.version}.1": + elif cms_event.version != f"{detection.version}.1": # Compare the versions (we append '.1' to the detection version to be in line w/ the # internal representation in ES) msg = ( f"[{self.infrastructure.instance_name}] [{detection.name}]: Version in cms_event " - f"('{cms_event['version']}') does not match version in detection " + f"('{cms_event.version}') does not match version in detection " f"('{detection.version}.1')" ) self.logger.error(msg)