Source code for libtaxii.clients

# Copyright (c) 2017, The MITRE Corporation
# For license information, see the LICENSE.txt file

TAXII Clients
import sys
import base64
import socket
import ssl
import warnings
from libtaxii.constants import *
import six
from six.moves import urllib

[docs]class HttpClient(object): # Constants for authentication types AUTH_NONE = 0 #: Do not offer any authentication credentials to the server AUTH_BASIC = 1 #: Offer HTTP Basic authentication credentials to the server AUTH_CERT = 2 #: Offer certificate based authentication credentials to the server AUTH_CERT_BASIC = 3 #: Offer certificate based auth and HTTP Basic credentials # Proxy values SYSTEM_PROXY = None NO_PROXY = 'noproxy' # Proxy Constants PROXY_HTTP = 'http' PROXY_HTTPS = 'https' # Header Constants HEADER_ACCEPT = 'accept' HEADER_CONTENT_TYPE = 'content-type' HEADER_X_TAXII_ACCEPT = 'x-taxii-accept' HEADER_X_TAXII_CONTENT_TYPE = 'x-taxii-content-type' HEADER_X_TAXII_PROTOCOL = 'x-taxii-protocol' HEADER_X_TAXII_SERVICES = 'x-taxii-services' def __init__(self, auth_type=AUTH_NONE, auth_credentials=None, use_https=False): self.use_https = use_https self.auth_type = auth_type self.auth_credentials = {} if auth_credentials is not None: self.set_auth_credentials(auth_credentials) # These cannot currently be set in the constructor self.proxy_string = None self.verify_server = False self.ca_file = None
[docs] def set_auth_type(self, auth_type): """Set the authentication type for this client. :param string auth_type: Must be one of :attr:`AUTH_NONE`, :attr:`AUTH_BASIC`, or :attr:`AUTH_CERT` """ # If this isn't a change, don't do anything if self.auth_type == auth_type: return if auth_type == HttpClient.AUTH_NONE: self.auth_type = HttpClient.AUTH_NONE elif auth_type == HttpClient.AUTH_BASIC: self.auth_type = HttpClient.AUTH_BASIC elif auth_type == HttpClient.AUTH_CERT: self.auth_type = HttpClient.AUTH_CERT elif auth_type == HttpClient.AUTH_CERT_BASIC: self.auth_type = HttpClient.AUTH_CERT_BASIC else: raise Exception('Invalid auth_type specified. Must be one of HttpClient AUTH_NONE, AUTH_BASIC, or AUTH_CERT')
[docs] def set_verify_server(self, verify_server=False, ca_file=None): """ Tell libtaxii whether to verify the server's ssl certificate using the provided ca_file. :param bool verify_server: Flag indicating whether or not libtaxii should verify the server. """ if verify_server and ca_file is None: raise ValueError('If verify_server is True, ca_file must not be None.') self.verify_server = verify_server self.ca_file = ca_file
@property def basic_auth_header(self): """Returns a Base64-encoded HTTP Basic Authorization Header.""" credentials = '{}:{}'.format( self.auth_credentials['username'], self.auth_credentials['password'] ).encode('utf-8') return b'Basic ' + base64.b64encode(credentials)
[docs] def set_proxy(self, proxy_string=None): """ Set the proxy settings to use when making a connection. :param string proxy_string: Proxy address formatted like Set to :attr:`SYSTEM_PROXY` to use the system proxy; set to :attr:`NO_PROXY` to use no proxy. """ self.proxy_string = proxy_string
[docs] def set_use_https(self, bool_): """Indicate whether the HttpClient should use HTTP or HTTPs. The default is HTTP. :param bool bool: The new use_https value. """ if bool_ is True: self.use_https = True elif bool_ is False: self.use_https = False else: raise Exception('Invalid argument value. Must be a boolean value of \'True\' or \'False\'.')
[docs] def set_auth_credentials(self, auth_credentials_dict): """Set the authentication credentials used later when making a request. Note that it is possible to pass in one dict containing credentials for different authentication types and swap between them later. :param auth_credentials_dict dict: The dictionary containing authentication credentials. e.g.: - {'key_file': '/path/to/key.key', 'cert_file': '/path/to/cert.crt'} - {'username': 'abc', 'password': 'xyz'} - Or both, if both username/password and certificate based auth are used """ if self.auth_type == HttpClient.AUTH_NONE: req_fields = [] elif self.auth_type == HttpClient.AUTH_BASIC: req_fields = ['username', 'password'] elif self.auth_type == HttpClient.AUTH_CERT: req_fields = ['key_file', 'cert_file'] elif self.auth_type == HttpClient.AUTH_CERT_BASIC: req_fields = ['key_file', 'cert_file', 'username', 'password'] for k in req_fields: if k not in auth_credentials_dict: raise Exception('Invalid auth credentials. Field %s is not present' % k) self.auth_credentials = auth_credentials_dict
def call_taxii_service(self, host, path, message_binding, post_data, port=None, get_params_dict=None): """ **DEPRECATED.** May be removed in the next version of `libtaxii`. Use :func:`call_taxii_service2` instead. Call a TAXII service. """ warnings.warn('Call to deprecated function: libtaxii.clients.HttpClient.call_taxii_service()', category=DeprecationWarning) if port is None: # If the caller did not specify a port, use the default if self.use_https: port = 443 else: port = 80 if get_params_dict is not None: # Add the query params to the URL path += '?' + urllib.parse.urlencode(get_params_dict) header_dict = {'Content-Type': 'application/xml', 'User-Agent': 'libtaxii.httpclient'} if self.auth_type == HttpClient.AUTH_CERT_BASIC: raise Exception('AuthType AUTH_CERT_BASIC not supported by call_taxii_service. Use call_taxii_service2.') if self.use_https: header_dict['X-TAXII-Protocol'] = VID_TAXII_HTTPS_10 if self.auth_type == HttpClient.AUTH_NONE: conn = six.moves.http_client.HTTPSConnection(host, port) elif self.auth_type == HttpClient.AUTH_BASIC: header_dict['Authorization'] = self.basic_auth_header conn = six.moves.http_client.HTTPSConnection(host, port) else: # AUTH_CERT key_file = self.auth_credentials['key_file'] cert_file = self.auth_credentials['cert_file'] conn = six.moves.http_client.HTTPSConnection(host, port, key_file, cert_file) else: # Not using https header_dict['X-TAXII-Protocol'] = VID_TAXII_HTTP_10 if self.auth_type == HttpClient.AUTH_NONE: conn = six.moves.http_client.HTTPConnection(host, port) # TODO: Consider deleting because this is a terrible idea elif self.auth_type == HttpClient.AUTH_BASIC: # Sending credentials in cleartext.. tsk tsk header_dict['Authorization'] = self.basic_auth_header conn = six.moves.http_client.HTTPConnection(host, port) else: # AUTH_CERT key_file = self.auth_credentials['key_file'] cert_file = self.auth_credentials['cert_file'] conn = six.moves.http_client.HTTPConnection(host, port, key_file, cert_file) header_dict['Content-Type'] = 'application/xml' header_dict['X-TAXII-Content-Type'] = message_binding req = conn.request('POST', path, post_data, headers=header_dict) response = conn.getresponse() return response
[docs] def call_taxii_service2(self, host, path, message_binding, post_data, port=None, get_params_dict=None, content_type=None, headers=None, user_agent=None, timeout=None): """Call a TAXII service. **Note:** this uses urllib2 instead of httplib, and therefore returns a different kind of object than :func:`call_taxii_service`. :return: :class:`urllib2.Response` """ header_dict = {} if headers is not None: for k, v in six.iteritems(headers): header_dict[k.lower()] = v if user_agent is not None: header_dict['User-Agent'] = user_agent else: header_dict['User-Agent'] = 'libtaxii.httpclient' header_dict[HttpClient.HEADER_X_TAXII_CONTENT_TYPE] = message_binding content_type_map = {VID_TAXII_XML_10: 'application/xml', VID_TAXII_XML_11: 'application/xml', VID_CERT_EU_JSON_10: 'application/json'} if content_type is not None: # Set the content type to the user-provided value header_dict[HttpClient.HEADER_CONTENT_TYPE] = content_type else: # If the user did not provide a value, attempt to find a known value if message_binding not in content_type_map: raise ValueError("content_type not specified, and the message_binding is unrecognized") header_dict[HttpClient.HEADER_CONTENT_TYPE] = content_type_map[message_binding] # States of Accept and X-TAXII-Accept headers: # # 1. Accept and X-TAXII-Accept headers both set. # - Do nothing. Assume user knows what they are doing # # 2. Accept header set and X-TAXII-Accept header not set # - Do nothing. Assume user knows what they are doing # # 3. Accept header not set and X-TAXII-Accept header set # - Do nothing. Bad practice, but not invalid. # Assume user knows what they are doing. # # 4. Accept header not set and X-TAXII-Accept header not set # - User hasn't specified anything. In this case, default behavior is: # Accept = Content-Type and # X-TAXII-Accept = X-TAXII-Content-Type. # This means that the client will only accept messages # in the same format that was sent. # # 5. Users of libtaxii that wish to accept everything should set the # Accept header to '*/*'. accept_set = header_dict.get(HttpClient.HEADER_ACCEPT) is not None x_taxii_accept_set = header_dict.get(HttpClient.HEADER_X_TAXII_ACCEPT) is not None if not accept_set and not x_taxii_accept_set: header_dict[HttpClient.HEADER_ACCEPT] = header_dict[HttpClient.HEADER_CONTENT_TYPE] header_dict[HttpClient.HEADER_X_TAXII_ACCEPT] = header_dict[HttpClient.HEADER_X_TAXII_CONTENT_TYPE] # If the X-TAXII-Services header is not set by the user, # Attempt to use the library's default mapping services_map = {VID_TAXII_XML_10: VID_TAXII_SERVICES_10, VID_TAXII_XML_11: VID_TAXII_SERVICES_11, VID_CERT_EU_JSON_10: VID_TAXII_SERVICES_10} if header_dict.get(HttpClient.HEADER_X_TAXII_SERVICES) is None: # The X-TAXII-Services header was not set by the user if message_binding not in services_map: raise ValueError('x-taxii-services header not specified, and the message_binding is unrecognized') header_dict[HttpClient.HEADER_X_TAXII_SERVICES] = services_map[message_binding] handler_list = [] if self.use_https: header_dict[HttpClient.HEADER_X_TAXII_PROTOCOL] = VID_TAXII_HTTPS_10 if (self.auth_type == HttpClient.AUTH_CERT or self.auth_type == HttpClient.AUTH_CERT_BASIC): key_file = self.auth_credentials['key_file'] cert_file = self.auth_credentials['cert_file'] key_password = self.auth_credentials.get('key_password') else: key_file = None cert_file = None key_password = None if (self.auth_type == HttpClient.AUTH_BASIC or self.auth_type == HttpClient.AUTH_CERT_BASIC): header_dict['Authorization'] = self.basic_auth_header verify_server = self.verify_server ca_file = self.ca_file handler_list.append(LibtaxiiHTTPSHandler( key_file=key_file, cert_file=cert_file, verify_server=verify_server, ca_certs=ca_file, key_password=key_password)) else: # Not using https header_dict[HttpClient.HEADER_X_TAXII_PROTOCOL] = VID_TAXII_HTTP_10 if self.auth_type == HttpClient.AUTH_NONE: handler_list.append(urllib.request.HTTPHandler()) elif self.auth_type == HttpClient.AUTH_BASIC: header_dict['Authorization'] = self.basic_auth_header elif self.auth_type == HttpClient.AUTH_CERT: k = self.auth_credentials['key_file'] c = self.auth_credentials['cert_file'] handler_list.append(HTTPClientAuthHandler(k, c)) elif self.auth_type == HttpClient.AUTH_CERT_BASIC: header_dict['Authorization'] = self.basic_auth_header k = self.auth_credentials['key_file'] c = self.auth_credentials['cert_file'] handler_list.append(HTTPSClientAuthHandler(k, c)) handler_list.append(urllib.request.HTTPHandler()) if self.proxy_string is not None: if self.proxy_string == 'noproxy': # Dont use any proxy, including the system-specified proxy handler_list.append(urllib.request.ProxyHandler({})) else: # Use a specific proxy handler_list.append(urllib.request.ProxyHandler({self.PROXY_HTTP: self.proxy_string, self.PROXY_HTTPS: self.proxy_string})) opener = urllib.request.build_opener(*handler_list) urllib.request.install_opener(opener) if port is None: # If the caller did not specify a port, use the default if self.use_https: port = 443 else: port = 80 if self.use_https: scheme = 'https://' else: scheme = 'http://' url = scheme + host + ':' + str(port) + path if get_params_dict is not None: url += '?' + urllib.parse.urlencode(get_params_dict) req = urllib.request.Request(url, post_data, header_dict) try: if timeout is not None: response = urllib.request.urlopen(req, timeout=timeout) else: # Defaults to socket.getdefaulttimeout() response = urllib.request.urlopen(req) return response except urllib.error.HTTPError as error: return error
# Backwards compatibility setAuthType = set_auth_type setVerifyServer = set_verify_server setProxy = set_proxy setUseHttps = set_use_https setAuthCredentials = set_auth_credentials callTaxiiService = call_taxii_service callTaxiiService2 = call_taxii_service2
# class LibtaxiiHTTPSHandler(urllib.request.HTTPSHandler): def __init__(self, key_file=None, cert_file=None, verify_server=False, ca_certs=None, key_password=None): urllib.request.HTTPSHandler.__init__(self) self.key_file = key_file self.cert_file = cert_file self.verify_server = verify_server self.ca_certs = ca_certs self.key_password = key_password def https_open(self, req): return self.do_open(self.get_connection, req) def get_connection(self, host, timeout=0): return VerifiableHTTPSConnection(host, key_file=self.key_file, cert_file=self.cert_file, verify_server=self.verify_server, ca_certs=self.ca_certs, key_password=self.key_password) class HTTPClientAuthHandler(urllib.request.HTTPSHandler): # TODO: Is this used / is this possible? def __init__(self, key, cert): urllib.request.HTTPSHandler.__init__(self) self.key = key self.cert = cert def https_open(self, req): return self.do_open(self.get_connection, req) def get_connection(self, host, timeout=0): return six.moves.http_client.HTTPSConnection(host, key_file=self.key, cert_file=self.cert, timeout=timeout) class VerifiableHTTPSConnection(six.moves.http_client.HTTPSConnection): """ The default httplib HTTPSConnection does not verify certificates. This class extends HTTPSConnection and requires certificate verification. Borrowed from (moved to """ def __init__(self, host, port=None, key_file=None, cert_file=None, key_password=None, strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None, verify_server=False, ca_certs=None): # The httplib.HTTPSConnection init arguments have changed over different Python versions: # Py 2.6: httplib.HTTPSConnection(host[, port[, key_file[, cert_file[, strict[, timeout]]]]]) # Py 2.7: httplib.HTTPSConnection(host[, port[, key_file[, cert_file[, strict[, timeout[, source_address[, context]]]]]]]) # Py 3.3: http.client.HTTPSConnection(host, port=None, key_file=None, cert_file=None[, strict][, timeout], source_address=None, *, context=None, check_hostname=None) # Py 3.4-3.6: http.client.HTTPSConnection(host, port=None, key_file=None, cert_file=None, [timeout, ]source_address=None, *, context=None, check_hostname=None) python_version = sys.version_info[0:2] # (e.g. 2.6, 2.7, 3.3, etc.) self.context = None if python_version == (2, 6): six.moves.http_client.HTTPSConnection.__init__( self, host, port, key_file, cert_file, strict, timeout) if key_password: warnings.warn('Key password is not supported in Python 2.6. Ignoring') elif python_version == (2, 7) or six.PY3: if hasattr(ssl, "create_default_context"): self.context = ssl.create_default_context( ssl.Purpose.SERVER_AUTH if verify_server else ssl.Purpose.CLIENT_AUTH, cafile=ca_certs) if cert_file or key_file: self.context.load_cert_chain( cert_file, key_file, password=key_password) if not self.context and key_password: warnings.warn('Key password is not supported in Python <2.7.9. Ignoring') if python_version == (2, 7): if self.context: six.moves.http_client.HTTPSConnection.__init__( self, host, port, strict=strict, timeout=timeout, source_address=source_address, context=self.context) else: six.moves.http_client.HTTPSConnection.__init__( self, host, port, strict=strict, timeout=timeout, source_address=source_address) elif six.PY3: super(VerifiableHTTPSConnection, self).__init__( host, port, timeout=timeout, source_address=source_address, context=self.context) else: raise RuntimeError("Unsupported Python version: '{0}'".format(python_version)) self.cert_file = cert_file self.key_file = key_file if verify_server: self.cert_reqs = ssl.CERT_REQUIRED else: self.cert_reqs = ssl.CERT_NONE self.ca_certs = ca_certs def connect(self): # overrides the version in httplib so that we do # certificate verification if sys.version_info.major == 2 and sys.version_info.minor == 6: # Python 2.6 socket.create_connection has no source_address argument: sock = socket.create_connection( (, self.port), self.timeout) else: sock = socket.create_connection( (, self.port), self.timeout, self.source_address) if self._tunnel_host: self.sock = sock self._tunnel() server_hostname = self._tunnel_host else: server_hostname = if self.context: self.sock = self.context.wrap_socket( sock, server_hostname=server_hostname) else: self.sock = ssl.wrap_socket( sock, keyfile=self.key_file, certfile=self.cert_file, cert_reqs=self.cert_reqs, ca_certs=self.ca_certs, server_hostname=server_hostname)