diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a1c78cb --- /dev/null +++ b/.gitignore @@ -0,0 +1,61 @@ +# https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ +test_results/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..bb3ec5f --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include README.md diff --git a/README.md b/README.md index c374aa5..5ca601e 100644 --- a/README.md +++ b/README.md @@ -55,3 +55,310 @@ except Exception as e: print e ``` + +# infoblox.infoblox Module + + +## infoblox.infoblox.Infoblox Objects + + + +##### `__init__(self, iba_ipaddr, iba_user, iba_password, iba_wapi_version, iba_dns_view, iba_network_view, iba_verify_ssl=False)` + +> Class initialization method +> :param iba_ipaddr: IBA IP address of management interface +> :param iba_user: IBA user name +> :param iba_password: IBA user password +> :param iba_wapi_version: IBA WAPI version (example: 1.0) +> :param iba_dns_view: IBA default view +> :param iba_network_view: IBA default network view +> :param iba_verify_ssl: IBA SSL certificate validation (example: False) + + + +##### `add_host_alias(self, host_fqdn, alias_fqdn)` + +> Implements IBA REST API call to add an alias to IBA host record +> :param host_fqdn: host record name in FQDN +> :param alias_fqdn: host record name in FQDN + + + +##### `create_cname_record(self, canonical, name)` + +> Implements IBA REST API call to create IBA cname record +> :param canonical: canonical name in FQDN format +> :param name: the name for a CNAME record in FQDN format + + + +##### `create_dhcp_range(self, start_ip_v4, end_ip_v4)` + +> Implements IBA REST API call to add DHCP range for given +> start and end addresses +> :param start_ip_v4: IP v4 address +> :param end_ip_v4: IP v4 address + + + +##### `create_host_record(self, address, fqdn)` + +> Implements IBA REST API call to create IBA host record +> Returns IP v4 address assigned to the host +> :param address: IP v4 address or NET v4 address in CIDR format to get +> next_available_ip from +> :param fqdn: hostname in FQDN + + + +##### `create_network(self, network)` + +> Implements IBA REST API call to create DHCP network object +> :param network: network in CIDR format + + + +##### `create_networkcontainer(self, networkcontainer)` + +> Implements IBA REST API call to create DHCP network containert object +> :param networkcontainer: network container in CIDR format + + + +##### `create_txt_record(self, text, fqdn)` + +> Implements IBA REST API call to create IBA txt record +> Returns IP v4 address assigned to the host +> :param text: free text to be added to the record +> :param fqdn: hostname in FQDN + + + +##### `delete_cname_record(self, fqdn)` + +> Implements IBA REST API call to delete IBA cname record +> :param fqdn: cname in FQDN + + + +##### `delete_dhcp_range(self, start_ip_v4, end_ip_v4)` + +> Implements IBA REST API call to delete DHCP range for given +> start and end addresses +> :param start_ip_v4: IP v4 address +> :param end_ip_v4: IP v4 address + + + +##### `delete_host_alias(self, host_fqdn, alias_fqdn)` + +> Implements IBA REST API call to add an alias to IBA host record +> :param host_fqdn: host record name in FQDN +> :param alias_fqdn: host record name in FQDN + + + +##### `delete_host_record(self, fqdn)` + +> Implements IBA REST API call to delete IBA host record +> :param fqdn: hostname in FQDN + + + +##### `delete_network(self, network)` + +> Implements IBA REST API call to delete DHCP network object +> :param network: network in CIDR format + + + +##### `delete_network_extattrs(self, network, attributes)` + +> Implements IBA REST API call to delete network extensible attributes +> :param network: network in CIDR format +> :param attributes: array of extensible attribute names + + + +##### `delete_networkcontainer(self, networkcontainer)` + +> Implements IBA REST API call to delete DHCP network container object +> :param networkcontainer: network container in CIDR format + + + +##### `delete_txt_record(self, fqdn)` + +> Implements IBA REST API call to delete IBA TXT record +> :param fqdn: hostname in FQDN + + + +##### `get_host(self, fqdn, fields=None)` + +> Implements IBA REST API call to retrieve host record fields +> Returns hash table of fields with field name as a hash key +> :param fqdn: hostname in FQDN +> :param fields: comma-separated list of field names (optional) + + + +##### `get_host_by_extattrs(self, attributes)` + +> Implements IBA REST API call to find host by it's extensible attributes +> Returns array of hosts in FQDN +> :param attributes: comma-separated list of attrubutes name/value +> pairs in the format: +> attr_name=attr_value - exact match for attribute value +> attr_name:=attr_value - case insensitive match for attribute value +> attr_name~=regular_expression - match attribute value by regular +> expression +> attr_name>=attr_value - search by number greater than value +> attr_name<=attr_value - search by number less than value +> attr_name!=attr_value - search by number not equal of value + + + +##### `get_host_by_ip(self, ip_v4)` + +> Implements IBA REST API call to find hostname by IP address +> Returns array of host names in FQDN associated with given IP address +> :param ip_v4: IP v4 address + + + +##### `get_host_by_regexp(self, fqdn)` + +> Implements IBA REST API call to retrieve host records by fqdn regexp filter +> Returns array of host names in FQDN matched to given regexp filter +> :param fqdn: hostname in FQDN or FQDN regexp filter + + + +##### `get_host_extattrs(self, fqdn, attributes=None)` + +> Implements IBA REST API call to retrieve host extensible attributes +> Returns hash table of attributes with attribute name as a hash key +> :param fqdn: hostname in FQDN +> :param attributes: array of extensible attribute names (optional) + + + +##### `get_ip_by_host(self, fqdn)` + +> Implements IBA REST API call to find IP addresses by hostname +> Returns array of IP v4 addresses associated with given hostname +> :param fqdn: hostname in FQDN + + + +##### `get_network(self, network, fields=None)` + +> Implements IBA REST API call to retrieve network object fields +> Returns hash table of fields with field name as a hash key +> :param network: network in CIDR format +> :param fields: comma-separated list of field names +> (optional, returns network in CIDR format and netmask if +> not specified) + + + +##### `get_network_by_extattrs(self, attributes)` + +> Implements IBA REST API call to find a network by it's +> extensible attributes +> Returns array of networks in CIDR format +> :param attributes: comma-separated list of attrubutes name/value +> pairs in the format: +> attr_name=attr_value - exact match for attribute value +> attr_name:=attr_value - case insensitive match for attribute value +> attr_name~=regular_expression - match attribute value by regular +> expression +> attr_name>=attr_value - search by number greater than value +> attr_name<=attr_value - search by number less than value +> attr_name!=attr_value - search by number not equal of value + + + +##### `get_network_by_ip(self, ip_v4)` + +> Implements IBA REST API call to find network by IP address which +> belongs to this network +> Returns network in CIDR format +> :param ip_v4: IP v4 address + + + +##### `get_network_extattrs(self, network, attributes=None)` + +> Implements IBA REST API call to retrieve network extensible attributes +> Returns hash table of attributes with attribute name as a hash key +> :param network: network in CIDR format +> :param attributes: array of extensible attribute names (optional) + + + +##### `get_next_available_ip(self, network)` + +> Implements IBA next_available_ip REST API call +> Returns IP v4 address +> :param network: network in CIDR format + + + +##### `get_next_available_network(self, networkcontainer, cidr)` + +> Implements IBA REST API call to retrieve next available network +> of network container +> Returns network address in CIDR format +> :param networkcontainer: network container address in CIDR format +> :param cidr: requested network length (from 0 to 32) + + + +##### `get_txt_by_regexp(self, fqdn)` + +> Implements IBA REST API call to retrieve TXT records by fqdn +> regexp filter +> Returns dictonary of host names in FQDN matched to given regexp +> filter with the TXT value +> :param fqdn: hostname in FQDN or FQDN regexp filter + + + +##### `update_cname_record(self, canonical, name)` + +> Implements IBA REST API call to update or repoint IBA cname record +> :param canonical: canonical name in FQDN format +> :param name: the name for the new CNAME record in FQDN format + + + +##### `update_network_extattrs(self, network, attributes)` + +> Implements IBA REST API call to add or update network extensible attributes +> :param network: network in CIDR format +> :param attributes: hash table of extensible attributes with attribute +> name as a hash key + + + +## infoblox.infoblox.InfobloxBadInputParameter Objects + + + +## infoblox.infoblox.InfobloxGeneralException Objects + + + +## infoblox.infoblox.InfobloxNoIPavailableException Objects + + + +## infoblox.infoblox.InfobloxNoNetworkAvailableException Objects + + + +## infoblox.infoblox.InfobloxNotFoundException Objects + + diff --git a/infoblox.py b/infoblox.py deleted file mode 100644 index 051fd6c..0000000 --- a/infoblox.py +++ /dev/null @@ -1,1043 +0,0 @@ -# -# Copyright 2014 "Igor Feoktistov" -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import re -import requests -import json - -class InfobloxNotFoundException(Exception): - pass - -class InfobloxNoIPavailableException(Exception): - pass - -class InfobloxNoNetworkAvailableException(Exception): - pass - -class InfobloxGeneralException(Exception): - pass - -class InfobloxBadInputParameter(Exception): - pass - -class Infoblox(object): - """ Implements the following subset of Infoblox IPAM API via REST API - create_network - delete_network - create_networkcontainer - delete_networkcontainer - get_next_available_network - create_host_record - create_txt_record - delete_txt_record - delete_host_record - add_host_alias - delete_host_alias - create_cname_record - delete_cname_record - update_cname_record - create_dhcp_range - delete_dhcp_range - get_next_available_ip - get_host - get_host_by_ip - get_ip_by_host - get_host_by_regexp - get_txt_by_regexp - get_host_by_extattrs - get_host_extattrs - get_network - get_network_by_ip - get_network_by_extattrs - get_network_extattrs - update_network_extattrs - delete_network_extattrs - """ - - def __init__(self, iba_ipaddr, iba_user, iba_password, iba_wapi_version, iba_dns_view, iba_network_view, iba_verify_ssl=False): - """ Class initialization method - :param iba_ipaddr: IBA IP address of management interface - :param iba_user: IBA user name - :param iba_password: IBA user password - :param iba_wapi_version: IBA WAPI version (example: 1.0) - :param iba_dns_view: IBA default view - :param iba_network_view: IBA default network view - :param iba_verify_ssl: IBA SSL certificate validation (example: False) - """ - self.iba_host = iba_ipaddr - self.iba_user = iba_user - self.iba_password = iba_password - self.iba_wapi_version = iba_wapi_version - self.iba_dns_view = iba_dns_view - self.iba_network_view = iba_network_view - self.iba_verify_ssl = iba_verify_ssl - - def get_next_available_ip(self, network): - """ Implements IBA next_available_ip REST API call - Returns IP v4 address - :param network: network in CIDR format - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network?network=' + network + '&network_view=' + self.iba_network_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - net_ref = r_json[0]['_ref'] - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + net_ref + '?_function=next_available_ip&num=1' - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - ip_v4 = r_json['ips'][0] - return ip_v4 - else: - if 'text' in r_json: - if 'code' in r_json and r_json['code'] == 'Client.Ibap.Data': - raise InfobloxNoIPavailableException(r_json['text']) - else: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxNotFoundException("No requested network found: " + network) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def create_host_record(self, address, fqdn): - """ Implements IBA REST API call to create IBA host record - Returns IP v4 address assigned to the host - :param address: IP v4 address or NET v4 address in CIDR format to get next_available_ip from - :param fqdn: hostname in FQDN - """ - if re.match("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+\/[0-9]+$", address): - ipv4addr = 'func:nextavailableip:' + address - else: - if re.match("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$", address): - ipv4addr = address - else: - raise InfobloxBadInputParameter('Expected IP or NET address in CIDR format') - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host' + '?_return_fields=ipv4addrs' - payload = '{"ipv4addrs": [{"configure_for_dhcp": false,"ipv4addr": "' + ipv4addr + '"}],"name": "' + fqdn + '","view": "' + self.iba_dns_view + '"}' - try: - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - r_json = r.json() - if r.status_code == 200 or r.status_code == 201: - return r_json['ipv4addrs'][0]['ipv4addr'] - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def create_txt_record(self, text, fqdn): - """ Implements IBA REST API call to create IBA txt record - Returns IP v4 address assigned to the host - :param text: free text to be added to the record - :param fqdn: hostname in FQDN - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:txt' - payload = '{"text": "' + text + '","name": "' + fqdn + '","view": "' + self.iba_dns_view + '"}' - try: - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - r_json = r.json() - if r.status_code == 200 or r.status_code == 201: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_host_record(self, fqdn): - """ Implements IBA REST API call to delete IBA host record - :param fqdn: hostname in FQDN - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name=' + fqdn + '&view=' + self.iba_dns_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - host_ref = r_json[0]['_ref'] - if host_ref and re.match("record:host\/[^:]+:([^\/]+)\/", host_ref).group(1) == fqdn: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + host_ref - r = requests.delete(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) - else: - raise InfobloxNotFoundException("No requested host found: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_txt_record(self, fqdn): - """ Implements IBA REST API call to delete IBA TXT record - :param fqdn: hostname in FQDN - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:txt?name=' + fqdn + '&view=' + self.iba_dns_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - host_ref = r_json[0]['_ref'] - if host_ref and re.match("record:txt\/[^:]+:([^\/]+)\/", host_ref).group(1) == fqdn: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + host_ref - r = requests.delete(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) - else: - raise InfobloxNotFoundException("No requested host found: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def add_host_alias(self, host_fqdn, alias_fqdn): - """ Implements IBA REST API call to add an alias to IBA host record - :param host_fqdn: host record name in FQDN - :param alias_fqdn: host record name in FQDN - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name=' + host_fqdn + '&view=' + self.iba_dns_view + '&_return_fields=name,aliases' - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - host_ref = r_json[0]['_ref'] - if host_ref and re.match("record:host\/[^:]+:([^\/]+)\/", host_ref).group(1) == host_fqdn: - if 'aliases' in r_json[0]: - aliases = r_json[0]['aliases'] - aliases.append(alias_fqdn) - payload = '{"aliases": ' + json.JSONEncoder().encode(aliases) + '}' - else: - payload = '{"aliases": ["' + alias_fqdn + '"]}' - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + host_ref - r = requests.put(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) - else: - raise InfobloxNotFoundException("No requested host found: " + host_fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_host_alias(self, host_fqdn, alias_fqdn): - """ Implements IBA REST API call to add an alias to IBA host record - :param host_fqdn: host record name in FQDN - :param alias_fqdn: host record name in FQDN - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name=' + host_fqdn + '&view=' + self.iba_dns_view + '&_return_fields=name,aliases' - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - host_ref = r_json[0]['_ref'] - if host_ref and re.match("record:host\/[^:]+:([^\/]+)\/", host_ref).group(1) == host_fqdn: - if 'aliases' in r_json[0]: - aliases = r_json[0]['aliases'] - aliases.remove(alias_fqdn) - payload = '{"aliases": ' + json.JSONEncoder().encode(aliases) + '}' - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + host_ref - r = requests.put(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxNotFoundException("No requested host alias found: " + alias_fqdn) - else: - raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) - else: - raise InfobloxNotFoundException("No requested host found: " + host_fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def create_cname_record(self, canonical, name): - """ Implements IBA REST API call to create IBA cname record - :param canonical: canonical name in FQDN format - :param name: the name for a CNAME record in FQDN format - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:cname' - payload = '{"canonical": "' + canonical + '","name": "' + name + '","view": "' + self.iba_dns_view + '"}' - try: - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - r_json = r.json() - if r.status_code == 200 or r.status_code == 201: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_cname_record(self, fqdn): - """ Implements IBA REST API call to delete IBA cname record - :param fqdn: cname in FQDN - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:cname?name=' + fqdn + '&view=' + self.iba_dns_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - cname_ref = r_json[0]['_ref'] - if cname_ref and re.match("record:cname\/[^:]+:([^\/]+)\/", cname_ref).group(1) == fqdn: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + cname_ref - r = requests.delete(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("Received unexpected cname record reference: " + cname_ref) - else: - raise InfobloxNotFoundException("No requested cname record found: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def update_cname_record(self, canonical, name): - """ Implements IBA REST API call to update or repoint IBA cname record - :param canonical: canonical name in FQDN format - :param name: the name for the new CNAME record in FQDN format - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:cname' - payload = json.dumps({'name': name}) - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - r_json = r.json() - # RFC1912 - A CNAME can not coexist with any other data, we should expect utmost one entry - if r.status_code == 200 and len(r_json) == 1: - ibx_cname = r.json()[0] - cname_ref = ibx_cname['_ref'] - payload = '{"canonical": ' + json.JSONEncoder().encode(canonical) + '}' - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + cname_ref - r = requests.put(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - if r.status_code == 200 or r.status_code == 201: - return - else: - r.raise_for_status() - elif len(r_json) == 0: - raise InfobloxNotFoundException("CNAME: " + name + " not found.") - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def create_dhcp_range(self, start_ip_v4, end_ip_v4): - """ Implements IBA REST API call to add DHCP range for given start and end addresses - :param start_ip_v4: IP v4 address - :param end_ip_v4: IP v4 address - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/range' - payload = '{"start_addr": "' + start_ip_v4 + '","end_addr": "' + end_ip_v4 + '"}' - try: - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - r_json = r.json() - if r.status_code == 200 or r.status_code == 201: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_dhcp_range(self, start_ip_v4, end_ip_v4): - """ Implements IBA REST API call to delete DHCP range for given start and end addresses - :param start_ip_v4: IP v4 address - :param end_ip_v4: IP v4 address - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/range?start_addr=' + start_ip_v4 + '?end_addr=' + end_ip_v4 + '&network_view=' + self.iba_network_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - range_ref = r_json[0]['_ref'] - if range_ref: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + range_ref - r = requests.delete(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("No range reference received in IBA reply") - else: - raise InfobloxNotFoundException("No requested range found: " + start_ip_v4 + "-" + end_ip_v4) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_host(self, fqdn, fields=None): - """ Implements IBA REST API call to retrieve host record fields - Returns hash table of fields with field name as a hash key - :param fqdn: hostname in FQDN - :param fields: comma-separated list of field names (optional) - """ - if fields: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name=' + fqdn + '&view=' + self.iba_dns_view + '&_return_fields=' + fields - else: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name=' + fqdn + '&view=' + self.iba_dns_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - return r_json[0] - else: - raise InfobloxNotFoundException("No hosts found: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxNotFoundException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_host_by_regexp(self, fqdn): - """ Implements IBA REST API call to retrieve host records by fqdn regexp filter - Returns array of host names in FQDN matched to given regexp filter - :param fqdn: hostname in FQDN or FQDN regexp filter - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name~=' + fqdn + '&view=' + self.iba_dns_view - hosts = [] - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - for host in r_json: - hosts.append(host['name']) - return hosts - else: - raise InfobloxNotFoundException("No hosts found for regexp filter: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_txt_by_regexp(self, fqdn): - """ Implements IBA REST API call to retrieve TXT records by fqdn regexp filter - Returns dictonary of host names in FQDN matched to given regexp filter with the TXT value - :param fqdn: hostname in FQDN or FQDN regexp filter - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:txt?name~=' + fqdn + '&view=' + self.iba_dns_view - hosts = {} - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - for host in r_json: - hosts[host['name']] = host['text'] - return hosts - else: - raise InfobloxNotFoundException("No txt records found for regexp filter: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_host_by_ip(self, ip_v4): - """ Implements IBA REST API call to find hostname by IP address - Returns array of host names in FQDN associated with given IP address - :param ip_v4: IP v4 address - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/ipv4address?ip_address=' + ip_v4 + '&network_view=' + self.iba_network_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - if len(r_json[0]['names']) > 0: - return r_json[0]['names'] - else: - raise InfobloxNotFoundException("No host records found for IP: " + ip_v4) - else: - raise InfobloxNotFoundException("No IP found: " + ip_v4) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_ip_by_host(self, fqdn): - """ Implements IBA REST API call to find IP addresses by hostname - Returns array of IP v4 addresses associated with given hostname - :param fqdn: hostname in FQDN - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name=' + fqdn + '&view=' + self.iba_dns_view - ipv4addrs = [] - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - if len(r_json[0]['ipv4addrs']) > 0: - for ipv4addr in r_json[0]['ipv4addrs']: - ipv4addrs.append(ipv4addr['ipv4addr']) - return ipv4addrs - else: - raise InfobloxNotFoundException("No host records found for FQDN: " + fqdn) - else: - raise InfobloxNotFoundException("No hosts found: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_host_extattrs(self, fqdn, attributes=None): - """ Implements IBA REST API call to retrieve host extensible attributes - Returns hash table of attributes with attribute name as a hash key - :param fqdn: hostname in FQDN - :param attributes: array of extensible attribute names (optional) - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?name=' + fqdn + '&view=' + self.iba_dns_view + '&_return_fields=name,extattrs' - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - extattrs = {} - if attributes: - for attribute in attributes: - if attribute in r_json[0]['extattrs']: - extattrs[attribute] = r_json[0]['extattrs'][attribute]['value'] - else: - raise InfobloxNotFoundException("No requested attribute found: " + attribute) - else: - for attribute in r_json[0]['extattrs'].keys(): - extattrs[attribute] = r_json[0]['extattrs'][attribute]['value'] - return extattrs - else: - raise InfobloxNotFoundException("No requested host found: " + fqdn) - else: - if 'text' in r_json: - raise InfobloxNotFoundException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_network(self, network, fields=None): - """ Implements IBA REST API call to retrieve network object fields - Returns hash table of fields with field name as a hash key - :param network: network in CIDR format - :param fields: comma-separated list of field names - (optional, returns network in CIDR format and netmask if not specified) - """ - if not fields: - fields = 'network,netmask' - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network?network=' + network + '&network_view=' + self.iba_network_view + '&_return_fields=' + fields - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - return r_json[0] - else: - raise InfobloxNotFoundException("No requested network found: " + network) - else: - if 'text' in r_json: - raise InfobloxNotFoundException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_network_by_ip(self, ip_v4): - """ Implements IBA REST API call to find network by IP address which belongs to this network - Returns network in CIDR format - :param ip_v4: IP v4 address - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/ipv4address?ip_address=' + ip_v4 + '&network_view=' + self.iba_network_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - if 'network' in r_json[0]: - return r_json[0]['network'] - else: - raise InfobloxNotFoundException("No network found for IP: " + ip_v4) - else: - raise InfobloxNotFoundException("No IP found: " + ip_v4) - else: - if 'text' in r_json: - raise InfobloxNotFoundException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_network_by_extattrs(self, attributes): - """ Implements IBA REST API call to find a network by it's extensible attributes - Returns array of networks in CIDR format - :param attributes: comma-separated list of attrubutes name/value pairs in the format: - attr_name=attr_value - exact match for attribute value - attr_name:=attr_value - case insensitive match for attribute value - attr_name~=regular_expression - match attribute value by regular expression - attr_name>=attr_value - search by number greater than value - attr_name<=attr_value - search by number less than value - attr_name!=attr_value - search by number not equal of value - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network?*' + "&*".join(attributes.split(",")) + '&network_view=' + self.iba_network_view - networks = [] - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - for network in r_json: - if 'network' in network: - networks.append(network['network']) - return networks - else: - raise InfobloxNotFoundException("No networks found for extensible attributes: " + attributes) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_host_by_extattrs(self, attributes): - """ Implements IBA REST API call to find host by it's extensible attributes - Returns array of hosts in FQDN - :param attributes: comma-separated list of attrubutes name/value pairs in the format: - attr_name=attr_value - exact match for attribute value - attr_name:=attr_value - case insensitive match for attribute value - attr_name~=regular_expression - match attribute value by regular expression - attr_name>=attr_value - search by number greater than value - attr_name<=attr_value - search by number less than value - attr_name!=attr_value - search by number not equal of value - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/record:host?*' + "&*".join(attributes.split(",")) + '&view=' + self.iba_dns_view - hosts = [] - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - for host in r_json: - if 'name' in host: - hosts.append(host['name']) - return hosts - else: - raise InfobloxNotFoundException("No hosts found for extensible attributes: " + attributes) - else: - if 'text' in r_json: - raise InfobloxNotFoundException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_network_extattrs(self, network, attributes=None): - """ Implements IBA REST API call to retrieve network extensible attributes - Returns hash table of attributes with attribute name as a hash key - :param network: network in CIDR format - :param attributes: array of extensible attribute names (optional) - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network?network=' + network + '&network_view=' + self.iba_network_view + '&_return_fields=network,extattrs' - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - extattrs = {} - if attributes: - for attribute in attributes: - if attribute in r_json[0]['extattrs']: - extattrs[attribute] = r_json[0]['extattrs'][attribute]['value'] - else: - raise InfobloxNotFoundException("No requested attribute found: " + attribute) - else: - for attribute in r_json[0]['extattrs'].keys(): - extattrs[attribute] = r_json[0]['extattrs'][attribute]['value'] - return extattrs - else: - raise InfobloxNotFoundException("No requested network found: " + network) - else: - if 'text' in r_json: - raise InfobloxNotFoundException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def update_network_extattrs(self, network, attributes): - """ Implements IBA REST API call to add or update network extensible attributes - :param network: network in CIDR format - :param attributes: hash table of extensible attributes with attribute name as a hash key - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network?network=' + network + '&network_view=' + self.iba_network_view + '&_return_fields=network,extattrs' - extattrs = {} - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - network_ref = r_json[0]['_ref'] - if network_ref: - extattrs = r_json[0]['extattrs'] - for attr_name, attr_value in attributes.iteritems(): - extattrs[attr_name]['value'] = attr_value - payload = '{"extattrs": ' + json.JSONEncoder().encode(extattrs) + '}' - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + network_ref - r = requests.put(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("No network reference received in IBA reply for network: " + network) - else: - raise InfobloxNotFoundException("No requested network found: " + network) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_network_extattrs(self, network, attributes): - """ Implements IBA REST API call to delete network extensible attributes - :param network: network in CIDR format - :param attributes: array of extensible attribute names - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network?network=' + network + '&network_view=' + self.iba_network_view + '&_return_fields=network,extattrs' - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - network_ref = r_json[0]['_ref'] - if network_ref: - extattrs = r_json[0]['extattrs'] - for attribute in attributes: - if attribute in extattrs: - del extattrs[attribute] - payload = '{"extattrs": ' + json.JSONEncoder().encode(extattrs) + '}' - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + network_ref - r = requests.put(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("No network reference received in IBA reply for network: " + network) - else: - raise InfobloxNotFoundException("No requested network found: " + network) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def create_network(self, network): - """ Implements IBA REST API call to create DHCP network object - :param network: network in CIDR format - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network' - payload = '{"network": "' + network + '","network_view": "' + self.iba_network_view + '"}' - try: - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - r_json = r.json() - if r.status_code == 200 or r.status_code == 201: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_network(self, network): - """ Implements IBA REST API call to delete DHCP network object - :param network: network in CIDR format - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/network?network=' + network + '&network_view=' + self.iba_network_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - network_ref = r_json[0]['_ref'] - if network_ref: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + network_ref - r = requests.delete(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("No network reference received in IBA reply for network: " + network) - else: - raise InfobloxNotFoundException("No network found: " + network) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def create_networkcontainer(self, networkcontainer): - """ Implements IBA REST API call to create DHCP network containert object - :param networkcontainer: network container in CIDR format - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/networkcontainer' - payload = '{"network": "' + networkcontainer + '","network_view": "' + self.iba_network_view + '"}' - try: - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl, data=payload) - r_json = r.json() - if r.status_code == 200 or r.status_code == 201: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def delete_networkcontainer(self, networkcontainer): - """ Implements IBA REST API call to delete DHCP network container object - :param networkcontainer: network container in CIDR format - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/networkcontainer?network=' + networkcontainer + '&network_view=' + self.iba_network_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - network_ref = r_json[0]['_ref'] - if network_ref: - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + network_ref - r = requests.delete(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - if r.status_code == 200: - return - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxGeneralException("No network container reference received in IBA reply for network container: " + networkcontainer) - else: - raise InfobloxNotFoundException("No network container found: " + networkcontainer) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise - - def get_next_available_network(self, networkcontainer, cidr): - """ Implements IBA REST API call to retrieve next available network of network container - Returns network address in CIDR format - :param networkcontainer: network container address in CIDR format - :param cidr: requested network length (from 0 to 32) - """ - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/networkcontainer?network=' + networkcontainer + '&network_view=' + self.iba_network_view - try: - r = requests.get(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - if len(r_json) > 0: - net_ref = r_json[0]['_ref'] - rest_url = 'https://' + self.iba_host + '/wapi/v' + self.iba_wapi_version + '/' + net_ref + '?_function=next_available_network&cidr=' + str(cidr) + '&num=1' - r = requests.post(url=rest_url, auth=(self.iba_user, self.iba_password), verify=self.iba_verify_ssl) - r_json = r.json() - if r.status_code == 200: - network = r_json['networks'][0] - return network - else: - if 'text' in r_json: - if 'code' in r_json and r_json['code'] == 'Client.Ibap.Data': - raise InfobloxNoNetworkAvailableException(r_json['text']) - else: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - else: - raise InfobloxNotFoundException("No requested network container found: " + networkcontainer) - else: - if 'text' in r_json: - raise InfobloxGeneralException(r_json['text']) - else: - r.raise_for_status() - except ValueError: - raise Exception(r) - except Exception: - raise diff --git a/infoblox/__init__.py b/infoblox/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/infoblox/infoblox.py b/infoblox/infoblox.py new file mode 100644 index 0000000..ec66900 --- /dev/null +++ b/infoblox/infoblox.py @@ -0,0 +1,1289 @@ +# +# Copyright 2014 "Igor Feoktistov" +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +import requests +import json + + +class InfobloxNotFoundException(Exception): + pass + + +class InfobloxNoIPavailableException(Exception): + pass + + +class InfobloxNoNetworkAvailableException(Exception): + pass + + +class InfobloxGeneralException(Exception): + pass + + +class InfobloxBadInputParameter(Exception): + pass + + +class Infoblox(object): + """ Implements the following subset of Infoblox IPAM API via REST API + create_network + delete_network + create_networkcontainer + delete_networkcontainer + get_next_available_network + create_host_record + create_txt_record + delete_txt_record + delete_host_record + add_host_alias + delete_host_alias + create_cname_record + delete_cname_record + update_cname_record + create_dhcp_range + delete_dhcp_range + get_next_available_ip + get_host + get_host_by_ip + get_ip_by_host + get_host_by_regexp + get_txt_by_regexp + get_host_by_extattrs + get_host_extattrs + get_network + get_network_by_ip + get_network_by_extattrs + get_network_extattrs + update_network_extattrs + delete_network_extattrs + """ + + def __init__(self, + iba_ipaddr, + iba_user, + iba_password, + iba_wapi_version, + iba_dns_view, + iba_network_view, + iba_verify_ssl=False): + """ Class initialization method + :param iba_ipaddr: IBA IP address of management interface + :param iba_user: IBA user name + :param iba_password: IBA user password + :param iba_wapi_version: IBA WAPI version (example: 1.0) + :param iba_dns_view: IBA default view + :param iba_network_view: IBA default network view + :param iba_verify_ssl: IBA SSL certificate validation (example: False) + """ + self.iba_host = iba_ipaddr + self.iba_user = iba_user + self.iba_password = iba_password + self.iba_wapi_version = iba_wapi_version + self.iba_dns_view = iba_dns_view + self.iba_network_view = iba_network_view + self.iba_verify_ssl = iba_verify_ssl + + def get_next_available_ip(self, network): + """ Implements IBA next_available_ip REST API call + Returns IP v4 address + :param network: network in CIDR format + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network?network=' \ + + network + '&network_view=' + self.iba_network_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + net_ref = r_json[0]['_ref'] + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + net_ref + \ + '?_function=next_available_ip&num=1' + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + ip_v4 = r_json['ips'][0] + return ip_v4 + else: + if 'text' in r_json: + if ('code' in r_json + and r_json['code'] == 'Client.Ibap.Data'): + raise InfobloxNoIPavailableException(r_json['text']) + else: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxNotFoundException("No requested network found: " + network) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def create_host_record(self, address, fqdn): + """ Implements IBA REST API call to create IBA host record + Returns IP v4 address assigned to the host + :param address: IP v4 address or NET v4 address in CIDR format to get + next_available_ip from + :param fqdn: hostname in FQDN + """ + if re.match("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+\/[0-9]+$", address): + ipv4addr = 'func:nextavailableip:' + address + else: + if re.match("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$", address): + ipv4addr = address + else: + raise InfobloxBadInputParameter('Expected IP or NET address in CIDR format') + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host' + \ + '?_return_fields=ipv4addrs' + payload = '{"ipv4addrs": [{"configure_for_dhcp": false,"ipv4addr": "' \ + + ipv4addr + '"}],"name": "' + fqdn + \ + '","view": "' + self.iba_dns_view + '"}' + try: + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + r_json = r.json() + if r.status_code == 200 or r.status_code == 201: + return r_json['ipv4addrs'][0]['ipv4addr'] + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def create_txt_record(self, text, fqdn): + """ Implements IBA REST API call to create IBA txt record + Returns IP v4 address assigned to the host + :param text: free text to be added to the record + :param fqdn: hostname in FQDN + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:txt' + payload = '{"text": "' + text + '","name": "' + fqdn + \ + '","view": "' + self.iba_dns_view + '"}' + try: + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + r_json = r.json() + if r.status_code == 200 or r.status_code == 201: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_host_record(self, fqdn): + """ Implements IBA REST API call to delete IBA host record + :param fqdn: hostname in FQDN + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name=' + fqdn + '&view=' \ + + self.iba_dns_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + host_ref = r_json[0]['_ref'] + if (host_ref and + re.match("record:host\/[^:]+:([^\/]+)\/", + host_ref).group(1) == fqdn): + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + host_ref + r = requests.delete(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) + else: + raise InfobloxNotFoundException("No requested host found: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_txt_record(self, fqdn): + """ Implements IBA REST API call to delete IBA TXT record + :param fqdn: hostname in FQDN + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:txt?name=' + fqdn + \ + '&view=' + self.iba_dns_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + host_ref = r_json[0]['_ref'] + if (host_ref and + re.match("record:txt\/[^:]+:([^\/]+)\/", + host_ref).group(1) == fqdn): + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + host_ref + r = requests.delete(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) + else: + raise InfobloxNotFoundException("No requested host found: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def add_host_alias(self, host_fqdn, alias_fqdn): + """ Implements IBA REST API call to add an alias to IBA host record + :param host_fqdn: host record name in FQDN + :param alias_fqdn: host record name in FQDN + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name=' + host_fqdn + \ + '&view=' + self.iba_dns_view + '&_return_fields=name,aliases' + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + host_ref = r_json[0]['_ref'] + if (host_ref and + re.match("record:host\/[^:]+:([^\/]+)\/", + host_ref).group(1) == host_fqdn): + if 'aliases' in r_json[0]: + aliases = r_json[0]['aliases'] + aliases.append(alias_fqdn) + payload = '{"aliases": ' + \ + json.JSONEncoder().encode(aliases) + '}' + else: + payload = '{"aliases": ["' + alias_fqdn + '"]}' + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + host_ref + r = requests.put(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) + else: + raise InfobloxNotFoundException("No requested host found: " + host_fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_host_alias(self, host_fqdn, alias_fqdn): + """ Implements IBA REST API call to add an alias to IBA host record + :param host_fqdn: host record name in FQDN + :param alias_fqdn: host record name in FQDN + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name=' + host_fqdn + \ + '&view=' + self.iba_dns_view + '&_return_fields=name,aliases' + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + host_ref = r_json[0]['_ref'] + if (host_ref + and re.match("record:host\/[^:]+:([^\/]+)\/", + host_ref).group(1) == host_fqdn): + if 'aliases' in r_json[0]: + aliases = r_json[0]['aliases'] + aliases.remove(alias_fqdn) + payload = '{"aliases": ' \ + + json.JSONEncoder().encode(aliases) + '}' + rest_url = 'https://' + self.iba_host + \ + '/wapi/v' + self.iba_wapi_version + \ + '/' + host_ref + r = requests.put(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxNotFoundException("No requested host alias found: " + alias_fqdn) + else: + raise InfobloxGeneralException("Received unexpected host reference: " + host_ref) + else: + raise InfobloxNotFoundException("No requested host found: " + host_fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def create_cname_record(self, canonical, name): + """ Implements IBA REST API call to create IBA cname record + :param canonical: canonical name in FQDN format + :param name: the name for a CNAME record in FQDN format + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:cname' + payload = '{"canonical": "' + canonical + '","name": "' + name + \ + '","view": "' + self.iba_dns_view + '"}' + try: + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + r_json = r.json() + if r.status_code == 200 or r.status_code == 201: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_cname_record(self, fqdn): + """ Implements IBA REST API call to delete IBA cname record + :param fqdn: cname in FQDN + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:cname?name=' + \ + fqdn + '&view=' + self.iba_dns_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + cname_ref = r_json[0]['_ref'] + if (cname_ref and + re.match("record:cname\/[^:]+:([^\/]+)\/", + cname_ref).group(1) == fqdn): + rest_url = 'https://' + self.iba_host + '/wapi/v' \ + + self.iba_wapi_version + '/' + cname_ref + r = requests.delete(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("Received unexpected cname record reference: " + cname_ref) + else: + raise InfobloxNotFoundException("No requested cname record found: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def update_cname_record(self, canonical, name): + """ Implements IBA REST API call to update or repoint IBA cname record + :param canonical: canonical name in FQDN format + :param name: the name for the new CNAME record in FQDN format + """ + rest_url = 'https://' + self.iba_host + \ + '/wapi/v' + self.iba_wapi_version + '/record:cname' + payload = json.dumps({'name': name}) + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + r_json = r.json() + # RFC1912 - A CNAME can not coexist with any other data, we + # should expect utmost one entry + if r.status_code == 200 and len(r_json) == 1: + ibx_cname = r.json()[0] + cname_ref = ibx_cname['_ref'] + payload = '{"canonical": ' + \ + json.JSONEncoder().encode(canonical) + '}' + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + cname_ref + r = requests.put(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + if r.status_code == 200 or r.status_code == 201: + return + else: + r.raise_for_status() + elif len(r_json) == 0: + raise InfobloxNotFoundException("CNAME: " + + name + " not found.") + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def create_dhcp_range(self, start_ip_v4, end_ip_v4): + """ Implements IBA REST API call to add DHCP range for given + start and end addresses + :param start_ip_v4: IP v4 address + :param end_ip_v4: IP v4 address + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/range' + payload = '{"start_addr": "' + start_ip_v4 + \ + '","end_addr": "' + end_ip_v4 + '"}' + try: + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + r_json = r.json() + if r.status_code == 200 or r.status_code == 201: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_dhcp_range(self, start_ip_v4, end_ip_v4): + """ Implements IBA REST API call to delete DHCP range for given + start and end addresses + :param start_ip_v4: IP v4 address + :param end_ip_v4: IP v4 address + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/range?start_addr=' + \ + start_ip_v4 + '?end_addr=' + end_ip_v4 + '&network_view=' + \ + self.iba_network_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + range_ref = r_json[0]['_ref'] + if range_ref: + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + range_ref + r = requests.delete(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("No range reference received in IBA reply") + else: + raise InfobloxNotFoundException("No requested range found: " + start_ip_v4 + "-" + end_ip_v4) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_host(self, fqdn, fields=None): + """ Implements IBA REST API call to retrieve host record fields + Returns hash table of fields with field name as a hash key + :param fqdn: hostname in FQDN + :param fields: comma-separated list of field names (optional) + """ + if fields: + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name=' + \ + fqdn + '&view=' + self.iba_dns_view + \ + '&_return_fields=' + fields + else: + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name=' + \ + fqdn + '&view=' + self.iba_dns_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + return r_json[0] + else: + raise InfobloxNotFoundException("No hosts found: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxNotFoundException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_host_by_regexp(self, fqdn): + """ Implements IBA REST API call to retrieve host records by fqdn regexp filter + Returns array of host names in FQDN matched to given regexp filter + :param fqdn: hostname in FQDN or FQDN regexp filter + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name~=' + \ + fqdn + '&view=' + self.iba_dns_view + hosts = [] + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + for host in r_json: + hosts.append(host['name']) + return hosts + else: + raise InfobloxNotFoundException("No hosts found for regexp filter: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_txt_by_regexp(self, fqdn): + """ Implements IBA REST API call to retrieve TXT records by fqdn + regexp filter + Returns dictonary of host names in FQDN matched to given regexp + filter with the TXT value + :param fqdn: hostname in FQDN or FQDN regexp filter + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:txt?name~=' + fqdn + \ + '&view=' + self.iba_dns_view + hosts = {} + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + for host in r_json: + hosts[host['name']] = host['text'] + return hosts + else: + raise InfobloxNotFoundException("No txt records found for regexp filter: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_host_by_ip(self, ip_v4): + """ Implements IBA REST API call to find hostname by IP address + Returns array of host names in FQDN associated with given IP address + :param ip_v4: IP v4 address + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/ipv4address?ip_address=' + \ + ip_v4 + '&network_view=' + self.iba_network_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + if len(r_json[0]['names']) > 0: + return r_json[0]['names'] + else: + raise InfobloxNotFoundException("No host records found for IP: " + ip_v4) + else: + raise InfobloxNotFoundException("No IP found: " + ip_v4) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_ip_by_host(self, fqdn): + """ Implements IBA REST API call to find IP addresses by hostname + Returns array of IP v4 addresses associated with given hostname + :param fqdn: hostname in FQDN + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name=' + \ + fqdn + '&view=' + self.iba_dns_view + ipv4addrs = [] + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + if len(r_json[0]['ipv4addrs']) > 0: + for ipv4addr in r_json[0]['ipv4addrs']: + ipv4addrs.append(ipv4addr['ipv4addr']) + return ipv4addrs + else: + raise InfobloxNotFoundException("No host records found for FQDN: " + fqdn) + else: + raise InfobloxNotFoundException("No hosts found: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_host_extattrs(self, fqdn, attributes=None): + """ Implements IBA REST API call to retrieve host extensible attributes + Returns hash table of attributes with attribute name as a hash key + :param fqdn: hostname in FQDN + :param attributes: array of extensible attribute names (optional) + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?name=' + fqdn + \ + '&view=' + self.iba_dns_view + '&_return_fields=name,extattrs' + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + extattrs = {} + if attributes: + for attribute in attributes: + if attribute in r_json[0]['extattrs']: + extattrs[attribute] = \ + r_json[0]['extattrs'][attribute]['value'] + else: + raise InfobloxNotFoundException("No requested attribute found: " + attribute) + else: + for attribute in r_json[0]['extattrs'].keys(): + extattrs[attribute] = \ + r_json[0]['extattrs'][attribute]['value'] + return extattrs + else: + raise InfobloxNotFoundException("No requested host found: " + fqdn) + else: + if 'text' in r_json: + raise InfobloxNotFoundException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_network(self, network, fields=None): + """ Implements IBA REST API call to retrieve network object fields + Returns hash table of fields with field name as a hash key + :param network: network in CIDR format + :param fields: comma-separated list of field names + (optional, returns network in CIDR format and netmask if + not specified) + """ + if not fields: + fields = 'network,netmask' + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network?network=' + network + \ + '&network_view=' + self.iba_network_view + '&_return_fields=' + \ + fields + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + return r_json[0] + else: + raise InfobloxNotFoundException("No requested network found: " + network) + else: + if 'text' in r_json: + raise InfobloxNotFoundException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_network_by_ip(self, ip_v4): + """ Implements IBA REST API call to find network by IP address which + belongs to this network + Returns network in CIDR format + :param ip_v4: IP v4 address + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/ipv4address?ip_address=' + ip_v4 + \ + '&network_view=' + self.iba_network_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + if 'network' in r_json[0]: + return r_json[0]['network'] + else: + raise InfobloxNotFoundException("No network found for IP: " + ip_v4) + else: + raise InfobloxNotFoundException("No IP found: " + ip_v4) + else: + if 'text' in r_json: + raise InfobloxNotFoundException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_network_by_extattrs(self, attributes): + """ Implements IBA REST API call to find a network by it's + extensible attributes + Returns array of networks in CIDR format + :param attributes: comma-separated list of attrubutes name/value + pairs in the format: + attr_name=attr_value - exact match for attribute value + attr_name:=attr_value - case insensitive match for attribute value + attr_name~=regular_expression - match attribute value by regular + expression + attr_name>=attr_value - search by number greater than value + attr_name<=attr_value - search by number less than value + attr_name!=attr_value - search by number not equal of value + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network?*' + \ + "&*".join(attributes.split(",")) + '&network_view=' + \ + self.iba_network_view + networks = [] + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + for network in r_json: + if 'network' in network: + networks.append(network['network']) + return networks + else: + raise InfobloxNotFoundException("No networks found for extensible attributes: " + attributes) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_host_by_extattrs(self, attributes): + """ Implements IBA REST API call to find host by it's extensible attributes + Returns array of hosts in FQDN + :param attributes: comma-separated list of attrubutes name/value + pairs in the format: + attr_name=attr_value - exact match for attribute value + attr_name:=attr_value - case insensitive match for attribute value + attr_name~=regular_expression - match attribute value by regular + expression + attr_name>=attr_value - search by number greater than value + attr_name<=attr_value - search by number less than value + attr_name!=attr_value - search by number not equal of value + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/record:host?*' + \ + "&*".join(attributes.split(",")) + '&view=' + self.iba_dns_view + hosts = [] + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + for host in r_json: + if 'name' in host: + hosts.append(host['name']) + return hosts + else: + raise InfobloxNotFoundException("No hosts found for extensible attributes: " + attributes) + else: + if 'text' in r_json: + raise InfobloxNotFoundException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_network_extattrs(self, network, attributes=None): + """ Implements IBA REST API call to retrieve network extensible attributes + Returns hash table of attributes with attribute name as a hash key + :param network: network in CIDR format + :param attributes: array of extensible attribute names (optional) + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network?network=' + network + \ + '&network_view=' + self.iba_network_view + \ + '&_return_fields=network,extattrs' + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + extattrs = {} + if attributes: + for attribute in attributes: + if attribute in r_json[0]['extattrs']: + extattrs[attribute] = \ + r_json[0]['extattrs'][attribute]['value'] + else: + raise InfobloxNotFoundException("No requested attribute found: " + attribute) + else: + for attribute in r_json[0]['extattrs'].keys(): + extattrs[attribute] = \ + r_json[0]['extattrs'][attribute]['value'] + return extattrs + else: + raise InfobloxNotFoundException("No requested network found: " + network) + else: + if 'text' in r_json: + raise InfobloxNotFoundException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def update_network_extattrs(self, network, attributes): + """ Implements IBA REST API call to add or update network extensible attributes + :param network: network in CIDR format + :param attributes: hash table of extensible attributes with attribute + name as a hash key + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network?network=' + \ + network + '&network_view=' + self.iba_network_view + \ + '&_return_fields=network,extattrs' + extattrs = {} + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + network_ref = r_json[0]['_ref'] + if network_ref: + extattrs = r_json[0]['extattrs'] + for attr_name, attr_value in attributes.iteritems(): + extattrs[attr_name]['value'] = attr_value + payload = '{"extattrs": ' + \ + json.JSONEncoder().encode(extattrs) + '}' + rest_url = 'https://' + self.iba_host + \ + '/wapi/v' + self.iba_wapi_version + \ + '/' + network_ref + r = requests.put(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("No network reference received in IBA reply for network: " + network) + else: + raise InfobloxNotFoundException("No requested network found: " + network) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_network_extattrs(self, network, attributes): + """ Implements IBA REST API call to delete network extensible attributes + :param network: network in CIDR format + :param attributes: array of extensible attribute names + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network?network=' + \ + network + '&network_view=' + self.iba_network_view + \ + '&_return_fields=network,extattrs' + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + network_ref = r_json[0]['_ref'] + if network_ref: + extattrs = r_json[0]['extattrs'] + for attribute in attributes: + if attribute in extattrs: + del extattrs[attribute] + payload = '{"extattrs": ' + \ + json.JSONEncoder().encode(extattrs) + '}' + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + network_ref + r = requests.put(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("No network reference received in IBA reply for network: " + network) + else: + raise InfobloxNotFoundException("No requested network found: " + network) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def create_network(self, network): + """ Implements IBA REST API call to create DHCP network object + :param network: network in CIDR format + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network' + payload = '{"network": "' + network + '","network_view": "' + \ + self.iba_network_view + '"}' + try: + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + r_json = r.json() + if r.status_code == 200 or r.status_code == 201: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_network(self, network): + """ Implements IBA REST API call to delete DHCP network object + :param network: network in CIDR format + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/network?network=' + \ + network + '&network_view=' + self.iba_network_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + network_ref = r_json[0]['_ref'] + if network_ref: + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + network_ref + r = requests.delete(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("No network reference received in IBA reply for network: " + network) + else: + raise InfobloxNotFoundException("No network found: " + network) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def create_networkcontainer(self, networkcontainer): + """ Implements IBA REST API call to create DHCP network containert object + :param networkcontainer: network container in CIDR format + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/networkcontainer' + payload = '{"network": "' + networkcontainer + \ + '","network_view": "' + self.iba_network_view + '"}' + try: + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl, + data=payload) + r_json = r.json() + if r.status_code == 200 or r.status_code == 201: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def delete_networkcontainer(self, networkcontainer): + """ Implements IBA REST API call to delete DHCP network container object + :param networkcontainer: network container in CIDR format + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/networkcontainer?network=' + \ + networkcontainer + '&network_view=' + self.iba_network_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + network_ref = r_json[0]['_ref'] + if network_ref: + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + network_ref + r = requests.delete(url=rest_url, + auth=(self.iba_user, + self.iba_password), + verify=self.iba_verify_ssl) + if r.status_code == 200: + return + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxGeneralException("No network container reference received in IBA reply for network container: " + networkcontainer) + else: + raise InfobloxNotFoundException("No network container found: " + networkcontainer) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise + + def get_next_available_network(self, networkcontainer, cidr): + """ Implements IBA REST API call to retrieve next available network + of network container + Returns network address in CIDR format + :param networkcontainer: network container address in CIDR format + :param cidr: requested network length (from 0 to 32) + """ + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/networkcontainer?network=' + \ + networkcontainer + '&network_view=' + self.iba_network_view + try: + r = requests.get(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + if len(r_json) > 0: + net_ref = r_json[0]['_ref'] + rest_url = 'https://' + self.iba_host + '/wapi/v' + \ + self.iba_wapi_version + '/' + net_ref + \ + '?_function=next_available_network&cidr=' + \ + str(cidr) + '&num=1' + r = requests.post(url=rest_url, + auth=(self.iba_user, self.iba_password), + verify=self.iba_verify_ssl) + r_json = r.json() + if r.status_code == 200: + network = r_json['networks'][0] + return network + else: + if 'text' in r_json: + if ('code' in r_json and + r_json['code'] == 'Client.Ibap.Data'): + raise InfobloxNoNetworkAvailableException(r_json['text']) + else: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + else: + raise InfobloxNotFoundException("No requested network container found: " + networkcontainer) + else: + if 'text' in r_json: + raise InfobloxGeneralException(r_json['text']) + else: + r.raise_for_status() + except ValueError: + raise Exception(r) + except Exception: + raise diff --git a/setup.py b/setup.py index fd291bd..a93d013 100644 --- a/setup.py +++ b/setup.py @@ -7,10 +7,12 @@ setup( name='infoblox', - version='1.1', + version='1.1.1', description='The module implements Infoblox IPAM API via REST API', long_description=README, license = 'Licensed under the Apache License, Version 2.0', - packages=['.'], + packages=['infoblox'], install_requires=['requests'], + tests_require=['responses'], + test_suite='infoblox.tests', ) diff --git a/testing_requirements.txt b/testing_requirements.txt new file mode 100644 index 0000000..0e62791 --- /dev/null +++ b/testing_requirements.txt @@ -0,0 +1,6 @@ +nose==1.3.7 +flake8==2.5.0 +pep8==1.5.7 +pyflakes==1.0.0 +responses==0.5.0 +coverage==4.0.2 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/data/alias_add.json b/tests/data/alias_add.json new file mode 100644 index 0000000..edbcd19 --- /dev/null +++ b/tests/data/alias_add.json @@ -0,0 +1 @@ +[{"name": "host.domain.com", "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default"}] diff --git a/tests/data/alias_delete.json b/tests/data/alias_delete.json new file mode 100644 index 0000000..8624310 --- /dev/null +++ b/tests/data/alias_delete.json @@ -0,0 +1 @@ +[{"aliases": ["alias.domain.com"], "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default", "name": "host.domain.com"}] diff --git a/tests/data/cname_create.json b/tests/data/cname_create.json new file mode 100644 index 0000000..b77430a --- /dev/null +++ b/tests/data/cname_create.json @@ -0,0 +1 @@ +"record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFwaXRlc3Q:host.domain.com/default" diff --git a/tests/data/cname_delete.json b/tests/data/cname_delete.json new file mode 100644 index 0000000..728ff42 --- /dev/null +++ b/tests/data/cname_delete.json @@ -0,0 +1 @@ +[{"_ref":"record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default","view":"default", "name": "cname.domain.com", "canonical": "target.domain.com"}] diff --git a/tests/data/cname_update.json b/tests/data/cname_update.json new file mode 100644 index 0000000..9b88936 --- /dev/null +++ b/tests/data/cname_update.json @@ -0,0 +1 @@ +[{"_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default", "view": "default", "name": "cname.domain.com", "canonical": "host.domain.com"}] diff --git a/tests/data/host_create.json b/tests/data/host_create.json new file mode 100644 index 0000000..ec594c1 --- /dev/null +++ b/tests/data/host_create.json @@ -0,0 +1 @@ +{"ipv4addrs": [{"configure_for_dhcp": false, "ipv4addr": "192.168.40.10", "host": "host.domain.com", "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuY29tLmVxdWlmYXgudXMubGFicy5jaWEuYWFhLXRlc3Rob3N0LjEyLjYuNC4yLg:192.168.40.10/host.domain.com/default"}], "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default"} diff --git a/tests/data/host_delete.json b/tests/data/host_delete.json new file mode 100644 index 0000000..1370319 --- /dev/null +++ b/tests/data/host_delete.json @@ -0,0 +1 @@ +[{"ipv4addrs": [{"ipv4addr": "192.168.40.10", "host": "host.domain.com", "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuY29tLmVxdWlmYXgudXMubGFicy5jaWEuYWFhLXRlc3Rob3N0LjEyLjYuNC4yLg:12.6.4.2/host.domain.com/default", "configure_for_dhcp": false}], "name": "host.domain.com", "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default", "view": "default"}] diff --git a/tests/data/host_get.json b/tests/data/host_get.json new file mode 100644 index 0000000..80b7fca --- /dev/null +++ b/tests/data/host_get.json @@ -0,0 +1 @@ +[{"name": "host.domain.com", "view": "default", "ipv4addrs": [{"host": "host.domain.com", "ipv4addr": "192.168.40.10", "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuY29tLmVxdWlmYXgudXMubGFicy5jaWEuYWFhLXRlc3Rob3N0LjEyLjYuNC4yLg:192.168.40.10/host.domain.com/default", "configure_for_dhcp": false}], "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default"}] diff --git a/tests/data/host_getbyip.json b/tests/data/host_getbyip.json new file mode 100644 index 0000000..ac9cd52 --- /dev/null +++ b/tests/data/host_getbyip.json @@ -0,0 +1 @@ +[{"name": "host.domain.com", "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default", "ipv4addrs": [{"configure_for_dhcp": false, "host": "host.domain.com", "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuY29tLmVxdWlmYXgudXMubGFicy5jaWEuYWFhLXRlc3Rob3N0LjEyLjYuNC4yLg:192.168.40.10/host.domain.com/default", "ipv4addr": "192.168.40.10"}], "view": "default"}] diff --git a/tests/data/txt_create.json b/tests/data/txt_create.json new file mode 100644 index 0000000..56b353e --- /dev/null +++ b/tests/data/txt_create.json @@ -0,0 +1 @@ +"record:txt/ZG5zLmJpbmRfdHh0JC5fZGVmYXVsdC5jb20uZXF1aWZheC51cy5sYWJzLmNpYS5hYWFfbW91c3RhY2hlLiJoaXBzdGVydHh0Mi5jaWEubGFicy51cy5lcXVpZmF4LmNvbSI:target.domain.com/default" diff --git a/tests/data/txt_delete.json b/tests/data/txt_delete.json new file mode 100644 index 0000000..0aa99e6 --- /dev/null +++ b/tests/data/txt_delete.json @@ -0,0 +1 @@ +[{"text": "textrecord", "name": "host.domain.com", "_ref": "record:txt/ZG5zLmJpbmRfdHh0JC5fZGVmYXVsdC5jb20uZXF1aWZheC51cy5sYWJzLmNpYS5hYWFfbW91c3RhY2hlLiJoaXBzdGVydHh0LmNpYS5sYWJzLnVzLmVxdWlmYXguY29tIg:textrecord.domain.com/default", "view": "default"}] diff --git a/tests/test_addhostalias.py b/tests/test_addhostalias.py new file mode 100644 index 0000000..061a203 --- /dev/null +++ b/tests/test_addhostalias.py @@ -0,0 +1,48 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestAddHostAlias(testcasefixture.TestCaseWithFixture): + fixture_name = 'alias_add' + + @classmethod + def setUpClass(cls): + super(TestAddHostAlias, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=cls.body, + status=200) + res.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + cls.ip = cls.iba_ipa.add_host_alias('host.domain.com', + 'alias.domain.com') + + def test_add_host_alias(self): + self.assertIsNone(self.ip) + + @responses.activate + def test_add_host__alias_nohost(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=200) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + self.iba_ipa.add_host_alias('nohost.domain.com', + 'alias.domain.com') + + @responses.activate + def test_add_host_alias_badresponse(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=500) + with self.assertRaises(HTTPError): + self.iba_ipa.add_host_alias('nohost.domain.com', + 'alias.domain.com') diff --git a/tests/test_constructor.py b/tests/test_constructor.py new file mode 100644 index 0000000..07cd536 --- /dev/null +++ b/tests/test_constructor.py @@ -0,0 +1,29 @@ +from infoblox import infoblox +from . import testcasefixture + + +class TestConstructor(testcasefixture.TestCaseWithFixture): + fixture_name = 'alias_add' + + @classmethod + def setUpClass(cls): + super(TestConstructor, cls).setUpClass() + + + def test_iba_host_is_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_host, '10.10.10.10') + + def test_iba_user_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_user, 'foo') + + def test_iba_password_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_password, 'bar') + + def test_iba_wapi_version_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_wapi_version, '1.6') + + def test_iba_dns_view_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_dns_view, 'default') + + def test_iba_network_view_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_network_view, 'default') diff --git a/tests/test_createcname.py b/tests/test_createcname.py new file mode 100644 index 0000000..fb04781 --- /dev/null +++ b/tests/test_createcname.py @@ -0,0 +1,53 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestCreateCname(testcasefixture.TestCaseWithFixture): + fixture_name = 'cname_create' + + @classmethod + def setUpClass(cls): + super(TestCreateCname, cls).setUpClass() + cls.iba_ipa = infoblox.Infoblox('10.10.10.10', 'foo', 'bar', + '1.6', 'default', 'default') + + with responses.RequestsMock() as res: + res.add(responses.POST, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body=cls.body, + status=201) + cls.ip = cls.iba_ipa.create_cname_record('target.domain.com', + 'cname.domain.com') + + def test_cname_create(self): + self.assertIsNone(self.ip) + + def test_iba_host_is_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_host, '10.10.10.10') + + def test_iba_user_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_user, 'foo') + + def test_iba_password_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_password, 'bar') + + def test_iba_wapi_version_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_wapi_version, '1.6') + + def test_iba_dns_view_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_dns_view, 'default') + + def test_iba_network_view_set_from_init(self): + self.assertEqual(self.iba_ipa.iba_network_view, 'default') + + @responses.activate + def test_cname_create_bad_reponse(self): + responses.add(responses.POST, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body=self.body, + status=500) + with self.assertRaises(HTTPError): + self.iba_ipa.create_cname_record('target.domain.com', + 'cname.domain.com') diff --git a/tests/test_createhost.py b/tests/test_createhost.py new file mode 100644 index 0000000..c12607c --- /dev/null +++ b/tests/test_createhost.py @@ -0,0 +1,42 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestCreateHostRecord(testcasefixture.TestCaseWithFixture): + fixture_name = 'host_create' + + @classmethod + def setUpClass(cls): + super(TestCreateHostRecord, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.POST, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=cls.body, + status=200) + cls.ip = cls.iba_ipa.create_host_record('192.168.40.10', + 'host.domain.com') + + def test_host_create(self): + self.assertEqual(self.ip, '192.168.40.10') + + @responses.activate + def test_host_create_badip(self): + responses.add(responses.POST, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=self.body, + status=200) + with self.assertRaises(infoblox.InfobloxBadInputParameter): + self.iba_ipa.create_host_record('not.an.ip.com', + 'host.domain.com') + + @responses.activate + def test_host_create_badresponse(self): + responses.add(responses.POST, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=500) + with self.assertRaises(HTTPError): + self.iba_ipa.create_host_record('192.168.40.10', + 'host.domain.com') diff --git a/tests/test_createtxtrecord.py b/tests/test_createtxtrecord.py new file mode 100644 index 0000000..4f185f6 --- /dev/null +++ b/tests/test_createtxtrecord.py @@ -0,0 +1,32 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestCreateTxtRecord(testcasefixture.TestCaseWithFixture): + fixture_name = 'txt_create' + + @classmethod + def setUpClass(cls): + super(TestCreateTxtRecord, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.POST, + 'https://10.10.10.10/wapi/v1.6/record:txt', + body=cls.body, + status=200) + cls.ip = cls.iba_ipa.create_txt_record('txtrecord.domain.com', + 'target.domain.com') + + def test_txt_create(self): + self.assertIsNone(self.ip) + + @responses.activate + def test_txt_create_error(self): + responses.add(responses.POST, + 'https://10.10.10.10/wapi/v1.6/record:txt', + body=self.body, + status=500) + with self.assertRaises(HTTPError): + self.iba_ipa.create_txt_record('txtrecord.domain.com', + 'target.domain.com') diff --git a/tests/test_deletecname.py b/tests/test_deletecname.py new file mode 100644 index 0000000..5523192 --- /dev/null +++ b/tests/test_deletecname.py @@ -0,0 +1,60 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestDeleteCname(testcasefixture.TestCaseWithFixture): + fixture_name = 'cname_delete' + + @classmethod + def setUpClass(cls): + super(TestDeleteCname, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body=cls.body, + status=200) + res.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + status=200) + cls.ip = cls.iba_ipa.delete_cname_record('cname.domain.com') + + def test_cname_delete(self): + self.assertIsNone(self.ip) + + @responses.activate + def test_cname_delete_badhostname(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body='[]', + status=200) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + self.iba_ipa.delete_cname_record('nocname.domain.com') + + @responses.activate + def test_cname_delete_unexpected(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body=self.body, + status=200) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxGeneralException): + self.iba_ipa.delete_cname_record('genexp.domain.com') + + @responses.activate + def test_cname_delete_bad_response(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body=self.body, + status=500) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + status=200) + with self.assertRaises(HTTPError): + self.iba_ipa.delete_cname_record('cname.domain.com') diff --git a/tests/test_deletehost.py b/tests/test_deletehost.py new file mode 100644 index 0000000..3627c68 --- /dev/null +++ b/tests/test_deletehost.py @@ -0,0 +1,49 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestDeleteHost(testcasefixture.TestCaseWithFixture): + fixture_name = 'host_delete' + + @classmethod + def setUpClass(cls): + super(TestDeleteHost, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=cls.body, + status=200) + res.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + cls.ip = cls.iba_ipa.delete_host_record('host.domain.com') + + def test_host_delete(self): + self.assertIsNone(self.ip) + + @responses.activate + def test_host_delete_hostnotfound(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=200) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + ip = self.iba_ipa.delete_host_record('hostnotfound.domain.com') + + @responses.activate + def test_host_delete_servererror(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=self.body, + status=500) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + with self.assertRaises(HTTPError): + ip = self.iba_ipa.delete_host_record('host.domain.com') + diff --git a/tests/test_deletehostalias.py b/tests/test_deletehostalias.py new file mode 100644 index 0000000..2d9e4ba --- /dev/null +++ b/tests/test_deletehostalias.py @@ -0,0 +1,78 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestDeleteHostAlias(testcasefixture.TestCaseWithFixture): + fixture_name = 'alias_delete' + + @classmethod + def setUpClass(cls): + super(TestDeleteHostAlias, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=cls.body, + status=200) + res.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + cls.ip = cls.iba_ipa.delete_host_alias('host.domain.com', + 'alias.domain.com') + + def test_delete_host_alias(self): + self.assertIsNone(self.ip) + + + @responses.activate + def test_delete_host_alias_unexpectedhost(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=self.body, + status=200) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxGeneralException): + self.iba_ipa.delete_host_alias('badhost.domain.com', + 'alias.domain.com') + + @responses.activate + def test_delete_host_alias_hostnotfound(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=200) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + self.iba_ipa.delete_host_alias('nohost.domain.com', + 'alias.domain.com') + + @responses.activate + def test_delete_host_alias_servererroronget(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=500) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=200) + with self.assertRaises(HTTPError): + self.iba_ipa.delete_host_alias('host.domain.com', + 'alias.domain.com') + + @responses.activate + def test_delete_host_alias_servererroronpost(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=self.body, + status=200) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default', + status=500) + with self.assertRaises(HTTPError): + self.iba_ipa.delete_host_alias('host.domain.com', + 'alias.domain.com') diff --git a/tests/test_deletetxtrecord.py b/tests/test_deletetxtrecord.py new file mode 100644 index 0000000..5a4b477 --- /dev/null +++ b/tests/test_deletetxtrecord.py @@ -0,0 +1,60 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestDeleteTxtRecord(testcasefixture.TestCaseWithFixture): + fixture_name = 'txt_delete' + + @classmethod + def setUpClass(cls): + super(TestDeleteTxtRecord, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:txt', + body=cls.body, + status=200) + res.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:txt/ZG5zLmJpbmRfdHh0JC5fZGVmYXVsdC5jb20uZXF1aWZheC51cy5sYWJzLmNpYS5hYWFfbW91c3RhY2hlLiJoaXBzdGVydHh0LmNpYS5sYWJzLnVzLmVxdWlmYXguY29tIg:textrecord.domain.com/default', + status=200) + cls.ip = cls.iba_ipa.delete_txt_record('textrecord.domain.com') + + def test_txt_delete(self): + self.assertIsNone(self.ip) + + @responses.activate + def test_txt_norecordfound(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:txt', + body='[]', + status=200) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:txt/ZG5zLmJpbmRfdHh0JC5fZGVmYXVsdC5jb20uZXF1aWZheC51cy5sYWJzLmNpYS5hYWFfbW91c3RhY2hlLiJoaXBzdGVydHh0LmNpYS5sYWJzLnVzLmVxdWlmYXguY29tIg:textrecord.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + self.iba_ipa.delete_txt_record('norecord.domain.com') + + @responses.activate + def test_txt_delete_exception(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:txt', + body=self.body, + status=200) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:txt/ZG5zLmJpbmRfdHh0JC5fZGVmYXVsdC5jb20uZXF1aWZheC51cy5sYWJzLmNpYS5hYWFfbW91c3RhY2hlLiJoaXBzdGVydHh0LmNpYS5sYWJzLnVzLmVxdWlmYXguY29tIg:textrecord.domain.com/default', + status=200) + with self.assertRaises(infoblox.InfobloxGeneralException): + self.iba_ipa.delete_txt_record('badtextrecord.domain.com') + + @responses.activate + def test_txt_delete_badresponse(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:txt', + body=self.body, + status=500) + responses.add(responses.DELETE, + 'https://10.10.10.10/wapi/v1.6/record:txt/ZG5zLmJpbmRfdHh0JC5fZGVmYXVsdC5jb20uZXF1aWZheC51cy5sYWJzLmNpYS5hYWFfbW91c3RhY2hlLiJoaXBzdGVydHh0LmNpYS5sYWJzLnVzLmVxdWlmYXguY29tIg:textrecord.domain.com/default', + status=200) + with self.assertRaises(HTTPError): + self.iba_ipa.delete_txt_record('textrecord.domain.com') diff --git a/tests/test_gethost.py b/tests/test_gethost.py new file mode 100644 index 0000000..ae84da8 --- /dev/null +++ b/tests/test_gethost.py @@ -0,0 +1,47 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestGetHost(testcasefixture.TestCaseWithFixture): + fixture_name = 'host_get' + + @classmethod + def setUpClass(cls): + super(TestGetHost, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=cls.body, + status=200) + cls.ip = cls.iba_ipa.get_host('host.domain.com') + + def test_get_host(self): + response_test_dict = {'view': 'default', + 'name': 'host.domain.com', + 'ipv4addrs': [{'host': + 'host.domain.com', + 'ipv4addr': '192.168.40.10', + '_ref': 'record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuY29tLmVxdWlmYXgudXMubGFicy5jaWEuYWFhLXRlc3Rob3N0LjEyLjYuNC4yLg:192.168.40.10/host.domain.com/default', + 'configure_for_dhcp': False}], + '_ref': 'record:host/ZG5zLmhvc3QkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYS10ZXN0aG9zdA:host.domain.com/default'} + self.assertDictEqual(self.ip, response_test_dict) + + @responses.activate + def test_get_host_nohostfound(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + ip = self.iba_ipa.get_host('host.domain.com') + + @responses.activate + def test_get_host_servererror(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=500) + with self.assertRaises(HTTPError): + ip = self.iba_ipa.get_host('host.domain.com') diff --git a/tests/test_getipbyhost.py b/tests/test_getipbyhost.py new file mode 100644 index 0000000..0996bd2 --- /dev/null +++ b/tests/test_getipbyhost.py @@ -0,0 +1,41 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestGetIpByHost(testcasefixture.TestCaseWithFixture): + fixture_name = 'host_getbyip' + + @classmethod + def setUpClass(cls): + super(TestGetIpByHost, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body=cls.body, + status=200) + cls.ip = cls.iba_ipa.get_ip_by_host('192.168.40.10') + + + def test_get_ip_by_host(self): + response_test_list = ['192.168.40.10'] + self.assertListEqual(self.ip, response_test_list) + + @responses.activate + def test_get_ip_by_hostnotfound(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + self.iba_ipa.get_ip_by_host('127.0.0.1') + + @responses.activate + def test_get_ip_by_host_serverfail(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:host', + body='[]', + status=500) + with self.assertRaises(HTTPError): + self.iba_ipa.get_ip_by_host('192.168.40.10') diff --git a/tests/test_updatecname.py b/tests/test_updatecname.py new file mode 100644 index 0000000..02f2ffc --- /dev/null +++ b/tests/test_updatecname.py @@ -0,0 +1,82 @@ +import responses +from requests.exceptions import HTTPError +from infoblox import infoblox +from . import testcasefixture + + +class TestUpdateCname(testcasefixture.TestCaseWithFixture): + fixture_name = 'cname_update' + + @classmethod + def setUpClass(cls): + super(TestUpdateCname, cls).setUpClass() + with responses.RequestsMock() as res: + res.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body=cls.body, + status=200) + res.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + body='', + status=200) + cls.ip = cls.iba_ipa.update_cname_record('newtarget.domain.com', + 'cname.domain.com') + + def test_cname_update(self): + self.assertIsNone(self.ip) + + @responses.activate + def test_cname_update_nocname(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body='[]', + status=200) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + body='', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + ip = self.iba_ipa.update_cname_record('newtarget.domain.com', + 'nocname.domain.com') + + @responses.activate + def test_cname_update_other(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body='{"text": "don\'t ever trust the needle"}', + status=203) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + body='', + status=200) + with self.assertRaises(infoblox.InfobloxGeneralException): + self.iba_ipa.update_cname_record('badtarget.domain.com', + 'cname.domain.com') + + @responses.activate + def test_cname_update_badupdate(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body=self.body, + status=200) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + body='', + status=500) + with self.assertRaises(HTTPError): + self.iba_ipa.update_cname_record('newtarget.domain.com', + 'cname.domain.com') + + @responses.activate + def test_cname_update_nocname(self): + responses.add(responses.GET, + 'https://10.10.10.10/wapi/v1.6/record:cname', + body='[]', + status=200) + responses.add(responses.PUT, + 'https://10.10.10.10/wapi/v1.6/record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0LmNvbS5lcXVpZmF4LnVzLmxhYnMuY2lhLmFhYWNuYW1ldGVzdA:cname.domain.com/default', + body='', + status=200) + with self.assertRaises(infoblox.InfobloxNotFoundException): + ip = self.iba_ipa.update_cname_record('newtarget.domain.com', + 'nocname.domain.com') diff --git a/tests/testcasefixture.py b/tests/testcasefixture.py new file mode 100644 index 0000000..4d2bccb --- /dev/null +++ b/tests/testcasefixture.py @@ -0,0 +1,22 @@ +from infoblox import infoblox +import unittest +import os + + +class TestCaseWithFixture(unittest.TestCase): + location = os.path.dirname(__file__) + fixture_name = None + + @classmethod + def setUpClass(cls): + super(TestCaseWithFixture, cls).setUpClass() + cls.body = cls.load_fixture(cls.fixture_name) + cls.iba_ipa = infoblox.Infoblox('10.10.10.10', 'foo', 'bar', + '1.6', 'default', 'default') + + @classmethod + def load_fixture(cls, fixture_name): + filename = "{}/data/{}.json".format(cls.location, + cls.fixture_name) + with open(filename, 'r') as file: + return file.read() diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..3efd957 --- /dev/null +++ b/tox.ini @@ -0,0 +1,35 @@ +# Tox (http://tox.testrun.org/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. +[setup] +results = {toxinidir}/test_results + +[tox] +envlist = py27, py35, py34 + + +[testenv] +deps = + -rtesting_requirements.txt +setenv = +passenv = * +whitelist_externals=mkdir +commands = + mkdir -p {[setup]results} + nosetests \ + --with-xunit \ + --xunit-file {[setup]results}/nose.xml \ + --with-coverage \ + --cover-erase \ + --cover-tests \ + --cover-branches \ + --cover-xml \ + --cover-xml-file {[setup]results}/coverage.xml \ + --cover-html \ + --cover-html-dir {[setup]results}/coverage \ + --cover-package infoblox + flake8 \ + --show-source \ + --output-file {[setup]results}/flake8.txt \ + --exit-zero infoblox