"""
Common utility classes and functions used throughout libtaxii.
"""
from operator import attrgetter
import re
import sys
from uuid import uuid4
import dateutil.parser
from lxml import etree
import six
try:
import simplejson as json
except ImportError:
import json
from libtaxii.constants import *
_XML_PARSER = None
def parse(s):
"""
Uses the default parser to parse a string or file-like object
:param s: The XML String or File-like object to parse
:return: an etree._Element
"""
try:
e = etree.parse(s, get_xml_parser()).getroot()
except IOError:
e = etree.XML(s, get_xml_parser())
return e
def parse_xml_string(xmlstr):
"""Parse an XML string (binary or unicode) with the default parser.
:param xmlstr: An XML String to parse
:return: an etree._Element
"""
if isinstance(xmlstr, six.binary_type):
xmlstr = six.BytesIO(xmlstr)
elif isinstance(xmlstr, six.text_type):
# LXML doesn't accept Unicode strings with an explicit encoding, so
# try to detect and encode to bytes before passing to LXML.
encoding = re.findall(r'encoding="([0-9A-Za-z_\-]+)"', xmlstr[:50], re.I)
# re.findall returns a list of matching strings. We only care about the
# first one.
if encoding:
xmlstr = six.BytesIO(xmlstr.encode(encoding[0]))
else:
xmlstr = six.StringIO(xmlstr)
return parse(xmlstr)
[docs]def get_xml_parser():
"""Return the XML parser currently in use.
If one has not already been set (via :py:func:`set_xml_parser()`), a new
``etree.XMLParser`` is constructed with ``no_network=True`` and
``huge_tree=False``.
"""
global _XML_PARSER
if _XML_PARSER is None:
_XML_PARSER = etree.XMLParser(
attribute_defaults=False,
dtd_validation=False,
load_dtd=False,
no_network=True,
ns_clean=True,
recover=False,
remove_blank_text=False,
remove_comments=False,
remove_pis=False,
strip_cdata=True,
compact=True,
# collect_ids=True,
resolve_entities=False,
huge_tree=False
)
return _XML_PARSER.copy()
[docs]def set_xml_parser(xml_parser=None):
"""Set the libtaxii.messages XML parser.
Args:
xml_parser (etree.XMLParser): The parser to use to parse TAXII XML.
"""
global _XML_PARSER
_XML_PARSER = xml_parser
def parse_datetime_string(datetime_string):
"""Parse a string into a :py:class:`datetime.datetime`.
libtaxii users should not need to use this function directly.
"""
if not datetime_string:
return None
return dateutil.parser.parse(datetime_string)
[docs]def generate_message_id(maxlen=5, version=VID_TAXII_SERVICES_10):
"""Generate a TAXII Message ID.
Args:
maxlen (int): maximum length of the ID, in characters
Example:
.. code-block:: python
msg_id = tm11.generate_message_id()
message = tm11.DiscoveryRequest(msg_id)
# Or...
message = tm11.DiscoveryRequest(tm11.generate_message_id())
"""
if version == VID_TAXII_SERVICES_10:
message_id = str(uuid4().int % sys.maxsize)
elif version == VID_TAXII_SERVICES_11:
message_id = str(uuid4())
else:
raise ValueError('Unknown TAXII Version: %s. Must be a TAXII Services Version ID!' % version)
return message_id
def append_any_content_etree(etree_elt, content):
"""
General method for adding content to an etree element. This method can handle:
* etree._ElementTree
* etree._Element
* any python type that can be cast to str
* str
:param etree_elt: The etree to append the content to
:param content: The content to append
:return: The etree_elt
"""
if isinstance(content, etree._ElementTree): # If content is an element tree, append the root element
etree_elt.append(content.getroot())
return etree_elt
if isinstance(content, etree._Element): # If content is an element, append it
etree_elt.append(content)
return etree_elt
if not isinstance(content, six.string_types): # If content is a non-string, cast it to string and set etree_elt.text
etree_elt.text = str(content)
return etree_elt
# If content is a string, need to check if it's XML or not
try:
etree_elt.append(etree.XML(content, get_xml_parser()))
except etree.XMLSyntaxError:
etree_elt.text = content
return etree_elt
def gen_filename(collection_name, format_part, date_string, extension):
"""
Creates a filename based on various properties of a Poll Request and Content Block
:param collection_name: The collection name
:param format_part: The format part (e.g., '_STIX_10_')
:param date_string: A datestring
:param extension: The file extension to use
:return: A string containing the generated filename
"""
if six.PY3:
return (collection_name.lstrip(".") +
format_part +
re.sub(r"[^a-zA-Z0-9]", "_", date_string) + extension
).translate('/\\:*?"<>|')
else:
return (collection_name.lstrip(".") +
format_part +
re.sub(r"[^a-zA-Z0-9]", "_", date_string) + extension
).translate(None, '/\\:*?"<>|')
[docs]class TAXIIBase(object):
"""
Base class for all TAXII Messages and Message component types.
libtaxii users should not need to use this class directly.
"""
@property
def sort_key(self):
"""
This property allows list of TAXII objects to be compared efficiently.
The __eq__ method uses this property to sort the lists before
comparisons are made.
Subclasses must implement this property.
"""
raise NotImplementedError()
[docs] def to_etree(self):
"""Create an etree representation of this class.
Subclasses must implement this method.
"""
raise NotImplementedError()
[docs] def to_dict(self):
"""Create a dictionary representation of this class.
Subclasses must implement this method.
"""
raise NotImplementedError()
[docs] def to_json(self):
"""Create a JSON object of this class.
Assumes any binary content will be UTF-8 encoded.
"""
content_dict = self.to_dict()
_decode_binary_fields(content_dict)
return json.dumps(content_dict)
[docs] def to_xml(self, pretty_print=False):
"""Create an XML representation of this class.
Subclasses should not need to implement this method.
"""
return etree.tostring(self.to_etree(), pretty_print=pretty_print)
[docs] def to_text(self, line_prepend=''):
"""Create a nice looking (this is a subjective term!)
textual representation of this class. Subclasses should
implement this method.
Note that this is just a convenience method for making
TAXII Messages nice to read for humans and may change
drastically in future versions of libtaxii.
"""
raise NotImplementedError()
[docs] @classmethod
def from_etree(cls, src_etree):
"""Create an instance of this class from an etree.
Subclasses must implement this method.
"""
raise NotImplementedError()
[docs] @classmethod
def from_dict(cls, d):
"""Create an instance of this class from a dictionary.
Subclasses must implement this method.
"""
raise NotImplementedError()
[docs] @classmethod
def from_xml(cls, xml):
"""Create an instance of this class from XML.
Subclasses should not need to implement this method.
"""
etree_xml = parse_xml_string(xml)
return cls.from_etree(etree_xml)
# Just noting that there is not a from_text() method. I also
# don't think there will ever be one.
def __str__(self):
return self.to_xml(pretty_print=True)
def __eq__(self, other, debug=False):
"""
Generic method used to check equality of objects of any TAXII type.
Also allows for ``print``-based debugging output showing differences.
In order for subclasses to use this function, they must meet the
following criteria:
1. All class properties start with one underscore.
2. The sort_key property is implemented.
Args:
self (object): this object
other (object): the object to compare ``self`` against.
debug (bool): Whether or not to print debug statements as the
equality comparison is performed.
"""
if other is None:
if debug:
print('other was None!')
return False
if self.__class__.__name__ != other.__class__.__name__:
if debug:
print('class names not equal: %s != %s' % (self.__class__.__name__, other.__class__.__name__))
return False
# Get all member properties that start with '_'
members = [attr for attr in vars(self) if attr.startswith('_') and not attr.startswith('__')]
for member in members:
if debug:
print('member name: %s' % member)
self_value = getattr(self, member)
other_value = getattr(other, member)
if isinstance(self_value, TAXIIBase):
# A debuggable equals comparison can be made
eq = self_value.__eq__(other_value, debug)
elif isinstance(self_value, list):
# We have lists to compare
if len(self_value) != len(other_value):
# Lengths not equal
member = member + ' lengths'
self_value = len(self_value)
other_value = len(other_value)
eq = False
elif len(self_value) == 0:
# Both lists are of size 0, and therefore equal
eq = True
else:
# Equal sized, non-0 length lists. The list might contain
# TAXIIBase objects, or it might not. Peek at the first
# item to see whether it is a TAXIIBase object or not.
if isinstance(self_value[0], TAXIIBase):
# All TAXIIBase objects have the 'sort_key' property implemented
self_value = sorted(self_value, key=attrgetter('sort_key'))
other_value = sorted(other_value, key=attrgetter('sort_key'))
for self_item, other_item in six.moves.zip(self_value, other_value):
# Compare the ordered lists element by element
eq = self_item.__eq__(other_item, debug)
else:
# Assume they don't... just do a set comparison
eq = set(self_value) == set(other_value)
elif isinstance(self_value, dict):
# Dictionary to compare
if len(set(self_value.keys()) - set(other_value.keys())) != 0:
if debug:
print('dict keys not equal: %s != %s' % (self_value, other_value))
eq = False
for k, v in six.iteritems(self_value):
if other_value[k] != v:
if debug:
print('dict values not equal: %s != %s' % (v, other_value[k]))
eq = False
eq = True
elif isinstance(self_value, etree._Element):
# Non-TAXII etree element (i.e. STIX)
eq = (etree.tostring(self_value) == etree.tostring(other_value))
else:
# Do a direct comparison
eq = (self_value == other_value)
# TODO: is this duplicate?
if not eq:
if debug:
print('%s was not equal: %s != %s' % (member, self_value, other_value))
return False
return True
def __ne__(self, other, debug=False):
return not self.__eq__(other, debug)
def get_required(etree_xml, xpath, ns_map):
elements = etree_xml.xpath(xpath, namespaces=ns_map)
if len(elements) == 0:
raise ValueError('Element "%s" is required' % xpath)
return elements[0]
def get_optional(etree_xml, xpath, ns_map):
try:
return get_required(etree_xml, xpath, ns_map)
except ValueError:
pass
def get_optional_text(etree_xml, xpath, ns_map):
try:
return get_required(etree_xml, xpath, ns_map).text
except ValueError:
pass
def _decode_binary_fields(dict_obj):
"""Given a dict, decode any binary values, assuming UTF-8 encoding.
Will recurse into nested dicts.
Modifies the values in-place.
"""
for key, value in dict_obj.items():
if isinstance(value, six.binary_type):
dict_obj[key] = value.decode('utf-8')
elif isinstance(value, dict):
_decode_binary_fields(value)
def stringify_content(content):
"""Always a string or raises an error.
Returns the string representation and whether the data is XML.
"""
# If it's an etree, it's definitely XML
if isinstance(content, etree._ElementTree):
return content.getroot(), True
if isinstance(content, etree._Element):
return content, True
if hasattr(content, 'read'): # The content is file-like
try: # Try to parse as XML
xml = parse(content)
return xml, True
except etree.XMLSyntaxError: # Content is not well-formed XML; just treat as a string
return content.read(), False
else: # The Content is not file-like
try: # Attempt to parse string as XML
xml = parse_xml_string(content)
return xml, True
except etree.XMLSyntaxError: # Content is not well-formed XML; just treat as a string
if isinstance(content, six.string_types): # It's a string of some kind, unicode or otherwise
return content, False
else: # It's some other datatype that needs casting to string
return str(content), False