2. 软件开发环境国家重点实验室北京航空航天大学, 北京 100191;
3. 北京市网络技术重点实验室北京航空航天大学, 北京 100191;
4. 深圳市腾讯计算机系统有限公司, 广东 深圳 518057
2. State Key Laboratory of Software Development Environment BeiHang University, Beijing 100191, China;
3. Beijing Key Laboratory of Network Technology BeiHang University, Beijing 100191, China;
4. Tencent Computer System Co., Ltd., Shenzhen 518057, China
随着大数据处理的深入发展,系统规模日渐扩大,系统需求日趋复杂,其产生的数据量也在不断增加.与此同时,在许多应用,诸如实时监控、股票在线分析以及医疗生命保障等对系统响应速度具有较高要求的系统中,针对大规模复杂数据进行快速处理乃至实时处理的需求更加突出.作为一种针对大量数据进行实时处理的技术,复杂事件处理实现的是将用户对特定的事件序列的查询需求映射到某个一般化算法上,从多个持续事件流中分析并提取满足特定模式的事件序列.
在复杂事件处理中,每一组到达的数据都被认为是一个事件,由用户输入查询条件,系统则根据查询语义,对输入的事件进行持续搜索,找出满足查询语义的事件组合.通过使用复杂事件处理技术,用户定制相应的查询规则,输出相应的事件组合作为查询结果.例如,股票在线分析系统就对数据的实时处理有较高的要求,因为股价波动速度较快,用户需要在极短的时间内针对符合买入、卖出条件的股票进行操作.因此,用户可以通过复杂事件处理技术探测股价波动、交易状况以及相关新闻等多种类型数据之间的关联关系,规定相应的买卖条件,一旦股票满足其规定的条件,即可及时进行相应的买入或卖出操作.此外,在实时系统监控中,将复杂事件处理技术应用于故障探测也具有明显的优势,国内的大数据处理领域较为突出的公司之一,腾讯也在尝试使用复杂事件处理技术对分布在全球各地的服务器进行故障检测.其原有系统通过简单的阈值设置以及逻辑组合的方式实现故障检测及报警,但与之俱来的是经常出现重复报警、无法追溯故障根源、故障信息列表难以维护等诸多问题.在改进后,将底层获取的基本信息(如计算资源监控数据、存储状况监控数据、网络访问状况监控数据)以及定制的基本事件(DOS攻击事件、用户访问事件、查询操作事件)进行封装,作为事件输入到复杂事件处理的引擎.用户根据已知的故障模型定制查询语句,要求引擎在输入的事件中查找符合故障模型的组合.在故障检测实施过程中,引擎根据用户定制的查询语句构建识别模型,使底层获取的事件持续流入该模型.由该识别结构在这些事件中搜索符合查询条件的集合作为结果,一旦获得结果,就表示满足故障模型的事件已经出现,即,故障已经发生.该公司此前遇到的大多数问题,诸如重复报警、故障根源追溯等,都可以通过复杂事件处理技术中的克林闭包查询、时序查询等方式来解决,其他特殊需求还可以通过复杂事件处理的算子扩展等方式进行处理.作为流处理技术的一个分支,复杂事件处理技术正在更加广泛地应用于各类实时系统中.与此同时,当前的复杂事件处理技术面临如下的挑战:
· 海量输入数据的实时处理.单位时间事件的处理速度,是复杂事件处理系统最根本的性能保障.首先,随着系统规模日渐庞大,单位时间产生的数据规模也日渐增多.因此,足够高的吞吐率才能保证复杂事件处理引擎能够被有效地应用于大数据环境.同时,在实时处理需求较高的环境中,处理时间成为非常重要的衡量标准.如何保证在尽量短的时间内返回符合条件的查询结果,也是复杂事件处理技术的重要性能指标.例如,在云环境的监控中,由于每分钟的计算资源、存储资源以及带宽都被列为计费项目,一旦故障产生而不能及时被探测,将浪费大量的资源.
· 查询条件的可扩展性.在数据处理的过程中,系统不可能预置全部的查询模式,同时,许多系统在演化过程中会产生新的模式,一种支持扩展语义的复杂事件处理模型是非常必要的.例如,在股票在线分析系统中,用户除了需要软件查询已知的如股价V字波动以外,还需要根据自己的经验自行定制股价波动模式和相应的查询条件;在系统监控过程中,不同的系统存在许多特定的故障,其数学特征未必明确,探测这些故障也往往需要用户自行定制.因此,语义的可扩展支持是实时复杂事件处理非常迫切的需求之一.
针对上述问题,本文提出一种基于算子的可扩展复杂事件处理模型.本文主要贡献如下:
(1) 提出了基于算子的事件匹配模型,将事件流的各种复合操作过程映射到一个基本算子上,并通过算子的匹配判定函数、复合计算函数以及输出界定函数描述事件流的复合过程;
(2) 通过建立算子与现有语义的映射,实现用算子的方式描述用户的查询规则,从而保证通过算子构建的复杂事件处理模型在满足查询规则的前提下支持可扩展性;
(3) 对算子以及由算子构建的查询规则树结构进行时间消耗评估,并基于此进行优化,得到算子树的最优构建策略,并在实验环节进行相关验证.
1 相关工作有别于数据库的存储-索引-查询模式[1, 2, 3, 4]以及查询订阅模式[5,6],复杂事件处理实现了对流数据的实时处理.在复杂事件流引擎的实际使用过程中,引擎首先根据用户的每条查询中的约束构建识别结构,保证流出该结构的数据符合查询中所有约束;而后,在实际运行过程中,由流数据不断流入识别结构,识别结构根据约束自行选取符合的数据[7].当前主要的识别结构包括NFA结构、树形结构、图以及Petri网等几类.
NFA模型是根据时序查询条件生成一个非确定性状态机模型,每当出现符合查询条件的事件后,变更当前存在状态机列表的状态,使状态机到达最终状态的事件集合,即为一组满足查询结果的事件集合.该状态机的构建过程需要分析全部的时序约束之后方能完成.该模型主要存在的问题是构建算法必须考虑到所有的时序组合构建情况.例如,在NFA模型中,时序查询中的“否约束”无法被单独定义以及实现,只能通过与序列算子联合定义才具有实际意义;同时,新扩展的时序约束可能导致构建整个 NFA算法发生改变,如克林约束,因此,该模型不具有良好的可扩展性.Cayuga[8,9]首先提出了NFA的代数逻辑以及基于其上的支持复杂事件查询的发布订阅系统.Diao[10,11,12,13,14]等人在其基础上提出了SASE+系统,改进NFA模型为NFAb[13]模型,使其更好地支持克林语义以及否语义,并在近年来关注于复杂事件处理的乱序问题[15].
树形复杂事件处理的实现则是根据每个查询规则生成一个树节点,数据流从树的叶子节点流入,在节点中复合生成新的事件,并向上流动直至根节点,即为输出结果.树形结构则因为结构灵活、扩展方便等特点而被关注,GEM[2]系统首先提出了树模型,其主要用于系统监控方面的应用.而后,MIT的Zstream[16]给出了树形构建的一般策略以及代价模型,但却没有明确给出定量化最优构建算法.因此,对于树结构如何充分利用中间节点找寻最优构建方式以及丰富语义仍有待研究.而后,在国内也有部分相关研究[5,6,17],这些研究工作都着重于系统内部性能的优化,在可扩展性方面没有涉及.由于树结构处理单元之间由数据流连接,可以很容易地将不同的处理单元分布到不同的节点进行处理.相比之下,前人的工作着重于根据查询语义实现识别结构,而并没有太多的规范一般化实现方法.针对具有新增查询语义需求时,往往需要更改整个识别结构.本研究虽然也是基于树结构的识别模型,但着重考虑复杂事件流引擎的一般化实现方法.通过使用每个算子代表一次事件流的复合过程,使算子能够作为构建识别结构的基本单元,并通过定义F,Q,Z这3个函数实现对算子的完整描述.
除此之外,其他实现工作则主要是在功能上进行了扩展,包括ZStream[18],TelegraphCQ[19],Aurora[10]等. STREAM提出了CQL[18]语义,TelegraphCQ则根据PostgreSQL实现了引擎原型,Aurora则通过拖拽的方式实现了具有图形化的复杂事件处理引擎.综合而言,在复杂事件处理方面的研究工作已经较为全面,各种结构均有多种实现,但绝大部分都是仅针对特定的语义.NFAb模型虽然被证明了其具有完备性,但也没有考虑系统的扩展性问题.在性能优化上,优化算法也局限在性能调优的范畴,没有给出最优化算法.
2 事件模型复杂事件处理技术最终解决的是将用户对特定的事件序列的实时查询需求映射到某个一般化算法上的过程,因此,本节首先清晰地界定事件、事件流、事件的复合以及事件流的复合等基本概念.
2.1 事件与事件流事件原本用于描述状态的变化,在处理过程中被泛指为任意一组数据.系统往往把任何一组数据、一个网络上传播的数据包、一个类、一个xml文件,都认为是一个事件,用字母e表示.比如,在系统监控中,监控探针在某一时刻获取了某个机器的硬件当前状态,这个机器当前状态若附加了时间属性即可被认定为一个事件.在监控系统运行的过程中,探针会源源不断地从被监控系统中获取数据.因此,众多与该事件结构相同的数据会按照时间顺序不断到达监控中心,如同水管中的水不断流出.为了更好地表述这一特征,定义同类型的事件按照时间先后顺序排列的集合为一条事件流,用E表示.每个事件由3部分组成:起始时间、结束时间以及该事件所包含的其他数据.因此,定义一个事件e为
e=ástartTime,endTime,datañ (1)
其中,startTime与endTime表征该事件的起始以及结束时间,data表征该事件中其他属性.
而事件流E则表示为
E=átype,states {e}ñ (2)
其中,type表征事件流的类型,为一个字符串,对于任意事件流E1,E2,当且仅当E1.type=E2.type时,称E1,E2是两个相同的事件流;{e}为事件集合,表征事件流E中的全部事件,该集合往往是一个无穷的集合;states为一组key- Value对,用于表征事件流的一些状态特征.以后在不加说明的情况下,事件流均用大写字母表示,该事件流中的事件用与之相同的小写字母表示.
2.2 事件的复合运算在监控、股票、医疗等领域,系统可获得诸如“CPU变化”、“股价变动”、“病人心跳”变化等基本事件,但用户希望得到的往往是更加抽象的信息,如,“需要进行维护系统了”、“该在某时刻买入某股票”、“某病人需要呼吸机”等,这些事件往 往不是一个可以简单地获取的变量,而是由众多数据或者事件根据特定条件组合的结果.
因此,复杂事件处理技术允许用户编写相应的规则.该规则描述了各种类型的事件如何组合成更加抽象复合的事件.由于这些抽象的事件具有新的含义,因此也往往被认定为新的事件.即,事件的复合是将多个事件按照一定的规则进行组合,生成新事件的过程.例如,“系统被DDOS攻击导致服务不可访问”事件是一个相对抽象的事件,实际上,该事件由多个“DDOS攻击”事件以及“服务不可访问”事件组合形成;同时,“DDOS攻击”事件同样由“网络监控报告”、“防火墙提醒”、“操作系统日志”等多个基本事件组合而成.图 1就是将多个类型的事件进行组合以及抽象的过程.通过类似此类事件的层层抽象,系统可以直接向上层用户反映更加直观、具体的信息.用户通过特定的查询语句描述这些事件应当如何进行复合操作,并要求引擎返回符合条件的复合事件.
![]() | Fig. 1 Join process of the events图 1 事件的复合过程 |
事件的复合仅适用于描述一组特定事件的组合操作,但用户的查询需求则是一个持续不断的过程.例如,查询系统故障需要每当出现相应的特征后就产生警报,即需要持续不断地对输入的事件进行复合操作.因此引入事件流的复合的概念:将1个或多个事件流中的事件,持续不断地按照指定的方法进行复合,并持续地输出新事件的过程就是事件流的复合过程.新生成的事件构成了一条新的事件流,即,复合的结果流.
仍以查询“DDOS攻击导致服务不可访问”为例,由图 1的事件复合过程可知,用户需要查找一系列事件的组合.但是这些事件分别来自不同的组件,因此,我们可以简单地把每个组件输出的信息封装成一种事件类型,即,每个组件都输出一条事件流.用户的需求是,当出现“DDOS攻击导致服务不可访问”时就产生报警,因此,引擎需要将流入的事件按照查询规则持续地进行组合,只有满足查询的事件组合才能被输出,作为结果.即,产生了一条新的流.其整个过程如图 2所示.
![]() | Fig. 2 Implements of the operator S by function Q,F,Z图 2 算子S通过函数Q,F,Z进行实现的方式 |
不同于图 1的是,该系统不是仅仅将1组事件组合即可,而是持续不断地将每一组符合条件的事件全部组合.在具体的复合过程中,以一个简单的序列复合为例,查询A®B表示系统需要输出所有aÎAÙbÎBÙ a.starttime<b.starttime的事件 .实现该查询需要构建一个能够持续处理的结构.该结构不断输入属于A,B的事件,并将其组合.具体而言,该过程需要3个函数:
(1) 由于属于A,B事件类型的事件不断到达,且要求二者需要满足符号“®”所代表的特定规则: a.starttime<b.starttime.因此在每次复合操作之前,都需要先由一个函数判定即将进行组合的这些事件是否满足进行复合的条件,此处即为a.starttime<b.starttime,用事件匹配判定函数Q来表示.该函数输出布尔型结果,只有Q函数为真,才可以对这组事件组合进行复合操作.
(2) a,b事件如果通过了事件匹配判定函数,则可以进行事件的复合计算,但二者应当如何组成新的事件?新的事件属性应当如何决定?因此,使用事件复合计算函数F描述对满足匹配函数的事件进行事件组合操作过程.
(3) a,b事件复合之后,输出的事件是一种新的事件,这种事件的集合构成了一条新的事件流,通过使用流状态计算函数Z描述了新的事件流的相关属性.
更一般地来说,通过Q,F,Z可以描述事件流之间的一次复合过程.例如,A®B在构建A与B的复合过程中,输入的是复合运算符“®”,首先由流状态计算函数Z根据该符号确定复合之后新产生的事件流的属性、类型以及状态.在数据到达后,由匹配判定函数Q判定一组数据是否满足 该符号的需求:若满足,则交由复合计算函数F生成新的事件;否则,事件被抛弃.新生成的事件作为一条新的事件流输出,该流的属性则由Z决定.为描述方便,我们使用S=áZ,Q,Fñ描述一次事件流的复合过程,我们称S为一个事件流复合算子,简称算子.
2.4 算子S的形式化描述事件流的复合过程分为两部分,即构建过程和运行过程.构建过程规定了事件流中事件应当进行何种计算过程;而在运行过程中,事件流中的事件则按照规定的运算规则进行计算.假设事件流E1,E2,…,En通过算子S=áZ,Q,Fñ复合生成新的事件流Enew,则算子定义如下:
定义1(算子定义). 若Enew=Z(E1,E2,…,En),且对任意enewÎEnew,存在e1,e2,…,ekÎE1,E2,…,En,使得F(e1,e2,…,ek)=enew且Q(e1,e2,…,ek)=True,称事件流Enew由E1,E2,…,En复合而成,映射关系S=áZ,Q,Fñ即为事件流的复合函数,或称为一个算子.函数F为算子S的事件复合计算函数,函数Z为算子S的流状态计算函数,函数Q为算子S的匹配判定函数.按照Q,Z,F这3个函数的具体职能,3个函数的子函数定义见表 1.
![]() |
Table 1 Operator S and its three sub functions 表 1 算子S及其3个子函数 |
本节主要介绍如何通过用户的查询条件构建事件识别模型的过程.通过算子与用户查询的映射过程,说明用户的查询均可通过算子的方式进行描述;在构建识别模型的过程中,通过将算子实现成为树节点的方式,构成树形复杂事件处理模型.
3.1 用户查询中约束与其实现用户的查询以众多约束组合的形式呈现,随着应用需求日趋广泛,查询约束种类繁多,按照查询对构建系统的影响程度,可分为时序约束和谓词约束两类.时序约束是复杂事件处理中特有的一类约束,其包括序列约束、否约束(negative)、或者克林(Kleen)约束以及窗口约束(window)等,这些约束往往对事件的时间有一定要求;谓词约束则与数据库查询类似,包括比较、排序以及各种聚合函数等.
下面以一条系统监控中用于稳定性保障的查询语句为例进行说明.管理员可以通过该句侦测出“由于带宽不足而强行取消用户服务”的情况.其具体查询如下:当90%的用户操作响应时间超过100 ms之后,继而出现平均数据传输带宽小于128 KB/S的情况,而后出现用户取消请求,则输出相应事件集合.其描述如下:
select percentile(A.responseTime,90),avg(B.inTransferRate)
from pattern[(A=ClientPMs(appId=‘app1’)®(B=DataTransPMs(endpoint=‘zeus-1’))® (C=ClientCancel(appId=‘app1’)))].win:time(5min)
having percentile(A.responseTime,90)>100 or avg(B.inTransRate)<128
限于篇幅,有关于语法解析方面的内容将不再复述.针对上述查询,其包含的约束可按照如下方式分类:WE0为时序约束,其他的WE1,WE2,WE3为普通的谓词约束.输出的结果应该是全部满足如上约束的事件集合.下面对两类约束的实现分别进行描述:
输入的事件流:A(ClientPMs),B(DataTransPMs),C(ClientCancel)
约束: WE0:(A®B®C).win:time(5min)
WE1:A.appId=‘app1’
WE2:B.endpoint=‘zeus-1’
WE3:(pertile(A.responseTime,90)>100 or avg(B.inTransRate)<128)
3.1.1 时序约束时序约束用于描述被查询事件在时间上应满足的关系.目前,时序约束的种类有很多,不同的事件流查询引擎实现的约束方式也不尽相同.时序约束除了被单独使用以外,很多查询中还会联合使用多个时序约束.比如在上例中,约束WE0=(A®B®C).win:time(5min)是2次序列约束以及1次时间范围约束的组合.其寻找的是A,B,C这3种类型的事件先后发生并且其间的时间间隔小于5分钟.另外,这些事件同样应该满足其他约束,包括WE1,WE2和WE3.虽然没有明确指出,但时序约束之间都用“与”进行链接,即,要求事件同时满足所有的时序约束.
3.1.2 谓词约束类似于数据库处理中的where部分查询,谓词约束同样也是用户对事件的筛选条件.谓词约束往往被表示为由多个and以及or连接而成的布尔表达式,并使用事件中的多个属性进行聚合、比较等操作.我们将整个查询条件转化为多个WE通过与(and)关系连接,其目的是方便后面的数据处理.
我们定义谓词约束为如下形式:
WhereQuery::=WE {and WE}* (3)
WE::=BoolExpression {or BoolExpression}* (4)
其中,BoolExpression是一个最基本的布尔表达式,其可以是几个简单的比较表达式,如约束WE1:ppId=‘app1’,也可以是由聚合函数组成的比较表达式,例如WE3:avg(DataTransPerfMs.inTransRate)<128.
3.1.3 用户查询的算子描述时序约束与谓词约束事实上都是对事件属性的限定条件,但在实现上却有极大的区别:
· 首先,谓词约束是对事件除时间外的属性的约束,针对谓词约束的实现,早在数据库理论中就有诸多研究工作,算法实现效率高,能够进一步优化之处较少;时序约束是对一组事件的时间进行限定,而事件则基本都是按照时间先后顺序到达,因此,算法有较大的优化空间.例如,序列约束A®B要求A类型事件先于 B类型事件先到达,因此,系统只需在接收到每个A类型事件后,将其与所有在其后到达的B类型事件组合作为结果即可.而不像谓词约束的Join操作,需要对所有事件进行全匹配.因此,本部分重点研究时序约束的实现,而针对谓词约束,则采用公认的高效实现算法.
· 其次,时序约束会将原有的事件集合组合成新的事件,因此在计算顺序上存在一定的先后关系,但谓词约束中往往没有此类限定条件.例如上例中的时序约束WE0=(A®B®C).win:time(5min),其中,序列约束A®B®C可被看作事件流A与B复合后形成的新的事件流再与C流复合,即(A®B)®C;也可以看作是B与C复合后再与A复合的过程,即A®(B®C).虽然这两种计算方式最终都会获得同样的结果,但是不能存在除此之外的其他计算方式.且其最终结果都要通过事件范围约束win:time(5min),该约束限定的是A®B®C的结果,即ABC流复合的结果流.由于时序约束进行的是事件流的复合过程,因此考虑使用算子对时序约束进行描述.
图 2是通过算子S的3个子函数,即事件匹配判定函数Q、复合计算函数F以及流状态计算函数Z来实现时序约束的过程.通过更改算子S的3个子函数,可以对现有的时序约束进行充分的描述.为了验证其表述能力,表 1列出了使用算子S的方式描述NFAb中所使用 的时序约束.由于YanleiDiao等人已经证明NFAb模型的语义完备性[13],可推得通过算子描述的时序模型同样具有语义完备性,即,算子描述模型可支持任何现有时序语义.由于时序约束可以统一地通过算子S进行描述,上例中的时序约束WE0用中缀表达式可表示为
Swin(S®2(S®1(A,B),C)).
对谓词约束而言,谓词约束仅仅包含对事件中属性的要求,因此,谓词约束的实现并非新添加一个算子,而直接将谓词约束添加到某个已经存在的算子中即可.由于算子S的复合匹配函数Q用于验证数据是否满足算子的要求,所以可直接更改其Q函数为Qnew=QoldÙWE,保证了通过该算子的数据都是满足了谓词约束的数据.以第3.1节的用户查询为例,对 事件流A,B的复合过程A®B而言,系统需要构建一个算子S1完成该过程.谓词约束WE1,WE2,WE3均为对A,B流中的事件的约束,因此,系统生成一个算子S1实现“A®B”时赋予S1.Q=A.endtime< B.starttime,用于保障每个B事件都发生在A事件之后.在实现谓词约束时,需要添加约束WE1,WE2,WE3到算子S1中.按照前述方法,算子更改后的匹配函数改为S1.Qnew=A.endtime<B.starttim eÙWE1ÙWE2ÙWE3.此时,A,B事件流中的事件如果需要通过算子S,则必须满足S1.Qnew,从而保障了输出的事件组合既满足时序约束S1,也满足谓词约束WE1ÙWE2ÙWE3.
3.1.4 算子的扩展支持本节开始时列举了部分时序约束,但实际使用的时序约束并非只包括如上这些查询.在使用过程中,很多查询可能与这些查询类似,却在细微位置有部分更改.如下列出两例进行说明:
例1:在去除重复告警事件的过程中,可使用扩展克林约束对完全相同的告警事件进行过滤,但由于告警事件的数量不确定,不能直接使用扩展克林约束中的参数“n”,而应该要求该约束匹配任意数量连续地报警.此外,扩展克林约束会组合全部的同类型事件之后才组成一个新的事件并输出,即,告警信息会在所有重复告警全部到达后才被输出,此时与第1个告警信息到达时间相比已经延后了很长时间,在实际使用中,应该在第1个告 警事件到达时就进行报警,因此需要在扩展克林的原有语义上进行修改或扩展.
例2:在一般的序列约束A®B中,A,B事件会构成新的事件,且新事件以A事件的起始时间为开始时间、以B事件的结束时间为结束时间.但在股票价格分析过程中,其数学模型均以单个时间点作为事件触发条件,不存在时间段的概念,因此在金融领域,探测双峰波动等事件时,不能使用原有的序列约束,需要对序列约束的部分实现进行扩展或调整.
以上两例描述的仅仅是两个较小的改动需求.事实上,虽然第3.1.1节描述的几个约束组合已经被证明具有完备性,但由于实际使用中存在的差异,用户对时序约束的需要也在不断变化,需要对原有约束进行扩展和调整.而算子S的3个子函数Q,F,Z可对时序约束进行充分描述,因此,用户可直接更改S的3个子函数以实现新的时序约束.例1中的需求可通过在原有克林闭包算子的基础上修改其匹配判定函数Q进行实现.例2中对序列约束的需求可通过修改其复合计算函数F进行实现.这两个新实现的算子对应于两个新的时序约束.通过更改算子S的3个子函数,可对算子S表述的语义进行扩展.
3.2 树结构的算子模型通过单个算子S可实现单个的时序约束,但由于用户查询往往由多个时序约束组合而成,因此需要一个统一结构对事件进行处理.本节描述如何将实现单个时序约束的算子构建出统一的树形识别结构,注意,此时对于单个算子而言,其对树结构的影响只有其3个子函数Q,F,Z,因此无论是系统内置的时序算子还是用户自行添加的算子,在构建树结构的过程中都是完全相同的.
3.2.1 树模型运行样例分析由于复杂事件处理技术最终将多个基本事件组合成1个复合事件,是一个自底向上的汇聚过程.每个实现了时序约束的算子作为树的分支节点,而流入该引擎的事件类型名作为树的叶子节点.如图 3所示为针对查询Equery=Swin(S®2(S®1(A,B),C))构建的树形结构,输入系统的事件类型名A,B,C全部作为树的叶子节点,算子Swin,S®2,S®1作为树结构的分支节点.节点间通过边相连,若某个节点M以另一个节点N的输出事件流作为输入,则有一条从M节点到N节点的边.在运行阶段,分别属于A,B,C类型的事件a,b,c事件持续从A,B,C节点持续流入,沿树形结构的边向上流入到分支节点进行数据处理.流入的数据首先由匹配判定函数Q检测是否需要进行复合操作,Q函数判定无误后交由复合计算函数F将流入的事件复合成新事件,新事件的属性则有Z函数根据流入事件的属性确定.最终,从根节点流出的事件即为查询的结果.图 3演示了在运行阶段数据的流向过程,分别属于A,B的事件经由自己所属类型的叶节点流入引擎,沿树结构的边流入节点S1.该节点由序列约束实现,因此要求a事件发生在b事件之前.输入的事件a1,a2与b1,b2进行全匹配之后,生成4种组合,并由S1的Q函数判定.由图 3可知,事件发生的先后顺序为a1,b1,c1,c2,a2,b2,因为a2发生在b1之后,不满足该条件,因此a2b1的组合无法通过Q函数,而其他3组数据则通过该函数,按照S1的事件复合计算函数F生成3个新的事件组合:a1b1,a1b2a2b2.这3组事件组合成新的事件并向节点上方流动,到达S2.S2同样是序列算子,这3个新的事件与c1,c2进行全匹配后交由S2的Q,F,Z这3个函数进行处理.输出的结果同样按照树结构的边流向下一算子,如此往复.最终仅有1组事件符合全部的查询约束a1b1c1,即为查询结果.
![]() | Fig. 3 Operator-Based tree model implements图 3 基于算子的树结构模型实现 |
上节通过查询WE0=(A®B®C).win:time(5min)演示了树形识别结构的构造方法以及运行过程.更一般地来说,对于任意用户查询,最终通过语法解析转化为算子的表示形式Equery=S1S2S3…Sm(E1,E2,…,En),其中,E1~En是输入系统的事件流,S1~Sm是通过算子方式描述的时序约束.这些约束可能是现有的时序约束,也可以是用户自行扩展的时序约束.给定算子计算的先后顺序(i1,i2…imÎ[1,m]),存在唯一的识别树结构与之映射.
具体构造算法如下:
算法1. Construction and runtime algorithm of factor-based tree model.
1. Date Streams E¬{E1,E2,E3,…,En-1}
2. Factors S¬{S1,S2,S3,…,Sn}
3. User Query EC=S1S2S3…Sm(E1,E2,…,En)
4. Factor Compute Priority i1,i2,…,in-1Î[1,n-1]
//Tree building process
5. for jin {i1,i2,…,in-1}
6. Enew¬make tree node use factor Sjand related event stream Ej,Ej+1
7. Enew.property=Sj.Z(Ej,Ej+1)
8. Ej¬Enew
9. Ej+1¬Enew
10. //Runtime for each event processing in tree model
11. if S is father node of E and eÎE
12. if S.Q(e)=true
13. enew=S.F(e)
4 复杂事件处理模型性能分析与优化在复杂事件处理中,作为实时数据处理引擎,衡量效率的最重要标准为吞吐率,即,单位时间能够处理的事件数量.因此,本节着重于针对处理事件的速度进行分析,研究不同的算子组合结构对系统处理事件的速度造成的影响,并提出了最优的树形构造模型.
4.1 单个算子内部的时间消耗整个树结构都由多个算子构成,因此,首先需要衡量各个算子在处理事件的时间消耗,用字母C表示.因为考虑时间消耗,因此单位为ms.下面分别讨论算子S的3个子函数:Q,Z以及F对应的消耗函数CQ,CZ,CF.
4.1.1 基本单位定义首先定义几种基本操作的时间消耗,推导获得时间消耗以及部分匹配函数的通过概率定义,见表 2.
![]() |
Table 2 Meaning of the basic variables 表 2 基本变量定义 |
结合表 1中算子3个子函数的定义,每个算子处理一组数据的时间消耗应该为其3个子函数的时间消耗之和.因此,如果用CZ,CQ,CF表示算子S的3个子函数对一组数据的时间消耗,因为Q函数和Z函数对通过的每组数据都适用,但只有通过Q函数的数据才能进入F函数进行组合计算,假设通过Q函数的概率为PQ,则总消耗C可表示为
C=CQ+PQCF+CZ (5)
下面分别计算3个函数的时间消耗,限于篇幅,直接给出3个函数时间消耗如下:
Q函数的总时间消耗为Q函数的通过概率
其中,Qtype为类型判定函数.因为类型验证在识别树构建过程中完成,一个算子的输入一定满足其输入要求判定.因此,运行时此判定被忽略
QT为时间判定函数,除了Conjunction以及Disjunction之外,其他算子都需要进行一次时间的比较,
QA为属性判定函数,即,加载算子上的谓词约束.每个算子的QA中谓词约束条件各不相同,假设某算子有n个谓词约束条件,QA=WE1 and WE2 and …. and WEn,通过的概率分别为P1,P2,P3,…,Pn,每个的时间消耗为C1,C2,C3,…,Cn,则数据通过全部谓词约束的概率为
(6)
在约束匹配过程中,谓词约束仅需要对数据进行属性判断,因此谓词约束的时间消耗基本相同,即CW=C1= C2=…=Cn,其占用时间的期望最小值为
(7)
通过匹配函数的概率,即,通过概算子的概率为
(8)
特殊情况下,当概算子没有谓词约束条件时,即Pi=1,CW=0,带入得:CQ=n+2u,PQ=1/2.复合计算函数F的运行是在匹配函数全部运算完毕且全部通过之后,根据表 1,F由3个子函数FS,FE,FA构成,但因为3个函数较为简单,故直接给出
对于一组用于匹配的数据,消耗的时间对于匹配最复杂的节点为
(9)
通过的概率与公式(8)相同,其中,u,j,CW均为恒定值,Pj为各谓词约束通过的概率.当且仅当P1<P2<P3<…<Pn时,公式(9)中的第2项达到最小.由此得出:
优化策略1. 当一个算子存在多个约束条件时,应当先判断通过概率低的约束条件.
4.2 匹配树COST值计算
我们已经获得单个算子进行一次匹配所消耗的时间C,以及单个节点通过概率P.在整个识别树的运行过程中,每个算子构成一个中间节点,大量数据从叶节点流入,匹配成功后向上层父节点发送.每个中间节点Si耗的时间总量为匹配数据量Numi与每次匹配消耗的时间Ci之积,即NumixCi,节点的通过概率为Pi,该节点输出的数据量为该节点输入的数据量与通过概率之积NumixP.对于模式匹配查询EC=E1S1E2S2E3…Sn-1En,当算子表达式时,模型下总时间消耗为各个节点消耗时间之和.
(10)
其中,若存在Q1,Q2,…,QiÎ[K1,Ki]且分别为i,i+1,i+2,…,j-1或j,j+1,j+2,…,i-1,则Pij=PjxNumi;否则,.由定义知,P函数恒小于1,且C函数值基本恒定不变可知,显然,只有当
时,cost值达到最小.因此有:
优化策略2. 在分配谓词约束时,应当尽可能地分配到先结合的算子上.
4.3 基于COST模型的优化树构建策略由于即使对于同一个算子语义表达式EC=E1S1E2S2E3…Sn-1En,根据算子计算式SPattern的不同,其构造的树结构也各不相同.根据算子结合序列SP的定义,其构建方式共有(n-1)!种,现给出基于算子的匹配树最优构建策略如下.由于基于SP的树构建策略已经在第3节给出,因此此处仅给出如何搜索得到最优SP的策略:
算法2. Construction of most efficiency algorithm of factor-based tree model.
1. Date Streams E¬{E1,E2,E3,…,En-1}
2. Factors S¬{S1,S2,S3,…,Sn}
3. User Query EC=E1S1E2S2 E3…Sn-1En
4. WhereQuery=W1 and W2 and W3 and … and Wk
5. //Tree building process
6. for Wj in WhereQuery
7. for Ei in E
8. if W related to E
9.
10. for Si in {S} order by Qi ASC
11. SPattern+=Si
12. Make Tree use SPattern
5 性能评测本节将通过实验验证在第4节模型性能分析中获得的几个优化结论以及最优算法的正确性,而后通过自行开发的引擎Cesar与NFA结构的开源项目Esper进行性能比较.我们通过实验论证影响Cesar性能的主要因素,并优化其运行效率.所有实验全部使用同一套实验设备:IBM刀片服务器HS21,8核Intel(R) Xeon(R) CPU E5405@2.00 GHz CPU,8 G内存,132 GB SCSI硬盘,操作系统CentOS5.2,内核为 2.6.18-164.11.1.el5xen #1 SMP,JDK版本为1.6.0_01-b06.实验用例则采用分布式系统监控场景,事件中的数值分布采用中国国家网格CNGridEye中两个主节点(中国科学院计算机网络信息中心以及上海超级计算中心)2011年系统监控数据,其中包括集群内部网络使用情况、队列使用情况、主机运行性能、作业完成情况、请求应答信息以及应用运行状况这6类数据,依照如上分类封装为6类事件,总计约600 G条,21 TB.
5.1 性能优化对比由于采用统一的系统,因此计算量消耗与消耗时间基本成正比.在性能优化当中,我们主要对比不同的策略下,系统的总时间消耗.具体而言,时间消耗主要包括生成事件所用时间(t1)、获取属性所用时间(t2)、谓词判断所用时间(t3)这3部分.
5.1.1 数据量对优化策略的影响本实验验证输入事件的比例不同是否会对识别效率造成影响.首先验证在不存在谓词约束的情况下,树结构的构建策略.值得说明的是,在实际使用中,都会使用窗口算子对数据的到达事件进行约束,因此输入事件集合不会上升至无限大,因此,本实验中,A,B,C,D都输入1 000个事件,结果事件集在千万级.针对时序约束A®!B® C®D,其查询的是A{C}nD这种模式,并且要求A与C之间不能存在任何事件B.由于否算子“!”以及克林算子“*”为一元算子,所以必须先进行计算.基 于算子的识别树有5种(共6种,但其中有两种是等同的)构建方式,定义其名称为T1~T5,见表 3.并且,A,B,C,D这4种事件数据量相同,均为10 000条,在不添加其他谓词约束的情况下,系统将产生100004组中间结果.
![]() |
Table 3 Names of the match trees with different construction form 表 3 不同构造结构的识别树结构命名 |
T1~T5这5种时间消耗如图 4(b)所示,由于使用数据量较大且没有谓词约束,所以匹配事件较长,在实际使用情况中不会出现此类情况.
![]() | Fig. 4 Match efficiency influenced by different predicate constraints图 4 不同的谓词约束对识别效率的影响 |
由表 1可知,带有克林算子的事件通过概率也为序列算子的1/n(在此,根据概率计算,n约为5),以上问题即演化为PattenQuery=A®B®C®D,但A,B,C,D这4类事件的输入比例各不相同的问题.
根据第4节的结论,应当先计算通过概率低的算子.因为没有其他的谓词约束条件,4种算子同为序列算子,通过概率相同.因此,T1~T5这5种识别模式的事件消耗相近,最高效率与最低效率相差为21.7%.具体而言,消耗时间最多的是生成新事件消耗的时间,其次为谓词判断所用时间.在此,虽然已经没有人为添加谓词约束,但事实上,因为序列算子A®B本身就要求A.endtime<B.starttime,所以在事件复合的过程中,仍然需要进行一定的属性比较,这一部分耗时也加入到谓词判断当中.
5.1.2 谓词约束放置位置对效率的影响为了只验证谓词约束放置的不同位置给系统带来的效率差异,我们采用最普通的查询模式A®B®C®D.此时,加入谓词约束WE1~WE6,其具体信息见表 4,相关事件类型表示该约束使用的事件.如WE1=A.a1>B.b1,由于在生成a1以及b1的过程中应用同种构造函数,因此,WE1实现的概率为1/2.其他约束条件同样具有类似的性质.
![]() |
Table 4 Features of the basic predicate constraints 表 4 基本谓词约束特征 |
· 加载谓词约束的算子选择
谓词约束放置的位置将在很大程度上影响识别效率,因此,本实验验证谓词约束选择的不同位置的算子给识别效率带来的影响.任意添加谓词约束WE3,WE4,WE5,WE6中的3种,按照第4.3节给出的最优构建算法,对于PattenQuery=A®B®C®D=A,S1,B,S2,C,S3,D,无论使用哪种构造模式,在最简单的情况下,我们可以将全部约束记载在最后计算的算子上,因为概算子已经聚合了所有的事件,必然能够判断所有的约束,记为策略1.同时,也可以尽可能地将算子放置在较低的位置,即,一旦事件中包括了谓词约束的全部事件后,即对谓词约束进行判断,如,因为WE3仅包含事件AB,因此S1算子一旦进行计算,则必然包含AB事件,因此可以加载谓词约束WE3;同理,S2加载WE4,S2加载WE5,WE6,记为策略2.
对比两种策略下各个算子的时间消耗,如图 4(b)所示,策略2消耗的总时间明显要小于策略1.具体而言,如图 4(b)所示,在识别树T3模式下,策略1在S1,S2算子上消耗时间很少,由于前两个算子上没有约束条件,数据将全部两两匹配,达到N3的程度,由于谓词约束全部加载在S3上,因此要进行N3的匹配.相比之下,策略2在开始的两算子即进行数据筛选,在开始就由部分数据被筛选,因此虽然前两个算子时间消耗较多,但极大地减小了最后一个算子的数据量,从而达到整体最优.
更一般地来说,我们将T1~T5这5种构建模型在谓词约束WE1~WE6下分别进行测试,所获得的效率如图 5(a)所示,使用谓词约束下推都获得了明显的效率提升.提升效率最不明显的T4提高效率也在7.6倍左右,最高的T2缩短处理时间达到31倍.当然,该明显效果也与较多的谓词约束以及谓词约束通过概率较低有较大的影响.
![]() | Fig. 5 Verification of best strategy on building the tree图 5 最优树结构建策略验证 |
· 约束判断的先后顺序
在1个算子上仍然有可能存在多个谓词约束.本实验验证如何排列1个算子中多个谓词约束判断的计算先后才能达到高效的识别速度.考虑到约束条件中,WE1~WE3均为针对AB的约束条件采用AB事件各1 000 000个.为了避免其他操作耗时对系统带来的影响,仅针对PattenQuery=A®B以及约束条件,WE1~WE3.约束条件有6种放置方式,用K123表示依次判断WE1,WE2以及WE3,则这6种方式为K123,K132,K231,K213,K312,K321.6种方式的识别效率如图 5(b)所示,K213获得最少的处理时间,与模型分析的结论相同:当1个算子存在多个约束条件时,应当先判断通过概率低的约束条件.该问题仍然可以从数据量的角度进行解释,假设每次谓词约束判断所消耗的计算量相仿,则系统识别过程消耗的时间完全取决于数据量的多少.显然,只有先判断通过概率低的谓词约束,才能尽早地降低数据量,从而减少整体判断的次数,进而获得最小的时间消耗.
5.2 外部比较由于Cesar的可扩展性已得到验证,所以本实验主要对Cesar的处理速度进行评价,即在数据大量到达的前提下,单位时间内能够处理的输入事件数量,也称为吞吐率.其计算方法为:输入数据总量/处理时间.其中,该处理时间已经是去除了数据读入以及类型转化的时间.本实验将优化后的Cesar系统与其他复杂事件流处理引擎进行对比,选取的是同样由Java语言开发的,目前被广泛使用的开源事件流引擎Esper作为对比基准,二者使用同样的输入数据以及查询语句.
5.2.1 多算子识别结构性能评价在复杂事件处理引擎的使用过程中,更多的时候是使用多个约束相结合的查询条件.因此,本节对复杂的查询条件的识别效率进行验证,采用的输入查询条件选自另一篇使用复杂事件流引擎进行系统监控的工作[17]. Cesar和Esper在相同输入情况下的识别速度如图 6(a)所示,Cesar的识别效率均高于基于NFA实现的Esper.但考虑到Esper因为是开源实现,其内部对内置函数的支持较为丰富,且需要对各种类型输入进行兼容,因此实现起来可能比Cesar更为臃肿.
![]() | Fig. 6 Throughput comparisons between Esper with Cesar when face multiple time constraints图 6 当存在多个时序约束时,Ceser与Esper吞吐率的对比 |
但即使去除实现复杂度的影响,在一些特殊算子的识别过程中,Cesar的实现效率仍然明显高于Esper,例如加入了否约束“!”的查询.对比两条查询语句“A®B®C”以及“A®!B®C”的实现效率可以看出:在Esper的实现中,带有否算子的结构识别效率要略低于没有否算子的实现;而在Cesar中,二者的识别效率基本相同,且带有否算子的识别结构“A®!B®C”吞吐率还高于没带有否算子的查询“A®B®C”.因为Esper基于NFA实现,NFA模型对否算子和克林算子的实现时间复杂度较高,因此带有此类约束的实现效率较低.而在树模型中,此类约束同样被实现为一个具有Q,F,Z函数的算子,因此吞吐率基本上与其他查询约束类似.同时,由图 5(b)也可以看出:随着时序查询语句的增加,系统的吞吐率也在逐渐下降.且在时序约束增加的开始,吞吐率下降明显;而在时序约束已经具有一定数量后,再进一步增加时序约束则不会使吞吐率明显下降.其主要原因是:在时序约束较少时,符合查询的事件组合较多,而增加时序约束需要对所有符合查询条件的事件做进一步的处理,因此增加了处理时间,较大幅度地增加了吞吐率.而在已经具有一定数量的时序约束之后,符合查询条件的数据已经很少,增加约束也只需对较少的数据做进一步的筛选,因此仅增加极少的数据处理时间,对吞吐率影响减少.
5.2.2 附带自扩展算子的识别结构性能评价本节对用户自行扩展的算子识别效率进行评价,主要采用第3.1.4节提出的两个时序约束进行了扩展,并将其与原有算子的识别速度进行比较.例1中,在原有的克林算子上进行了扩展,更改其匹配判定函数Q以及符合计算函数F,因此在实验过程中主要用此算子的识别效率与原有的克林算子进行比较,结果如图 6(b)所示:随着数据量的增加,新扩展的算子吞吐率明显略低于原有的克林算子,但二者吞吐率随输入数据变化的趋势基本保持一致.其主要因是新算子修正了原有克林算子的Q函数以及F函数,与其原有的两函数相比,其计算量增大,因此每个到达的事件都会比原有克林算子进行更多的计算,导致吞吐率下降是正常的,但因为二者处理方式基本相同,因此二者波动变化类似.
图 6(b)同时也对比了第3.1.4节例2扩展的序列算子以及原有序列算子,因为扩展的序列算子仅修改了复合计算函数F为Fe=a.starttime,这种更改仅仅改变了生成新事件的结束时间,并没有增加任何计算量,因此可以看到:新扩展的算子与原有算子的计算量应该是完全相同的,二者吞吐率完全相同.由此可见:使用扩展的算子虽然有可能在一定程度上降低识别速度,但其识别效率还是由其定义的Q,F,Z这3函数的计算量决定.并不会因为增加扩展算子而显著影响整个识别结构的效率.
6 结论及下一步工作本文通过形式化定义事件以及事件流之间的复合关系,明确提出了算子的概念,并将复杂事件处理中的时序约束、谓词约束映射到算子相应的组件,将算子作为事件复合的基本单元,使算子能够在支持现有语义的同时,可以完善地支持新增语义.本工作通过算子的方式构建树结构事件识别模型,并对该模型的识别效率进行了评估和优化,提出了一套定量化的优化方案.最后,在性能评测中验证了评估模型中各条优化结论,与当前使用最广泛的开源事件流引擎Esper进行了对比.在下一步工作中,将进一步完善事件模型,在多种事件筛选策略、乱序事件处理等方面继续进行研究.
[1] | Eugster PTH, Felber PA, Guerraoui R, Kermarrec AMG. The many faces of publish/subscribe. ACM Computing Surveys (CSUR), 2003,35(2):114-131 . |
[2] | Mansouri-Samani M, Sloman M. GEM: A generalized event monitoring language for distributed systems. Distributed Systems Engineering, 1997,4(2):96-108 . |
[3] | Gruber RE, Krishnamurthy B, Panagos E. The architecture of the READY event notification service. In: Proc. of the 19th IEEE Int’l Conf. on Distributed Computing Systems Workshops on Electronic Commerce and Web-Based Applications/Middleware. IEEE, 1999 . |
[4] | Hong MS, Demers A, Gehrke J, Koch C, Riedewald M, White W. Massively multi-query join processing in publish/subscribe systems. In: Proc. of the SIGMOD 2007. 2007. |
[5] | Hu JF, Jin BH, Zhuo W, Chen HB, Zhang LF. Spatial event detection and optimization. Ruan Jian Xue Bao/Journal of Software, 2011,22:147-156 (in Chinese with English abstract). http://www.jos.org.cn/1000-9825/11035.htm |
[6] | Fu Y, Zhou MQ, Wang XS, Luan H. On-Line event detection from Web news stream. Ruan Jian Xue Bao/Journal of Software, 2010,21:363-372 (in Chinese with English abstract). http://www.jos.org.cn/1000-9825/10037.htm |
[7] | Balis B, Kowalewski B, Bubak M. Real-Time grid monitoring based on complex event processing. Future Generation Computer Systems, 2011,27(8):1103-1112 . |
[8] | Demerset A, Gehrke J, Panda B, Riedewald M, Sharma V, White W. Cayuga: A general purpose event monitoring system. In: Proc. of the CIDR 2007. 2007.412-422 . |
[9] | Dayal U, Blaustein B, Buchmann A, Chakravarthy U, Hsu M, Ledin R, McCarthy D, Rosenthal A, Sarin S, Carey MJ, Livny M, Jauhari R. The hipac project: Combining activedatabases and timing constraints. SIGMOD RECORD, 1988,17(1):51-70 . |
[10] | Abbadi D, Carney D, Cetintemel U, Cherniack M, Convey C, Lee S, Stonebraker M, Tatbul N, Zdonik S. Aurora: A new model and architecture for data stream management. In: Proc. of the Brown Computer Science (CS-02-10).2002 . |
[11] | Gyllstrom D, Wu E, Chae HJ, Diao YL, Stahlberg P, Anderson G. SASE: Complex event processing over streams. arXiv preprint cs/0612128, 2006. |
[12] | Diao YL, Immerman N, Gyllstrom D. SASE+: An agile language for kleene closure over event streams. Technical Report, Department of Computer Science, University of Massachusetts Amherst, 2007. |
[13] | Agrawal J, Diao YL, Gyllstrom D, Immerman N. Efficient pattern matching over event streams. In: Proc. of the SIGMOD 2008.2008 . |
[14] | Wu E, Diao YL, Rizvi S. High-Performance complex event processing over streams. In: Proc. of the SIGMOD 2006.2006 . |
[15] | Chandramouli B, Goldstein J. High-Performance dynamic pattern matching over disordered streams. Proc. of the VLDB Endowment, 2010,3(1-2):220-231 . |
[16] | Yuan M, Madden S. ZStream: A cost-based query processor for adaptively detecting composite events. In: Proc. of the SIGMOD 2009.2009 . |
[17] | Cheng SJ, Wang YJ, Meng Y, Cheng ZD, Luan ZZ, Qian DP. PMTree: An efficient pattern matching method for event stream processing. Journal of Computer Research and Development, 2012,49(11):2481-2493 (in Chinese with English abstract). |
[18] | Mei Y, Madden S. Zstream: A cost-based query processor for adaptively detecting composite events. In: Proc. of the 2009 ACM SIGMOD Int’l Conf. on Management of Data.ACM Press, 2009 . |
[19] | Chandrasekaran S, Cooper O, Deshpande A, Franklin MJ, Hellerstein JM, Hong W, Krishnamurthy S, Madden S, Reiss F, Shah MA. TelegraphCQ: Continuous dataflow processing. In: Proc. of the ACM SIGMOD Conf.2003 . |
[5] | 胡佳锋,金蓓弘,禚伟,陈海彪,张利锋.空间事件的检测及优化策略.软件学报,2011,22:147-156 (in Chinese with English abstract). http://www.jos.org.cn/1000-9825/11035.htm |
[6] | 付艳,周明全,王学松,栾华.面向互联网新闻的在线事件检测.软件学报,2010,21:363-372 (in Chinese with English abstract). http://www.jos.org.cn/1000-9825/10037.htm |
[17] | 程苏珺,王永剑,孟由,程振东,栾钟治,钱德沛.PMTree:一种高效的事件流模式匹配方法.计算机研究与发展,2012,49(11): 2481-2493. |