Source code for libtaxii.taxii_default_query

# Copyright (c) 2017, The MITRE Corporation
# For license information, see the LICENSE.txt file

Creating, handling, and parsing TAXII Default Queries.

import numbers
import datetime
from operator import attrgetter
import os
import dateutil.parser

from lxml import etree

import libtaxii.messages_11 as tm11

from .common import TAXIIBase
from .validation import (do_check, uri_regex, targeting_expression_regex)
from .constants import *
import six

class CapabilityModule(object):

    A Capability Module has valid relationships
    Each relationship has 0-n valid parameters

    def __init__(self, capability_module_id, relationships):
        self.capability_module_id = capability_module_id
        self.relationships = relationships

    def capability_module_id(self):
        return self._capability_module_id

    def capability_module_id(self, value):
        do_check(value, 'capability_module_id', type=six.string_types)
        self._capability_module_id = value

    def relationships(self):
        return self._relationships

    def relationships(self, value):
        do_check(value, 'relationships', type=Relationship)
        self._relationships = {}
        for item in value:
            self._relationships[] = item

class Relationship(object):

    def __init__(self, name, parameters=None): = name
        self.parameters = parameters or []

    def name(self):
        return self._name

    def name(self, value):
        do_check(value, 'name', type=six.string_types)
        self._name = value

    def parameters(self):
        return self._parameters

    def parameters(self, value):
        do_check(value, 'parameters', type=Parameter)
        self._parameters = {}
        for item in value:
            self._parameters[] = item

class Parameter(object):

    def __init__(self, name, type, value_tuple=None): = name
        self.type = type
        self.value_tuple = value_tuple

    def verify(self, value):
        do_check(value, 'value', type=self.type, value_tuple=self.value_tuple)
        return True, 'OK'

# params - Define parameters for the Core/Regex/Timestamp capability modules
param_str_value = Parameter(P_VALUE, six.string_types)
param_float_value = Parameter(P_VALUE, float)
param_ts_value = Parameter(P_VALUE, datetime.datetime)
param_match_type = Parameter(P_MATCH_TYPE, six.string_types, ('case_sensitive_string', 'case_insensitive_string', 'number'))
param_case_sensitive = Parameter(P_CASE_SENSITIVE, bool, (True, False))

# CORE Relationships - Define relationships for the core capability module
rel_equals = Relationship(R_EQUALS, [param_str_value, param_match_type])
rel_not_equals = Relationship(R_NOT_EQUALS, [param_str_value, param_match_type])
rel_greater_than = Relationship(R_GREATER_THAN, [param_float_value])
rel_greater_than_or_equal = Relationship(R_GREATER_THAN_OR_EQUAL, [param_float_value])
rel_less_than = Relationship(R_LESS_THAN, [param_float_value])
rel_less_than_or_equal = Relationship(R_LESS_THAN_OR_EQUAL, [param_float_value])
rel_dne = Relationship(R_DOES_NOT_EXIST)
rel_ex = Relationship(R_EXISTS)
rel_begins_with = Relationship(R_BEGINS_WITH, [param_case_sensitive, param_str_value])
rel_ends_with = Relationship(R_ENDS_WITH, [param_case_sensitive, param_str_value])
rel_contains = Relationship(R_CONTAINS, [param_case_sensitive, param_str_value])

# REGEX relationships
rel_matches = Relationship(R_MATCHES, [param_case_sensitive, param_str_value])

# TIMESTAMP relationships
rel_ts_eq = Relationship(R_EQUALS, [param_ts_value])
rel_ts_gt = Relationship(R_GREATER_THAN, [param_ts_value])
rel_ts_gte = Relationship(R_GREATER_THAN_OR_EQUAL, [param_ts_value])
rel_ts_lt = Relationship(R_LESS_THAN, [param_ts_value])
rel_ts_lte = Relationship(R_LESS_THAN_OR_EQUAL, [param_ts_value])

# CORE - Define the Core Capability Module
cm_core = CapabilityModule(CM_CORE,
                           [rel_equals, rel_not_equals, rel_greater_than,
                            rel_greater_than_or_equal, rel_less_than,
                            rel_less_than_or_equal, rel_dne, rel_ex,
                            rel_begins_with, rel_contains, rel_ends_with]

# REGEX - Define the RegEx Capability Module
cm_regex = CapabilityModule(CM_REGEX, [rel_matches])

# TIMESTAMP - Define the timestamp Capability Module
cm_timestamp = CapabilityModule(CM_TIMESTAMP, [rel_ts_eq, rel_ts_gt, rel_ts_gte, rel_ts_lt, rel_ts_lte])

capability_modules = {CM_CORE: cm_core, CM_REGEX: cm_regex, CM_TIMESTAMP: cm_timestamp}

[docs]class DefaultQueryInfo(tm11.SupportedQuery): """ Used to describe the TAXII Default Queries that are supported. :param targeting_expression_infos: Describe the supported targeting expressions :type targeting_expression_infos: :class:`list` of :class:`TargetingExpressionInfo` objects :param capability_modules: Indicate the supported capability modules :type capability_modules: :class:`list` of :class:`str` """ def __init__(self, targeting_expression_infos, capability_modules): super(DefaultQueryInfo, self).__init__(FID_TAXII_DEFAULT_QUERY_10) self.targeting_expression_infos = targeting_expression_infos self.capability_modules = capability_modules @property def targeting_expression_infos(self): return self._targeting_expression_infos @targeting_expression_infos.setter def targeting_expression_infos(self, value): do_check(value, 'targeting_expression_infos', type=DefaultQueryInfo.TargetingExpressionInfo) self._targeting_expression_infos = value @property def capability_modules(self): return self._capability_modules @capability_modules.setter def capability_modules(self, value): do_check(value, 'capability_modules', regex_tuple=uri_regex) self._capability_modules = value def to_etree(self): q = super(DefaultQueryInfo, self).to_etree() dqi = etree.SubElement(q, '{%s}Default_Query_Info' % ns_map['tdq']) for expression_info in self.targeting_expression_infos: dqi.append(expression_info.to_etree()) for cmod in self.capability_modules: cm = etree.SubElement(dqi, '{%s}Capability_Module' % ns_map['tdq'], nsmap=ns_map) cm.text = cmod return q def to_dict(self): d = super(DefaultQueryInfo, self).to_dict() d['targeting_expression_infos'] = [] for expression_info in self.targeting_expression_infos: d['targeting_expression_infos'].append(expression_info.to_dict()) # TODO: This looks like a serialization bug d['capability_modules'] = self.capability_modules return d def to_text(self, line_prepend=''): s = super(DefaultQueryInfo, self).to_text(line_prepend) for expression_info in self.targeting_expression_infos: s += expression_info.to_text(line_prepend + STD_INDENT) for capability_module in self.capability_modules: s += line_prepend + " Capability Module: %s\n" % capability_module return s def __hash__(self): return hash(str(self.to_dict())) @staticmethod def from_etree(etree_xml): texpr_infos = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Targeting_Expression_Info', namespaces=ns_map) texpr_info_list = [] for texpr_info in texpr_infos: texpr_info_list.append(DefaultQueryInfo.TargetingExpressionInfo.from_etree(texpr_info)) cms = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Capability_Module', namespaces=ns_map) cms_list = [] for cm in cms: cms_list.append(cm.text) return DefaultQueryInfo(texpr_info_list, cms_list) @staticmethod def from_dict(d): kwargs = {} kwargs['targeting_expression_infos'] = [] for expression_info in d['targeting_expression_infos']: kwargs['targeting_expression_infos'].append(DefaultQueryInfo.TargetingExpressionInfo.from_dict(expression_info)) kwargs['capability_modules'] = d['capability_modules'] return DefaultQueryInfo(**kwargs)
class TargetingExpressionInfo(TAXIIBase): """This class describes supported Targeting Expressions :param string targeting_expression_id: The supported targeting expression ID :param preferred_scope: Indicates the preferred scope of queries :type preferred_scope: :class:`list` of :class:`string` :param allowed_scope: Indicates the allowed scope of queries :type allowed_scope: :class:`list` of :class:`string` """ def __init__(self, targeting_expression_id, preferred_scope=None, allowed_scope=None): self.targeting_expression_id = targeting_expression_id self.preferred_scope = preferred_scope or [] self.allowed_scope = allowed_scope or [] @property def sort_key(self): return self.targeting_expression_id @property def targeting_expression_id(self): return self._targeting_expression_id @targeting_expression_id.setter def targeting_expression_id(self, value): do_check(value, 'targeting_expression_id', regex_tuple=uri_regex) self._targeting_expression_id = value @property def preferred_scope(self): return self._preferred_scope @preferred_scope.setter def preferred_scope(self, value): do_check(value, 'preferred_scope', type=six.string_types, regex_tuple=targeting_expression_regex) self._preferred_scope = value @property def allowed_scope(self): return self._allowed_scope @allowed_scope.setter def allowed_scope(self, value): do_check(value, 'allowed_scope', type=six.string_types, regex_tuple=targeting_expression_regex) self._allowed_scope = value def to_etree(self): tei = etree.Element('{%s}Targeting_Expression_Info' % ns_map['tdq']) tei.attrib['targeting_expression_id'] = self.targeting_expression_id for scope in self.preferred_scope: preferred = etree.SubElement(tei, '{%s}Preferred_Scope' % ns_map['tdq']) preferred.text = scope for scope in self.allowed_scope: allowed = etree.SubElement(tei, '{%s}Allowed_Scope' % ns_map['tdq']) allowed.text = scope return tei def to_dict(self): d = {} d['targeting_expression_id'] = self.targeting_expression_id # TODO: Preferred / Allowed scope look like serialization bugs d['preferred_scope'] = self.preferred_scope d['allowed_scope'] = self.allowed_scope return d def to_text(self, line_prepend=''): s = line_prepend + "=== Targeting Expression Info ===\n" s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id for scope in self.preferred_scope: s += line_prepend + " Preferred Scope: %s\n" % scope for scope in self.allowed_scope: s += line_prepend + " Allowed Scope: %s\n" % scope return s def __hash__(self): return hash(str(self.to_dict())) @staticmethod def from_etree(etree_xml): kwargs = {} kwargs['targeting_expression_id'] = etree_xml.xpath('./@targeting_expression_id', namespaces=ns_map)[0] kwargs['preferred_scope'] = [] preferred_scope_set = etree_xml.xpath('./tdq:Preferred_Scope', namespaces=ns_map) for preferred in preferred_scope_set: kwargs['preferred_scope'].append(preferred.text) kwargs['allowed_scope'] = [] allowed_scope_set = etree_xml.xpath('./tdq:Allowed_Scope', namespaces=ns_map) for allowed in allowed_scope_set: kwargs['allowed_scope'].append(allowed.text) return DefaultQueryInfo.TargetingExpressionInfo(**kwargs) @staticmethod def from_dict(d): return DefaultQueryInfo.TargetingExpressionInfo(**d)
[docs]class DefaultQuery(tm11.Query): """Conveys a TAXII Default Query. :param string targeting_expression_id: The targeting_expression used in the query :param criteria: The criteria of the query :type criteria: :class:`DefaultQuery.Criteria` """ def __init__(self, targeting_expression_id, criteria): super(DefaultQuery, self).__init__(FID_TAXII_DEFAULT_QUERY_10) self.targeting_expression_id = targeting_expression_id self.criteria = criteria @property def targeting_expression_id(self): return self._targeting_expression_id @targeting_expression_id.setter def targeting_expression_id(self, value): do_check(value, 'targeting_expression_id', regex_tuple=uri_regex) self._targeting_expression_id = value @property def criteria(self): return self._criteria @criteria.setter def criteria(self, value): do_check(value, 'criteria', type=DefaultQuery.Criteria) self._criteria = value def to_etree(self): q = super(DefaultQuery, self).to_etree() dq = etree.SubElement(q, '{%s}Default_Query' % ns_map['tdq'], nsmap=ns_map) dq.attrib['targeting_expression_id'] = self.targeting_expression_id dq.append(self.criteria.to_etree()) return q def to_dict(self): d = super(DefaultQuery, self).to_dict() d['targeting_expression_id'] = self.targeting_expression_id d['criteria'] = self.criteria.to_dict() return d def to_text(self, line_prepend=''): s = super(DefaultQuery, self).to_text(line_prepend) s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id s += self.criteria.to_text(line_prepend) return s @staticmethod def from_etree(etree_xml): tei = etree_xml.xpath('./tdq:Default_Query/@targeting_expression_id', namespaces=ns_map)[0] # attrib['targeting_expression_id'] criteria = DefaultQuery.Criteria.from_etree(etree_xml.xpath('./tdq:Default_Query/tdq:Criteria', namespaces=ns_map)[0]) return DefaultQuery(tei, criteria) @staticmethod def from_dict(d): tei = d['targeting_expression_id'] criteria = DefaultQuery.Criteria.from_dict(d['criteria']) return DefaultQuery(tei, criteria)
class Criteria(TAXIIBase): """Represents criteria for a :class:`DefaultQuery`. **Note**: At least one criterion OR criteria MUST be present :param str operator: The logical operator (should be one of `OP_AND` or `OP_OR`) :param criteria: The criteria for the query :type criteria: :class:`DefaultQuery.Criteria` :param criterion: The criterion for the query :type criterion: :class:`DefaultQuery.Criterion` """ def __init__(self, operator, criteria=None, criterion=None): self.operator = operator self.criteria = criteria or [] self.criterion = criterion or [] @property def sort_key(self): key_list = [] ia = sorted(self.criteria, key=attrgetter('sort_key')) ion = sorted(self.criterion, key=attrgetter('sort_key')) for i in ia: key_list.append(i.sort_key) for i in ion: key_list.append(i.sort_key) return ''.join(key_list) @property def operator(self): return self._operator @operator.setter def operator(self, value): do_check(value, 'operator', value_tuple=OP_TYPES) self._operator = value @property def criteria(self): return self._criteria @criteria.setter def criteria(self, value): do_check(value, 'critiera', type=DefaultQuery.Criteria) self._criteria = value @property def criterion(self): return self._criterion @criterion.setter def criterion(self, value): do_check(value, 'criterion', type=DefaultQuery.Criterion) self._criterion = value def to_etree(self): cr = etree.Element('{%s}Criteria' % ns_map['tdq'], nsmap=ns_map) cr.attrib['operator'] = self.operator for criteria in self.criteria: cr.append(criteria.to_etree()) for criterion in self.criterion: cr.append(criterion.to_etree()) return cr def to_dict(self): d = {} d['operator'] = self.operator d['criteria'] = [] for criteria in self.criteria: d['criteria'].append(criteria.to_dict()) d['criterion'] = [] for criterion in self.criterion: d['criterion'].append(criterion.to_dict()) return d def to_text(self, line_prepend=''): s = line_prepend + "=== Criteria ===\n" s += line_prepend + " Operator: %s\n" % self.operator for criteria in self.criteria: s += criteria.to_text(line_prepend + STD_INDENT) for criterion in self.criterion: s += criterion.to_text(line_prepend + STD_INDENT) return s @staticmethod def from_etree(etree_xml): kwargs = {} kwargs['operator'] = etree_xml.attrib['operator'] kwargs['criteria'] = [] criteria_set = etree_xml.xpath('./tdq:Criteria', namespaces=ns_map) for criteria in criteria_set: kwargs['criteria'].append(DefaultQuery.Criteria.from_etree(criteria)) kwargs['criterion'] = [] criterion_set = etree_xml.xpath('./tdq:Criterion', namespaces=ns_map) for criterion in criterion_set: kwargs['criterion'].append(DefaultQuery.Criterion.from_etree(criterion)) return DefaultQuery.Criteria(**kwargs) @staticmethod def from_dict(d): kwargs = {} kwargs['operator'] = d['operator'] kwargs['criteria'] = [] criteria_set = d.get('criteria', []) for criteria in criteria_set: kwargs['criteria'].append(DefaultQuery.Criteria.from_dict(criteria)) kwargs['criterion'] = [] criterion_set = d.get('criterion', []) for criterion in criterion_set: kwargs['criterion'].append(DefaultQuery.Criterion.from_dict(criterion)) return DefaultQuery.Criteria(**kwargs) class Criterion(TAXIIBase): """Represents criterion for a :class:`DefaultQuery.Criteria` :param string target: A targeting expression identifying the target :param test: The test to be applied to the target :type test: :class:`DefaultQuery.Criterion.Test` :param bool negate: Whether the result of applying the test to the target should be negated """ def __init__(self, target, test, negate=False): self.negate = negate = target self.test = test @property def sort_key(self): return @property def negate(self): return self._negate @negate.setter def negate(self, value): do_check(value, 'negate', value_tuple=(True, False), can_be_none=True) self._negate = value @property def target(self): return self._target @target.setter def target(self, value): do_check(value, 'target', type=six.string_types) self._target = value @property def test(self): return self._test @test.setter def test(self, value): do_check(value, value, type=DefaultQuery.Criterion.Test) self._test = value def to_etree(self): cr = etree.Element('{%s}Criterion' % ns_map['tdq'], nsmap=ns_map) if self.negate is not None: cr.attrib['negate'] = str(self.negate).lower() target = etree.SubElement(cr, '{%s}Target' % ns_map['tdq'], nsmap=ns_map) target.text = cr.append(self.test.to_etree()) return cr def to_dict(self): d = {} d['negate'] = None if self.negate is not None: d['negate'] = self.negate d['target'] = d['test'] = self.test.to_dict() return d def to_text(self, line_prepend=''): s = line_prepend + "=== Criterion ===\n" s += line_prepend + " Negate: %s\n" % (self.negate if (None != self.negate) else False) s += line_prepend + " Target: %s\n" % s += self.test.to_text(line_prepend + STD_INDENT) return s @staticmethod def from_etree(etree_xml): negate_set = etree_xml.xpath('./@negate') negate = None if len(negate_set) > 0: negate = negate_set[0] == 'true' target = etree_xml.xpath('./tdq:Target', namespaces=ns_map)[0].text test = DefaultQuery.Criterion.Test.from_etree(etree_xml.xpath('./tdq:Test', namespaces=ns_map)[0]) return DefaultQuery.Criterion(target, test, negate) @staticmethod def from_dict(d): negate = d.get('negate', None) target = d['target'] test = DefaultQuery.Criterion.Test.from_dict(d['test']) return DefaultQuery.Criterion(target, test, negate) class Test(TAXIIBase): """ :param string capability_id: The ID of the capability module that defines the relationship & parameters :param string relationship: The relationship (e.g., equals) :param parameters: The parameters for the relationship. :type parameters: :class:`dict` of key/value pairs """ def __init__(self, capability_id, relationship, parameters=None): self.capability_id = capability_id self.relationship = relationship self.parameters = parameters or {} self.validate() @property def capability_id(self): return self._capability_id @capability_id.setter def capability_id(self, value): do_check(value, 'capability_id', regex_tuple=uri_regex) self._capability_id = value @property def relationship(self): return self._relationship @relationship.setter def relationship(self, value): # TODO: For known capability IDs, check that the relationship is valid # TODO: provide a way to register other capability IDs do_check(value, 'relationship', type=six.string_types) self._relationship = value @property def parameters(self): return self._parameters @parameters.setter def parameters(self, value): do_check(list(value.keys()), 'parameters.keys()', regex_tuple=uri_regex) self._parameters = value # TODO: Can this be done better? def validate(self): capability_module = capability_modules.get(self.capability_id) if capability_module is None: # Nothing is defined for this, validation not possible return True relationship = capability_module.relationships.get(self.relationship) if relationship is None: raise Exception('relationship not in defined relationships. %s not in %s' % (self.relationship, list(capability_module.relationships.keys()))) for name, value in list(self.parameters.items()): param = relationship.parameters.get(name) if param is None: raise Exception('name not valid. %s not in %s' % (name, list(relationship.parameters.keys()))) param.verify(value) def to_etree(self): t = etree.Element('{%s}Test' % ns_map['tdq'], nsmap=ns_map) t.attrib['capability_id'] = self.capability_id t.attrib['relationship'] = self.relationship for k, v in list(self.parameters.items()): p = etree.SubElement(t, '{%s}Parameter' % ns_map['tdq']) p.attrib['name'] = k if isinstance(v, bool): p.text = str(v).lower() elif isinstance(v, datetime.datetime): p.text = v.isoformat() elif isinstance(v, numbers.Number): p.text = str(v) else: p.text = v return t def to_dict(self): d = {} d['capability_id'] = self.capability_id d['relationship'] = self.relationship d['parameters'] = self.parameters return d def to_text(self, line_prepend=''): s = line_prepend + "=== Test ==\n" s += line_prepend + " Capability ID: %s\n" % self.capability_id s += line_prepend + " Relationship: %s\n" % self.relationship for k, v in six.iteritems(self.parameters): s += line_prepend + " Parameter: %s = %s\n" % (k, v) return s @staticmethod def from_etree(etree_xml): capability_id = etree_xml.attrib['capability_id'] relationship = etree_xml.attrib['relationship'] parameters = {} cm = capability_modules.get(capability_id, None) if cm is not None: r = cm.relationships.get(relationship, None) if r is None: raise ValueError('Relationship (%s) not in CM (%s).' % (r, capability_id)) else: r = None for parameter in etree_xml.xpath('./tdq:Parameter', namespaces=ns_map): k = parameter.attrib['name'] v = parameter.text if v in ('true', 'false'): # bool is a special case parameters[k] = v == 'true' elif r is not None: type_ = r.parameters[k].type if type_ == six.string_types: # basestring can't be instantiated, but str can be type_ = str elif type_ == datetime.datetime: # We can use this function to parse datetime strings. type_ = dateutil.parser.parse parameters[k] = type_(v) else: parameters[k] = v return DefaultQuery.Criterion.Test(capability_id, relationship, parameters) @staticmethod def from_dict(d): return DefaultQuery.Criterion.Test(**d) DefaultQueryInfo.TargetingExpressionInfo = TargetingExpressionInfo DefaultQuery.Criterion = Criterion DefaultQuery.Criteria = Criteria DefaultQuery.Criterion.Test = Test package_dir, package_filename = os.path.split(__file__) schema_file = os.path.join(package_dir, "xsd", "TAXII_DefaultQuery_Schema.xsd") tm11.register_query_format( format_id=FID_TAXII_DEFAULT_QUERY_10, query=DefaultQuery, query_info=DefaultQueryInfo, schema=schema_file)