Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Param cleanup, IDs fix, DRYer, Basic unit tests
  • Loading branch information
jshcodes committed Dec 26, 2020
commit 903b7d801046c1c54feff50c199d6657262438b7
101 changes: 46 additions & 55 deletions src/falconpy/real_time_response_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,18 @@ def __call__(self, status_code, headers, body):

return self.result_obj

def BatchAdminCmd(self, parameters, body):
def BatchAdminCmd(self, body, parameters={}):
""" Batch executes a RTR administrator command across the hosts mapped to the given batch ID. """
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/BatchAdminCmd
FULL_URL = self.base_url+'/real-time-response/combined/batch-admin-command/v1'
HEADERS = self.headers
DATA = body
PARAMS = parameters
result = self.Result()
try:
response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -83,12 +82,11 @@ def RTR_CheckAdminCommandStatus(self, parameters):
FULL_URL = self.base_url+'/real-time-response/entities/admin-command/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -98,27 +96,25 @@ def RTR_ExecuteAdminCommand(self, body):
FULL_URL = self.base_url+'/real-time-response/entities/admin-command/v1'
HEADERS = self.headers
DATA = body
result = self.Result()
try:
response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def RTR_GetPut_Files(self, parameters):
def RTR_GetPut_Files(self, ids):
""" Get put-files based on the ID's given. These are used for the RTR `put` command. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_GetPut_Files
FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False)
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -127,44 +123,42 @@ def RTR_CreatePut_Files(self, data, files):
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_CreatePut_Files
FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1'
HEADERS = self.headers
HEADERS['Content-Type'] = 'multipart/form-data'
DATA = data
FILES = files
result = self.Result()
try:
response = requests.request("POST", FULL_URL, data=DATA, files=FILES, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def RTR_DeletePut_Files(self, parameters):
def RTR_DeletePut_Files(self, ids):
""" Delete a put-file based on the ID given. Can only delete one file at a time. """
# [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_DeletePut_Files
FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False)
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def RTR_GetScripts(self, parameters):
def RTR_GetScripts(self, ids):
""" Get custom-scripts based on the ID's given. These are used for the RTR `runscript` command. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_GetScripts
FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False)
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -173,29 +167,28 @@ def RTR_CreateScripts(self, data, files):
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_CreateScripts
FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1'
HEADERS = self.headers
HEADERS['Content-Type'] = 'multipart/form-data'
DATA = data
FILES = files
result = self.Result()
try:
response = requests.request("POST", FULL_URL, data=DATA, files=FILES, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def RTR_DeleteScripts(self, parameters):
def RTR_DeleteScripts(self, ids):
""" Delete a custom-script based on the ID given. Can only delete one script at a time. """
# [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_DeleteScripts
FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False)
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -204,43 +197,41 @@ def RTR_UpdateScripts(self, data, files):
# [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_UpdateScripts
FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1'
HEADERS = self.headers
HEADERS['Content-Type'] = 'multipart/form-data'
DATA = data
FILES = files
result = self.Result()
try:
response = requests.request("PATCH", FULL_URL, data=DATA, files=FILES, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def RTR_ListPut_Files(self, parameters):
def RTR_ListPut_Files(self, parameters={}):
""" Get a list of put-file ID's that are available to the user for the `put` command. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_ListPut_Files
FULL_URL = self.base_url+'/real-time-response/queries/put-files/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def RTR_ListScripts(self, parameters):
def RTR_ListScripts(self, parameters={}):
""" Get a list of custom-script ID's that are available to the user for the `runscript` command. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_ListScripts
FULL_URL = self.base_url+'/real-time-response/queries/scripts/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned