Source code for libtaxii.common

"""
Common utility classes and functions used throughout libtaxii.
"""

from operator import attrgetter
import re
import sys
from uuid import uuid4

import dateutil.parser
from lxml import etree
import six
from six.moves.urllib.parse import urlparse

try:
    import simplejson as json
except ImportError:
    import json

from libtaxii.constants import *

_XML_PARSER = None


def parse(s, allow_file=True, allow_url=False):
    """
    Uses the default parser to parse a string or file-like object

    :param s: The XML String or File-like object to parse.
    :param allow_file: Allow `s` to be a file path.
    :param allow_url: Allow `s` to be a URL.
    :return: an etree._Element
    """
    # Do a simple validation that the given string (or URL)
    # has no protocol specified. Anything without parseable protocol
    # will be interpreted by lxml as string instead or path of external URL.
    if not allow_url and isinstance(s, six.string_types):
        parsed = urlparse(s)
        if parsed.scheme:
            raise ValueError('external URLs are not allowed')

    parser = get_xml_parser()

    # parse from string if no external paths allowed
    if not allow_file and not allow_url:
        return etree.fromstring(s, parser)

    # try to parse from file or string if files are allowed
    try:
        return etree.parse(s, parser).getroot()
    except IOError:
        return etree.XML(s, parser)


def parse_xml_string(xmlstr):
    """Parse an XML string (binary or unicode) with the default parser.

    :param xmlstr: An XML String to parse
    :return: an etree._Element
    """
    if isinstance(xmlstr, six.binary_type):
        xmlstr = six.BytesIO(xmlstr)
    elif isinstance(xmlstr, six.text_type):
        # LXML doesn't accept Unicode strings with an explicit encoding, so
        # try to detect and encode to bytes before passing to LXML.
        encoding = re.findall(r'encoding="([0-9A-Za-z_\-]+)"', xmlstr[:50], re.I)
        # re.findall returns a list of matching strings. We only care about the
        # first one.
        if encoding:
            xmlstr = six.BytesIO(xmlstr.encode(encoding[0]))
        else:
            xmlstr = six.StringIO(xmlstr)

    return parse(xmlstr, allow_file=True)


[docs]def get_xml_parser(): """Return the XML parser currently in use. If one has not already been set (via :py:func:`set_xml_parser()`), a new ``etree.XMLParser`` is constructed with ``no_network=True`` and ``huge_tree=False``. """ global _XML_PARSER if _XML_PARSER is None: _XML_PARSER = etree.XMLParser( attribute_defaults=False, dtd_validation=False, load_dtd=False, no_network=True, ns_clean=True, recover=False, remove_blank_text=False, remove_comments=False, remove_pis=False, strip_cdata=True, compact=True, # collect_ids=True, resolve_entities=False, huge_tree=False ) return _XML_PARSER.copy()
[docs]def set_xml_parser(xml_parser=None): """Set the libtaxii.messages XML parser. Args: xml_parser (etree.XMLParser): The parser to use to parse TAXII XML. """ global _XML_PARSER _XML_PARSER = xml_parser
def parse_datetime_string(datetime_string): """Parse a string into a :py:class:`datetime.datetime`. libtaxii users should not need to use this function directly. """ if not datetime_string: return None return dateutil.parser.parse(datetime_string)
[docs]def generate_message_id(maxlen=5, version=VID_TAXII_SERVICES_10): """Generate a TAXII Message ID. Args: maxlen (int): maximum length of the ID, in characters Example: .. code-block:: python msg_id = tm11.generate_message_id() message = tm11.DiscoveryRequest(msg_id) # Or... message = tm11.DiscoveryRequest(tm11.generate_message_id()) """ if version == VID_TAXII_SERVICES_10: message_id = str(uuid4().int % sys.maxsize) elif version == VID_TAXII_SERVICES_11: message_id = str(uuid4()) else: raise ValueError('Unknown TAXII Version: %s. Must be a TAXII Services Version ID!' % version) return message_id
def append_any_content_etree(etree_elt, content): """ General method for adding content to an etree element. This method can handle: * etree._ElementTree * etree._Element * any python type that can be cast to str * str :param etree_elt: The etree to append the content to :param content: The content to append :return: The etree_elt """ if isinstance(content, etree._ElementTree): # If content is an element tree, append the root element etree_elt.append(content.getroot()) return etree_elt if isinstance(content, etree._Element): # If content is an element, append it etree_elt.append(content) return etree_elt if not isinstance(content, six.string_types): # If content is a non-string, cast it to string and set etree_elt.text etree_elt.text = str(content) return etree_elt # If content is a string, need to check if it's XML or not try: etree_elt.append(etree.XML(content, get_xml_parser())) except etree.XMLSyntaxError: etree_elt.text = content return etree_elt def gen_filename(collection_name, format_part, date_string, extension): """ Creates a filename based on various properties of a Poll Request and Content Block :param collection_name: The collection name :param format_part: The format part (e.g., '_STIX_10_') :param date_string: A datestring :param extension: The file extension to use :return: A string containing the generated filename """ if six.PY3: return (collection_name.lstrip(".") + format_part + re.sub(r"[^a-zA-Z0-9]", "_", date_string) + extension ).translate('/\\:*?"<>|') else: return (collection_name.lstrip(".") + format_part + re.sub(r"[^a-zA-Z0-9]", "_", date_string) + extension ).translate(None, '/\\:*?"<>|')
[docs]class TAXIIBase(object): """ Base class for all TAXII Messages and Message component types. libtaxii users should not need to use this class directly. """ @property def sort_key(self): """ This property allows list of TAXII objects to be compared efficiently. The __eq__ method uses this property to sort the lists before comparisons are made. Subclasses must implement this property. """ raise NotImplementedError()
[docs] def to_etree(self): """Create an etree representation of this class. Subclasses must implement this method. """ raise NotImplementedError()
[docs] def to_dict(self): """Create a dictionary representation of this class. Subclasses must implement this method. """ raise NotImplementedError()
[docs] def to_json(self): """Create a JSON object of this class. Assumes any binary content will be UTF-8 encoded. """ content_dict = self.to_dict() _decode_binary_fields(content_dict) return json.dumps(content_dict)
[docs] def to_xml(self, pretty_print=False): """Create an XML representation of this class. Subclasses should not need to implement this method. """ return etree.tostring(self.to_etree(), pretty_print=pretty_print, encoding='utf-8')
[docs] def to_text(self, line_prepend=''): """Create a nice looking (this is a subjective term!) textual representation of this class. Subclasses should implement this method. Note that this is just a convenience method for making TAXII Messages nice to read for humans and may change drastically in future versions of libtaxii. """ raise NotImplementedError()
[docs] @classmethod def from_etree(cls, src_etree): """Create an instance of this class from an etree. Subclasses must implement this method. """ raise NotImplementedError()
[docs] @classmethod def from_dict(cls, d): """Create an instance of this class from a dictionary. Subclasses must implement this method. """ raise NotImplementedError()
[docs] @classmethod def from_xml(cls, xml): """Create an instance of this class from XML. Subclasses should not need to implement this method. """ etree_xml = parse_xml_string(xml) return cls.from_etree(etree_xml)
# Just noting that there is not a from_text() method. I also # don't think there will ever be one. def __str__(self): return self.to_xml(pretty_print=True) def __eq__(self, other, debug=False): """ Generic method used to check equality of objects of any TAXII type. Also allows for ``print``-based debugging output showing differences. In order for subclasses to use this function, they must meet the following criteria: 1. All class properties start with one underscore. 2. The sort_key property is implemented. Args: self (object): this object other (object): the object to compare ``self`` against. debug (bool): Whether or not to print debug statements as the equality comparison is performed. """ if other is None: if debug: print('other was None!') return False if self.__class__.__name__ != other.__class__.__name__: if debug: print('class names not equal: %s != %s' % (self.__class__.__name__, other.__class__.__name__)) return False # Get all member properties that start with '_' members = [attr for attr in vars(self) if attr.startswith('_') and not attr.startswith('__')] for member in members: if debug: print('member name: %s' % member) self_value = getattr(self, member) other_value = getattr(other, member) if isinstance(self_value, TAXIIBase): # A debuggable equals comparison can be made eq = self_value.__eq__(other_value, debug) elif isinstance(self_value, list): # We have lists to compare if len(self_value) != len(other_value): # Lengths not equal member = member + ' lengths' self_value = len(self_value) other_value = len(other_value) eq = False elif len(self_value) == 0: # Both lists are of size 0, and therefore equal eq = True else: # Equal sized, non-0 length lists. The list might contain # TAXIIBase objects, or it might not. Peek at the first # item to see whether it is a TAXIIBase object or not. if isinstance(self_value[0], TAXIIBase): # All TAXIIBase objects have the 'sort_key' property implemented self_value = sorted(self_value, key=attrgetter('sort_key')) other_value = sorted(other_value, key=attrgetter('sort_key')) for self_item, other_item in six.moves.zip(self_value, other_value): # Compare the ordered lists element by element eq = self_item.__eq__(other_item, debug) else: # Assume they don't... just do a set comparison eq = set(self_value) == set(other_value) elif isinstance(self_value, dict): # Dictionary to compare if len(set(self_value.keys()) - set(other_value.keys())) != 0: if debug: print('dict keys not equal: %s != %s' % (self_value, other_value)) eq = False for k, v in six.iteritems(self_value): if other_value[k] != v: if debug: print('dict values not equal: %s != %s' % (v, other_value[k])) eq = False eq = True elif isinstance(self_value, etree._Element): # Non-TAXII etree element (i.e. STIX) eq = (etree.tostring(self_value, encoding='utf-8') == etree.tostring(other_value, encoding='utf-8')) else: # Do a direct comparison eq = (self_value == other_value) # TODO: is this duplicate? if not eq: if debug: print('%s was not equal: %s != %s' % (member, self_value, other_value)) return False return True def __ne__(self, other, debug=False): return not self.__eq__(other, debug)
def get_required(etree_xml, xpath, ns_map): elements = etree_xml.xpath(xpath, namespaces=ns_map) if len(elements) == 0: raise ValueError('Element "%s" is required' % xpath) return elements[0] def get_optional(etree_xml, xpath, ns_map): try: return get_required(etree_xml, xpath, ns_map) except ValueError: pass def get_optional_text(etree_xml, xpath, ns_map): try: return get_required(etree_xml, xpath, ns_map).text except ValueError: pass def _decode_binary_fields(dict_obj): """Given a dict, decode any binary values, assuming UTF-8 encoding. Will recurse into nested dicts. Modifies the values in-place. """ for key, value in dict_obj.items(): if isinstance(value, six.binary_type): dict_obj[key] = value.decode('utf-8') elif isinstance(value, dict): _decode_binary_fields(value) def stringify_content(content): """Always a string or raises an error. Returns the string representation and whether the data is XML. """ # If it's an etree, it's definitely XML if isinstance(content, etree._ElementTree): return content.getroot(), True if isinstance(content, etree._Element): return content, True if hasattr(content, 'read'): # The content is file-like try: # Try to parse as XML xml = parse(content, allow_file=True) return xml, True except etree.XMLSyntaxError: # Content is not well-formed XML; just treat as a string return content.read(), False else: # The Content is not file-like try: # Attempt to parse string as XML xml = parse_xml_string(content) return xml, True except etree.XMLSyntaxError: # Content is not well-formed XML; just treat as a string if isinstance(content, six.string_types): # It's a string of some kind, unicode or otherwise return content, False else: # It's some other datatype that needs casting to string return str(content), False