# Copyright (C) 2013 - The MITRE Corporation
# For license information, see the LICENSE.txt file
# Contributors:
# * Mark Davidson - mdavidson@mitre.org
"""
Creating, handling, and parsing TAXII Default Queries.
"""
import numbers
import datetime
from operator import attrgetter
import os
import dateutil.parser
from lxml import etree
import libtaxii.messages_11 as tm11
from .common import TAXIIBase
from .validation import (do_check, uri_regex, targeting_expression_regex)
from .constants import *
import six
class CapabilityModule(object):
"""
A Capability Module has valid relationships
Each relationship has 0-n valid parameters
"""
def __init__(self, capability_module_id, relationships):
self.capability_module_id = capability_module_id
self.relationships = relationships
@property
def capability_module_id(self):
return self._capability_module_id
@capability_module_id.setter
def capability_module_id(self, value):
do_check(value, 'capability_module_id', type=six.string_types)
self._capability_module_id = value
@property
def relationships(self):
return self._relationships
@relationships.setter
def relationships(self, value):
do_check(value, 'relationships', type=Relationship)
self._relationships = {}
for item in value:
self._relationships[item.name] = item
class Relationship(object):
def __init__(self, name, parameters=None):
self.name = name
self.parameters = parameters or []
@property
def name(self):
return self._name
@name.setter
def name(self, value):
do_check(value, 'name', type=six.string_types)
self._name = value
@property
def parameters(self):
return self._parameters
@parameters.setter
def parameters(self, value):
do_check(value, 'parameters', type=Parameter)
self._parameters = {}
for item in value:
self._parameters[item.name] = item
class Parameter(object):
def __init__(self, name, type, value_tuple=None):
self.name = name
self.type = type
self.value_tuple = value_tuple
def verify(self, value):
do_check(value, 'value', type=self.type, value_tuple=self.value_tuple)
return True, 'OK'
# params - Define parameters for the Core/Regex/Timestamp capability modules
param_str_value = Parameter(P_VALUE, six.string_types)
param_float_value = Parameter(P_VALUE, float)
param_ts_value = Parameter(P_VALUE, datetime.datetime)
param_match_type = Parameter(P_MATCH_TYPE, six.string_types, ('case_sensitive_string', 'case_insensitive_string', 'number'))
param_case_sensitive = Parameter(P_CASE_SENSITIVE, bool, (True, False))
# CORE Relationships - Define relationships for the core capability module
rel_equals = Relationship(R_EQUALS, [param_str_value, param_match_type])
rel_not_equals = Relationship(R_NOT_EQUALS, [param_str_value, param_match_type])
rel_greater_than = Relationship(R_GREATER_THAN, [param_float_value])
rel_greater_than_or_equal = Relationship(R_GREATER_THAN_OR_EQUAL, [param_float_value])
rel_less_than = Relationship(R_LESS_THAN, [param_float_value])
rel_less_than_or_equal = Relationship(R_LESS_THAN_OR_EQUAL, [param_float_value])
rel_dne = Relationship(R_DOES_NOT_EXIST)
rel_ex = Relationship(R_EXISTS)
rel_begins_with = Relationship(R_BEGINS_WITH, [param_case_sensitive, param_str_value])
rel_ends_with = Relationship(R_ENDS_WITH, [param_case_sensitive, param_str_value])
rel_contains = Relationship(R_CONTAINS, [param_case_sensitive, param_str_value])
# REGEX relationships
rel_matches = Relationship(R_MATCHES, [param_case_sensitive, param_str_value])
# TIMESTAMP relationships
rel_ts_eq = Relationship(R_EQUALS, [param_ts_value])
rel_ts_gt = Relationship(R_GREATER_THAN, [param_ts_value])
rel_ts_gte = Relationship(R_GREATER_THAN_OR_EQUAL, [param_ts_value])
rel_ts_lt = Relationship(R_LESS_THAN, [param_ts_value])
rel_ts_lte = Relationship(R_LESS_THAN_OR_EQUAL, [param_ts_value])
# CORE - Define the Core Capability Module
cm_core = CapabilityModule(CM_CORE,
[rel_equals, rel_not_equals, rel_greater_than,
rel_greater_than_or_equal, rel_less_than,
rel_less_than_or_equal, rel_dne, rel_ex,
rel_begins_with, rel_contains, rel_ends_with]
)
# REGEX - Define the RegEx Capability Module
cm_regex = CapabilityModule(CM_REGEX, [rel_matches])
# TIMESTAMP - Define the timestamp Capability Module
cm_timestamp = CapabilityModule(CM_TIMESTAMP, [rel_ts_eq, rel_ts_gt, rel_ts_gte, rel_ts_lt, rel_ts_lte])
capability_modules = {CM_CORE: cm_core, CM_REGEX: cm_regex, CM_TIMESTAMP: cm_timestamp}
[docs]class DefaultQueryInfo(tm11.SupportedQuery):
""" Used to describe the TAXII Default Queries that are supported.
:param targeting_expression_infos: Describe the supported targeting expressions
:type targeting_expression_infos: :class:`list` of :class:`TargetingExpressionInfo` objects
:param capability_modules: Indicate the supported capability modules
:type capability_modules: :class:`list` of :class:`str`
"""
def __init__(self, targeting_expression_infos, capability_modules):
super(DefaultQueryInfo, self).__init__(FID_TAXII_DEFAULT_QUERY_10)
self.targeting_expression_infos = targeting_expression_infos
self.capability_modules = capability_modules
@property
def targeting_expression_infos(self):
return self._targeting_expression_infos
@targeting_expression_infos.setter
def targeting_expression_infos(self, value):
do_check(value, 'targeting_expression_infos', type=DefaultQueryInfo.TargetingExpressionInfo)
self._targeting_expression_infos = value
@property
def capability_modules(self):
return self._capability_modules
@capability_modules.setter
def capability_modules(self, value):
do_check(value, 'capability_modules', regex_tuple=uri_regex)
self._capability_modules = value
def to_etree(self):
q = super(DefaultQueryInfo, self).to_etree()
dqi = etree.SubElement(q, '{%s}Default_Query_Info' % ns_map['tdq'])
for expression_info in self.targeting_expression_infos:
dqi.append(expression_info.to_etree())
for cmod in self.capability_modules:
cm = etree.SubElement(dqi, '{%s}Capability_Module' % ns_map['tdq'], nsmap=ns_map)
cm.text = cmod
return q
def to_dict(self):
d = super(DefaultQueryInfo, self).to_dict()
d['targeting_expression_infos'] = []
for expression_info in self.targeting_expression_infos:
d['targeting_expression_infos'].append(expression_info.to_dict())
# TODO: This looks like a serialization bug
d['capability_modules'] = self.capability_modules
return d
def to_text(self, line_prepend=''):
s = super(DefaultQueryInfo, self).to_text(line_prepend)
for expression_info in self.targeting_expression_infos:
s += expression_info.to_text(line_prepend + STD_INDENT)
for capability_module in self.capability_modules:
s += line_prepend + " Capability Module: %s\n" % capability_module
return s
def __hash__(self):
return hash(str(self.to_dict()))
@staticmethod
def from_etree(etree_xml):
texpr_infos = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Targeting_Expression_Info', namespaces=ns_map)
texpr_info_list = []
for texpr_info in texpr_infos:
texpr_info_list.append(DefaultQueryInfo.TargetingExpressionInfo.from_etree(texpr_info))
cms = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Capability_Module', namespaces=ns_map)
cms_list = []
for cm in cms:
cms_list.append(cm.text)
return DefaultQueryInfo(texpr_info_list, cms_list)
@staticmethod
def from_dict(d):
kwargs = {}
kwargs['targeting_expression_infos'] = []
for expression_info in d['targeting_expression_infos']:
kwargs['targeting_expression_infos'].append(DefaultQueryInfo.TargetingExpressionInfo.from_dict(expression_info))
kwargs['capability_modules'] = d['capability_modules']
return DefaultQueryInfo(**kwargs)
class TargetingExpressionInfo(TAXIIBase):
"""This class describes supported Targeting Expressions
:param string targeting_expression_id: The supported targeting expression ID
:param preferred_scope: Indicates the preferred scope of queries
:type preferred_scope: :class:`list` of :class:`string`
:param allowed_scope: Indicates the allowed scope of queries
:type allowed_scope: :class:`list` of :class:`string`
"""
def __init__(self, targeting_expression_id, preferred_scope=None, allowed_scope=None):
self.targeting_expression_id = targeting_expression_id
self.preferred_scope = preferred_scope or []
self.allowed_scope = allowed_scope or []
@property
def sort_key(self):
return self.targeting_expression_id
@property
def targeting_expression_id(self):
return self._targeting_expression_id
@targeting_expression_id.setter
def targeting_expression_id(self, value):
do_check(value, 'targeting_expression_id', regex_tuple=uri_regex)
self._targeting_expression_id = value
@property
def preferred_scope(self):
return self._preferred_scope
@preferred_scope.setter
def preferred_scope(self, value):
do_check(value, 'preferred_scope', type=six.string_types, regex_tuple=targeting_expression_regex)
self._preferred_scope = value
@property
def allowed_scope(self):
return self._allowed_scope
@allowed_scope.setter
def allowed_scope(self, value):
do_check(value, 'allowed_scope', type=six.string_types, regex_tuple=targeting_expression_regex)
self._allowed_scope = value
def to_etree(self):
tei = etree.Element('{%s}Targeting_Expression_Info' % ns_map['tdq'])
tei.attrib['targeting_expression_id'] = self.targeting_expression_id
for scope in self.preferred_scope:
preferred = etree.SubElement(tei, '{%s}Preferred_Scope' % ns_map['tdq'])
preferred.text = scope
for scope in self.allowed_scope:
allowed = etree.SubElement(tei, '{%s}Allowed_Scope' % ns_map['tdq'])
allowed.text = scope
return tei
def to_dict(self):
d = {}
d['targeting_expression_id'] = self.targeting_expression_id
# TODO: Preferred / Allowed scope look like serialization bugs
d['preferred_scope'] = self.preferred_scope
d['allowed_scope'] = self.allowed_scope
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Targeting Expression Info ===\n"
s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id
for scope in self.preferred_scope:
s += line_prepend + " Preferred Scope: %s\n" % scope
for scope in self.allowed_scope:
s += line_prepend + " Allowed Scope: %s\n" % scope
return s
def __hash__(self):
return hash(str(self.to_dict()))
@staticmethod
def from_etree(etree_xml):
kwargs = {}
kwargs['targeting_expression_id'] = etree_xml.xpath('./@targeting_expression_id', namespaces=ns_map)[0]
kwargs['preferred_scope'] = []
preferred_scope_set = etree_xml.xpath('./tdq:Preferred_Scope', namespaces=ns_map)
for preferred in preferred_scope_set:
kwargs['preferred_scope'].append(preferred.text)
kwargs['allowed_scope'] = []
allowed_scope_set = etree_xml.xpath('./tdq:Allowed_Scope', namespaces=ns_map)
for allowed in allowed_scope_set:
kwargs['allowed_scope'].append(allowed.text)
return DefaultQueryInfo.TargetingExpressionInfo(**kwargs)
@staticmethod
def from_dict(d):
return DefaultQueryInfo.TargetingExpressionInfo(**d)
[docs]class DefaultQuery(tm11.Query):
"""Conveys a TAXII Default Query.
:param string targeting_expression_id: The targeting_expression used in the query
:param criteria: The criteria of the query
:type criteria: :class:`DefaultQuery.Criteria`
"""
def __init__(self, targeting_expression_id, criteria):
super(DefaultQuery, self).__init__(FID_TAXII_DEFAULT_QUERY_10)
self.targeting_expression_id = targeting_expression_id
self.criteria = criteria
@property
def targeting_expression_id(self):
return self._targeting_expression_id
@targeting_expression_id.setter
def targeting_expression_id(self, value):
do_check(value, 'targeting_expression_id', regex_tuple=uri_regex)
self._targeting_expression_id = value
@property
def criteria(self):
return self._criteria
@criteria.setter
def criteria(self, value):
do_check(value, 'criteria', type=DefaultQuery.Criteria)
self._criteria = value
def to_etree(self):
q = super(DefaultQuery, self).to_etree()
dq = etree.SubElement(q, '{%s}Default_Query' % ns_map['tdq'], nsmap=ns_map)
dq.attrib['targeting_expression_id'] = self.targeting_expression_id
dq.append(self.criteria.to_etree())
return q
def to_dict(self):
d = super(DefaultQuery, self).to_dict()
d['targeting_expression_id'] = self.targeting_expression_id
d['criteria'] = self.criteria.to_dict()
return d
def to_text(self, line_prepend=''):
s = super(DefaultQuery, self).to_text(line_prepend)
s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id
s += self.criteria.to_text(line_prepend)
return s
@staticmethod
def from_etree(etree_xml):
tei = etree_xml.xpath('./tdq:Default_Query/@targeting_expression_id', namespaces=ns_map)[0] # attrib['targeting_expression_id']
criteria = DefaultQuery.Criteria.from_etree(etree_xml.xpath('./tdq:Default_Query/tdq:Criteria', namespaces=ns_map)[0])
return DefaultQuery(tei, criteria)
@staticmethod
def from_dict(d):
tei = d['targeting_expression_id']
criteria = DefaultQuery.Criteria.from_dict(d['criteria'])
return DefaultQuery(tei, criteria)
class Criteria(TAXIIBase):
"""Represents criteria for a :class:`DefaultQuery`. **Note**: At least one criterion OR criteria MUST be present
:param str operator: The logical operator (should be one of `OP_AND` or `OP_OR`)
:param criteria: The criteria for the query
:type criteria: :class:`DefaultQuery.Criteria`
:param criterion: The criterion for the query
:type criterion: :class:`DefaultQuery.Criterion`
"""
def __init__(self, operator, criteria=None, criterion=None):
self.operator = operator
self.criteria = criteria or []
self.criterion = criterion or []
@property
def sort_key(self):
key_list = []
ia = sorted(self.criteria, key=attrgetter('sort_key'))
ion = sorted(self.criterion, key=attrgetter('sort_key'))
for i in ia:
key_list.append(i.sort_key)
for i in ion:
key_list.append(i.sort_key)
return ''.join(key_list)
@property
def operator(self):
return self._operator
@operator.setter
def operator(self, value):
do_check(value, 'operator', value_tuple=OP_TYPES)
self._operator = value
@property
def criteria(self):
return self._criteria
@criteria.setter
def criteria(self, value):
do_check(value, 'critiera', type=DefaultQuery.Criteria)
self._criteria = value
@property
def criterion(self):
return self._criterion
@criterion.setter
def criterion(self, value):
do_check(value, 'criterion', type=DefaultQuery.Criterion)
self._criterion = value
def to_etree(self):
cr = etree.Element('{%s}Criteria' % ns_map['tdq'], nsmap=ns_map)
cr.attrib['operator'] = self.operator
for criteria in self.criteria:
cr.append(criteria.to_etree())
for criterion in self.criterion:
cr.append(criterion.to_etree())
return cr
def to_dict(self):
d = {}
d['operator'] = self.operator
d['criteria'] = []
for criteria in self.criteria:
d['criteria'].append(criteria.to_dict())
d['criterion'] = []
for criterion in self.criterion:
d['criterion'].append(criterion.to_dict())
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Criteria ===\n"
s += line_prepend + " Operator: %s\n" % self.operator
for criteria in self.criteria:
s += criteria.to_text(line_prepend + STD_INDENT)
for criterion in self.criterion:
s += criterion.to_text(line_prepend + STD_INDENT)
return s
@staticmethod
def from_etree(etree_xml):
kwargs = {}
kwargs['operator'] = etree_xml.attrib['operator']
kwargs['criteria'] = []
criteria_set = etree_xml.xpath('./tdq:Criteria', namespaces=ns_map)
for criteria in criteria_set:
kwargs['criteria'].append(DefaultQuery.Criteria.from_etree(criteria))
kwargs['criterion'] = []
criterion_set = etree_xml.xpath('./tdq:Criterion', namespaces=ns_map)
for criterion in criterion_set:
kwargs['criterion'].append(DefaultQuery.Criterion.from_etree(criterion))
return DefaultQuery.Criteria(**kwargs)
@staticmethod
def from_dict(d):
kwargs = {}
kwargs['operator'] = d['operator']
kwargs['criteria'] = []
criteria_set = d.get('criteria', [])
for criteria in criteria_set:
kwargs['criteria'].append(DefaultQuery.Criteria.from_dict(criteria))
kwargs['criterion'] = []
criterion_set = d.get('criterion', [])
for criterion in criterion_set:
kwargs['criterion'].append(DefaultQuery.Criterion.from_dict(criterion))
return DefaultQuery.Criteria(**kwargs)
class Criterion(TAXIIBase):
"""Represents criterion for a :class:`DefaultQuery.Criteria`
:param string target: A targeting expression identifying the target
:param test: The test to be applied to the target
:type test: :class:`DefaultQuery.Criterion.Test`
:param bool negate: Whether the result of applying the test to the target should be negated
"""
def __init__(self, target, test, negate=False):
self.negate = negate
self.target = target
self.test = test
@property
def sort_key(self):
return self.target
@property
def negate(self):
return self._negate
@negate.setter
def negate(self, value):
do_check(value, 'negate', value_tuple=(True, False), can_be_none=True)
self._negate = value
@property
def target(self):
return self._target
@target.setter
def target(self, value):
do_check(value, 'target', type=six.string_types)
self._target = value
@property
def test(self):
return self._test
@test.setter
def test(self, value):
do_check(value, value, type=DefaultQuery.Criterion.Test)
self._test = value
def to_etree(self):
cr = etree.Element('{%s}Criterion' % ns_map['tdq'], nsmap=ns_map)
if self.negate is not None:
cr.attrib['negate'] = str(self.negate).lower()
target = etree.SubElement(cr, '{%s}Target' % ns_map['tdq'], nsmap=ns_map)
target.text = self.target
cr.append(self.test.to_etree())
return cr
def to_dict(self):
d = {}
d['negate'] = None
if self.negate is not None:
d['negate'] = self.negate
d['target'] = self.target
d['test'] = self.test.to_dict()
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Criterion ===\n"
s += line_prepend + " Negate: %s\n" % (self.negate if (None != self.negate) else False)
s += line_prepend + " Target: %s\n" % self.target
s += self.test.to_text(line_prepend + STD_INDENT)
return s
@staticmethod
def from_etree(etree_xml):
negate_set = etree_xml.xpath('./@negate')
negate = None
if len(negate_set) > 0:
negate = negate_set[0] == 'true'
target = etree_xml.xpath('./tdq:Target', namespaces=ns_map)[0].text
test = DefaultQuery.Criterion.Test.from_etree(etree_xml.xpath('./tdq:Test', namespaces=ns_map)[0])
return DefaultQuery.Criterion(target, test, negate)
@staticmethod
def from_dict(d):
negate = d.get('negate', None)
target = d['target']
test = DefaultQuery.Criterion.Test.from_dict(d['test'])
return DefaultQuery.Criterion(target, test, negate)
class Test(TAXIIBase):
"""
:param string capability_id: The ID of the capability module that defines the relationship & parameters
:param string relationship: The relationship (e.g., equals)
:param parameters: The parameters for the relationship.
:type parameters: :class:`dict` of key/value pairs
"""
def __init__(self, capability_id, relationship, parameters=None):
self.capability_id = capability_id
self.relationship = relationship
self.parameters = parameters or {}
self.validate()
@property
def capability_id(self):
return self._capability_id
@capability_id.setter
def capability_id(self, value):
do_check(value, 'capability_id', regex_tuple=uri_regex)
self._capability_id = value
@property
def relationship(self):
return self._relationship
@relationship.setter
def relationship(self, value):
# TODO: For known capability IDs, check that the relationship is valid
# TODO: provide a way to register other capability IDs
do_check(value, 'relationship', type=six.string_types)
self._relationship = value
@property
def parameters(self):
return self._parameters
@parameters.setter
def parameters(self, value):
do_check(list(value.keys()), 'parameters.keys()', regex_tuple=uri_regex)
self._parameters = value
# TODO: Can this be done better?
def validate(self):
capability_module = capability_modules.get(self.capability_id)
if capability_module is None: # Nothing is defined for this, validation not possible
return True
relationship = capability_module.relationships.get(self.relationship)
if relationship is None:
raise Exception('relationship not in defined relationships. %s not in %s' % (self.relationship, list(capability_module.relationships.keys())))
for name, value in list(self.parameters.items()):
param = relationship.parameters.get(name)
if param is None:
raise Exception('name not valid. %s not in %s' % (name, list(relationship.parameters.keys())))
param.verify(value)
def to_etree(self):
t = etree.Element('{%s}Test' % ns_map['tdq'], nsmap=ns_map)
t.attrib['capability_id'] = self.capability_id
t.attrib['relationship'] = self.relationship
for k, v in list(self.parameters.items()):
p = etree.SubElement(t, '{%s}Parameter' % ns_map['tdq'])
p.attrib['name'] = k
if isinstance(v, bool):
p.text = str(v).lower()
elif isinstance(v, datetime.datetime):
p.text = v.isoformat()
elif isinstance(v, numbers.Number):
p.text = str(v)
else:
p.text = v
return t
def to_dict(self):
d = {}
d['capability_id'] = self.capability_id
d['relationship'] = self.relationship
d['parameters'] = self.parameters
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Test ==\n"
s += line_prepend + " Capability ID: %s\n" % self.capability_id
s += line_prepend + " Relationship: %s\n" % self.relationship
for k, v in six.iteritems(self.parameters):
s += line_prepend + " Parameter: %s = %s\n" % (k, v)
return s
@staticmethod
def from_etree(etree_xml):
capability_id = etree_xml.attrib['capability_id']
relationship = etree_xml.attrib['relationship']
parameters = {}
cm = capability_modules.get(capability_id, None)
if cm is not None:
r = cm.relationships.get(relationship, None)
if r is None:
raise ValueError('Relationship (%s) not in CM (%s).' % (r, capability_id))
else:
r = None
for parameter in etree_xml.xpath('./tdq:Parameter', namespaces=ns_map):
k = parameter.attrib['name']
v = parameter.text
if v in ('true', 'false'): # bool is a special case
parameters[k] = v == 'true'
elif r is not None:
type_ = r.parameters[k].type
if type_ == six.string_types: # basestring can't be instantiated, but str can be
type_ = str
elif type_ == datetime.datetime:
# We can use this function to parse datetime strings.
type_ = dateutil.parser.parse
parameters[k] = type_(v)
else:
parameters[k] = v
return DefaultQuery.Criterion.Test(capability_id, relationship, parameters)
@staticmethod
def from_dict(d):
return DefaultQuery.Criterion.Test(**d)
DefaultQueryInfo.TargetingExpressionInfo = TargetingExpressionInfo
DefaultQuery.Criterion = Criterion
DefaultQuery.Criteria = Criteria
DefaultQuery.Criterion.Test = Test
package_dir, package_filename = os.path.split(__file__)
schema_file = os.path.join(package_dir, "xsd", "TAXII_DefaultQuery_Schema.xsd")
tm11.register_query_format(
format_id=FID_TAXII_DEFAULT_QUERY_10,
query=DefaultQuery,
query_info=DefaultQueryInfo,
schema=schema_file)