Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Param cleanup, IDs fix, DRYer, Unit testing
  • Loading branch information
jshcodes committed Dec 26, 2020
commit 57feb89927ac8b610225f225b56208fe3625a906
74 changes: 32 additions & 42 deletions src/falconpy/prevention_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,37 +61,35 @@ def __call__(self, status_code, headers, body):

return self.result_obj

def queryCombinedPreventionPolicyMembers(self, parameters):
def queryCombinedPreventionPolicyMembers(self, parameters={}):
""" Search for members of a Prevention Policy in your environment by providing an FQL filter
and paging details. Returns a set of host details which match the filter criteria.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/queryCombinedPreventionPolicyMembers
FULL_URL = self.base_url+'/policy/combined/prevention-members/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def queryCombinedPreventionPolicies(self, parameters):
def queryCombinedPreventionPolicies(self, parameters={}):
""" Search for Prevention Policies in your environment by providing an FQL filter and
paging details. Returns a set of Prevention Policies which match the filter criteria.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/queryCombinedPreventionPolicies
FULL_URL = self.base_url+'/policy/combined/prevention/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -102,12 +100,11 @@ def performPreventionPoliciesAction(self, parameters, body):
HEADERS = self.headers
PARAMS = parameters
BODY = body
result = self.Result()
try:
response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -120,27 +117,25 @@ def setPreventionPoliciesPrecedence(self, body):
FULL_URL = self.base_url+'/policy/entities/prevention-precedence/v1'
HEADERS = self.headers
BODY = body
result = self.Result()
try:
response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def getPreventionPolicies(self, parameters):
def getPreventionPolicies(self, ids):
""" Retrieve a set of Prevention Policies by specifying their IDs. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/getPreventionPolicies
FULL_URL = self.base_url+'/policy/entities/prevention/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/policy/entities/prevention/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False)
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -150,27 +145,25 @@ def createPreventionPolicies(self, body):
FULL_URL = self.base_url+'/policy/entities/prevention/v1'
HEADERS = self.headers
BODY = body
result = self.Result()
try:
response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def deletePreventionPolicies(self, parameters):
def deletePreventionPolicies(self, ids):
""" Delete a set of Prevention Policies by specifying their IDs. """
# [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/deletePreventionPolicies
FULL_URL = self.base_url+'/policy/entities/prevention/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/policy/entities/prevention/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False)
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -180,45 +173,42 @@ def updatePreventionPolicies(self, body):
FULL_URL = self.base_url+'/policy/entities/prevention/v1'
HEADERS = self.headers
BODY = body
result = self.Result()
try:
response = requests.request("PATCH", FULL_URL, json=BODY, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def queryPreventionPolicyMembers(self, parameters):
def queryPreventionPolicyMembers(self, parameters={}):
""" Search for members of a Prevention Policy in your environment by providing an FQL filter
and paging details. Returns a set of Agent IDs which match the filter criteria.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/queryPreventionPolicyMembers
FULL_URL = self.base_url+'/policy/queries/prevention-members/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def queryPreventionPolicies(self, parameters):
def queryPreventionPolicies(self, parameters={}):
""" Search for Prevention Policies in your environment by providing an FQL filter
and paging details. Returns a set of Prevention Policy IDs which match the filter criteria.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/queryPreventionPolicies
FULL_URL = self.base_url+'/policy/queries/prevention/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned