Common utility classes and functions used throughout libtaxii.


from operator import attrgetter
from re import sub as resub
import dateutil.parser
import random
from libtaxii.constants import *
from lxml import etree
from uuid import uuid4
import sys
import six

    import simplejson as json
except ImportError:
    import json


def parse(s):
    Uses the default parser to parse a string or file-like object

    :param s: The XML String or File-like object to parse
    :return: an etree._Element

        e = etree.parse(s, get_xml_parser()).getroot()
    except IOError:
        e = etree.XML(s, get_xml_parser())

    return e

def parse_xml_string(xmlstr):
    """Parse an XML string (binary or unicode) with the default parser.

    :param xmlstr: An XML String to parse
    :return: an etree._Element
    if isinstance(xmlstr, six.binary_type):
        xmlstr = six.BytesIO(xmlstr)
    elif isinstance(xmlstr, six.text_type):
        xmlstr = six.StringIO(xmlstr)

    return parse(xmlstr)

[docs]def get_xml_parser(): """Return the XML parser currently in use. If one has not already been set (via :py:func:`set_xml_parser()`), a new ``etree.XMLParser`` is constructed with ``no_network=True`` and ``huge_tree=True``. """ global _XML_PARSER if _XML_PARSER is None: _XML_PARSER = etree.XMLParser(attribute_defaults=False, dtd_validation=False, load_dtd=False, no_network=True, ns_clean=True, recover=False, remove_blank_text=False, remove_comments=False, remove_pis=False, strip_cdata=True, compact=True, # collect_ids=True, resolve_entities=False, huge_tree=False) return _XML_PARSER
[docs]def set_xml_parser(xml_parser=None): """Set the libtaxii.messages XML parser. Args: xml_parser (etree.XMLParser): The parser to use to parse TAXII XML. """ global _XML_PARSER _XML_PARSER = xml_parser
def parse_datetime_string(datetime_string): """Parse a string into a :py:class:`datetime.datetime`. libtaxii users should not need to use this function directly. """ if not datetime_string: return None return dateutil.parser.parse(datetime_string)
[docs]def generate_message_id(maxlen=5, version=VID_TAXII_SERVICES_10): """Generate a TAXII Message ID. Args: maxlen (int): maximum length of the ID, in characters Example: .. code-block:: python msg_id = tm11.generate_message_id() message = tm11.DiscoveryRequest(msg_id) # Or... message = tm11.DiscoveryRequest(tm11.generate_message_id()) """ if version == VID_TAXII_SERVICES_10: message_id = str(uuid4().int % sys.maxsize) elif version == VID_TAXII_SERVICES_11: message_id = str(uuid4()) else: raise ValueError('Unknown TAXII Version: %s. Must be a TAXII Services Version ID!' % version) return message_id
def append_any_content_etree(etree_elt, content): """ General method for adding content to an etree element. This method can handle: * etree._ElementTree * etree._Element * any python type that can be cast to str * str :param etree_elt: The etree to append the content to :param content: The content to append :return: The etree_elt """ if isinstance(content, etree._ElementTree): # If content is an element tree, append the root element etree_elt.append(content.getroot()) return etree_elt if isinstance(content, etree._Element): # If content is an element, append it etree_elt.append(content) return etree_elt if not isinstance(content, six.string_types): # If content is a non-string, cast it to string and set etree_elt.text etree_elt.text = str(content) return etree_elt # If content is a string, need to check if it's XML or not try: etree_elt.append(etree.XML(content, get_xml_parser())) except etree.XMLSyntaxError: etree_elt.text = content return etree_elt def gen_filename(collection_name, format_part, date_string, extension): """ Creates a filename based on various properties of a Poll Request and Content Block :param collection_name: The collection name :param format_part: The format part (e.g., '_STIX_10_') :param date_string: A datestring :param extension: The file extension to use :return: A string containing the generated filename """ filename = (collection_name.lstrip(".") + format_part + resub(r"[^a-zA-Z0-9]", "_", date_string) + extension ).translate(None, '/\\:*?"<>|') return filename
[docs]class TAXIIBase(object): """ Base class for all TAXII Messages and Message component types. libtaxii users should not need to use this class directly. """ @property def sort_key(self): """ This property allows list of TAXII objects to be compared efficiently. The __eq__ method uses this property to sort the lists before comparisons are made. Subclasses must implement this property. """ raise NotImplementedError()
[docs] def to_etree(self): """Create an etree representation of this class. Subclasses must implement this method. """ raise NotImplementedError()
[docs] def to_dict(self): """Create a dictionary representation of this class. Subclasses must implement this method. """ raise NotImplementedError()
[docs] def to_json(self): """Create a JSON object of this class. Assumes any binary content will be UTF-8 encoded. """ content_dict = self.to_dict() _decode_binary_fields(content_dict) return json.dumps(content_dict)
[docs] def to_xml(self, pretty_print=False): """Create an XML representation of this class. Subclasses should not need to implement this method. """ return etree.tostring(self.to_etree(), pretty_print=pretty_print)
[docs] def to_text(self, line_prepend=''): """Create a nice looking (this is a subjective term!) textual representation of this class. Subclasses should implement this method. Note that this is just a convenience method for making TAXII Messages nice to read for humans and may change drastically in future versions of libtaxii. """ raise NotImplementedError()
[docs] def from_etree(cls, src_etree): """Create an instance of this class from an etree. Subclasses must implement this method. """ raise NotImplementedError()
[docs] def from_dict(cls, d): """Create an instance of this class from a dictionary. Subclasses must implement this method. """ raise NotImplementedError()
[docs] def from_xml(cls, xml): """Create an instance of this class from XML. Subclasses should not need to implement this method. """ etree_xml = parse_xml_string(xml) return cls.from_etree(etree_xml) # Just noting that there is not a from_text() method. I also # don't think there will ever be one.
def __str__(self): return self.to_xml(pretty_print=True)
[docs] def __eq__(self, other, debug=False): """ Generic method used to check equality of objects of any TAXII type. Also allows for ``print``-based debugging output showing differences. In order for subclasses to use this function, they must meet the following criteria: 1. All class properties start with one underscore. 2. The sort_key property is implemented. Args: self (object): this object other (object): the object to compare ``self`` against. debug (bool): Whether or not to print debug statements as the equality comparison is performed. """ if other is None: if debug: print('other was None!') return False if self.__class__.__name__ != other.__class__.__name__: if debug: print('class names not equal: %s != %s' % (self.__class__.__name__, other.__class__.__name__)) return False # Get all member properties that start with '_' members = [attr for attr in vars(self) if attr.startswith('_') and not attr.startswith('__')] for member in members: if debug: print('member name: %s' % member) self_value = getattr(self, member) other_value = getattr(other, member) if isinstance(self_value, TAXIIBase): # A debuggable equals comparison can be made eq = self_value.__eq__(other_value, debug) elif isinstance(self_value, list): # We have lists to compare if len(self_value) != len(other_value): # Lengths not equal member = member + ' lengths' self_value = len(self_value) other_value = len(other_value) eq = False elif len(self_value) == 0: # Both lists are of size 0, and therefore equal eq = True else: # Equal sized, non-0 length lists. The list might contain # TAXIIBase objects, or it might not. Peek at the first # item to see whether it is a TAXIIBase object or not. if isinstance(self_value[0], TAXIIBase): # All TAXIIBase objects have the 'sort_key' property implemented self_value = sorted(self_value, key=attrgetter('sort_key')) other_value = sorted(other_value, key=attrgetter('sort_key')) for self_item, other_item in, other_value): # Compare the ordered lists element by element eq = self_item.__eq__(other_item, debug) else: # Assume they don't... just do a set comparison eq = set(self_value) == set(other_value) elif isinstance(self_value, dict): # Dictionary to compare if len(set(self_value.keys()) - set(other_value.keys())) != 0: if debug: print('dict keys not equal: %s != %s' % (self_value, other_value)) eq = False for k, v in six.iteritems(self_value): if other_value[k] != v: if debug: print('dict values not equal: %s != %s' % (v, other_value[k])) eq = False eq = True elif isinstance(self_value, etree._Element): # Non-TAXII etree element (i.e. STIX) eq = (etree.tostring(self_value) == etree.tostring(other_value)) else: # Do a direct comparison eq = (self_value == other_value) # TODO: is this duplicate? if not eq: if debug: print('%s was not equal: %s != %s' % (member, self_value, other_value)) return False return True
def __ne__(self, other, debug=False): return not self.__eq__(other, debug)
def get_required(etree_xml, xpath, ns_map): elements = etree_xml.xpath(xpath, namespaces=ns_map) if len(elements) == 0: raise ValueError('Element "%s" is required' % xpath) return elements[0] def get_optional(etree_xml, xpath, ns_map): try: return get_required(etree_xml, xpath, ns_map) except ValueError: pass def get_optional_text(etree_xml, xpath, ns_map): try: return get_required(etree_xml, xpath, ns_map).text except ValueError: pass def _decode_binary_fields(dict_obj): """Given a dict, decode any binary values, assuming UTF-8 encoding. Will recurse into nested dicts. Modifies the values in-place. """ for key, value in dict_obj.items(): if isinstance(value, six.binary_type): dict_obj[key] = value.decode('utf-8') elif isinstance(value, dict): _decode_binary_fields(value)