# Copyright (c) 2017, The MITRE Corporation
# For license information, see the LICENSE.txt file
"""
Creating, handling, and parsing TAXII Default Queries.
"""
import numbers
import datetime
from operator import attrgetter
import os
import dateutil.parser
from lxml import etree
import libtaxii.messages_11 as tm11
from .common import TAXIIBase
from .validation import (do_check, uri_regex, targeting_expression_regex)
from .constants import *
import six
class CapabilityModule(object):
"""
A Capability Module has valid relationships
Each relationship has 0-n valid parameters
"""
def __init__(self, capability_module_id, relationships):
self.capability_module_id = capability_module_id
self.relationships = relationships
@property
def capability_module_id(self):
return self._capability_module_id
@capability_module_id.setter
def capability_module_id(self, value):
do_check(value, 'capability_module_id', type=six.string_types)
self._capability_module_id = value
@property
def relationships(self):
return self._relationships
@relationships.setter
def relationships(self, value):
do_check(value, 'relationships', type=Relationship)
self._relationships = {}
for item in value:
self._relationships[item.name] = item
class Relationship(object):
def __init__(self, name, parameters=None):
self.name = name
self.parameters = parameters or []
@property
def name(self):
return self._name
@name.setter
def name(self, value):
do_check(value, 'name', type=six.string_types)
self._name = value
@property
def parameters(self):
return self._parameters
@parameters.setter
def parameters(self, value):
do_check(value, 'parameters', type=Parameter)
self._parameters = {}
for item in value:
self._parameters[item.name] = item
class Parameter(object):
def __init__(self, name, type, value_tuple=None):
self.name = name
self.type = type
self.value_tuple = value_tuple
def verify(self, value):
do_check(value, 'value', type=self.type, value_tuple=self.value_tuple)
return True, 'OK'
# params - Define parameters for the Core/Regex/Timestamp capability modules
param_str_value = Parameter(P_VALUE, six.string_types)
param_float_value = Parameter(P_VALUE, float)
param_ts_value = Parameter(P_VALUE, datetime.datetime)
param_match_type = Parameter(P_MATCH_TYPE, six.string_types, ('case_sensitive_string', 'case_insensitive_string', 'number'))
param_case_sensitive = Parameter(P_CASE_SENSITIVE, bool, (True, False))
# CORE Relationships - Define relationships for the core capability module
rel_equals = Relationship(R_EQUALS, [param_str_value, param_match_type])
rel_not_equals = Relationship(R_NOT_EQUALS, [param_str_value, param_match_type])
rel_greater_than = Relationship(R_GREATER_THAN, [param_float_value])
rel_greater_than_or_equal = Relationship(R_GREATER_THAN_OR_EQUAL, [param_float_value])
rel_less_than = Relationship(R_LESS_THAN, [param_float_value])
rel_less_than_or_equal = Relationship(R_LESS_THAN_OR_EQUAL, [param_float_value])
rel_dne = Relationship(R_DOES_NOT_EXIST)
rel_ex = Relationship(R_EXISTS)
rel_begins_with = Relationship(R_BEGINS_WITH, [param_case_sensitive, param_str_value])
rel_ends_with = Relationship(R_ENDS_WITH, [param_case_sensitive, param_str_value])
rel_contains = Relationship(R_CONTAINS, [param_case_sensitive, param_str_value])
# REGEX relationships
rel_matches = Relationship(R_MATCHES, [param_case_sensitive, param_str_value])
# TIMESTAMP relationships
rel_ts_eq = Relationship(R_EQUALS, [param_ts_value])
rel_ts_gt = Relationship(R_GREATER_THAN, [param_ts_value])
rel_ts_gte = Relationship(R_GREATER_THAN_OR_EQUAL, [param_ts_value])
rel_ts_lt = Relationship(R_LESS_THAN, [param_ts_value])
rel_ts_lte = Relationship(R_LESS_THAN_OR_EQUAL, [param_ts_value])
# CORE - Define the Core Capability Module
cm_core = CapabilityModule(CM_CORE,
[rel_equals, rel_not_equals, rel_greater_than,
rel_greater_than_or_equal, rel_less_than,
rel_less_than_or_equal, rel_dne, rel_ex,
rel_begins_with, rel_contains, rel_ends_with]
)
# REGEX - Define the RegEx Capability Module
cm_regex = CapabilityModule(CM_REGEX, [rel_matches])
# TIMESTAMP - Define the timestamp Capability Module
cm_timestamp = CapabilityModule(CM_TIMESTAMP, [rel_ts_eq, rel_ts_gt, rel_ts_gte, rel_ts_lt, rel_ts_lte])
capability_modules = {CM_CORE: cm_core, CM_REGEX: cm_regex, CM_TIMESTAMP: cm_timestamp}
[docs]class DefaultQueryInfo(tm11.SupportedQuery):
""" Used to describe the TAXII Default Queries that are supported.
:param targeting_expression_infos: Describe the supported targeting expressions
:type targeting_expression_infos: :class:`list` of :class:`TargetingExpressionInfo` objects
:param capability_modules: Indicate the supported capability modules
:type capability_modules: :class:`list` of :class:`str`
"""
def __init__(self, targeting_expression_infos, capability_modules):
super(DefaultQueryInfo, self).__init__(FID_TAXII_DEFAULT_QUERY_10)
self.targeting_expression_infos = targeting_expression_infos
self.capability_modules = capability_modules
@property
def targeting_expression_infos(self):
return self._targeting_expression_infos
@targeting_expression_infos.setter
def targeting_expression_infos(self, value):
do_check(value, 'targeting_expression_infos', type=DefaultQueryInfo.TargetingExpressionInfo)
self._targeting_expression_infos = value
@property
def capability_modules(self):
return self._capability_modules
@capability_modules.setter
def capability_modules(self, value):
do_check(value, 'capability_modules', regex_tuple=uri_regex)
self._capability_modules = value
def to_etree(self):
q = super(DefaultQueryInfo, self).to_etree()
dqi = etree.SubElement(q, '{%s}Default_Query_Info' % ns_map['tdq'])
for expression_info in self.targeting_expression_infos:
dqi.append(expression_info.to_etree())
for cmod in self.capability_modules:
cm = etree.SubElement(dqi, '{%s}Capability_Module' % ns_map['tdq'], nsmap=ns_map)
cm.text = cmod
return q
def to_dict(self):
d = super(DefaultQueryInfo, self).to_dict()
d['targeting_expression_infos'] = []
for expression_info in self.targeting_expression_infos:
d['targeting_expression_infos'].append(expression_info.to_dict())
# TODO: This looks like a serialization bug
d['capability_modules'] = self.capability_modules
return d
def to_text(self, line_prepend=''):
s = super(DefaultQueryInfo, self).to_text(line_prepend)
for expression_info in self.targeting_expression_infos:
s += expression_info.to_text(line_prepend + STD_INDENT)
for capability_module in self.capability_modules:
s += line_prepend + " Capability Module: %s\n" % capability_module
return s
def __hash__(self):
return hash(str(self.to_dict()))
@staticmethod
def from_etree(etree_xml):
texpr_infos = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Targeting_Expression_Info', namespaces=ns_map)
texpr_info_list = []
for texpr_info in texpr_infos:
texpr_info_list.append(DefaultQueryInfo.TargetingExpressionInfo.from_etree(texpr_info))
cms = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Capability_Module', namespaces=ns_map)
cms_list = []
for cm in cms:
cms_list.append(cm.text)
return DefaultQueryInfo(texpr_info_list, cms_list)
@staticmethod
def from_dict(d):
kwargs = {}
kwargs['targeting_expression_infos'] = []
for expression_info in d['targeting_expression_infos']:
kwargs['targeting_expression_infos'].append(DefaultQueryInfo.TargetingExpressionInfo.from_dict(expression_info))
kwargs['capability_modules'] = d['capability_modules']
return DefaultQueryInfo(**kwargs)
class TargetingExpressionInfo(TAXIIBase):
"""This class describes supported Targeting Expressions
:param string targeting_expression_id: The supported targeting expression ID
:param preferred_scope: Indicates the preferred scope of queries
:type preferred_scope: :class:`list` of :class:`string`
:param allowed_scope: Indicates the allowed scope of queries
:type allowed_scope: :class:`list` of :class:`string`
"""
def __init__(self, targeting_expression_id, preferred_scope=None, allowed_scope=None):
self.targeting_expression_id = targeting_expression_id
self.preferred_scope = preferred_scope or []
self.allowed_scope = allowed_scope or []
@property
def sort_key(self):
return self.targeting_expression_id
@property
def targeting_expression_id(self):
return self._targeting_expression_id
@targeting_expression_id.setter
def targeting_expression_id(self, value):
do_check(value, 'targeting_expression_id', regex_tuple=uri_regex)
self._targeting_expression_id = value
@property
def preferred_scope(self):
return self._preferred_scope
@preferred_scope.setter
def preferred_scope(self, value):
do_check(value, 'preferred_scope', type=six.string_types, regex_tuple=targeting_expression_regex)
self._preferred_scope = value
@property
def allowed_scope(self):
return self._allowed_scope
@allowed_scope.setter
def allowed_scope(self, value):
do_check(value, 'allowed_scope', type=six.string_types, regex_tuple=targeting_expression_regex)
self._allowed_scope = value
def to_etree(self):
tei = etree.Element('{%s}Targeting_Expression_Info' % ns_map['tdq'])
tei.attrib['targeting_expression_id'] = self.targeting_expression_id
for scope in self.preferred_scope:
preferred = etree.SubElement(tei, '{%s}Preferred_Scope' % ns_map['tdq'])
preferred.text = scope
for scope in self.allowed_scope:
allowed = etree.SubElement(tei, '{%s}Allowed_Scope' % ns_map['tdq'])
allowed.text = scope
return tei
def to_dict(self):
d = {}
d['targeting_expression_id'] = self.targeting_expression_id
# TODO: Preferred / Allowed scope look like serialization bugs
d['preferred_scope'] = self.preferred_scope
d['allowed_scope'] = self.allowed_scope
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Targeting Expression Info ===\n"
s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id
for scope in self.preferred_scope:
s += line_prepend + " Preferred Scope: %s\n" % scope
for scope in self.allowed_scope:
s += line_prepend + " Allowed Scope: %s\n" % scope
return s
def __hash__(self):
return hash(str(self.to_dict()))
@staticmethod
def from_etree(etree_xml):
kwargs = {}
kwargs['targeting_expression_id'] = etree_xml.xpath('./@targeting_expression_id', namespaces=ns_map)[0]
kwargs['preferred_scope'] = []
preferred_scope_set = etree_xml.xpath('./tdq:Preferred_Scope', namespaces=ns_map)
for preferred in preferred_scope_set:
kwargs['preferred_scope'].append(preferred.text)
kwargs['allowed_scope'] = []
allowed_scope_set = etree_xml.xpath('./tdq:Allowed_Scope', namespaces=ns_map)
for allowed in allowed_scope_set:
kwargs['allowed_scope'].append(allowed.text)
return DefaultQueryInfo.TargetingExpressionInfo(**kwargs)
@staticmethod
def from_dict(d):
return DefaultQueryInfo.TargetingExpressionInfo(**d)
[docs]class DefaultQuery(tm11.Query):
"""Conveys a TAXII Default Query.
:param string targeting_expression_id: The targeting_expression used in the query
:param criteria: The criteria of the query
:type criteria: :class:`DefaultQuery.Criteria`
"""
def __init__(self, targeting_expression_id, criteria):
super(DefaultQuery, self).__init__(FID_TAXII_DEFAULT_QUERY_10)
self.targeting_expression_id = targeting_expression_id
self.criteria = criteria
@property
def targeting_expression_id(self):
return self._targeting_expression_id
@targeting_expression_id.setter
def targeting_expression_id(self, value):
do_check(value, 'targeting_expression_id', regex_tuple=uri_regex)
self._targeting_expression_id = value
@property
def criteria(self):
return self._criteria
@criteria.setter
def criteria(self, value):
do_check(value, 'criteria', type=DefaultQuery.Criteria)
self._criteria = value
def to_etree(self):
q = super(DefaultQuery, self).to_etree()
dq = etree.SubElement(q, '{%s}Default_Query' % ns_map['tdq'], nsmap=ns_map)
dq.attrib['targeting_expression_id'] = self.targeting_expression_id
dq.append(self.criteria.to_etree())
return q
def to_dict(self):
d = super(DefaultQuery, self).to_dict()
d['targeting_expression_id'] = self.targeting_expression_id
d['criteria'] = self.criteria.to_dict()
return d
def to_text(self, line_prepend=''):
s = super(DefaultQuery, self).to_text(line_prepend)
s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id
s += self.criteria.to_text(line_prepend)
return s
@staticmethod
def from_etree(etree_xml):
tei = etree_xml.xpath('./tdq:Default_Query/@targeting_expression_id', namespaces=ns_map)[0] # attrib['targeting_expression_id']
criteria = DefaultQuery.Criteria.from_etree(etree_xml.xpath('./tdq:Default_Query/tdq:Criteria', namespaces=ns_map)[0])
return DefaultQuery(tei, criteria)
@staticmethod
def from_dict(d):
tei = d['targeting_expression_id']
criteria = DefaultQuery.Criteria.from_dict(d['criteria'])
return DefaultQuery(tei, criteria)
class Criteria(TAXIIBase):
"""Represents criteria for a :class:`DefaultQuery`. **Note**: At least one criterion OR criteria MUST be present
:param str operator: The logical operator (should be one of `OP_AND` or `OP_OR`)
:param criteria: The criteria for the query
:type criteria: :class:`DefaultQuery.Criteria`
:param criterion: The criterion for the query
:type criterion: :class:`DefaultQuery.Criterion`
"""
def __init__(self, operator, criteria=None, criterion=None):
self.operator = operator
self.criteria = criteria or []
self.criterion = criterion or []
@property
def sort_key(self):
key_list = []
ia = sorted(self.criteria, key=attrgetter('sort_key'))
ion = sorted(self.criterion, key=attrgetter('sort_key'))
for i in ia:
key_list.append(i.sort_key)
for i in ion:
key_list.append(i.sort_key)
return ''.join(key_list)
@property
def operator(self):
return self._operator
@operator.setter
def operator(self, value):
do_check(value, 'operator', value_tuple=OP_TYPES)
self._operator = value
@property
def criteria(self):
return self._criteria
@criteria.setter
def criteria(self, value):
do_check(value, 'critiera', type=DefaultQuery.Criteria)
self._criteria = value
@property
def criterion(self):
return self._criterion
@criterion.setter
def criterion(self, value):
do_check(value, 'criterion', type=DefaultQuery.Criterion)
self._criterion = value
def to_etree(self):
cr = etree.Element('{%s}Criteria' % ns_map['tdq'], nsmap=ns_map)
cr.attrib['operator'] = self.operator
for criteria in self.criteria:
cr.append(criteria.to_etree())
for criterion in self.criterion:
cr.append(criterion.to_etree())
return cr
def to_dict(self):
d = {}
d['operator'] = self.operator
d['criteria'] = []
for criteria in self.criteria:
d['criteria'].append(criteria.to_dict())
d['criterion'] = []
for criterion in self.criterion:
d['criterion'].append(criterion.to_dict())
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Criteria ===\n"
s += line_prepend + " Operator: %s\n" % self.operator
for criteria in self.criteria:
s += criteria.to_text(line_prepend + STD_INDENT)
for criterion in self.criterion:
s += criterion.to_text(line_prepend + STD_INDENT)
return s
@staticmethod
def from_etree(etree_xml):
kwargs = {}
kwargs['operator'] = etree_xml.attrib['operator']
kwargs['criteria'] = []
criteria_set = etree_xml.xpath('./tdq:Criteria', namespaces=ns_map)
for criteria in criteria_set:
kwargs['criteria'].append(DefaultQuery.Criteria.from_etree(criteria))
kwargs['criterion'] = []
criterion_set = etree_xml.xpath('./tdq:Criterion', namespaces=ns_map)
for criterion in criterion_set:
kwargs['criterion'].append(DefaultQuery.Criterion.from_etree(criterion))
return DefaultQuery.Criteria(**kwargs)
@staticmethod
def from_dict(d):
kwargs = {}
kwargs['operator'] = d['operator']
kwargs['criteria'] = []
criteria_set = d.get('criteria', [])
for criteria in criteria_set:
kwargs['criteria'].append(DefaultQuery.Criteria.from_dict(criteria))
kwargs['criterion'] = []
criterion_set = d.get('criterion', [])
for criterion in criterion_set:
kwargs['criterion'].append(DefaultQuery.Criterion.from_dict(criterion))
return DefaultQuery.Criteria(**kwargs)
class Criterion(TAXIIBase):
"""Represents criterion for a :class:`DefaultQuery.Criteria`
:param string target: A targeting expression identifying the target
:param test: The test to be applied to the target
:type test: :class:`DefaultQuery.Criterion.Test`
:param bool negate: Whether the result of applying the test to the target should be negated
"""
def __init__(self, target, test, negate=False):
self.negate = negate
self.target = target
self.test = test
@property
def sort_key(self):
return self.target
@property
def negate(self):
return self._negate
@negate.setter
def negate(self, value):
do_check(value, 'negate', value_tuple=(True, False), can_be_none=True)
self._negate = value
@property
def target(self):
return self._target
@target.setter
def target(self, value):
do_check(value, 'target', type=six.string_types)
self._target = value
@property
def test(self):
return self._test
@test.setter
def test(self, value):
do_check(value, value, type=DefaultQuery.Criterion.Test)
self._test = value
def to_etree(self):
cr = etree.Element('{%s}Criterion' % ns_map['tdq'], nsmap=ns_map)
if self.negate is not None:
cr.attrib['negate'] = str(self.negate).lower()
target = etree.SubElement(cr, '{%s}Target' % ns_map['tdq'], nsmap=ns_map)
target.text = self.target
cr.append(self.test.to_etree())
return cr
def to_dict(self):
d = {}
d['negate'] = None
if self.negate is not None:
d['negate'] = self.negate
d['target'] = self.target
d['test'] = self.test.to_dict()
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Criterion ===\n"
s += line_prepend + " Negate: %s\n" % (self.negate if (None != self.negate) else False)
s += line_prepend + " Target: %s\n" % self.target
s += self.test.to_text(line_prepend + STD_INDENT)
return s
@staticmethod
def from_etree(etree_xml):
negate_set = etree_xml.xpath('./@negate')
negate = None
if len(negate_set) > 0:
negate = negate_set[0] == 'true'
target = etree_xml.xpath('./tdq:Target', namespaces=ns_map)[0].text
test = DefaultQuery.Criterion.Test.from_etree(etree_xml.xpath('./tdq:Test', namespaces=ns_map)[0])
return DefaultQuery.Criterion(target, test, negate)
@staticmethod
def from_dict(d):
negate = d.get('negate', None)
target = d['target']
test = DefaultQuery.Criterion.Test.from_dict(d['test'])
return DefaultQuery.Criterion(target, test, negate)
class Test(TAXIIBase):
"""
:param string capability_id: The ID of the capability module that defines the relationship & parameters
:param string relationship: The relationship (e.g., equals)
:param parameters: The parameters for the relationship.
:type parameters: :class:`dict` of key/value pairs
"""
def __init__(self, capability_id, relationship, parameters=None):
self.capability_id = capability_id
self.relationship = relationship
self.parameters = parameters or {}
self.validate()
@property
def capability_id(self):
return self._capability_id
@capability_id.setter
def capability_id(self, value):
do_check(value, 'capability_id', regex_tuple=uri_regex)
self._capability_id = value
@property
def relationship(self):
return self._relationship
@relationship.setter
def relationship(self, value):
# TODO: For known capability IDs, check that the relationship is valid
# TODO: provide a way to register other capability IDs
do_check(value, 'relationship', type=six.string_types)
self._relationship = value
@property
def parameters(self):
return self._parameters
@parameters.setter
def parameters(self, value):
do_check(list(value.keys()), 'parameters.keys()', regex_tuple=uri_regex)
self._parameters = value
# TODO: Can this be done better?
def validate(self):
capability_module = capability_modules.get(self.capability_id)
if capability_module is None: # Nothing is defined for this, validation not possible
return True
relationship = capability_module.relationships.get(self.relationship)
if relationship is None:
raise Exception('relationship not in defined relationships. %s not in %s' % (self.relationship, list(capability_module.relationships.keys())))
for name, value in list(self.parameters.items()):
param = relationship.parameters.get(name)
if param is None:
raise Exception('name not valid. %s not in %s' % (name, list(relationship.parameters.keys())))
param.verify(value)
def to_etree(self):
t = etree.Element('{%s}Test' % ns_map['tdq'], nsmap=ns_map)
t.attrib['capability_id'] = self.capability_id
t.attrib['relationship'] = self.relationship
for k, v in list(self.parameters.items()):
p = etree.SubElement(t, '{%s}Parameter' % ns_map['tdq'])
p.attrib['name'] = k
if isinstance(v, bool):
p.text = str(v).lower()
elif isinstance(v, datetime.datetime):
p.text = v.isoformat()
elif isinstance(v, numbers.Number):
p.text = str(v)
else:
p.text = v
return t
def to_dict(self):
d = {}
d['capability_id'] = self.capability_id
d['relationship'] = self.relationship
d['parameters'] = self.parameters
return d
def to_text(self, line_prepend=''):
s = line_prepend + "=== Test ==\n"
s += line_prepend + " Capability ID: %s\n" % self.capability_id
s += line_prepend + " Relationship: %s\n" % self.relationship
for k, v in six.iteritems(self.parameters):
s += line_prepend + " Parameter: %s = %s\n" % (k, v)
return s
@staticmethod
def from_etree(etree_xml):
capability_id = etree_xml.attrib['capability_id']
relationship = etree_xml.attrib['relationship']
parameters = {}
cm = capability_modules.get(capability_id, None)
if cm is not None:
r = cm.relationships.get(relationship, None)
if r is None:
raise ValueError('Relationship (%s) not in CM (%s).' % (r, capability_id))
else:
r = None
for parameter in etree_xml.xpath('./tdq:Parameter', namespaces=ns_map):
k = parameter.attrib['name']
v = parameter.text
if v in ('true', 'false'): # bool is a special case
parameters[k] = v == 'true'
elif r is not None:
type_ = r.parameters[k].type
if type_ == six.string_types: # basestring can't be instantiated, but str can be
type_ = str
elif type_ == datetime.datetime:
# We can use this function to parse datetime strings.
type_ = dateutil.parser.parse
parameters[k] = type_(v)
else:
parameters[k] = v
return DefaultQuery.Criterion.Test(capability_id, relationship, parameters)
@staticmethod
def from_dict(d):
return DefaultQuery.Criterion.Test(**d)
DefaultQueryInfo.TargetingExpressionInfo = TargetingExpressionInfo
DefaultQuery.Criterion = Criterion
DefaultQuery.Criteria = Criteria
DefaultQuery.Criterion.Test = Test
package_dir, package_filename = os.path.split(__file__)
schema_file = os.path.join(package_dir, "xsd", "TAXII_DefaultQuery_Schema.xsd")
tm11.register_query_format(
format_id=FID_TAXII_DEFAULT_QUERY_10,
query=DefaultQuery,
query_info=DefaultQueryInfo,
schema=schema_file)