Skip to content

Commit 0c8fa26

Browse files
Merge pull request #449 from splunk/PEX-653/kvstore-versioning-validation
PEX-653: KVStore Versioning Validation
2 parents 093d75b + 5a34656 commit 0c8fa26

File tree

1 file changed

+187
-43
lines changed

1 file changed

+187
-43
lines changed

contentctl/objects/content_versioning_service.py

Lines changed: 187 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@
77
from typing import Any, Callable
88

99
import splunklib.client as splunklib # type: ignore
10-
from pydantic import BaseModel, Field, PrivateAttr, computed_field
10+
from pydantic import (
11+
BaseModel,
12+
Field,
13+
PrivateAttr,
14+
computed_field,
15+
model_validator,
16+
)
17+
from semantic_version import Version
1118
from splunklib.binding import HTTPError, ResponseReader # type: ignore
1219
from splunklib.data import Record # type: ignore
1320

@@ -21,6 +28,58 @@
2128
LOG_LEVEL = logging.DEBUG
2229
LOG_PATH = "content_versioning_service.log"
2330

31+
# The app name of ES; needed to check ES version
32+
ES_APP_NAME = "SplunkEnterpriseSecuritySuite"
33+
34+
35+
class CMSEvent(BaseModel):
36+
"""
37+
A model representing a CMS event. This is used to validate that detections have been installed
38+
in a way that is compatible with content versioning.
39+
"""
40+
41+
content: str # JSON string
42+
43+
# The app name of the detection
44+
app_name: str
45+
46+
# The detection id of the detection
47+
detection_id: str
48+
49+
# The version of the detection
50+
version: str
51+
52+
# The saved search name of the detection
53+
action_correlationsearch_label: str
54+
55+
@model_validator(mode="before")
56+
@classmethod
57+
def extract_from_content(cls, data):
58+
"""Extract fields from content JSON if not already provided"""
59+
if isinstance(data, dict) and "content" in data:
60+
try:
61+
content_str = data.get("content")
62+
parsed = json.loads(content_str)
63+
64+
# Extract metadata fields - note the key has dots in it
65+
metadata_str = parsed.get("action.correlationsearch.metadata", {})
66+
metadata = (
67+
json.loads(metadata_str)
68+
if isinstance(metadata_str, str)
69+
else metadata_str
70+
)
71+
data.setdefault("app_name", metadata.get("app_name"))
72+
data.setdefault("detection_id", metadata.get("detection_id"))
73+
data.setdefault("version", metadata.get("version"))
74+
data.setdefault(
75+
"action_correlationsearch_label",
76+
parsed.get("action.correlationsearch.label"),
77+
)
78+
except (json.JSONDecodeError, AttributeError, KeyError, TypeError):
79+
# If parsing fails, let Pydantic handle validation errors
80+
raise ValueError("Failed to parse content JSON {}".format(data))
81+
return data
82+
2483

2584
class ContentVersioningService(BaseModel):
2685
"""
@@ -77,6 +136,47 @@ def setup_functions(self) -> list[tuple[Callable[[], None], str]]:
77136
(self.validate_content_against_cms, "Validating Against CMS"),
78137
]
79138

139+
@cached_property
140+
def es_version(self) -> Version | None:
141+
"""
142+
Returns the version of Enterprise Security installed on the instance; None if not installed.
143+
144+
:return: the version of ES, as a semver aware object
145+
:rtype: :class:`semantic_version.Version`
146+
"""
147+
if ES_APP_NAME not in self.service.apps:
148+
return None
149+
return Version(self.service.apps[ES_APP_NAME]["version"]) # type: ignore
150+
151+
@cached_property
152+
def kvstore_content_versioning(self) -> bool:
153+
"""
154+
Indicates whether we should test content versioning based on kvstore logic. Content versioning
155+
should be tested with kvstore logic when ES is at least version 8.3.0.
156+
157+
:return: a bool indicating whether we should test content versioning with kvstore logic
158+
:rtype: bool
159+
"""
160+
es_version = self.es_version
161+
return es_version is not None and es_version >= Version("8.3.0")
162+
163+
@cached_property
164+
def indexbased_content_versioning(self) -> bool:
165+
"""
166+
Indicates whether we should test content versioning based on indexbased logic. Content versioning
167+
should be tested with indexbased logic when ES is less than version 8.3.0 but greater than or equal
168+
to version 8.0.0.
169+
170+
:return: a bool indicating whether we should test content versioning with indexbased logic
171+
:rtype: bool
172+
"""
173+
es_version = self.es_version
174+
return (
175+
es_version is not None
176+
and es_version >= Version("8.0.0")
177+
and es_version < Version("8.3.0")
178+
)
179+
80180
def _query_content_versioning_service(
81181
self, method: str, body: dict[str, Any] = {}
82182
) -> Record:
@@ -97,12 +197,19 @@ def _query_content_versioning_service(
97197

98198
# Query the content versioning service
99199
try:
100-
response = self.service.request( # type: ignore
101-
method=method,
102-
path_segment="configs/conf-feature_flags/general",
103-
body=body,
104-
app="SA-ContentVersioning",
105-
)
200+
if method == "GET" and self.kvstore_content_versioning:
201+
response = self.service.request(
202+
method=method,
203+
path_segment="content_versioning/versioning_apps",
204+
app="SA-ContentVersioning",
205+
)
206+
if self.indexbased_content_versioning:
207+
response = self.service.request( # type: ignore
208+
method=method,
209+
path_segment="configs/conf-feature_flags/general",
210+
body=body,
211+
app="SA-ContentVersioning",
212+
)
106213
except HTTPError as e:
107214
# Raise on any HTTP errors
108215
raise HTTPError(f"Error querying content versioning service: {e}") from e
@@ -141,9 +248,26 @@ def is_versioning_activated(self) -> bool:
141248

142249
# Find the versioning_activated field and report any errors
143250
try:
144-
for entry in data["entry"]:
145-
if entry["name"] == "general":
146-
return bool(int(entry["content"]["versioning_activated"]))
251+
if self.kvstore_content_versioning:
252+
if "content" in data:
253+
for app in data["content"]:
254+
if app.get("name") == "DA-ESS-ContentUpdate":
255+
# If there is error message versioning is not activated properly
256+
if "message" in app:
257+
return False
258+
259+
# If the installed verion is not the same as the test version
260+
if app.get("version") != self.global_config.app.version:
261+
return False
262+
263+
if app.get("status") == "active":
264+
return True
265+
else:
266+
return False
267+
if self.indexbased_content_versioning:
268+
for entry in data["entry"]:
269+
if entry["name"] == "general":
270+
return bool(int(entry["content"]["versioning_activated"]))
147271
except KeyError as e:
148272
raise KeyError(
149273
"Cannot retrieve versioning status, unable to determine versioning status using "
@@ -159,9 +283,19 @@ def activate_versioning(self) -> None:
159283
Activate the content versioning service
160284
"""
161285
# Post to the SA-ContentVersioning service to set versioning status
162-
self._query_content_versioning_service(
163-
method="POST", body={"versioning_activated": True}
164-
)
286+
if self.indexbased_content_versioning:
287+
self._query_content_versioning_service(
288+
method="POST", body={"versioning_activated": True}
289+
)
290+
291+
# Wait for versioning to be activated for ES 8.3.0+
292+
if self.kvstore_content_versioning:
293+
timeout = 600
294+
while not self.is_versioning_activated:
295+
time.sleep(60)
296+
timeout -= 60
297+
if timeout <= 0:
298+
break
165299

166300
# Confirm versioning has been enabled
167301
if not self.is_versioning_activated:
@@ -173,23 +307,6 @@ def activate_versioning(self) -> None:
173307
f"[{self.infrastructure.instance_name}] Versioning service successfully activated"
174308
)
175309

176-
@computed_field
177-
@cached_property
178-
def cms_fields(self) -> list[str]:
179-
"""
180-
Property listing the fields we want to pull from the cms_main index
181-
182-
:returns: a list of strings, the fields we want
183-
:rtype: list[str]
184-
"""
185-
return [
186-
"app_name",
187-
"detection_id",
188-
"version",
189-
"action.correlationsearch.label",
190-
"sourcetype",
191-
]
192-
193310
@property
194311
def is_cms_parser_enabled(self) -> bool:
195312
"""
@@ -292,16 +409,37 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job:
292409
)
293410

294411
# Construct the query looking for CMS events matching the content app name
295-
query = (
296-
f"search index=cms_main sourcetype=stash_common_detection_model "
297-
f'app_name="{self.global_config.app.appid}" | fields {", ".join(self.cms_fields)}'
298-
)
412+
if self.kvstore_content_versioning:
413+
query = (
414+
f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}"
415+
f"| fields content"
416+
)
417+
elif self.indexbased_content_versioning:
418+
query = (
419+
f"search index=cms_main sourcetype=stash_common_detection_model "
420+
f'app_name="{self.global_config.app.appid}" | fields _raw'
421+
)
422+
else:
423+
if self.kvstore_content_versioning:
424+
raise Exception(
425+
f"Unable to perform search to cms_content_lookup in ES version {self.es_version}"
426+
)
427+
elif self.indexbased_content_versioning:
428+
raise Exception(
429+
f"Unable to perform search to cms_main index in ES version {self.es_version}"
430+
)
431+
else:
432+
raise Exception(
433+
f"Unable to determine content versioning method for ES version {self.es_version}. "
434+
"Expected ES version >= 8.0.0."
435+
)
299436
self.logger.debug(
300437
f"[{self.infrastructure.instance_name}] Query on cms_main: {query}"
301438
)
302439

303440
# Get the job as a blocking operation, set the cache, and return
304441
self._cms_main_job = self.service.search(query, exec_mode="blocking") # type: ignore
442+
305443
return self._cms_main_job
306444

307445
def get_num_cms_events(self, use_cache: bool = False) -> int:
@@ -375,8 +513,13 @@ def validate_content_against_cms(self) -> None:
375513
# Increment the offset for each result
376514
offset += 1
377515

516+
if self.kvstore_content_versioning:
517+
cms_event = CMSEvent(content=cms_event["content"])
518+
elif self.indexbased_content_versioning:
519+
cms_event = CMSEvent(content=cms_event["_raw"])
520+
378521
# Get the name of the search in the CMS event
379-
cms_entry_name = cms_event["action.correlationsearch.label"]
522+
cms_entry_name = cms_event.action_correlationsearch_label
380523
self.logger.info(
381524
f"[{self.infrastructure.instance_name}] {offset}: Matching cms_main entry "
382525
f"'{cms_entry_name}' against detections"
@@ -456,14 +599,14 @@ def validate_content_against_cms(self) -> None:
456599
)
457600

458601
def validate_detection_against_cms_event(
459-
self, cms_event: dict[str, Any], detection: Detection
602+
self, cms_event: CMSEvent, detection: Detection
460603
) -> Exception | None:
461604
"""
462605
Given an event from the cms_main index and the matched detection, compare fields and look
463606
for any inconsistencies
464607
465608
:param cms_event: The event from the cms_main index
466-
:type cms_event: dict[str, Any]
609+
:type cms_event: CMSEvent
467610
:param detection: The matched detection
468611
:type detection: :class:`contentctl.objects.detection.Detection`
469612
@@ -472,17 +615,18 @@ def validate_detection_against_cms_event(
472615
"""
473616
# TODO (PEX-509): validate additional fields between the cms_event and the detection
474617

475-
cms_uuid = uuid.UUID(cms_event["detection_id"])
618+
cms_uuid = uuid.UUID(cms_event.detection_id)
476619
rule_name_from_detection = detection.get_action_dot_correlationsearch_dot_label(
477620
self.global_config.app
478621
)
479622

623+
cms_entry_name = cms_event.action_correlationsearch_label
624+
480625
# Compare the correlation search label
481-
if cms_event["action.correlationsearch.label"] != rule_name_from_detection:
626+
if cms_entry_name != rule_name_from_detection:
482627
msg = (
483628
f"[{self.infrastructure.instance_name}][{detection.name}]: Correlation search "
484-
f"label in cms_event ('{cms_event['action.correlationsearch.label']}') does not "
485-
"match detection name"
629+
f"label in cms_event ('{cms_entry_name}') does not match detection name"
486630
)
487631
self.logger.error(msg)
488632
return Exception(msg)
@@ -494,12 +638,12 @@ def validate_detection_against_cms_event(
494638
)
495639
self.logger.error(msg)
496640
return Exception(msg)
497-
elif cms_event["version"] != f"{detection.version}.1":
641+
elif cms_event.version != f"{detection.version}.1":
498642
# Compare the versions (we append '.1' to the detection version to be in line w/ the
499643
# internal representation in ES)
500644
msg = (
501645
f"[{self.infrastructure.instance_name}] [{detection.name}]: Version in cms_event "
502-
f"('{cms_event['version']}') does not match version in detection "
646+
f"('{cms_event.version}') does not match version in detection "
503647
f"('{detection.version}.1')"
504648
)
505649
self.logger.error(msg)

0 commit comments

Comments
 (0)