Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Param cleanup, IDs fix, DRYer, Unit testing
  • Loading branch information
jshcodes committed Dec 26, 2020
commit 5a22b449afbb77626d9004c1263ed42fc8fb40e3
108 changes: 51 additions & 57 deletions src/falconpy/intel.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,63 +61,60 @@ def __call__(self, status_code, headers, body):

return self.result_obj

def QueryIntelActorEntities(self, parameters):
def QueryIntelActorEntities(self, parameters={}):
""" Get info about actors that match provided FQL filters. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelActorEntities
FULL_URL = self.base_url+'/intel/combined/actors/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def QueryIntelIndicatorEntities(self, parameters):
def QueryIntelIndicatorEntities(self, parameters={}):
""" Get info about indicators that match provided FQL filters. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelIndicatorEntities
FULL_URL = self.base_url+'/intel/combined/indicators/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def QueryIntelReportEntities(self, parameters):
def QueryIntelReportEntities(self, parameters={}):
""" Get info about reports that match provided FQL filters. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelReportEntities
FULL_URL = self.base_url+'/intel/combined/reports/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def GetIntelActorEntities(self, parameters):
def GetIntelActorEntities(self, ids, parameters={}):
""" Retrieve specific actors using their actor IDs. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelActorEntities
FULL_URL = self.base_url+'/intel/entities/actors/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/intel/entities/actors/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -127,42 +124,43 @@ def GetIntelIndicatorEntities(self, body):
FULL_URL = self.base_url+'/intel/entities/indicators/GET/v1'
HEADERS = self.headers
BODY = body
result = self.Result()
try:
response = requests.request("POST", FULL_URL, json=BODY, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def GetIntelReportPDF(self, parameters):#Probably need to not do result.json() here. Check the swagger
def GetIntelReportPDF(self, parameters):
""" Return a Report PDF attachment. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelReportPDF
FULL_URL = self.base_url+'/intel/entities/report-files/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
if response.headers.get('content-type') == "application/json":
returned = self.Result()(response.status_code, response.headers, response.json())
else:
returned = response.content
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def GetIntelReportEntities(self, parameters):
def GetIntelReportEntities(self, ids, parameters={}):
""" Retrieve specific reports using their report IDs. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelReportEntities
FULL_URL = self.base_url+'/intel/entities/reports/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/intel/entities/reports/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -172,12 +170,14 @@ def GetIntelRuleFile(self, parameters):#There is an optional header you can see
FULL_URL = self.base_url+'/intel/entities/rules-files/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
if response.headers.get('content-type') == "application/json":
returned = self.Result()(response.status_code, response.headers, response.json())
else:
returned = response.content
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -187,72 +187,67 @@ def GetLatestIntelRuleFile(self, parameters):#There is an optional header you ca
FULL_URL = self.base_url+'/intel/entities/rules-latest-files/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def GetIntelRuleEntities(self, parameters):
def GetIntelRuleEntities(self, ids):
""" Retrieve details for rule sets for the specified ids. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/GetIntelRuleEntities
FULL_URL = self.base_url+'/intel/entities/rules/v1'
ID_LIST = str(ids).replace(",","&ids=")
FULL_URL = self.base_url+'/intel/entities/rules/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
response = requests.request("GET", FULL_URL, headers=HEADERS, verify=False)
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def QueryIntelActorIds(self, parameters):
def QueryIntelActorIds(self, parameters={}):
""" Get actor IDs that match provided FQL filters. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelActorIds
FULL_URL = self.base_url+'/intel/queries/actors/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def QueryIntelIndicatorIds(self, parameters):
def QueryIntelIndicatorIds(self, parameters={}):
""" Get indicators IDs that match provided FQL filters. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelIndicatorIds
FULL_URL = self.base_url+'/intel/queries/indicators/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

def QueryIntelReportIds(self, parameters):
def QueryIntelReportIds(self, parameters={}):
""" Get report IDs that match provided FQL filters. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/intel/QueryIntelReportIds
FULL_URL = self.base_url+'/intel/queries/reports/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned

Expand All @@ -262,11 +257,10 @@ def QueryIntelRuleIds(self, parameters):
FULL_URL = self.base_url+'/intel/queries/rules/v1'
HEADERS = self.headers
PARAMS = parameters
result = self.Result()
try:
response = requests.request("GET", FULL_URL, params=PARAMS, headers=HEADERS, verify=False)
returned = result(response.status_code, response.headers, response.json())
returned = self.Result()(response.status_code, response.headers, response.json())
except Exception as e:
returned = result(500, {}, str(e))
returned = self.Result()(500, {}, str(e))

return returned