2. 山东大学 计算机科学与技术学院, 山东 济南 250101
2. School of Computer Science and Technology, Shandong University, Ji'nan 250101, China
随着移动终端和无线网络的广泛应用, 现实中许多应用面临大规模数据流的处理.例如, 社交网络中的微博数据, 电子商务领域的顾客浏览、交易数据, 城市交通管控系统的过车数据, 环境监测领域的传感器数据等都包含了大规模数据流.在海量数据流中, 通常隐藏着大量用户感兴趣的特定模式, 发现这些特定模式对于改善应用服务质量, 提升数据应用价值具有重要意义.多数据流中的频繁伴随模式是由Yu等人[1]首次提出来的, 现实中若干应用都可以抽象为频繁伴随模式的发现问题.本文的研究目标是设计分布式挖掘算法, 实时发现海量数据流中的频繁伴随模式.为便于描述, 下文将采用FCP来表达频繁伴随模式.
给定一个无界数据流的集合S={s1, s2, …, sn}和一个元素集合O={o1, o2, …, ok}, 如果集合O是一个FCP, 那么它将满足以下条件:(1)该集合中的元素在小于τ的时间段内出现在至少θ个数据流上; (2)该集合的元素在每一个数据流上出现的时间间隔不大于ξ.这里, τ、θ、ξ都是用户指定的参数[1].频繁伴随模式表达了一组对象在多个数据流中的紧密关系, 许多应用都可以归结为数据流上的频繁伴随模式发现问题[1].
伴随车辆发现.当前城市主要道路和路口安装了监控抓拍设备, 车辆每次经过时, 抓拍设备都会生成一条结构化的过车记录(vehicle passing record, 简称VPR).这样, 每个抓拍设备将产生一个由连续过车记录组成的数据流, 而一些伴随行驶的车辆将会出现在由多个抓拍设备所产生的数据流中.因此, 从过车记录中发现伴随车辆相当于从这些数据流中发现FCP.现实中, 及时发现伴随车辆对于预防和打击罪犯嫌疑人驾驶多部车辆协同作案、尾随受害人作案等案件具有重要意义.
基于微博的热点话题发现.在新浪微博等社交平台, 可以认为每个用户对应着一个由自己发布的微博组成的数据流.较短时间内, 在许多用户微博中频繁出现的词汇组合通常预示着一个新的热点话题的出现.那么从海量用户对应的数据流中挖掘频繁伴随出现的热点词汇组合其本质也是多数据流的FCP发现问题.
位置信息服务.在一些签到应用中(如Foursquare), 用户会在到达一个地点之后签到, 并向数据中心报告自己的位置.在这类应用中, 可以认为每个签到的地点对应着一个数据流, 而每个数据流是由用户的签到信息组成.在这些数据流中实时挖掘FCP, 能够发现短时间内共同到达某些签到地点的用户群, 而发现这些用户具有潜在的商业价值, 例如可以作为某些广告更好地推送对象.
以上事例所要解决的基本问题都是发现较短时间内出现在多个数据流上的FCP(车辆、人、关键词等).与多数据流频繁伴随模式发现较为相似的问题是数据流中的频繁模式发现问题, 但两者有明显区别.数据流中的频繁模式是指某一个元素集合在一个数据流的多个“事务(transaction)”中出现, 且出现的频率大于给定的阈值; 本文所研究的频繁伴随模式则是指一个元素集合在多个数据流中出现, 并且所在数据流的数目和出现的间隔需要满足指定的条件.由于问题本身不同, 因此, 已有数据流频繁模式挖掘算法无法直接用来解决本文所研究的问题, 原因将在下一节进行详细阐述.
目前, 对于多数据流FCP发现问题的研究还很少.文献[1]提出该问题的同时, 给出了CooMine挖掘算法.该算法的目的是通过减少内存消耗和索引维护代价快速发现多数据流中的FCP.值得注意的是, CooMine算法是针对单个计算节点设计的集中式挖掘算法, 而单个计算节点受到硬件资源的限制, 很难应对大规模数据流.为解决这一问题, 最直接的办法是采用多个计算节点, 令每个计算节点独立运行CooMine算法, 处理一部分数据流.该策略虽然能够处理大规模数据流, 但缺点是形成某些FCP的数据流可能被分在不同的计算节点, 导致部分结果丢失.另一策略是将CooMine算法在多个计算节点上并行化实现, 但是CooMine算法采用的Seg-tree树型索引结构很难有效地并行化部署到多个计算节点上.因此, 当在分布式计算环境下解决大规模数据流FCP发现问题时无法直接采用CooMine算法, 仍需设计高效的分布式挖掘算法.
与集中式算法相比, 设计分布式挖掘算法面临以下挑战.
(1) 由于每个计算节点只存储部分数据, 这给FCP的生成和比对带来很大困扰.例如, 对于一个分布在计算节点n1上的长度为k的FCP, 计算节点n1无法得知该FCP应和哪些计算节点的长度为k的FCP合并, 生成长度为(k+1)的FCP.
(2) 采用多个计算节点处理海量数据流时, 可能出现不同计算节点上数据负载不均的情况, 从而影响整体的处理效率.这是因为, 每个FCP的生成需要多个计算单元协同合作, 负载失衡时, 负载较轻的计算单元往往需要等待负载较重的计算单元的计算结果, 从而影响了整个系统的挖掘效率.
(3) 由于大规模数据流连续到达, 新到达的数据不断地和已有数据共同构成新的FCP, 这就要求挖掘算法必须能够随着数据流的连续到达实时发现新生成的FCP, 实现FCP的增量挖掘.
为应对以上挑战, 本文提出FCP分布式挖掘方法(FCP distributed mining approach, 简称FCP-DM).该方法的核心思想是采用基于服务器集群的分布式计算模式, 从大规模数据流中实时连续发现FCP.FCP-DM首先将每个连续到达的数据流划分成若干segment片段, 将问题转化为从不同数据流的segment中发现FCP; 然后基于Actor-Model计算模型构建多级分布式挖掘框架, 由多计算节点实现对不同数据流的segment片段的逐级并行处理; 最后根据Apriori算法的迭代思想, 设计不同层级的数据分发策略, 通过对segment的多层计算和比对, 最终得到FCP.
本文的主要贡献如下:
●提出FCP-DM分布式挖掘方法并给出相应的分布式多级计算框架, 能够实现对大规模数据流的并行处理, 通过多层迭代计算和比对发现所有的FCP.FCP-DM方法具备良好的可扩展性, 仅通过增加硬件资源就可实现处理能力的线性增长.
●解决了分布式环境下多数据流FCP挖掘所面临的负载迁移、多计算节点之间FCP分发策略以及面向连续数据流的FCP增量挖掘等问题.
●在开源流数据处理平台S4上实现了FCP-DM方法, 并且以山东省会城市济南某天所有卡口的过车记录为测试数据集进行大量实验, 充分验证了该算法的各项性能.
2 相关工作多数据流频繁伴随模式发现问题是由Yu等人首次提出来的[1], 并给出两种集中式挖掘算法:DIMine算法和CooMine算法.这两种算法通过设计不同的索引结构来提高挖掘效率并达到节省存储空间的目的.然而, 这两种算法都是基于单机计算环境而设计, 难以直接应用到分布式环境中, 可扩展性较差, 无法应对当前规模急剧增长的海量数据流.
已有工作中, 与多数据流FCP发现问题最为相似的是数据流上的频繁模式挖掘问题.数据流上的频繁模式(frequent pattern, 简称FP)[2]是指给定一个由连续的transaction组成的数据流(每个transaction包含多个元素), 如果指定的某段时间内该数据流共有n个transaction, 而一个元素集合在其中的m(m≤n)个transaction中出现并且
近年来, 流数据FP挖掘问题受到国内外学者的广泛关注[2-12].Manku和Motwani提出了粘性抽样和有损耗的计数算法来计算数据流元素集合的近似频率[4].Yu等人提出了FDPM算法[5], 该算法能够使用有界的内存挖掘数据流中的频繁元素集合.该算法是以假的消极结果为导向的, 即某些特定的频繁元素集合可能不会被发现.这两项工作都是基于landmark模型, 它们的目标都是挖掘数据流从开始到当前时间所有的频繁模式.此外, 它们并不保证发现所有频繁模式, 而是保证获得一个错误率小于指定参数的近似结果集.
另外一些方法[2, 5-12]是能够从数据流中获得当前精确的频繁模式集合.Chang等人[2]提出了一种挖掘算法, 该算法通过自适应地减小过期事务的影响从而从在线数据流中发现频繁模式.Leung和Khan等人[6]提出树型索引结构(DSTtree), 该索引能够从数据流中捕获重要信息从而精确地挖掘频繁模式.Mozafari等人则将数据流划分成滑动窗口, 提出versification这一新的计数概念, 并基于此概念设计了SWIM算法.该算法是一种精确算法, 其性能和扩展性能够根据窗口的大小进行调整[8].KARP等人提出了一个简单、精确的数据流上的频繁项集发现算法, 并证明了该算法的时间复杂度为线性且空间复杂度为1/θ, θ为用户指定的支持度[9].Chi等人[10]主要研究内存受限情况下的数据流上闭合频繁项集挖掘问题, 并给出Moment(maintaining closed frequent itemsets by incremental updates)算法对数据流的闭合频繁项集进行持续监测.Silva等人提出了Star FP Stream算法, 用以解决从多维数据模型所产生的海量星型数据模式中发现频繁模式[11].李海峰等人研究了数据流上频繁项集挖掘时所采用的滑动窗口模型, 提出了基于事务的可变窗口滑动模型, 并在此基础上提出了频繁项集的挖掘算法FIMoTS[12].
以上这些方法虽然能够从数据流中挖掘频繁模式, 但是无法被直接用来解决FCP挖掘问题.首先, FP和FCP的定义有着很大的不同.FP挖掘问题是关注一个元素集合在某个数据流上出现的频率, 而FCP挖掘问题则关注一个元素集合在数据流内部和多个数据流之间的伴随关系.此外, 上述方法主要针对单个数据流的FP挖掘问题, 而本文要解决的是多数据流FCP发现问题, 需要对多个数据流同时处理, 两者之间存在明显区别.
值得注意的是, 文献[13]提出了H-stream算法, 目标是发现多个数据流上的频繁模式.表面上看, H-stream算法要解决的问题似乎与本文的问题非常相似, 但事实上有很大不同.虽然H-stream算法的目标是挖掘在多个数据流出现的FP, 但本质上还是先查找每个数据流的FP, 然后筛选符合条件的FP.而本文中, FCP是由多个数据流共同生成, 因此必须同时处理多个数据流才能发现FCP.此外, H-stream是一种近似算法, 而本文的目标是求解精确结果.因此, H-stream算法并不能解决本文研究的问题.此外, 毛宇星等人虽然也提出一种多层关联规则挖掘方法[14], 但是分层的目的是对每层的数据进行聚类, 而本文所设计的多层挖掘模型的目的是充分发挥多计算节点在处理多数据流时的并行计算能力, 研究的侧重点有很大不同.
频繁情节挖掘(frequent episode mining)是在频繁模式挖掘基础上衍生出来的又一问题, 它是指从一些序列数据中发现有价值的或者用户感兴趣的模式序列[15, 16].目前, 已有若干工作研究多个行业(如通信、制造业、金融、生物信息)的频繁情节挖掘问题, 但是绝大部分工作都是采用离线方式挖掘频繁情节, 这些方法也难以用于实时发现多数据流的频繁伴随模式.虽然Ao等人提出了基于episode trie的在线频繁情节挖掘方法[15], 然而该方法所关注的频繁情节也是基于单个数据流定义的, 与本文研究的问题有很大区别.此外, 该方法是针对单机计算环境所设计, episode trie索引结构很难直接部署到分布式计算环境, 也难以解决本文所研究的问题.
当前, 也有一些学者开始研究大数据环境下的模式发现[17-20], 但是这类工作通常关注的是静态数据集上的模式发现问题, 而本文关注的是从海量动态数据流上发现特定模式.
3 问题定义为便于算法描述, 本文引入并使用文献[1]所给出的相关定义.
定义 1(数据流(data stream)). 数据流是一组连续的按时间顺序到达的元素序列.对于数据流中的任意元素oi(i为该元素在数据流中的序列号), 它的标识符和时间戳分别是idi和ti, 其中, ti表示该元素出现的时间.
定义 2(伴随模式(co-occurrence pattern, 简称CP)). 给定数据流si上的一个元素集合O={o1, o2, …, ok}, 如果
定义 3(频繁伴随模式(frequent co-occurrence pattern, 简称FCP)). 如果一个伴随模式CP在l个数据流中出现, l个数据流可以表示为集合S={s1, s2, …, sl}.如果该CP满足以下条件:
(1)
(2)
那么, 该CP就是一个FCP.其中,
定义 4(segment片段). 给定一个数据流si, 一个segment片段G(o1, o2, …, om)是数据流si的子序列, 并且G满足以下条件.
(1) |ti–tj|≤ξ, 这里, oi, oj是G中任意两个元素, ti, tj分别是oi, oj出现的时间;
(2) 数据流si中不存在子序列Gʹ, 使得Gʹ是一个segment片段并且G是Gʹ的一个严格的子序列.
图 1给出由多个元素构成的一个数据流, 假设每个元素出现的时间如下:ta=2, tc=5, td=13, tg=16, te=20, tb=26, 并假设ξ=10, 那么该图中{a, c, d}和{d, g, e}为两个segment.元素a是segment片段G0的第1个元素, 因为td–ta < ξ, tg–ta>ξ, 所以元素d是G0的最后一个元素.当以元素c为第1个元素构建segment时, 由于tg–tc>ξ, {c, d, g}无法构成segment.进一步地, 我们以元素d为第1个元素构建segment时, 便可得{c, d, e}是一个segment.
4 FCP-DM挖掘方法
引入segment之后, 每个数据流被划分成多个segment片段, 那么FCP-DM的目标就是从海量数据流的segment中发现FCP.根据CP和FCP的定义, 可以推断出一个数据流的某个segment中的任意CP都可能与其他数据流的segment中的CP形成FCP.因此, 为了得到精确结果, FCP-DM算法需要对所有数据流的segment所包含的CP进行比对.
4.1 FCP-DM算法FCP-DM算法中, 每一个数据流首先被划分成多个segment.由于segment分布在不同的计算节点上, 那么如何比对不同segment中的CP是一个首先要解决的问题.
在本文构建的分布式计算环境中, 一个物理节点可以运行若干个逻辑计算单元(processing element, 简称PE).FCP-DM算法的思想是以CP本身为key构建分布式哈希索引(DH-index), 每个CP对应DH-index的一个索引单元.DH-index中每个索引单元的value值包括对应CP的出现时间以及它所在的segment和数据流的相关信息.这种情况下, 不同segment中相同的CP被插入到DH-index时, 这些CP将被映射到同一个索引单元.当FCP-DM被部署到分布式计算环境时, 令物理节点的一个PE负责一个索引单元, 该索引单元每增加一条新的记录(即来自某个segment的CP), PE都会判断该索引单元对应的CP是否为FCP.某个CP一旦满足FCP定义的条件, FCP-DM会实时发现该FCP.
在图 2中, 给出分别属于数据流(s1, s3, s2)的3个segment(G0, G1, G2), 根据给定的segment, 得到所有长度为2的CP集合.根据图 2的CP2集合建立DH-index索引, 如图 3所示.FCP-DM算法对DH-index进行扫描, 可以得到全部长度为2的FCP(FCP2).例如, 设定θ=3, 那么FCP-DM只需对{o1, o2}进行判断.如果{o1, o2}出现在s1, s3, s2的时间间隔小于τ, 那么{o1, o2}就是一个FCP.FCP-DM算法在获得FCP2后, 便利用Apriori启发式思想[3], 逐步地由FCPk(k≥2)的集合推导出FCPk+1(k≥2)的集合.
实际应用中, 海量数据流将产生规模巨大的CP, 超出了单个计算节点的处理能力.为此, 本文将FCP-DM算法部署到分布式计算环境, 通过利用多计算节点的存储和计算资源, 采用并行计算和比对方式, 实现海量数据流中FCP的实时发现.下面介绍FCP-DM算法的分布式计算框架.
4.2 FCP-DM分布式计算框架本文所提出的FCP-DM分布式计算框架(FCP-DM distributed framework, 简称FCP-DMDF)采用Actor- Model分布式计算模型, FCP-DMDF中基本逻辑处理单元是PE(processing element), 每个物理节点可以运行任意多个PE.PE之间通过发送和接收事件(event)进行数据传输.一个Event被表示为〈type, key, value〉, type表示Event的类型, key和value分别表示Event的键值和内容.每个PE指定其所接收的Event的type和key值.FCP-DMDF根据PE所指定的type和key值为其分发Event.任意一个PE可以接受其他PE发送的Event, 也可以将本地计算结果以Event的形式发送给其他PE.对于一个新的Event, 如果已有的PE无法对其进行处理, 那么FCP-DMDF将自动构建一个新的PE处理该Event.
FCP-DMDF是一个多级框架, 主要包括数据接收层、数据分割层和算法实现层.图 4是FCP-DMDF框架示意图.
数据接收层的任务是接收持续到达的数据流.每个数据流由一个ReceivePE负责, ReceivePE的任务是接收数据流并随时间延续将每个数据流分割成若干segment, 然后将segment以SegmentEvent的形式发送至数据分割层的PartitionPE.
数据分割层的任务是生成每个segment所有长度为2的CP.该层中每个PartitionPE的任务是接收来自不同数据流的segment, 并计算每个segment包含的所有长度为2的CP.然后以CP本身为key值将每一个CP以CPEvent的形式分发至算法实现层的PE.在该层中, 每个PartitionPE负责一个或多个segment, 并能够根据负载对segment进行数据迁移, 第4.3.1节将详细讨论该问题.
算法实现层是FCP-DMDF的主体部分.该层包含多个Computing-Level, Computing-Level-i表示第i层Computing-Level, 负责发现长度为(i+1)的FCP.FCP-DM算法在产生长度为i的FCP时包含两步操作:CPi的聚合和FCPi的检测, 因此, 每层Computing-Level-i包含两类PE:CPi-PE和FCPi-PE.当i≥3时, 由FCPi–1集合得到FCPi集合的迭代计算步骤如下.
① CPi-PE将可能构成CPi的一组FCPi–1进行聚合, 生成待检测的CPi, 并将符合条件的CPi以CPEvent的形式发送至FCPi-PE.分布式环境下, 如何将可能构成CPi的一组FCPi–1由不同的PE汇聚至同一个CPi-PE是一个不小的挑战, 为此, 本文设计了基于〈key, value〉的FCP分发策略(该策略将在第4.3.2节详细加以讨论);
② FCPi-PE接收来自不同CPi-PE的CPi进行检测并判断其是否为FCPi, 然后将发现的FCPi以FCPEvent的形式发送至Computing-Level-(i+1)层的CPi+1-PE.
③ 重复步骤①和②, CPi+1-PE将接收的FCPi合并生成CPi+1, 然后由FCPi+1-PE检测CPi+1是否为FCPi+1.
根据上述步骤, 随着i的增长, 算法实现层能够通过逐步迭代的方式发现所有的FCP.
需要说明的是, 当i=2时, 由于CP2由PartitionPE直接生成, 因此, 每个CP2-PE接收到的是一组来自不同数据流的CP2, CP2-PE将该组CP2进行聚合后发送至FCP2-PE.由于现实应用中FCP的最大长度无法预先确定, 因此FCP-DM计算框架将根据FCP的长度自适应地增加或减少Computing-Level的层数, 从而发现给定数据流中所有FCP.
4.3 构建FCP-DMDF面临的挑战与解决方案本节主要研究分布式环境下多数据流FCP挖掘所面临的负载迁移、FCP分发策略以及面向连续数据流的FCP增量挖掘等挑战.
4.3.1 数据分割层的负载均衡策略数据分割层将生成每一个segment片段所有的长度为2的CP, 并将CP进行分发.实际应用中, 数据分割层的每个计算单元可能负责若干个segment, 如果数据分布不均匀, 将造成不同计算单元之间的负载失衡, 降低整个计算框架的挖掘效率.
为解决这一问题, 本文设计动态迁移策略来保证计算单元之间的负载均衡.为便于描述, 令PEr表示数据接收层的任意一个ReceivePE, 令
对于数据分割层的任意逻辑计算单元
(1) 对
(2)
(3)
图 5是负载迁移执行过程的示意图.其中, 图 5(a)表示初始状态下数据输入层的
① 表示
② 表示
③ PEr将后续到达的位于[wj, wk]的segment发送至
FCP-DM算法逐步地由FCPk的集合推导出FCPk+1的集合, 这在单机环境下不难实现.但在分布式环境下却面临新的问题.
(1) 由于数据分布在多个计算节点上, 单个计算节点无法确定哪些FCPk能够合并生成FCPk+1.
(2) 为解决第1个问题, 需要将能够合并生成FCPk+1的FCPk分发至同一个计算单元, 但是如何将这些FCPk分发至同一个计算单元则又是一个新的挑战.例如, {a, b, c}和{b, c, d}是两个长度为3的FCP, FCP-DM算法将会对{a, b, c}和{b, c, d}进行合并, 生成长度为4的CP{a, b, c, d}, 继而判断{a, b, c, d}是否为FCP4.分布式环境下, {a, b, c}和{b, c, d}可能分布在不同的计算节点n1和n2上, n1和n2无法直接获取对方的数据, 这为生成伴随模式{a, b, c, d}增加了很大难度.
为解决上述问题, 本文设计基于〈key, value〉的FCP分发策略.该策略首先令能够合并生成CPk+1的FCPk具有相同的key值, 然后令每个key值由FCP-DMDF中唯一的计算单元负责.此时, 根据key值与计算单元的映射关系, 可将具有相同key值的FCPk分发至同一个计算单元, 继而由该计算单元对这些FCPk进行聚合, 生成相应的CPk+1, 最后判断CPk+1是否为FCPk+1.
假设FCPk和FCPʹk分别表示两个不同的长度为k的FCP, FCPk+1表示一个长度为(k+1)的FCP.如果FCPk ⊂FCPk+1, FCPʹk⊂FCPk+1且FCPk∪FCPʹk=FCPk+1, 那么FCPk和FCPʹk至少含有(k-1)个相同元素.将FCPk的任意一个由(k-1)个不同元素构成的集合记作ek, 那么FCPk将包含
上述策略采用(
由于数据流持续到达且没有边界, 新的数据流到达之后, 可能与已有数据形成新的FCP, 因此需要对连续到达的数据流持续监控, 设计增量挖掘方法实时发现由新到达的数据流所形成的FCP.在增量挖掘方法中, 我们始终维护FCP-DMDF框架, 并将已处理的有效数据始终在内存中进行维护.新的数据流到达之后, 只需将这些数据与已有的有效数据进行比对, 即可发现新生成的FCP.
由于数据流的规模无限扩大, 导致无法在内存存储所有数据, 需要对过期数据进行删除.由于形成FCP的所有元素的时间间隔不大于τ, 如果某个元素的产生时间与当前时间的间隔大于τ, 那么该元素可以被安全删除, 原因是它不可能和其他元素共同构成新的FCP.从节省内存角度考虑, 则应该实时删除过期元素.然而, 如果令每个计算节点持续监视其维护的每条数据是否过期, 那么该操作将产生很大的计算代价.为使内存使用效率和计算代价两方面达到一个平衡, 本文设计了延迟删除策略.该策略中, 每个计算节点每隔Δt时间令其所有的逻辑计算单元执行一次对过期数据的删除操作.实验部分, 我们将Δt的默认值设置为τ.Δt值越大, 删除操作的计算代价越小, 但是内存消耗增大; Δt值越小, 内存消耗越少, 但删除操作代价增大.因此, 可以调整Δt的值来平衡内存使用效率和删除计算代价.实验部分将对Δt对于算法内存使用情况的影响进行测试.
由于FCP-DMDF始终在内存中维护有效数据, 那么对于一个最新到达的segment, 如果它能够和已有数据形成FCP, 那么只需要在已有数据的基础上处理该segment, 便可发现由新到达的segment与已有segment所形成的所有FCP, 从而实现多数据流FCP的连续增量挖掘.为解决这一问题, 本文设计了Incremental-mining算法并给出算法的伪代码.对于一个新生成的segment, Incremental-mining算法首先生成该segment的所有CP2, 之后对应的PE将CP2与已有数据进行比对, 得到FCP2.然后, 基于Apriori启发式思想, 由对应的PE对FCP2所能构成的FCP进行逐级并行检测, 从而得到该segment与已有数据形成的所有FCP.在Incremental-mining算法对segment的处理过程中, 仅有少量PE参与计算, 因此可节省计算资源.
Incremental-mining算法.
输入:segment G;
输出:G和已有segment新形成的所有FCP.
$1. ${\mathcal{H}_2} = \emptyset ;$$ |
2. PartitionPE接收G并生成G的所有CP2;
3. 将CP2发送至对应的CP2-PE;
4. CP2-PE将CP2聚合, 并将其发送至FCP2-PE;
5. FCP2-PE检测CP2是否为FCP2, 添加FCP2至
6. For (i=3;
$7. ${\mathcal{H}_i} = \emptyset ;$$ |
8. CPi-PE接收FCPi–1, 将FCPi–1与已有FCPi–1进行聚合, 构成CPi;
9. 发送CPi至FCPi-PE;
10. If (CPi是FCPi) //由FCPi-PE执行
11. 添加FCPi至集合
12. 发送FCPi至对应的CPi+1-PE;
13. End If
14. End For
图 6给出在FCP-DMDF上利用Incremental-mining算法发现FCP3的一个例子.在该例子中, 假设数据流S1至Si的segment已被处理, 此时数据流Si+1产生一个新的segment(Gi+1), 下面是利用Incremental-mining算法发现由新产生的segment所形成的FCP.首先由ReceivePE将每个数据流划分成不同的segment; 对于一个segment, PartitionPE生成其包含的所有CP2.CP2-PE将来自不同数据流的相同CP2发送至FCP2-PE, 然后由FCP2-PE判断某个CP2是否为FCP2.FCP2-PE根据第4.3.2节介绍的分发策略, 将得到的FCP2发送至对应的CP3-PE, 每个CP3-PE将收到的FCP2进行聚合, 生成CP3, 最后由FCP3-PE检测CP3是否为FCP3.
4.3.4 Incremental-mining算法时间复杂度分析
分布式环境下, Incremental-mining算法处理一个长度为m的segment片段G的时间复杂度的下界为
证明:Incremental-mining算法处理G时, 首先生成
下面讨论Incremental-mining算法最坏情况下的时间复杂度.当G能够形成FCPm时, Incremental-mining算法的计算时间最长.一般情况下, 只有部分伴随模式能够构成频繁伴随模式.不失一般性, 假设G的长度为i的子集是FCPi的概率为p(0 < p < 1), 那么G所构成的FCPi的数量为
虽然Incremental-mining算法最坏情况下的时间复杂度为
(1) 真实情况下p值非常小, 因此需要检测的FCPi数量远小于
(2) Incremental-mining算法运行在多个物理计算节点之上, 能够利用大量的PE实现对FCPi的并行检测, 将大大缩短计算时间.因此, 物理节点越多, 总的PE的数量越多(即z值越大), 计算时间越短, 这与实验部分中图 10所示结果一致.
5 实验首先, 本文基于Apache开源流数据处理平台S4[21]实现了FCP-DMDF.S4中的逻辑处理单元称为PE, 每个物理节点可以运行任意多个PE.基于S4平台实现FCP-DMDF时, FCP-DMDF中PE的功能可以由S4的PE来实现.S4中PE之间通过发送和接收Event进行数据传输.每个PE指定其所接收的Event的类型和key值.任意一个PE可以接受其他PE发送的Event, 也可以将本地计算结果以Event的形式发送给其他PE.对一个新产生的Event, 如果已有的PE根据Event的类型和key值都无法处理该Event, 那么S4将自动创建一个新的PE来处理该Event.由于S4的并行处理思想被当前大多数流数据分布式计算平台所采用, 因此, 本文所设计的FCP-DM分布式挖掘方法也易于部署到Storm、Spark Streaming等平台.
5.1 实验配置实验采用8台戴尔R210的服务器, 每台服务器配置主频2.4GHz的Intel处理器和8G内存, 服务器之间通过1Gbps带宽的以太网相连.分布式平台采用S4-0.6.0版本(http://incubator.apache.org/s4/download/).以山东省济南市2015年5月1日7:00~12:00产生的320万条过车记录(vehicle passing records, 简称VPR)为测试数据集, 从中挖掘频繁伴随车辆.该数据集中每条过车记录可以看作是一个四元组〈ri, li, mi, tmi〉, ri表示该记录的编号, li表示该车的车牌号, mi表示该记录所对应的监控卡口编号, tmi表示该记录产生的时间.整个测试集中过车记录是由不同卡口产生的, 如果将同一个卡口连续产生的过车记录看成一个数据流, 那么若干卡口就对应若干数据流.实验中要查找的伴随车辆是指很短时间内连续通过某卡口, 并在一段时间内以同样方式通过多个卡口的一组车辆.因此, 在测试数据集中发现伴随车辆就相当于从多个卡口产生的数据流中发现FCP.FCP定义中相关参数(ξ、τ和θ)将对FCP-DM算法的效率和挖掘结果产生影响, 因此, 每组实验均标明各个参数的取值.缺省情况下, ξ=60(s), θ=4, τ=2(h).每组实验均重复5次, 取5次结果的平均值作为最终实验结果.
5.2 实验内容 5.2.1 FCP-DM算法效率评估图 7将测试数据集模拟为523个数据流, 分别以20 000VPR/s、40 000VPR/s、60 000 VPR/s、80 000 VPR/s、100 000 VPR/s的速率发送至FCP-DMDF, 以测试系统的抗压性.实验结果表明, FCP-DM算法的最大处理能力约为60 000条/s.在该组实验中, 我们在内存中设置一个队列Q用以缓存到达的VPR.FCP-DM(t)表示FCP-DM处理测试数据集所需总的时间, FCP-DM(Q)表示队列Q缓存VPR的最大数量.在图 7中, 当数据到达速率小于60 000条/s时, 随着数据速率的提高, FCP-DM(t)明显下降.此时, 由于FCP-DM处理速率大于数据到达速率, 所以FCP-DM(Q)趋近于0.当速率达到60 000 VPR/s后, FCP-DM(t)趋于平稳, 表明算法是以最大处理能力运行的, FCP-DM(Q)开始明显增加.
图 8表明, Yu等人提出的CooMine算法的最大处理能力约为8 000VPR/s(单个计算节点)[1].通过图 7和图 8的比较可以发现, FCP-DM在8个计算节点上的处理能力几乎是CooMine算法的8倍.也就是说, FCP-DM在8个计算节点的处理效率几乎相当于在8个计算节点上分别运行CooMine算法的效率总和.这是因为, CooMine算法是针对单机环境设计的多数据流频繁伴随模式发现算法, 该算法为了节省内存(压缩segment)和减少索引维护代价, 设计和采用Seg-tree树型索引结构.本文设计的FCP-DM算法是针对分布式计算环境, 拥有较充裕的内存, 因此采用了分布式哈希索引.虽然CooMine算法对基于Seg-tree的数据查找操作进行了大量优化, 但是FCP-DM算法中基于哈希索引的查询操作(如查找元素、伴随模式等)要比CooMine算法中基于Seg-tree的查询操作具备更高的查询效率, 而哈希索引的内存使用效率要低于Seg-tree索引结构.因此, 在8台机器上运行FCP-DM算法的计算效率接近于在8个计算节点上单独运行的CooMine算法的计算效率之和.
为更加直观地比较FCP-DM算法和CooMine算法的性能, 分别令这两种算法处理相同规模的过车记录数据集, 并调整过车记录数据集的规模来比较两者的处理时间.该组实验是将整个数据集一次性地加载到系统中, 算法处理时间是指从处理第1条数据开始至最后一条数据处理完成的时间跨度.图 9所示的实验结果表明, FCP-DM算法(采用8个物理节点)的处理时间要明显小于CooMine算法的处理时间, 并且随着数据规模的扩大, FCP-DM算法处理时间的增长速率明显小于CooMine算法的时间增长速率.
图 10测试数据到达速率为40 000条/s时FCP-DM算法处理不同规模数据时的可扩展性.该组实验中, 分别采用2、4、6、8个计算节点处理不同规模的数据集.实验结果表明, 随着计算节点个数的增加, FCP-DM的处理时间明显下降.VPR数量越多, 处理时间的下降趋势越明显, 表明FCP-DM方法在处理大规模数据流时具备良好的可扩展性.
5.2.2 FCP-DM算法相关参数对算法性能影响的评价
图 11和图 12主要对本文所采取的负载迁移策略的性能进行验证, 主要测试单个PartitionPE所能承载的segment的最大数量β、数据到达速率以及数据集规模对于PartitionPE数量的影响.该组实验共采用4个物理计算节点.实验开始之前, 我们对测试数据集中的所有元素的标识进行遍历, 得到一个能够包含测试数据集中所有元素的标识范围[ws, we].实验开始时, 初始化一个PartitionPE, 令其key值范围为[ws, we], 也就是说, 所有初始的PartitionPE能够接收所有的segment.当初始PartitionPE接收但来不及处理的segment数量达到阈值β时, 该PartitionPE就会分裂, 这样就会产生多个PartitionPE.这里需要注意的是, PartitionPE一旦创建就不会被删除, 本组实验将记录处理过程中所有的PartitionPE的数量.
图 11的实验结果表明, 当β值一定时, 随着数据到达速率的增加, PartitionPE的个数明显增多, 这是因为数据到达越快, PartitionPE所积压的未处理的segment越多, 这样发生负载迁移的次数越多, 导致更多的PartitionPE产生.当数据到达速率一定时(例如30 000VPR/s时), 随着β值的增大, PartitionPE的数量逐渐减少.这是因为随着β值的增大, PartitionPE能够缓存的segment增多, 使得PartitionPE的数量减少.图 12中, 令FCP-DM处理不同规模的数据集并观察PartitionPE的数量.当数据到达速率和β值固定时, 我们发现数据集的规模对PartitionPE的数量没有显著影响, 这表明PartitionPE的数量只与数据到达速率和β值有关, 与待处理的数据集规模无关.
在延迟删除策略中, 调整参数Δt的值可以改变对过期数据的删除周期.图 13测试了参数Δt对于算法运行时内存使用状况的影响.实验策略是随机抽取100万条VPR作为测试数据集, 令数据发送速率为10 000VPR/s, 然后观察不同时刻的所有计算节点的内存使用状态.图 13的纵坐标表示所有计算节点运行FCP-DM算法时消耗的内存之和, 横坐标表示算法的运行时间.实验结果表明, 算法运行初期Δt取不同值时, 内存使用量均增大, 这是因为此时内存中没有过期数据, 致使内存累积的数据增加.当算法运行50s之后, 内存使用量出现明显波动, 这是因为算法每隔Δt时间都会对过期数据进行删除.总的来说, Δt取值越小, 平均的内存使用量也越少, 但是删除次数也相对增加.图 14测试了参数ξ对FCP-DM处理时间的影响.图 14所示实验结果表明, 算法处理时间随着ξ值的增大而增加.这是因为, ξ越大, segment长度越大, 此时所产生的CP数量也越大.由于FCP-DM需要对所有的CP进行处理, 因此, FCP-DM算法的处理时间随着CP数量的增大而明显增加.
FCP-DM算法的目标是准确发现给定数据集中所有的FCP, 即查全率和查准率应为100%.由于CooMine算法的目标也是准确发现给定数据流中所有的FCP[1], 其查全率和查准率均为100%.表 1比较了FCP-DM算法和CooMine算法在同一数据集上所发现的FCP数量.当参数θ和k取值一定时, 两种算法在该实验数据集上发现的FCP数量相同, 从而验证了FCP-DM算法的查全率和查准率均为100%.
5.2.3 FCP-DM挖掘结果评价
图 15和图 16主要测试数据规模和参数θ对FCP-DM算法挖掘结果的影响, 其中, k表示FCP的长度, ξ、τ和θ的含义见定义2和定义3.该组实验中, 一个FCP代表一组频繁伴随车辆.图 15所示实验结果表明, 当FCP定义中相关参数(ξ、τ、θ)的值固定时, 随着数据规模的扩大, FCP-DM发现的FCP数量逐渐增多; 当数据规模一定时, FCP的长度(k值)越大, FCP的数量越少, 也就是说, 用户指定一组频繁伴随车辆中的车辆数目越大, 现实中这样的车辆组合越少, 越符合我们的直观认识.FCP定义中参数θ将对FCP的数量产生较大影响, 图 16所示实验结果表明, 随着θ值的增大, FCP数量变小, 也就是说, 如果用户要求一组FCP伴随出现的数据流越多, 那么这样的FCP数量越少, 实验结果同样符合预期.目前, FCP-DM算法已被应用至山东省某地市智能交通综合管控平台, 用于快速发现频繁伴随车辆.图 17和图 18给出了FCP-DM算法在该交通管控平台发现频繁伴随车辆的部分结果展示.其中, 地图中的红色箭头是车辆伴随行驶方向, 图片右侧是发现的频繁伴随车辆组合.
6 总结
本文研究了海量多数据流FCP挖掘问题, 提出了FCP-DM——一个可部署在分布式流数据处理平台的分布式挖掘方法.在该方法中, 本文重点研究分布式环境下多计算节点协同挖掘大规模数据流所形成的FCP时面临的负载均衡问题、数据分布式存储背景下的FCP生成问题以及连续数据流的FCP增量挖掘问题, 最后通过大量实验对FCP-DM方法的各项性能和挖掘结果进行充分验证, 并给出了算法在实际应用中的效果展示.后续工作将继续研究如何对发现的大量FCP进行排序优化, 从而将更符合用户偏好的FCP排在挖掘结果的前面.
[1] |
Yu ZQ, Yu XH, Liu Y, Li WZ, Pei J. Mining frequent co-occurrence patterns across multiple data streams. In: Proc. of the 19th Int'l Conf. on Extending Database Technology. Brussels, 2015. 73-84. http://59.80.44.48/www.cs.sfu.ca/~jpei/publications/Co-occurrence%20patterns%20EDBT15.pdf
|
[2] |
Chang JH, Lee WS. Finding recent frequent itemsets adaptively over online data streams. In: Proc. of the 9th SIGKDD Int'l Conf. on Knowledge Discovery and Data Mining. Washington, 2003. 487-492. https://dl.acm.org/citation.cfm?id=956807
|
[3] |
Agrawal R, Srikant R. Fast algorithms for mining association rules. In: Proc. of the 20th Int'l Conf. on Very Large Data Bases. Santiago de Chile, 1994, 1215: 487-499.
|
[4] |
Manku GS, Motwani R. Approximate frequency counts over data streams. In: Proc. of the 28th Int'l Conf. on Very Large Data Bases. Hong Kong, 2002. 346-357. https://www.researchgate.net/publication/242445578_Approximate_frequency_counts_over_data_streams
|
[5] |
Yu JX, Chong Z, Lu H, Zhou A. False positive or false negative: Mining frequent itemsets from high speed transactional data streams. In: Proc. of the 30th Int'l Conf. on Very Large Data Bases. Toronto, 2004. 204-215. https://www.sciencedirect.com/science/article/pii/B9780120884698500218
|
[6] |
Leung CS, Khan QI. Dstree: A tree structure for the mining of frequent sets from data streams. In: Proc. of the 2006 IEEE Int'l Conf. on Data Mining. Hong Kong, 2006. 928-932. https://www.researchgate.net/publication/220765869_DSTree_A_Tree_Structure_for_the_Mining_of_Frequent_Sets_from_Data_Streams
|
[7] |
Li J, Maier D, Tufte K, et al. No pane, no gain: Efficient evaluation of sliding-window aggregates over data streams. In: Proc. of the 11th ACM SIGKDD Int'l Conf. on Knowledge Discovery and Data Mining. Baltimore, 2005. 39-44. https://www.researchgate.net/publication/220416232_No_pane_no_gain_Efficient_evaluation_of_sliding-window_aggregates_over_data_streams
|
[8] |
Mozafari B, Thakkar H, Zaniolo C. Verifying and mining frequent patterns from large windows over data streams. In: Proc. of the 29th Int'l Conf. on Data Engineering. Cancun, 2008. 179-188. https://www.researchgate.net/publication/4331010_Verifying_and_Mining_Frequent_Patterns_from_Large_Windows_over_Data_Streams
|
[9] |
Karp RM, Papadimitriou CH, Shenker S. A simple algorithm for finding frequent elements in streams and bags. ACM Trans. on Database Systems, 2003, 28(1): 51-55.
[doi:10.1145/762471] |
[10] |
Chi Y, Wang H, Yu PS, Muntz R. Moment: Maintaining closed frequent itemsets over a stream sliding window. In: Proc. of the Int'l Conf. on Data Mining. Brighton, 2004. 59-66.
|
[11] |
Silva A, Antunes C. Multi-relational pattern mining over data streams. Data Mining and Knowledge Discovery, 2015, 29(6): 1783-1814.
[doi:10.1007/s10618-014-0394-6] |
[12] |
Li HF, Zhang N, Zhu JM, Cao HH. Frequent itemset mining over time-sensitive streams. Chinese Journal of Computers, 2012, 35(11): 2283-2293(in Chinese with English abstract).
http://d.old.wanfangdata.com.cn/Periodical/jsjxb201211007 |
[13] |
Guo J, Zhang P, Tan J. Mining frequent patterns across multiple data streams. In: Proc. of the 20th ACM Conf. on Information and Knowledge Management. Glasgow, 2011. 2325-2328. https://www.researchgate.net/publication/221614236_Mining_frequent_patterns_across_multiple_data_streams
|
[14] |
Mao YX, Chen TB, Shi BL. Efficient method for mining multiple-level and generalized association rules. Ruan Jian Xue Bao/Journal of Software, 2011, 22(12): 2965-2980(in Chinese with English abstract).
[doi:10.3724/SP.J.1001.2011.03907] |
[15] |
Ao X, Luo P, Li C, et al. Online frequent episode mining. In: Proc. of the 31st Int'l Conf. on Data Engineering. Seoul, 2015. 891-902. https://www.researchgate.net/publication/283105915_Online_Frequent_Episode_Mining?ev=prf_high
|
[16] |
Patnaik D, Laxman S, Chandramouli B, Ramakrishnan N. Efficient episode mining of dynamic event streams. In: Proc. of the 12th IEEE Int'l Conf. on Data Mining. Brussels, 2012. 605-614. https://www.researchgate.net/publication/260188436_Efficient_Episode_Mining_of_Dynamic_Event_Streams
|
[17] |
Vanchinathan HP, Marfurt A, Robelin CA. Discovering valuable items from massive data. In: Proc. of the 21st ACM SIGKDD Int'l Conf. on Knowledge Discovery and Data Mining. Melbourne, 2015. 1195-1204.
|
[18] |
Saber S, Reza A, Florent M. Fast parallel mining of maximally informative k-itemsets in big data. In: Proc. of the 15th IEEE Int'l Conf. on Data Mining. 2015. 350-368. https://www.researchgate.net/publication/281529944_Fast_Parallel_Mining_of_Maximally_Informative_k-Itemsets_in_Big_Data
|
[19] |
Wang L, Yang B, Chen YH, Zhang XQ, Orchard JF. Improving neural-network classifiers using nearest neighbor partitioning. IEEE Trans. on Neural Networks and Learning Systems, 2017, 28(10): 2255-2267.
[doi:10.1109/TNNLS.2016.2580570] |
[20] |
Han SY, Chen YH, Tang GY. Fault diagnosis and fault-tolerant tracking control for discrete-time systems with faults and delays in actuator and measurement. Journal of the Franklin Institute, 2017, 354(12): 4719-4738.
[doi:10.1016/j.jfranklin.2017.05.027] |
[21] |
Neumeyer L, Robbins B, Nair A, Kesari A. S4: Distributed stream computing platform. In: Proc. of the 2010 IEEE Int'l Conf. on Data Mining Workshops. Washington, 2010. 170-177. https://www.researchgate.net/publication/220766108_S4_Distributed_Stream_Computing_Platform
|
[12] |
李海峰, 章宁, 朱建明, 曹怀虎. 时间敏感数据流上的频繁项集挖掘算法. 计算机学报, 2012, 35(11): 2283-2293.
http://d.old.wanfangdata.com.cn/Periodical/jsjxb201211007 |
[14] |
毛宇星, 陈彤兵, 施伯乐. 一种高效的多层和概化关联规则挖掘方法. 软件学报, 2011, 22(12): 2965-2980.
[doi:10.3724/SP.J.1001.2011.03907] |