From 25418326ffbfb740428686945b08613caca3f4b8 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Thu, 24 Dec 2020 18:11:16 -0500 Subject: [PATCH 01/20] Default parameter debugging. Ran the code through the DRYer. --- src/falconpy/cloud_connect_aws.py | 68 +++++++++++++------------------ 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/src/falconpy/cloud_connect_aws.py b/src/falconpy/cloud_connect_aws.py index c007a185..64c0cbfc 100644 --- a/src/falconpy/cloud_connect_aws.py +++ b/src/falconpy/cloud_connect_aws.py @@ -61,7 +61,7 @@ def __call__(self, status_code, headers, body): return self.result_obj - def QueryAWSAccounts(self, parameters): + def QueryAWSAccounts(self, parameters={}): """ Search for provisioned AWS Accounts by providing an FQL filter and paging details. Returns a set of AWS accounts which match the filter criteria. """ @@ -69,12 +69,11 @@ def QueryAWSAccounts(self, parameters): FULL_URL = self.base_url+'/cloud-connect-aws/combined/accounts/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -83,58 +82,52 @@ def GetAWSSettings(self): # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/cloud-connect-aws/GetAWSSettings FULL_URL = self.base_url+'/cloud-connect-aws/combined/settings/v1' HEADERS = self.headers - result = self.Result() try: response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetAWSAccounts(self, parameters, ids): + def GetAWSAccounts(self, ids): """ Retrieve a set of AWS Accounts by specifying their IDs.""" # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/cloud-connect-aws/GetAWSAccounts ID_LIST = str(ids).replace(",","&ids=") FULL_URL = self.base_url+'/cloud-connect-aws/entities/accounts/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def ProvisionAWSAccounts(self, parameters, body): + def ProvisionAWSAccounts(self, body, parameters={}): """ Provision AWS Accounts by specifying details about the accounts to provision. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/cloud-connect-aws/ProvisionAWSAccounts FULL_URL = self.base_url+'/cloud-connect-aws/entities/accounts/v1' PARAMS=parameters BODY=body - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=self.headers, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def DeleteAWSAccounts(self, parameters, ids): + def DeleteAWSAccounts(self, ids): """ Delete a set of AWS Accounts by specifying their IDs. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/cloud-connect-aws/DeleteAWSAccounts ID_LIST = str(ids).replace(",","&ids=") FULL_URL = self.base_url+'/cloud-connect-aws/entities/accounts/v1?ids={}'.format(ID_LIST) - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=self.headers, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=self.headers, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -143,12 +136,11 @@ def UpdateAWSAccounts(self, body): # [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/cloud-connect-aws/UpdateAWSAccounts FULL_URL = self.base_url+'/cloud-connect-aws/entities/accounts/v1' BODY=body - result = self.Result() try: response = requests.request("PATCH", FULL_URL, json=BODY, headers=self.headers, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -158,33 +150,30 @@ def CreateOrUpdateAWSSettings(self, body): FULL_URL = self.base_url+'/cloud-connect-aws/entities/settings/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def VerifyAWSAccountAccess(self, parameters, body, ids): + def VerifyAWSAccountAccess(self, ids, body={}): """ Performs an Access Verification check on the specified AWS Account IDs. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/cloud-connect-aws/VerifyAWSAccountAccess ID_LIST = str(ids).replace(",","&ids=") FULL_URL = self.base_url+'/cloud-connect-aws/entities/verify-account-access/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - BODY = body #payload does not appear to be required - result = self.Result() + BODY=body try: - response = requests.request("POST", FULL_URL, json=BODY, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryAWSAccountsForIDs(self, parameters): + def QueryAWSAccountsForIDs(self, parameters={}): """ Search for provisioned AWS Accounts by providing an FQL filter and paging details. Returns a set of AWS account IDs which match the filter criteria. """ @@ -192,11 +181,10 @@ def QueryAWSAccountsForIDs(self, parameters): FULL_URL = self.base_url+'/cloud-connect-aws/queries/accounts/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 1798e4ec24a240d678277b74fa40feb5ec868b6c Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Thu, 24 Dec 2020 23:36:56 -0500 Subject: [PATCH 02/20] Param defaults, fixes to GetDeviceDetails, DRYer --- src/falconpy/hosts.py | 44 ++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/falconpy/hosts.py b/src/falconpy/hosts.py index 57c8c220..6a06f4ce 100644 --- a/src/falconpy/hosts.py +++ b/src/falconpy/hosts.py @@ -68,58 +68,68 @@ def PerformActionV2(self, parameters, body): HEADERS = self.headers PARAMS = parameters BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetDeviceDetails(self, parameters): + def GetDeviceDetails(self, ids): """ Get details on one or more hosts by providing agent IDs (AID). You can get a host's agent IDs (AIDs) from the /devices/queries/devices/v1 endpoint, the Falcon console or the Streaming API. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/GetDeviceDetails - FULL_URL = self.base_url+'/devices/entities/devices/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/devices/entities/devices/v1?ids={}'.format(ID_LIST) + HEADERS = self.headers + try: + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) + except Exception as e: + returned = self.Result()(500, {}, str(e)) + + return returned + + def QueryHiddenDevices(self, parameters={}): + """ Perform the specified action on the Prevention Policies specified in the request. """ + # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/QueryHiddenDevices + FULL_URL = self.base_url+'/devices/queries/devices-hidden/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - - def QueryDevicesByFilterScroll(self, parameters): + + def QueryDevicesByFilterScroll(self, parameters={}): """ Perform the specified action on the Prevention Policies specified in the request. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/QueryDevicesByFilterScroll FULL_URL = self.base_url+'/devices/queries/devices-scroll/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryDevicesByFilter(self, parameters, body): + def QueryDevicesByFilter(self, parameters={}): """ Search for hosts in your environment by platform, hostname, IP, and other criteria. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/QueryDevicesByFilter FULL_URL = self.base_url+'/devices/queries/devices/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From c0a060b5f20a11ffcd8ba465843defb6ffe4b9ac Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Fri, 25 Dec 2020 00:13:28 -0500 Subject: [PATCH 03/20] Parameter defaults, DRYer, Unit testing --- src/falconpy/incidents.py | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/src/falconpy/incidents.py b/src/falconpy/incidents.py index e84eb5b7..dfafb0b2 100644 --- a/src/falconpy/incidents.py +++ b/src/falconpy/incidents.py @@ -61,18 +61,17 @@ def __call__(self, status_code, headers, body): return self.result_obj - def CrowdScore(self, parameters): + def CrowdScore(self, parameters={}): """ Query environment wide CrowdScore and return the entity data. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/incidents/CrowdScore FULL_URL = self.base_url+'/incidents/combined/crowdscores/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -82,12 +81,11 @@ def GetBehaviors(self, body): FULL_URL = self.base_url+'/incidents/entities/behaviors/GET/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -99,12 +97,11 @@ def PerformIncidentAction(self, body): FULL_URL = self.base_url+'/incidents/entities/incident-actions/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -114,41 +111,38 @@ def GetIncidents(self, body): FULL_URL = self.base_url+'/incidents/entities/incidents/GET/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryBehaviors(self, parameters): + def QueryBehaviors(self, parameters={}): """ Search for behaviors by providing an FQL filter, sorting, and paging details. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/incidents/QueryBehaviors FULL_URL = self.base_url+'/incidents/queries/behaviors/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryIncidents(self, parameters): + def QueryIncidents(self, parameters={}): """ Search for incidents by providing an FQL filter, sorting, and paging details. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/incidents/QueryIncidents FULL_URL = self.base_url+'/incidents/queries/incidents/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From db407db0129d78c4c2411fd2a759dc94d7a75bdd Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Fri, 25 Dec 2020 00:49:33 -0500 Subject: [PATCH 04/20] Parameter cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/spotlight_vulnerabilities.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/falconpy/spotlight_vulnerabilities.py b/src/falconpy/spotlight_vulnerabilities.py index a6888c6b..ca5a9102 100644 --- a/src/falconpy/spotlight_vulnerabilities.py +++ b/src/falconpy/spotlight_vulnerabilities.py @@ -61,18 +61,17 @@ def __call__(self, status_code, headers, body): return self.result_obj - def getVulnerabilities(self, parameters): + def getVulnerabilities(self, ids): """ Get details on vulnerabilities by providing one or more IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/spotlight-vulnerabilities/getVulnerabilities - FULL_URL = self.base_url+'/spotlight/entities/vulnerabilities/v2' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/spotlight/entities/vulnerabilities/v2?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -84,11 +83,10 @@ def queryVulnerabilities(self, parameters): FULL_URL = self.base_url+'/spotlight/queries/vulnerabilities/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From c8797c292f8a9142a986729fcd685658eeae7770 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Fri, 25 Dec 2020 01:25:25 -0500 Subject: [PATCH 05/20] Param, partition, content-type fixes. Unit tests. --- src/falconpy/event_streams.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/falconpy/event_streams.py b/src/falconpy/event_streams.py index f7239b88..01320f6e 100644 --- a/src/falconpy/event_streams.py +++ b/src/falconpy/event_streams.py @@ -61,19 +61,18 @@ def __call__(self, status_code, headers, body): return self.result_obj - def refreshActiveStreamSession(self, parameters): + def refreshActiveStreamSession(self, parameters, partition=0): """ Refresh an active event stream. Use the URL shown in a GET /sensors/entities/datafeed/v2 response. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/event-streams/refreshActiveStreamSession - FULL_URL = self.base_url+'/sensors/entities/datafeed-actions/v1/{}'.format(parameters['partition']) + FULL_URL = self.base_url+'/sensors/entities/datafeed-actions/v1/{}'.format(str(partition)) HEADERS = self.headers PARAMS = parameters - result = self.Result() try: - response = requests.request("POST", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json={}, params=PARAMS, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -83,12 +82,11 @@ def listAvailableStreamsOAuth2(self, parameters): FULL_URL = self.base_url+'/sensors/entities/datafeed/v2' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned \ No newline at end of file From 3cdc60989e733e1ba17013e56de698547b33a08f Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Fri, 25 Dec 2020 18:26:46 -0500 Subject: [PATCH 06/20] Parameter cleanup, DRYer --- src/falconpy/detects.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/src/falconpy/detects.py b/src/falconpy/detects.py index abdd0918..afa589d9 100644 --- a/src/falconpy/detects.py +++ b/src/falconpy/detects.py @@ -61,19 +61,17 @@ def __call__(self, status_code, headers, body): return self.result_obj - def GetAggregateDetects(self, body, parameters): + def GetAggregateDetects(self, body): """ Get detect aggregates as specified via json in request body. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/detects/GetAggregateDetects FULL_URL = self.base_url+'/detects/aggregates/detects/GET/v1' HEADERS = self.headers - PARAMS = parameters BODY = body - result = self.Result() try: - response = requests.request("POST", FULL_URL, json=BODY, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -83,42 +81,38 @@ def UpdateDetectsByIdsV2(self, body): FULL_URL = self.base_url+'/detects/entities/detects/v2' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("PATCH", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetDetectSummaries(self, body, parameters): + def GetDetectSummaries(self, body): """ View information about detections. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/detects/GetDetectSummaries FULL_URL = self.base_url+'/detects/entities/summaries/GET/v1' HEADERS = self.headers - PARAMS = parameters BODY = body - result = self.Result() try: - response = requests.request("POST", FULL_URL, json=BODY, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryDetects(self, parameters): + def QueryDetects(self, parameters={}): """ Search for detection IDs that match a given query. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/detects/QueryDetects FULL_URL = self.base_url+'/detects/queries/detects/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 8a91ae4d03e2d004ac80866bc3536bbc0eefef47 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Fri, 25 Dec 2020 19:43:51 -0500 Subject: [PATCH 07/20] Param cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/device_control_policies.py | 74 +++++++++++-------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/src/falconpy/device_control_policies.py b/src/falconpy/device_control_policies.py index 839a0ad7..9fec2b5e 100644 --- a/src/falconpy/device_control_policies.py +++ b/src/falconpy/device_control_policies.py @@ -61,7 +61,7 @@ def __call__(self, status_code, headers, body): return self.result_obj - def queryCombinedDeviceControlPolicyMembers(self, parameters): + def queryCombinedDeviceControlPolicyMembers(self, parameters={}): """ Search for members of a Device Control Policy in your environment by providing an FQL filter and paging details. Returns a set of host details which match the filter criteria. """ @@ -69,16 +69,15 @@ def queryCombinedDeviceControlPolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/combined/device-control-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedDeviceControlPolicies(self, parameters): + def queryCombinedDeviceControlPolicies(self, parameters={}): """ Search for Device Control Policies in your environment by providing an FQL filter and paging details. Returns a set of Device Control Policies which match the filter criteria. """ @@ -86,12 +85,11 @@ def queryCombinedDeviceControlPolicies(self, parameters): FULL_URL = self.base_url+'/policy/combined/device-control/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -104,12 +102,11 @@ def performDeviceControlPoliciesAction(self, parameters, body): HEADERS = self.headers BODY = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -122,27 +119,25 @@ def setDeviceControlPoliciesPrecedence(self, body): FULL_URL = self.base_url+'/policy/entities/device-control-precedence/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def getDeviceControlPolicies(self, parameters): + def getDeviceControlPolicies(self, ids): """ Retrieve a set of Device Control Policies by specifying their IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/device-control-policies/getDeviceControlPolicies - FULL_URL = self.base_url+'/policy/entities/device-control/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/device-control/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -152,27 +147,25 @@ def createDeviceControlPolicies(self, body): FULL_URL = self.base_url+'/policy/entities/device-control/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def deleteDeviceControlPolicies(self, parameters): + def deleteDeviceControlPolicies(self, ids): """ Delete a set of Device Control Policies by specifying their IDs. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/device-control-policies/createDeviceControlPolicies - FULL_URL = self.base_url+'/policy/entities/device-control/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/device-control/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -182,16 +175,15 @@ def updateDeviceControlPolicies(self, body): FULL_URL = self.base_url+'/policy/entities/device-control/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("PATCH", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryDeviceControlPolicyMembers(self, parameters): + def queryDeviceControlPolicyMembers(self, parameters={}): """ Search for members of a Device Control Policy in your environment by providing an FQL filter and paging details. Returns a set of Agent IDs which match the filter criteria. """ @@ -199,16 +191,15 @@ def queryDeviceControlPolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/queries/device-control-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryDeviceControlPolicies(self, parameters): + def queryDeviceControlPolicies(self, parameters={}): """ Search for Device Control Policies in your environment by providing an FQL filter and paging details. Returns a set of Device Control Policy IDs which match the filter criteria. """ @@ -216,11 +207,10 @@ def queryDeviceControlPolicies(self, parameters): FULL_URL = self.base_url+'/policy/queries/device-control/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 8bb115d5c2afb456b999143ad0da3a9ac278db60 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Fri, 25 Dec 2020 22:35:31 -0500 Subject: [PATCH 08/20] Param cleanup, IDs fix, DRYer, More to do here --- src/falconpy/falconx_sandbox.py | 66 ++++++++++++++++----------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/src/falconpy/falconx_sandbox.py b/src/falconpy/falconx_sandbox.py index 41b9a6d9..a901df73 100644 --- a/src/falconpy/falconx_sandbox.py +++ b/src/falconpy/falconx_sandbox.py @@ -61,49 +61,49 @@ def __call__(self, status_code, headers, body): return self.result_obj - def GetArtifacts(self, parameters):#This function will probably need to not do a result.json() if used... See Swagger + def GetArtifacts(self, parameters): """ Download IOC packs, PCAP files, and other analysis artifacts. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/GetArtifacts FULL_URL = self.base_url+'/falconx/entities/artifacts/v1' HEADERS = self.headers - HEADERS['Accept-Encoding'] = 'gzip' + HEADERS['Accept-Encoding'] = 'gzip' #Force gzip compression PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + if response.headers.get('content-type') == "application/json": + returned = self.Result()(response.status_code, response.headers, response.json()) + else: + returned = response.content except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetSummaryReports(self, parameters): + def GetSummaryReports(self, ids): """ Get a short summary version of a sandbox report. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/GetSummaryReports - FULL_URL = self.base_url+'/falconx/entities/report-summaries/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/falconx/entities/report-summaries/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetSubmissions(self, parameters): + def GetSubmissions(self, ids): """ Check the status of a sandbox analysis. Time required for analysis varies but is usually less than 15 minutes. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/GetSubmissions - FULL_URL = self.base_url+'/falconx/entities/submissions/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/falconx/entities/submissions/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -113,31 +113,29 @@ def Submit(self, body): FULL_URL = self.base_url+'/falconx/entities/submissions/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryReports(self, parameters): + def QueryReports(self, parameters={}): """ Find sandbox reports by providing an FQL filter and paging details. Returns a set of report IDs that match your criteria. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/QueryReports FULL_URL = self.base_url+'/falconx/queries/reports/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QuerySubmissions(self, parameters): + def QuerySubmissions(self, parameters={}): """ Find submission IDs for uploaded files by providing an FQL filter and paging details. Returns a set of submission IDs that match your criteria. """ @@ -145,12 +143,11 @@ def QuerySubmissions(self, parameters): FULL_URL = self.base_url+'/falconx/queries/submissions/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -159,13 +156,16 @@ def UploadSampleV2(self, parameters, body): # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/UploadSampleV2 FULL_URL = self.base_url+'/samples/entities/samples/v2' HEADERS = self.headers + HEADERS['Content-Type'] = 'application/octet-stream' BODY = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, data=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned + + +# TODO: Missing methods - GetReports, DeleteReport, GetSampleV2, DeleteSampleV2, QuerySampleV1 \ No newline at end of file From f954ffc06e007ea694e259d681900a34993f5a5e Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 00:33:24 -0500 Subject: [PATCH 09/20] Param cleanup, IDs fixes, DRYer --- src/falconpy/firewall_management.py | 179 +++++++++++++--------------- 1 file changed, 83 insertions(+), 96 deletions(-) diff --git a/src/falconpy/firewall_management.py b/src/falconpy/firewall_management.py index ceee52ad..6947fbb8 100644 --- a/src/falconpy/firewall_management.py +++ b/src/falconpy/firewall_management.py @@ -67,12 +67,12 @@ def aggregate_events(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/events/GET/v1' HEADERS = self.headers BODY = body - result = self.Result() + try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -82,12 +82,12 @@ def aggregate_policy_rules(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/policy-rules/GET/v1' HEADERS = self.headers BODY = body - result = self.Result() + try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -97,12 +97,12 @@ def aggregate_rule_groups(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/rule-groups/GET/v1' HEADERS = self.headers BODY = body - result = self.Result() + try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -112,75 +112,72 @@ def aggregate_rules(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/rules/GET/v1' HEADERS = self.headers BODY = body - result = self.Result() + try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def get_events(self, parameters): + def get_events(self, ids): """ Get events entities by ID and optionally version. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/get_events - FULL_URL = self.base_url+'/fwmgr/entities/events/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/fwmgr/entities/events/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def get_firewall_fields(self, parameters): + def get_firewall_fields(self, ids): """ Get the firewall field specifications by ID. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/get_firewall_fields - FULL_URL = self.base_url+'/fwmgr/entities/firewall-fields/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/fwmgr/entities/firewall-fields/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def get_platforms(self, parameters): + def get_platforms(self, ids): """ Get platforms by ID, e.g., windows or mac or droid. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/get_platforms - FULL_URL = self.base_url+'/fwmgr/entities/platforms/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/fwmgr/entities/platforms/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def get_policy_containers(self, parameters): + def get_policy_containers(self, ids): """ Get policy container entities by policy ID. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/get_policy_containers - FULL_URL = self.base_url+'/fwmgr/entities/policies/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/fwmgr/entities/policies/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned + #TODO: Update dynamic documentation to handle the cs_username parameter def update_policy_container(self, body, cs_username): """ Update an identified policy container. """ # [PUT] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/update_policy_container @@ -188,180 +185,170 @@ def update_policy_container(self, body, cs_username): HEADERS = self.headers HEADERS['X-CS-USERNAME'] = cs_username BODY = body - result = self.Result() try: response = requests.request("PUT", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def get_rule_groups(self, parameters): + def get_rule_groups(self, ids): """ Get rule group entities by ID. These groups do not contain their rule entites, just the rule IDs in precedence order. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/get_rule_groups - FULL_URL = self.base_url+'/fwmgr/entities/rule-groups/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/fwmgr/entities/rule-groups/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def create_rule_group(self, parameters, body, cs_username): + def create_rule_group(self, body, cs_username, parameters={}): """ Create new rule group on a platform for a customer with a name and description, and return the ID. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/create_rule_group FULL_URL = self.base_url+'/fwmgr/entities/rule-groups/v1' HEADERS = self.headers HEADERS['X-CS-USERNAME'] = cs_username PARAMS = parameters - BODY = body - result = self.Result() + BODY = body try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def delete_rule_groups(self, parameters, cs_username): + def delete_rule_groups(self, ids, cs_username, parameters={}): """ Delete rule group entities by ID. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/QueryReports - FULL_URL = self.base_url+'/fwmgr/entities/rule-groups/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/fwmgr/entities/rule-groups/v1?ids={}'.format(ID_LIST) HEADERS = self.headers HEADERS['X-CS-USERNAME'] = cs_username PARAMS = parameters - result = self.Result() try: response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def update_rule_group(self, parameters, cs_username): + def update_rule_group(self, body, cs_username, parameters={}): """ Update name, description, or enabled status of a rule group, or create, edit, delete, or reorder rules. """ # [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/update_rule_group FULL_URL = self.base_url+'/fwmgr/entities/rule-groups/v1' HEADERS = self.headers HEADERS['X-CS-USERNAME'] = cs_username PARAMS = parameters - result = self.Result() + BODY = body try: - response = requests.request("PTACH", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("PTACH", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def get_rules(self, parameters): + def get_rules(self, ids): """ Get rule entities by ID (64-bit unsigned int as decimal string) or Family ID (32-character hexadecimal string). """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/get_rules - FULL_URL = self.base_url+'/fwmgr/entities/rules/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/fwmgr/entities/rules/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def query_events(self, parameters): + def query_events(self, parameters={}): """ Find all event IDs matching the query with filter. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/query_events FULL_URL = self.base_url+'/fwmgr/queries/events/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def query_firewall_fields(self, parameters): + def query_firewall_fields(self, parameters={}): """ Get the firewall field specification IDs for the provided platform. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/query_firewall_fields FULL_URL = self.base_url+'/fwmgr/queries/firewall-fields/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def query_platforms(self, parameters): + def query_platforms(self, parameters={}): """ Get the list of platform names. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/query_platforms FULL_URL = self.base_url+'/fwmgr/queries/platforms/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def query_policy_rules(self, parameters): + def query_policy_rules(self, parameters={}): """ Find all firewall rule IDs matching the query with filter, and return them in precedence order. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/query_policy_rules FULL_URL = self.base_url+'/fwmgr/queries/policy-rules/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def query_rule_groups(self, parameters): + def query_rule_groups(self, parameters={}): """ Find all rule group IDs matching the query with filter. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/query_rule_groups FULL_URL = self.base_url+'/fwmgr/queries/rule-groups/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def query_rules(self, parameters): + def query_rules(self, parameters={}): """ Find all rule IDs matching the query with filter. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-management/query_rule_groups FULL_URL = self.base_url+'/fwmgr/queries/rules/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From df1e1a311e0213188e429fad42fb0101c9cd0d87 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 00:35:01 -0500 Subject: [PATCH 10/20] Param cleanup, IDs fixes, DRYer --- src/falconpy/firewall_management.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/falconpy/firewall_management.py b/src/falconpy/firewall_management.py index 6947fbb8..7add3e2f 100644 --- a/src/falconpy/firewall_management.py +++ b/src/falconpy/firewall_management.py @@ -67,7 +67,6 @@ def aggregate_events(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/events/GET/v1' HEADERS = self.headers BODY = body - try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) returned = self.Result()()(response.status_code, response.headers, response.json()) @@ -82,7 +81,6 @@ def aggregate_policy_rules(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/policy-rules/GET/v1' HEADERS = self.headers BODY = body - try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) returned = self.Result()(response.status_code, response.headers, response.json()) @@ -97,7 +95,6 @@ def aggregate_rule_groups(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/rule-groups/GET/v1' HEADERS = self.headers BODY = body - try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) returned = self.Result()(response.status_code, response.headers, response.json()) @@ -112,7 +109,6 @@ def aggregate_rules(self, body): FULL_URL = self.base_url+'/fwmgr/aggregates/rules/GET/v1' HEADERS = self.headers BODY = body - try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) returned = self.Result()(response.status_code, response.headers, response.json()) From 295198933c5b06e306805aff7ed5a4949f062095 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 00:44:40 -0500 Subject: [PATCH 11/20] Param cleanup, IDs fix, DRYer --- src/falconpy/firewall_policies.py | 76 ++++++++++++++----------------- 1 file changed, 33 insertions(+), 43 deletions(-) diff --git a/src/falconpy/firewall_policies.py b/src/falconpy/firewall_policies.py index 74417d2c..0d697d45 100644 --- a/src/falconpy/firewall_policies.py +++ b/src/falconpy/firewall_policies.py @@ -61,7 +61,7 @@ def __call__(self, status_code, headers, body): return self.result_obj - def queryCombinedFirewallPolicyMembers(self, parameters): + def queryCombinedFirewallPolicyMembers(self, parameters={}): """ Search for members of a Firewall Policy in your environment by providing an FQL filter and paging details. Returns a set of host details which match the filter criteria. """ @@ -69,16 +69,15 @@ def queryCombinedFirewallPolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/combined/firewall-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedFirewallPolicies(self, parameters): + def queryCombinedFirewallPolicies(self, parameters={}): """ Search for Firewall Policies in your environment by providing an FQL filter and paging details. Returns a set of Firewall Policies which match the filter criteria. """ @@ -86,12 +85,11 @@ def queryCombinedFirewallPolicies(self, parameters): FULL_URL = self.base_url+'/policy/combined/firewall/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -102,12 +100,11 @@ def performFirewallPoliciesAction(self, parameters, body): HEADERS = self.headers BODY = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -120,58 +117,54 @@ def setFirewallPoliciesPrecedence(self, body): FULL_URL = self.base_url+'/policy/entities/firewall-precedence/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def getFirewallPolicies(self, parameters): + def getFirewallPolicies(self, ids): """ Retrieve a set of Firewall Policies by specifying their IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-policies/getFirewallPolicies - FULL_URL = self.base_url+'/policy/entities/firewall/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/firewall/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def createFirewallPolicies(self, parameters, body): + def createFirewallPolicies(self, body, parameters={}): """ Create Firewall Policies by specifying details about the policy to create. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-policies/createFirewallPolicies FULL_URL = self.base_url+'/policy/entities/firewall/v1' HEADERS = self.headers PARAMS = parameters BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def deleteFirewallPolicies(self, parameters): + def deleteFirewallPolicies(self, ids): """ Delete a set of Firewall Policies by specifying their IDs. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/firewall-policies/deleteFirewallPolicies - FULL_URL = self.base_url+'/policy/entities/firewall/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/firewall/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -181,16 +174,15 @@ def updateFirewallPolicies(self, body): FULL_URL = self.base_url+'/policy/entities/firewall/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("PATCH", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryFirewallPolicyMembers(self, parameters): + def queryFirewallPolicyMembers(self, parameters={}): """ Search for members of a Firewall Policy in your environment by providing an FQL filter and paging details. Returns a set of Agent IDs which match the filter criteria. """ @@ -198,16 +190,15 @@ def queryFirewallPolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/queries/firewall-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryFirewallPolicies(self, parameters): + def queryFirewallPolicies(self, parameters={}): """ Search for Firewall Policies in your environment by providing an FQL filter and paging details. Returns a set of Firewall Policy IDs which match the filter criteria. """ @@ -215,11 +206,10 @@ def queryFirewallPolicies(self, parameters): FULL_URL = self.base_url+'/policy/queries/firewall/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 65c5fe822756f8afae4bb8d38522ad922826eaa4 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 01:15:15 -0500 Subject: [PATCH 12/20] Param cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/host_group.py | 69 +++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 39 deletions(-) diff --git a/src/falconpy/host_group.py b/src/falconpy/host_group.py index bfa5187e..01c87588 100644 --- a/src/falconpy/host_group.py +++ b/src/falconpy/host_group.py @@ -61,7 +61,7 @@ def __call__(self, status_code, headers, body): return self.result_obj - def queryCombinedGroupMembers(self, parameters): + def queryCombinedGroupMembers(self, parameters={}): """ Search for members of a Host Group in your environment by providing an FQL filter and paging details. Returns a set of host details which match the filter criteria. """ @@ -69,16 +69,15 @@ def queryCombinedGroupMembers(self, parameters): FULL_URL = self.base_url+'/devices/combined/host-group-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedHostGroups(self, parameters): + def queryCombinedHostGroups(self, parameters={}): """ Search for Host Groups in your environment by providing an FQL filter and paging details. Returns a set of Host Groups which match the filter criteria. """ @@ -86,12 +85,11 @@ def queryCombinedHostGroups(self, parameters): FULL_URL = self.base_url+'/devices/combined/host-groups/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -102,27 +100,25 @@ def performGroupAction(self, parameters, body): HEADERS = self.headers PARAMS = parameters BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def getHostGroups(self, parameters): + def getHostGroups(self, ids): """ Retrieve a set of Host Groups by specifying their IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/host-group/getHostGroups - FULL_URL = self.base_url+'/devices/entities/host-groups/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/devices/entities/host-groups/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -132,27 +128,25 @@ def createHostGroups(self, body): FULL_URL = self.base_url+'/devices/entities/host-groups/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def deleteHostGroups(self, parameters): + def deleteHostGroups(self, ids): """ Delete a set of Host Groups by specifying their IDs. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/host-group/deleteHostGroups - FULL_URL = self.base_url+'/devices/entities/host-groups/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/devices/entities/host-groups/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned def updateHostGroups(self, body): @@ -161,16 +155,15 @@ def updateHostGroups(self, body): FULL_URL = self.base_url+'/devices/entities/host-groups/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("PATCH", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryGroupMembers(self, parameters): + def queryGroupMembers(self, parameters={}): """ Search for members of a Host Group in your environment by providing an FQL filter and paging details. Returns a set of Agent IDs which match the filter criteria. """ @@ -178,16 +171,15 @@ def queryGroupMembers(self, parameters): FULL_URL = self.base_url+'/devices/queries/host-group-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryHostGroups(self, parameters): + def queryHostGroups(self, parameters={}): """ Search for Host Groups in your environment by providing an FQL filter and paging details. Returns a set of Host Group IDs which match the filter criteria. """ @@ -195,11 +187,10 @@ def queryHostGroups(self, parameters): FULL_URL = self.base_url+'/devices/queries/host-groups/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned \ No newline at end of file From 5a22b449afbb77626d9004c1263ed42fc8fb40e3 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 02:50:25 -0500 Subject: [PATCH 13/20] Param cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/intel.py | 108 ++++++++++++++++++++---------------------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/src/falconpy/intel.py b/src/falconpy/intel.py index 8eb14b17..4e39b9b8 100644 --- a/src/falconpy/intel.py +++ b/src/falconpy/intel.py @@ -61,63 +61,60 @@ def __call__(self, status_code, headers, body): return self.result_obj - def QueryIntelActorEntities(self, parameters): + def QueryIntelActorEntities(self, parameters={}): """ Get info about actors that match provided FQL filters. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelActorEntities FULL_URL = self.base_url+'/intel/combined/actors/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryIntelIndicatorEntities(self, parameters): + def QueryIntelIndicatorEntities(self, parameters={}): """ Get info about indicators that match provided FQL filters. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelIndicatorEntities FULL_URL = self.base_url+'/intel/combined/indicators/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryIntelReportEntities(self, parameters): + def QueryIntelReportEntities(self, parameters={}): """ Get info about reports that match provided FQL filters. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelReportEntities FULL_URL = self.base_url+'/intel/combined/reports/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetIntelActorEntities(self, parameters): + def GetIntelActorEntities(self, ids, parameters={}): """ Retrieve specific actors using their actor IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelActorEntities - FULL_URL = self.base_url+'/intel/entities/actors/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/intel/entities/actors/v1?ids={}'.format(ID_LIST) HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -127,42 +124,43 @@ def GetIntelIndicatorEntities(self, body): FULL_URL = self.base_url+'/intel/entities/indicators/GET/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetIntelReportPDF(self, parameters):#Probably need to not do result.json() here. Check the swagger + def GetIntelReportPDF(self, parameters): """ Return a Report PDF attachment. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelReportPDF FULL_URL = self.base_url+'/intel/entities/report-files/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + if response.headers.get('content-type') == "application/json": + returned = self.Result()(response.status_code, response.headers, response.json()) + else: + returned = response.content except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetIntelReportEntities(self, parameters): + def GetIntelReportEntities(self, ids, parameters={}): """ Retrieve specific reports using their report IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelReportEntities - FULL_URL = self.base_url+'/intel/entities/reports/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/intel/entities/reports/v1?ids={}'.format(ID_LIST) HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -172,12 +170,14 @@ def GetIntelRuleFile(self, parameters):#There is an optional header you can see FULL_URL = self.base_url+'/intel/entities/rules-files/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + if response.headers.get('content-type') == "application/json": + returned = self.Result()(response.status_code, response.headers, response.json()) + else: + returned = response.content except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -187,72 +187,67 @@ def GetLatestIntelRuleFile(self, parameters):#There is an optional header you ca FULL_URL = self.base_url+'/intel/entities/rules-latest-files/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def GetIntelRuleEntities(self, parameters): + def GetIntelRuleEntities(self, ids): """ Retrieve details for rule sets for the specified ids. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelRuleEntities - FULL_URL = self.base_url+'/intel/entities/rules/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/intel/entities/rules/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryIntelActorIds(self, parameters): + def QueryIntelActorIds(self, parameters={}): """ Get actor IDs that match provided FQL filters. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelActorIds FULL_URL = self.base_url+'/intel/queries/actors/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryIntelIndicatorIds(self, parameters): + def QueryIntelIndicatorIds(self, parameters={}): """ Get indicators IDs that match provided FQL filters. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelIndicatorIds FULL_URL = self.base_url+'/intel/queries/indicators/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryIntelReportIds(self, parameters): + def QueryIntelReportIds(self, parameters={}): """ Get report IDs that match provided FQL filters. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelReportIds FULL_URL = self.base_url+'/intel/queries/reports/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -262,11 +257,10 @@ def QueryIntelRuleIds(self, parameters): FULL_URL = self.base_url+'/intel/queries/rules/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 1d3117ab2a0b5103df6bfedd4b3eb10656822514 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 13:00:43 -0500 Subject: [PATCH 14/20] Param cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/iocs.py | 55 ++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/src/falconpy/iocs.py b/src/falconpy/iocs.py index bf5f1c68..b5ce81c6 100644 --- a/src/falconpy/iocs.py +++ b/src/falconpy/iocs.py @@ -67,12 +67,11 @@ def DevicesCount(self, parameters): FULL_URL = self.base_url+'/indicators/aggregates/devices-count/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -82,12 +81,11 @@ def GetIOC(self, parameters): FULL_URL = self.base_url+'/indicators/entities/iocs/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -97,12 +95,11 @@ def CreateIOC(self, body): FULL_URL = self.base_url+'/indicators/entities/iocs/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -112,12 +109,11 @@ def DeleteIOC(self, parameters): FULL_URL = self.base_url+'/indicators/entities/iocs/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -128,12 +124,11 @@ def UpdateIOC(self, parameters, body): HEADERS = self.headers PARAMS = parameters BODY = body - result = self.Result() try: response = requests.request("PATCH", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -143,27 +138,25 @@ def DevicesRanOn(self, parameters): FULL_URL = self.base_url+'/indicators/queries/devices/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def QueryIOCs(self, parameters): + def QueryIOCs(self, parameters={}): """ Search the custom IOCs in your customer account. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/iocs/QueryIOCs FULL_URL = self.base_url+'/indicators/queries/iocs/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -173,26 +166,24 @@ def ProcessesRanOn(self, parameters): FULL_URL = self.base_url+'/indicators/queries/processes/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def entities_processes(self, parameters): + def entities_processes(self, ids): """ For the provided ProcessID retrieve the process details. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/iocs/entities_processes - FULL_URL = self.base_url+'/processes/entities/processes/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/processes/entities/processes/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 57feb89927ac8b610225f225b56208fe3625a906 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 14:06:01 -0500 Subject: [PATCH 15/20] Param cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/prevention_policy.py | 74 +++++++++++++------------------ 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/src/falconpy/prevention_policy.py b/src/falconpy/prevention_policy.py index 400c39ea..5a78d308 100644 --- a/src/falconpy/prevention_policy.py +++ b/src/falconpy/prevention_policy.py @@ -61,7 +61,7 @@ def __call__(self, status_code, headers, body): return self.result_obj - def queryCombinedPreventionPolicyMembers(self, parameters): + def queryCombinedPreventionPolicyMembers(self, parameters={}): """ Search for members of a Prevention Policy in your environment by providing an FQL filter and paging details. Returns a set of host details which match the filter criteria. """ @@ -69,16 +69,15 @@ def queryCombinedPreventionPolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/combined/prevention-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedPreventionPolicies(self, parameters): + def queryCombinedPreventionPolicies(self, parameters={}): """ Search for Prevention Policies in your environment by providing an FQL filter and paging details. Returns a set of Prevention Policies which match the filter criteria. """ @@ -86,12 +85,11 @@ def queryCombinedPreventionPolicies(self, parameters): FULL_URL = self.base_url+'/policy/combined/prevention/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -102,12 +100,11 @@ def performPreventionPoliciesAction(self, parameters, body): HEADERS = self.headers PARAMS = parameters BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -120,27 +117,25 @@ def setPreventionPoliciesPrecedence(self, body): FULL_URL = self.base_url+'/policy/entities/prevention-precedence/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def getPreventionPolicies(self, parameters): + def getPreventionPolicies(self, ids): """ Retrieve a set of Prevention Policies by specifying their IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/getPreventionPolicies - FULL_URL = self.base_url+'/policy/entities/prevention/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/prevention/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -150,27 +145,25 @@ def createPreventionPolicies(self, body): FULL_URL = self.base_url+'/policy/entities/prevention/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def deletePreventionPolicies(self, parameters): + def deletePreventionPolicies(self, ids): """ Delete a set of Prevention Policies by specifying their IDs. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/prevention-policies/deletePreventionPolicies - FULL_URL = self.base_url+'/policy/entities/prevention/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/prevention/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -180,16 +173,15 @@ def updatePreventionPolicies(self, body): FULL_URL = self.base_url+'/policy/entities/prevention/v1' HEADERS = self.headers BODY = body - result = self.Result() try: response = requests.request("PATCH", FULL_URL, json=BODY, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryPreventionPolicyMembers(self, parameters): + def queryPreventionPolicyMembers(self, parameters={}): """ Search for members of a Prevention Policy in your environment by providing an FQL filter and paging details. Returns a set of Agent IDs which match the filter criteria. """ @@ -197,16 +189,15 @@ def queryPreventionPolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/queries/prevention-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryPreventionPolicies(self, parameters): + def queryPreventionPolicies(self, parameters={}): """ Search for Prevention Policies in your environment by providing an FQL filter and paging details. Returns a set of Prevention Policy IDs which match the filter criteria. """ @@ -214,11 +205,10 @@ def queryPreventionPolicies(self, parameters): FULL_URL = self.base_url+'/policy/queries/prevention/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 9f4758134411972ddeb42e7ab0adb5e2dc24ae28 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 14:57:20 -0500 Subject: [PATCH 16/20] Param cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/sensor_update_policy.py | 146 ++++++++++++--------------- 1 file changed, 62 insertions(+), 84 deletions(-) diff --git a/src/falconpy/sensor_update_policy.py b/src/falconpy/sensor_update_policy.py index b968714a..ffae87fb 100644 --- a/src/falconpy/sensor_update_policy.py +++ b/src/falconpy/sensor_update_policy.py @@ -61,40 +61,37 @@ def __call__(self, status_code, headers, body): return self.result_obj - def revealUninstallToken(self, parameters, body): + def revealUninstallToken(self, body): """ Reveals an uninstall token for a specific device. To retrieve the bulk maintenance token pass the value 'MAINTENANCE' as the value for 'device_id'. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/revealUninstallToken FULL_URL = self.base_url+'/policy/combined/reveal-uninstall-token/v1' HEADERS = self.headers DATA = body - PARAMS = parameters - result = self.Result() try: - response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedSensorUpdateBuilds(self, parameters): + def queryCombinedSensorUpdateBuilds(self, parameters={}): """ Retrieve available builds for use with Sensor Update Policies. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/queryCombinedSensorUpdateBuilds FULL_URL = self.base_url+'/policy/combined/sensor-update-builds/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedSensorUpdatePolicyMembers(self, parameters): + def queryCombinedSensorUpdatePolicyMembers(self, parameters={}): """ Search for members of a Sensor Update Policy in your environment by providing an FQL filter and paging details. Returns a set of host details which match the filter criteria. """ @@ -102,16 +99,15 @@ def queryCombinedSensorUpdatePolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/combined/sensor-update-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedSensorUpdatePolicies(self, parameters): + def queryCombinedSensorUpdatePolicies(self, parameters={}): """ Search for Sensor Update Policies in your environment by providing an FQL filter and paging details. Returns a set of Sensor Update Policies which match the filter criteria. """ @@ -119,16 +115,15 @@ def queryCombinedSensorUpdatePolicies(self, parameters): FULL_URL = self.base_url+'/policy/combined/sensor-update/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def queryCombinedSensorUpdatePoliciesV2(self, parameters): + def queryCombinedSensorUpdatePoliciesV2(self, parameters={}): """ Search for Sensor Update Policies with additional support for uninstall protection in your environment by providing an FQL filter and paging details. Returns a set of Sensor Update Policies which match the filter criteria. """ @@ -136,12 +131,11 @@ def queryCombinedSensorUpdatePoliciesV2(self, parameters): FULL_URL = self.base_url+'/policy/combined/sensor-update/v2' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -152,16 +146,15 @@ def performSensorUpdatePoliciesAction(self, parameters, body): HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def setSensorUpdatePoliciesPrecedence(self, parameters, body): + def setSensorUpdatePoliciesPrecedence(self, body): """ Sets the precedence of Sensor Update Policies based on the order of IDs specified in the request. The first ID specified will have the highest precedence and the last ID specified will have the lowest. You must specify all non-Default Policies for a platform when updating precedence. @@ -170,126 +163,113 @@ def setSensorUpdatePoliciesPrecedence(self, parameters, body): FULL_URL = self.base_url+'/policy/entities/sensor-update-precedence/v1' HEADERS = self.headers DATA = body - PARAMS = parameters - result = self.Result() try: - response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def getSensorUpdatePolicies(self, parameters): + def getSensorUpdatePolicies(self, ids): """ Retrieve a set of Sensor Update Policies by specifying their IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/getSensorUpdatePolicies - FULL_URL = self.base_url+'/policy/entities/sensor-update/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/sensor-update/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def createSensorUpdatePolicies(self, parameters, body): + def createSensorUpdatePolicies(self, body): """ Create Sensor Update Policies by specifying details about the policy to create. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/createSensorUpdatePolicies FULL_URL = self.base_url+'/policy/entities/sensor-update/v1' HEADERS = self.headers DATA = body - PARAMS = parameters - result = self.Result() try: - response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def deleteSensorUpdatePolicies(self, parameters): + def deleteSensorUpdatePolicies(self, ids): """ Delete a set of Sensor Update Policies by specifying their IDs. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/deleteSensorUpdatePolicies - FULL_URL = self.base_url+'/policy/entities/sensor-update/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/sensor-update/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def updateSensorUpdatePolicies(self, parameters, body): + def updateSensorUpdatePolicies(self, body): """ Update Sensor Update Policies by specifying the ID of the policy and details to update. """ # [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/updateSensorUpdatePolicies FULL_URL = self.base_url+'/policy/entities/sensor-update/v1' HEADERS = self.headers DATA = body - PARAMS = parameters - result = self.Result() try: - response = requests.request("PATCH", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("PATCH", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def getSensorUpdatePoliciesV2(self, parameters): + def getSensorUpdatePoliciesV2(self, ids): """ Retrieve a set of Sensor Update Policies with additional support for uninstall protection by specifying their IDs. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/getSensorUpdatePoliciesV2 - FULL_URL = self.base_url+'/policy/entities/sensor-update/v2' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/policy/entities/sensor-update/v2?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def createSensorUpdatePoliciesV2(self, parameters, body): + def createSensorUpdatePoliciesV2(self, body): """ Create Sensor Update Policies by specifying details about the policy to create with additional support for uninstall protection. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/createSensorUpdatePoliciesV2 FULL_URL = self.base_url+'/policy/entities/sensor-update/v2' HEADERS = self.headers DATA = body - PARAMS = parameters - result = self.Result() try: - response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def updateSensorUpdatePoliciesV2(self, parameters, body): + def updateSensorUpdatePoliciesV2(self, body): """ Update Sensor Update Policies by specifying the ID of the policy and details to update with additional support for uninstall protection. """ # [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/sensor-update-policies/updateSensorUpdatePoliciesV2 FULL_URL = self.base_url+'/users/entities/users/v1' HEADERS = self.headers DATA = body - PARAMS = parameters - result = self.Result() try: - response = requests.request("PATCH", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("PATCH", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def querySensorUpdatePolicyMembers(self, parameters): + def querySensorUpdatePolicyMembers(self, parameters={}): """ Search for members of a Sensor Update Policy in your environment by providing an FQL filter and paging details. Returns a set of Agent IDs which match the filter criteria. """ @@ -297,16 +277,15 @@ def querySensorUpdatePolicyMembers(self, parameters): FULL_URL = self.base_url+'/policy/queries/sensor-update-members/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def querySensorUpdatePolicies(self, parameters): + def querySensorUpdatePolicies(self, parameters={}): """ Search for Sensor Update Policies in your environment by providing an FQL filter and paging details. Returns a set of Sensor Update Policy IDs which match the filter criteria. """ @@ -314,11 +293,10 @@ def querySensorUpdatePolicies(self, parameters): FULL_URL = self.base_url+'/policy/queries/sensor-update/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 067fbe7885e4b37d53b768b43a8730f08dc41b18 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 15:43:33 -0500 Subject: [PATCH 17/20] Param cleanup, IDs fix, DRYer, Unit testing --- src/falconpy/user_management.py | 86 ++++++++++++++------------------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/src/falconpy/user_management.py b/src/falconpy/user_management.py index 61f216d9..a957771c 100644 --- a/src/falconpy/user_management.py +++ b/src/falconpy/user_management.py @@ -61,18 +61,17 @@ def __call__(self, status_code, headers, body): return self.result_obj - def GetRoles(self, parameters): + def GetRoles(self, ids): """ Get info about a role. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/user-management/GetRoles - FULL_URL = self.base_url+'/user-roles/entities/user-roles/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/user-roles/entities/user-roles/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -83,27 +82,26 @@ def GrantUserRoleIds(self, parameters, body): HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RevokeUserRoleIds(self, parameters): + def RevokeUserRoleIds(self, ids, parameters): """ Revoke one or more roles from a user. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/user-management/RevokeUserRoleIds - FULL_URL = self.base_url+'/user-roles/entities/user-roles/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/user-roles/entities/user-roles/v1?ids={}'.format(ID_LIST) HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -114,12 +112,11 @@ def GetAvailableRoleIds(self): # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/user-management/GetAvailableRoleIds FULL_URL = self.base_url+'/user-roles/queries/user-role-ids-by-cid/v1' HEADERS = self.headers - result = self.Result() try: response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -129,43 +126,39 @@ def GetUserRoleIds(self, parameters): FULL_URL = self.base_url+'/user-roles/queries/user-role-ids-by-user-uuid/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RetrieveUser(self, parameters): + def RetrieveUser(self, ids): """ Get info about a user. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/user-management/RetrieveUser - FULL_URL = self.base_url+'/users/entities/users/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/users/entities/users/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def CreateUser(self, parameters, body): + def CreateUser(self, body): """ Create a new user. After creating a user, assign one or more roles with POST /user-roles/entities/user-roles/v1. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/user-management/CreateUser FULL_URL = self.base_url+'/users/entities/users/v1' HEADERS = self.headers DATA = body - PARAMS = parameters - result = self.Result() try: - response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -175,12 +168,11 @@ def DeleteUser(self, parameters): FULL_URL = self.base_url+'/users/entities/users/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -191,12 +183,11 @@ def UpdateUser(self, parameters, body): HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("PATCH", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -205,12 +196,11 @@ def RetrieveEmailsByCID(self): # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/user-management/RetrieveEmailsByCID FULL_URL = self.base_url+'/users/queries/emails-by-cid/v1' HEADERS = self.headers - result = self.Result() try: response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -219,12 +209,11 @@ def RetrieveUserUUIDsByCID(self): # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/user-management/RetrieveUserUUIDsByCID FULL_URL = self.base_url+'/users/queries/user-uuids-by-cid/v1' HEADERS = self.headers - result = self.Result() try: response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -234,11 +223,10 @@ def RetrieveUserUUID(self, parameters): FULL_URL = self.base_url+'/users/queries/user-uuids-by-email/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From d4215cd8bbaa52c6ebcaadb9e2a06fa3ca93f4d3 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 16:23:23 -0500 Subject: [PATCH 18/20] Param cleanup, IDs fix, DRYer, Unit test prep --- src/falconpy/real_time_response.py | 145 ++++++++++++++++------------- 1 file changed, 79 insertions(+), 66 deletions(-) diff --git a/src/falconpy/real_time_response.py b/src/falconpy/real_time_response.py index e3990f14..4c76b57b 100644 --- a/src/falconpy/real_time_response.py +++ b/src/falconpy/real_time_response.py @@ -67,44 +67,41 @@ def RTR_AggregateSessions(self, body): FULL_URL = self.base_url+'/real-time-response/aggregates/sessions/GET/v1' HEADERS = self.headers DATA = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def BatchActiveResponderCmd(self, parameters, body): + def BatchActiveResponderCmd(self, body, parameters={}): """ Batch executes a RTR active-responder command across the hosts mapped to the given batch ID. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/BatchActiveResponderCmd FULL_URL = self.base_url+'/real-time-response/combined/batch-active-responder-command/v1' HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def BatchCmd(self, parameters, body): + def BatchCmd(self, body, parameters={}): """ Batch executes a RTR read-only command across the hosts mapped to the given batch ID. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/BatchActiveResponderCmd FULL_URL = self.base_url+'/real-time-response/combined/batch-command/v1' HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -114,16 +111,15 @@ def BatchGetCmdStatus(self, parameters): FULL_URL = self.base_url+'/real-time-response/combined/batch-get-command/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def BatchGetCmd(self, parameters, body): + def BatchGetCmd(self, body, parameters={}): """ Batch executes `get` command across hosts to retrieve files. After this call is made `/real-time-response/combined/get-command-status/v1` is used to query for the results. """ @@ -132,44 +128,41 @@ def BatchGetCmd(self, parameters, body): HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def BatchInitSessions(self, parameters, body): + def BatchInitSessions(self, body, parameters={}): """ Batch initialize a RTR session on multiple hosts. Before any RTR commands can be used, an active session is needed on the host. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/BatchInitSessions FULL_URL = self.base_url+'/real-time-response/combined/batch-init-session/v1' HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def BatchRefreshSessions(self, parameters, body): + def BatchRefreshSessions(self, body, parameters={}): """ Batch refresh a RTR session on multiple hosts. RTR sessions will expire after 10 minutes unless refreshed. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/BatchRefreshSessions FULL_URL = self.base_url+'/real-time-response/combined/batch-refresh-session/v1' HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -179,12 +172,11 @@ def RTR_CheckActiveResponderCommandStatus(self, parameters): FULL_URL = self.base_url+'/real-time-response/entities/active-responder-command/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -194,12 +186,11 @@ def RTR_ExecuteActiveResponderCommand(self, body): FULL_URL = self.base_url+'/real-time-response/entities/active-responder-command/v1' HEADERS = self.headers DATA = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -209,12 +200,11 @@ def RTR_CheckCommandStatus(self, parameters): FULL_URL = self.base_url+'/real-time-response/entities/command/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -224,12 +214,11 @@ def RTR_ExecuteCommand(self, body): FULL_URL = self.base_url+'/real-time-response/entities/command/v1' HEADERS = self.headers DATA = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -239,12 +228,14 @@ def RTR_GetExtractedFileContents(self, parameters): FULL_URL = self.base_url+'/real-time-response/entities/extracted-file-contents/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + if response.headers.get('content-type') == "application/json": + returned = self.Result()(response.status_code, response.headers, response.json()) + else: + returned = response.content except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -254,27 +245,26 @@ def RTR_ListFiles(self, parameters): FULL_URL = self.base_url+'/real-time-response/entities/file/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_DeleteFile(self, parameters): + def RTR_DeleteFile(self, ids, parameters): """ Delete a RTR session file. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/RTR_DeleteFile - FULL_URL = self.base_url+'/real-time-response/entities/file/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/real-time-response/entities/file/v1?ids={}'.format(ID_LIST) HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -284,12 +274,11 @@ def RTR_PulseSession(self, body): FULL_URL = self.base_url+'/real-time-response/entities/refresh-session/v1' HEADERS = self.headers DATA = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -299,27 +288,39 @@ def RTR_ListSessions(self, body): FULL_URL = self.base_url+'/real-time-response/entities/sessions/GET/v1' HEADERS = self.headers DATA = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) + except Exception as e: + returned = self.Result()(500, {}, str(e)) + + return returned + + def RTR_ListQueuedSessions(self, body): + """ Get session metadata by session id. """ + # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/RTR_ListSessions + FULL_URL = self.base_url+'/real-time-response/entities/queued-sessions/GET/v1' + HEADERS = self.headers + DATA = body + try: + response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def InitSessionMixin0(self, body): + def RTR_InitSession(self, body): """ Initialize a new session with the RTR cloud. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/InitSessionMixin0 FULL_URL = self.base_url+'/real-time-response/entities/sessions/v1' HEADERS = self.headers DATA = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -329,26 +330,38 @@ def RTR_DeleteSession(self, parameters): FULL_URL = self.base_url+'/real-time-response/entities/sessions/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) + except Exception as e: + returned = self.Result()(500, {}, str(e)) + + return returned + + def RTR_DeleteQueuedSession(self, parameters): + """ Delete a queued session. """ + # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/RTR_DeleteSession + FULL_URL = self.base_url+'/real-time-response/entities/queued-sessions/command/v1' + HEADERS = self.headers + PARAMS = parameters + try: + response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_ListAllSessions(self, parameters): + def RTR_ListAllSessions(self, parameters={}): """ Get a list of session_ids. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response/RTR_ListAllSessions FULL_URL = self.base_url+'/real-time-response/queries/sessions/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 903b7d801046c1c54feff50c199d6657262438b7 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 16:39:51 -0500 Subject: [PATCH 19/20] Param cleanup, IDs fix, DRYer, Basic unit tests --- src/falconpy/real_time_response_admin.py | 101 +++++++++++------------ 1 file changed, 46 insertions(+), 55 deletions(-) diff --git a/src/falconpy/real_time_response_admin.py b/src/falconpy/real_time_response_admin.py index 34d6d0f4..7e4b9e21 100644 --- a/src/falconpy/real_time_response_admin.py +++ b/src/falconpy/real_time_response_admin.py @@ -61,19 +61,18 @@ def __call__(self, status_code, headers, body): return self.result_obj - def BatchAdminCmd(self, parameters, body): + def BatchAdminCmd(self, body, parameters={}): """ Batch executes a RTR administrator command across the hosts mapped to the given batch ID. """ # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/BatchAdminCmd FULL_URL = self.base_url+'/real-time-response/combined/batch-admin-command/v1' HEADERS = self.headers DATA = body PARAMS = parameters - result = self.Result() try: response = requests.request("POST", FULL_URL, params=PARAMS, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -83,12 +82,11 @@ def RTR_CheckAdminCommandStatus(self, parameters): FULL_URL = self.base_url+'/real-time-response/entities/admin-command/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -98,27 +96,25 @@ def RTR_ExecuteAdminCommand(self, body): FULL_URL = self.base_url+'/real-time-response/entities/admin-command/v1' HEADERS = self.headers DATA = body - result = self.Result() try: response = requests.request("POST", FULL_URL, json=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_GetPut_Files(self, parameters): + def RTR_GetPut_Files(self, ids): """ Get put-files based on the ID's given. These are used for the RTR `put` command. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_GetPut_Files - FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -127,44 +123,42 @@ def RTR_CreatePut_Files(self, data, files): # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_CreatePut_Files FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1' HEADERS = self.headers + HEADERS['Content-Type'] = 'multipart/form-data' DATA = data FILES = files - result = self.Result() try: response = requests.request("POST", FULL_URL, data=DATA, files=FILES, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_DeletePut_Files(self, parameters): + def RTR_DeletePut_Files(self, ids): """ Delete a put-file based on the ID given. Can only delete one file at a time. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_DeletePut_Files - FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/real-time-response/entities/put-files/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_GetScripts(self, parameters): + def RTR_GetScripts(self, ids): """ Get custom-scripts based on the ID's given. These are used for the RTR `runscript` command. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_GetScripts - FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -173,29 +167,28 @@ def RTR_CreateScripts(self, data, files): # [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_CreateScripts FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1' HEADERS = self.headers + HEADERS['Content-Type'] = 'multipart/form-data' DATA = data FILES = files - result = self.Result() try: response = requests.request("POST", FULL_URL, data=DATA, files=FILES, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_DeleteScripts(self, parameters): + def RTR_DeleteScripts(self, ids): """ Delete a custom-script based on the ID given. Can only delete one script at a time. """ # [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_DeleteScripts - FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1' + ID_LIST = str(ids).replace(",","&ids=") + FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1?ids={}'.format(ID_LIST) HEADERS = self.headers - PARAMS = parameters - result = self.Result() try: - response = requests.request("DELETE", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + response = requests.request("DELETE", FULL_URL, headers=HEADERS, verify=False) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned @@ -204,43 +197,41 @@ def RTR_UpdateScripts(self, data, files): # [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_UpdateScripts FULL_URL = self.base_url+'/real-time-response/entities/scripts/v1' HEADERS = self.headers + HEADERS['Content-Type'] = 'multipart/form-data' DATA = data FILES = files - result = self.Result() try: response = requests.request("PATCH", FULL_URL, data=DATA, files=FILES, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_ListPut_Files(self, parameters): + def RTR_ListPut_Files(self, parameters={}): """ Get a list of put-file ID's that are available to the user for the `put` command. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_ListPut_Files FULL_URL = self.base_url+'/real-time-response/queries/put-files/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned - def RTR_ListScripts(self, parameters): + def RTR_ListScripts(self, parameters={}): """ Get a list of custom-script ID's that are available to the user for the `runscript` command. """ # [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/real-time-response-admin/RTR_ListScripts FULL_URL = self.base_url+'/real-time-response/queries/scripts/v1' HEADERS = self.headers PARAMS = parameters - result = self.Result() try: response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False) - returned = result(response.status_code, response.headers, response.json()) + returned = self.Result()(response.status_code, response.headers, response.json()) except Exception as e: - returned = result(500, {}, str(e)) + returned = self.Result()(500, {}, str(e)) return returned From 11f6b8144f64289735f7a4cbb32ea142c0f73562 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Sat, 26 Dec 2020 16:41:36 -0500 Subject: [PATCH 20/20] Quick run thru the DRYer --- src/falconpy/oauth2.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/falconpy/oauth2.py b/src/falconpy/oauth2.py index fc500830..dced4ce3 100644 --- a/src/falconpy/oauth2.py +++ b/src/falconpy/oauth2.py @@ -73,12 +73,11 @@ def token(self): 'client_id': self.creds['client_id'], 'client_secret': self.creds['client_secret'] } - result = self.Result() try: response = requests.request("POST", FULL_URL, data=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code,response.json()) + returned = self.Result()(response.status_code,response.json()) except Exception as e: - returned = result(500, str(e)) + returned = self.Result()(500, str(e)) return returned @@ -87,11 +86,10 @@ def revoke(self, token): FULL_URL = self.base_url+'/oauth2/revoke' HEADERS = { 'Authorization': 'basic {}'.format(token) } DATA = { 'token': '{}'.format(token) } - result = self.Result() try: response = requests.request("POST", FULL_URL, data=DATA, headers=HEADERS, verify=False) - returned = result(response.status_code, response.json()) + returned = self.Result()(response.status_code, response.json()) except Exception as e: - returned = result(500, str(e)) + returned = self.Result()(500, str(e)) return returned