Survey Report on Data Skew in Big Data

1 Introduction

信息时代产生了大量的数据,运用和使用数据已经成为一个公司乃至一个国家核心实力的重要组成部分。当代大数据一般指的是:数据量巨大,需要运用新处理模式才能具有更强的决策力、洞察力和流程优化能力的海量、高增长和多样化的信息资产。大数据的特征有四个层面:第一:数据量巨大,从TB级别,跃升到PB级别;第二,数据类型繁多,包括网络日志,视频,图片和地理信息等;第三,价值密度低,商业价值高,以视频为例,在连续不间断的监控过程中,可能有用的数据仅仅只有一两秒;第四,处理速度快。也就是4V——Volumn、Varity、Value和Velocity[1]。 大数据对系统提出了很多极限要求,不论是存储、传输还是计算,现有计算技术都难以满足大数据的需求,因此整个IT架构的革命性重构势在必行,存储能力的增长远远赶不上数据的增长,设计合理的封层存储架构已经成为了信息系统的关键[2]。分布式存储架构不仅要求scale up式的可扩展性,也需要scale out式的可扩展性,因此大数据处理离不开云计算技术。云计算可为大数据提供弹性可扩展的基础设施支撑环境以及数据服务的高效模式,大数据则为云计算提供了新的商业价值[3]。 为了应对大数据带来的困难和挑战,以Google, Microsoft, Facebook为代表的互联网或传统软件公司不断推出新的技术和大数据处理系统。借助于巧妙设计的处理系统,机器学习,可视化技术等大数据分析技术近几年也在不断涌现。 云计算的关键技术包括分布式并行计算、分布式存储以及分布式数据管理技术,而Hadoop就是一个实现了Google云计算系统的开源平台[4],包括并行计算模型MapReduce[5],分布式文件系统HDFS[6]以及分布式数据库Hbase,同时Hadoop的相关项目也很丰富,包括Zookeeper,Pig,Chukwa,Hive,Mahout等等。 然而,在计算的时候,由于数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算或者由于节点的性能等因素,这些数据的计算速度远远低于平均计算速度,从而导致整个计算过程过慢,这种现象称之为数据偏斜[7]。数据偏斜对大数据计算框架性能的影响巨大,而且数据偏斜在现实世界中普遍存在,比如PageRank任务中存在的高访问量网站,社交网络中高热度节点。因此,解决数据偏斜是降低大数据处理时间的关键之一。 由于云计算能够将作业自动并行处理为多个短任务,并能够透明地处理在分布式环境中执行任务带来的挑战,云计算技术带来了广泛的欢迎。一个基本挑战是所有云计算框架(如MapReduce,Dryad和Spark)面临的落伍(stragging)任务。落伍任务(stragger)是指比其他任务运行速度慢得多的任务,并且由于任务必须在上游任务完成时才执行,所以落伍任务会延迟整个工作的完成[8]。落伍任务特别影响小型工作,即仅仅由几项任务组成的工作。这类工作通常可以一次完成所有任务。因此,即使单个任务很慢,或不稳定,整个工作也会显著延迟。小型工作普遍存在,数据中心运营商的经验表明,这些小型工作通常用于执行交互式和探索性分析。实现此类工作的低延迟对于数据分析师有效探索数据至关重要。为了获得较低的延迟,分析师已经将他们的查询限制在小而精的数据集中,这也就导致大多工作只包含一些短任务。这种探索性分析的趋势从Facebook的Hadoop生产群集和Microsoft Bing的Dryad群集的分析中显而易见。超过80%的Hadoop工作和超过60%的Dryad工作都很小,只有不到10个任务。实现这些小型互动作业的低延迟是数据中心运营商关注的首要问题。落伍任务(stagger)问题已经引起了相当大的关注,正在开发一系列偏斜缓解技术以解决该问题。这些技术大致可以分为两类:黑名单和推测性执行。然而,实验显示,即使在应用最先进的黑名单和推测性执行技术之后,含有数据偏斜问题的任务比正常任务慢八倍。因此,落后任务仍然是小型工作的问题。 这两种方法有其固有的局限性。黑名单标识机器运行状况不佳(例如由于磁盘故障),并避免在其上安排任务[9]。事实上,Facebook和Bing集群大约有10%的机器黑名单。然而,由于IO争用,定期维护操作和后台服务的干扰以及硬件行为等复杂原因,未列入黑名单的机器上会出现任务落伍现象。出于这个原因,投机性执行被用来处理落后节点。 推测性执行观察任务的进度并启动较慢任务的克隆[10]。然而,推测性执行技术在处理小型工作时有一个根本的限制:任何有意义的观测都需要等待收集具有统计意义的任务进度样本,这种等待限制了他们在处理小型工作中落伍节点时的敏捷性,因为他们经常同时开始他们的所有任务。如果某些任务在执行完成后开始出现混乱,则问题会加剧。在这一点上产生一个推测性副本可能为时太晚而无法提供帮助。

2 Background
2.1 大数据概况

在过去的20年中,数据在各个领域都有了大规模的增长。根据国际数据公司(IDC)的报告,2011年,全球创建和复制的数据量为1.8ZB(≈1021B),在五年内增加近9倍。这个数字在不久的将来至少每两年翻一番。在全球数据爆炸性增长的情况下,大数据这个术语主要用来代表大数据集。与传统数据集相比,大数据通常包括大量非结构化数据,需要更多的实时分析。此外,大数据也为发现新的价值提供了新的机会,有助于我们深入了解隐藏的价值观,同时也带来了新的挑战,例如如何有效地组织和管理这些数据集。最近,行业开始关注大数据的巨大潜力,许多政府机构宣布了加速大数据研究和应用的主要计划。此外,大数据方面的问题经常在“经济学人”,纽约时报和国家公共广播电台等公共媒体中报道。两大首屈一指的科学期刊Nature和Science也开设了专栏来讨论大数据的挑战和影响[11]。大数据时代已经不容置疑。例如,谷歌处理数百PB的数据,Facebook生成10PB以上的日志数据,百度每天处理数十PB的数据,淘宝是阿里巴巴的子公司,每天因为在线交易产生数十兆字节(TB)的数据。我们可以从广泛分布的数据源中获得了大量的数据,并将大量数据整合在一起。而云计算和物联网(物联网)则进一步促进了数据的急剧增长。云计算为数据资产提供了保护、访问站点和通道。现代应用中,世界各地的传感器都在收集和传输数据,以便在云中存储和处理数据。在这些数据将远远超过IT体系结构和现有企业的基础设施的能力。越来越多的数据导致了庞大的异构数据集的存储和分析问题[12]。如何可视化和预测的过程中有效地“挖掘”数据,从而揭示其内在属性并改进决策过程,是一个具有挑战性的任务。

2.2 大数据发展

20世纪70年代末,“数据库”的概念出现了,这是一种专门用于存储和分析数据的技术。随着数据量的增加,单一主机计算机系统的存储和处理能力不足。在20世纪80年代,人们提出了一个并行的数据库系统,以满足不断增长的数据量的需求。没有任何系统架构是基于集群的使用,而每台机器都有自己的处理器、存储和磁盘。Teradata系统是第一个成功的商业并行数据库系统。这样的数据库最近变得非常流行。在1986年6月2日,当Teradata交付第一个并行数据库系统时,一个里程碑事件发生了,它的存储容量为1 TB,以帮助北美的大型零售公司扩展其数据仓库[13]。在20世纪90年代末,并行数据库的优势在数据库领域得到了广泛的认可。然而,在大数据上出现了许多挑战。随着网络服务的发展,索引和查询内容的快速增长。因此,搜索引擎公司不得不面对处理如此大数据的挑战。谷歌提出了GFS和MapReduce编程模型,以应对数据管理和互联网规模分析带来的挑战。此外,用户、传感器和其他无处不在的数据源所生成的内容也会影响到体量巨大的数据流,这需要对架构和大型数据处理机制进行基本的改变。2007年1月,数据库软件的先驱吉姆格雷将这种转变称为“第四范式”[14]。他还认为,应对这种模式的唯一方法是开发新一代的计算工具来管理、可视化和分析大量数据。2011年6月,又发生了另一个里程碑事件:emc/idc发布了一份题为《从混沌中提取价值》的研究报告,该报告首次介绍了大数据的概念和潜力。这一研究报告引发了业界和学术界对大数据的极大兴趣。在过去的几年中,几乎所有的大公司,包括EMC、甲骨文、IBM、微软、谷歌、亚马逊和Facebook等都已经开始了他们的大数据项目。以IBM为例,自2005年以来,IBM已经在30项与大数据相关的收购上投资了160亿美元。在学术界,大数据也成为人们关注的焦点。2008年,自然杂志发布了一个大数据特刊。2011年,科学还对大数据中“数据处理”的关键技术提出了一个特殊的问题。2012年,欧洲信息与数学研究联盟(ERCIM)发布了一份关于bigdata的特殊问题。在2012年初,“大数据”——bigdata——达沃斯论坛在瑞士举行,宣布大数据就像货币或黄金一样已经成为一种新的经济资产。许多国家政府,如美国,也非常关注大数据。2012年3月,奥巴马政府宣布投资2亿美元,启动“大数据研究与发展计划”,这是1993年“信息高速公路”倡议后的第二个重大科技发展倡议。2012年7月,日本内政部和通信部发布的“充满活力的信息通信技术日本”项目表明,大数据开发应该是一项国家战略,应用技术应该是重点。2012年7月,联合国发布了大数据发展报告,该报告总结了政府如何利用大数据更好地服务和保护人民。

2.3 大数据的挑战

大数据时代急剧增加的数据流给数据采集、存储、管理和分析带来了巨大的挑战。传统的数据管理和分析系统是基于关系数据库管理系统(RDBMS)的。然而,这种RDBMS只适用于结构化数据,而不是半结构化或非结构化数据。此外,RDBMS越来越多地使用越来越昂贵的硬件。很明显,传统的RDBMS无法处理大数据的巨大容量和异构性。关于庞大数据流的存储分析的研究成为迫在眉睫的任务,例如,云计算被用来满足大数据基础设施的需求。对于大规模无序数据集的永久存储和管理的解决方案,分布式文件系统和nosql数据库都是不错的选择。此外,部署大数据分析系统并非易事。主要的挑战如下:(1)数据表示:许多数据集具有一定程度的结构、语义、组织、粒度和可访问性方面的不同。数据表示旨在使数据对计算机分析和用户解释更有意义。然而,不适当的数据表示将降低原始数据的价值,甚至可能阻碍有效的数据分析。有效的数据表示应能够反映数据的结构和类型,以便在不同的数据集上实现高效的操作。(2)减少冗余和数据压缩:一般来说,数据集有很高的冗余。减少冗余和数据压缩对于减少整个系统的间接成本是有效的,前提是数据的潜在价值不受影响。例如,传感器网络生成的大多数数据都是高度冗余的,可以按数量级进行过滤和压缩。(3)数据生命周期管理:与存储系统相对缓慢的进展相比,无处不在的传感和计算正在以前所未有的速度和规模生成数据。我们面临着许多紧迫的挑战,其中之一就是当前的存储系统无法支持如此大规模的数据。一般来说,隐藏在大数据中的价值依赖于数据的时效性。因此,应开发与分析值相关的数据重要性原则,以决定应存储哪些数据,哪些数据应被丢弃。大数据分析系统应在有限的时间内处理大量的异构数据[15]。然而,传统的RDBMS是严格设计的,缺乏可伸缩性和可扩展性,无法满足性能要求。非关系数据库在处理非结构化数据方面显示了它们的独特优势开始成为大数据分析的主流。即便如此,在它们的性能和特定的应用程序中仍然存在一些非关系数据库的问题。我们将在关系型数据库和非关系数据库之间找到一个折衷的解决方案。一些企业使用了混合数据库架构,集成了这两种数据库的优点(例如Facebook和淘宝)。(4)数据机密性:由于容量有限,目前大多数大型数据服务提供商或所有者无法有效地维护和分析如此庞大的数据集。他们必须依靠专业人员或工具来分析这些数据,这增加了潜在的安全风险。例如,事务数据集通常包括一组完整的操作数据来驱动关键业务流程。这些数据包含了最低粒度的细节和一些敏感信息,比如信用卡号。因此,只有当采取适当的预防措施来保护这些敏感数据,以确保其安全性时,对大数据的分析才可能被交付给第三方[16]。(5)能源管理:大型计算系统的能源消耗已经引起了人们对经济和环境的关注。随着数据量的增加和分析需求的增加,大数据的处理、存储和传输将不可避免地消耗更多的电能。因此,在保证大数据的可扩展性和可访问性的同时,需要建立系统级的电力消耗控制和管理机制。E.可扩展性和可伸缩性:大数据的分析系统必须支持当前和未来的数据集。分析算法必须能够处理越来越多的扩展和更复杂的数据集。

3 The Existing Techniques

3.1 Hadoop

Hadoop是一种分布式基础架构系统,是Google的云计算基础架构的开源实现。Google采集系统的核心组件有两个:第一个是GFS(Google FileSystem),一个分布式文件系统,隐藏下层的负载均衡,冗余复制等细节,对上层程序提供一个统一的文件系统的API接口[17];第二个是MapReduce计算模型,Google发现大多数分布式计算均可以抽象为Map/Reduce操作。Map是把输入Input分解为Key/Value对,Reduce是把Key/Value合成最终输出Output[5]。这两个函数都需要用户提供给系统,下层的设施将Map和Reduce操作分配给集群的各个节点上运行,将最终结果存储在GFS上。
而Hadoop是Google集群的开源实现,主要由HDFS和MapReduce组成,其中HDFS是GFS的开源实现,而MapReduce是Google MapReduce的开源实现。用户只需要分别实现Map和Reduce,并注册Job即可自动分布式运行,用户使用十分方便快捷。 一般而言,狭义的Hadoop就是指HDFS和MapReduce,这是一种典型的Master-Slave框架,Master逻辑节点使用NameNode和JobTracker组成,NameNode是HDFS的Master,主要负责Hadoop分布式文件系统的元数据的管理工作,JobTracker是MapReduce的Master,主要职责是启动,跟踪,调度各个TaskTracker的任务执行,每一个Slave逻辑节点通常同时具有DataNode以及TaskTracker的功能。

MapReduce的一个主要优势是它具有高效的容错机制。如果一个节点崩溃,MapReduce会在另一台机器上重新运行它的任务。同样重要的是,如果一个节点可用但表现不佳,这种情况称为失散/落伍(straggler),MapReduce会在另一台机器上运行其任务的副本(也称为“备份任务”),以更快地完成计算。如果没有这种推测性执行机制,任务会因为最慢的节点而使整个任务执行缓慢,或者由于多种原因而导致失败。谷歌注意到,推测执行可以将工作响应时间提高44%。
但是,推测执行是一个复杂的问题,原因有几个。首先,投机任务不是免费的,它们与其他正在运行的任务竞争某些资源,例如网络。其次,选择节点运行推测性任务与选择任务同等重要。第三,在异构环境中,可能难以区分比平均值和离散值稍慢的节点。最后,应尽早确定落后者,以缩短响应时间。因此在MapReduce中,为了应对数据偏斜带来的影响,设计高效的推测执行机制是非常有必要的。

3.2 Spark

Spark 是UC Berkeley AMPLab于2009年发起的,然后被Apache软件基金会接管的类Hadoop MapReduce通用性并行计算框架,是当前大数据领域最活跃的开源项目之一。Spark是基于MapReduce计算框架实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是,中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark更适用于数据挖掘与机器学习等需要迭代的算法。Spark由Scala语言实现的,Scala是一种基于JVM的函数式编程语言,提供了类似DryadLINQ的编程接口。而且Spark还提供修改的Scala语言解释器,能方便地用于交互式编程,用户可以定义变量、函数、类以及RDD[18]。
因此,Spark比Hadoop的传统处理方式MapReduce有着很大的差别,效率至少提高100倍以上。Spark分为四大模块:Spark SQL(类SQL数据管理),MLlib(机器学习),Graphx(图计算),Spark Streaming(实时处理)。这四个部分的数据处理单元都是RDD。所以整个框架形成了大数据处理各种应用场景编程的一致性。同时,Spark是基于内存的编程模型,它可以把中间的迭代过程不放在磁盘中,直接数据不落地在内存中执行,极大地提高了它的执行速度。下面来介绍它的各个模块。

(1)Spark SQL允许在Spark中使用SQL和HiveQL 的相关查询表达式。这个组件的核心是一个新型的RDD,JavaSchemaRDD。JavaSchemaRDD是由Row对象和Schema描述行中每一列的数据类型。JavaSchemaRDD类似一个关系型数据库中的表。一个JavaSchemaRDD可以通过已存在的RDD,Parquet文件,一个JSON数据集或者存储在Apache Hive 通过HiveQL运行的数据来创建[19]。 (2)MLlib包含一些常用机器学习算法和工具,分类,回归,聚类,协同过滤,降维,以及相关的优化原语[20]。许多标准的机器学习方法可以归结为一个凸优化问题,例如一项任务,去找到一个凸面函数F的最小值,这个凸面函数依赖于可变的Vectorw向量,这个向量在一个Node 中叫做Weights权值。MLlib 包括的评价指标有:精确度,召回率,F值,ROC,曲线下面积(AUC)。 (3)GraphX是Spark用于图表和图形,并行计算的的API。在一个高层次上, GraphX 延伸了Spark RDD, 通过引入Resilient Distributed Property Graph (弹性分布式属性图):一个有向多重图与附加到每个顶点和边的属性。为了支持图形计算,,GraphX公开了一组基本的运算符(subgraph ,joinVertices,mapReduceTriplets) [21]。从社交网络到语言建模,由于图形数据的增长规模和重要性,带动众多新图形并行系统(例如Giraph and GraphLab)的发展。通过限制可以被表示计算的类型和引入新技术来划分和分配图形,这些系统能够有效地执行复杂的图形算法命令并远快于大多数普通的数据并行系统。 (4)Spark Streaming。如果要用一句话来概括Spark Streaming的处理思路的话,那就是“将连续的数据持久化,离散化,然后进行批量处理”。Spark Streaming是Spark核心的扩展API,允许使高通量、容错实时数据流的流处理。数据可以从许多来源摄取,如Kafka,Flume,Twitter,ZeroMQ或普通TCP套接字。最后处理过的数据可以放到文件系统,数据库和可视化仪表板上。事实上,你可以引用内置的Spark的机器学习算法,数据流图处理算法。 像Hadoop和Spark这样的大规模数据分析框架现在已经得到广泛应用。因此,学术界和产业界都付出了巨大的努力来改善这些框架的性能。研究人员致力于解决三种限制大数据分析框架的关键问题:(1)网络是一个瓶颈。这引起了一系列网络优化的工作,包括跨多个路径的负载平衡、聚合数据以减少流量、隔离等等。(2)磁盘是一个瓶颈。这就导致了众多缓存策略的产生。(3)落后的任务大大延长了工作完成时间,这推动着推测运行和负载均衡的发展。研究人员已经定位到导致该问题的原因,如数据偏斜和流行度偏差。

3.3 Making Sense of Performance in Data Analytics Frameworks

阻塞时间分析,这是一种量化性能瓶颈的方法。由于分布式中普遍使用的并行性,识别数据,分析框架的瓶颈是一项巨大的挑战:作业由许多并行任务组成,每个任务使用流水线来并行使用网络,磁盘和CPU资源。一个任务可能会在不同的资源执行过程中受到瓶颈限制,并且在任何时候,同一个任务可能会受到不同资源的瓶颈。该方法提出的阻塞时间分析,是使用大量的白盒日志记录来度量每个任务在给定资源上被阻塞的时间。使用每项任务的测量结果,可以将缓慢任务与长时间阻塞时间关联起来,以了解其中的原因[22]。也就是说,针对特定作业的任务度量,允许我们了解,如果磁盘或网络能够无限快速,任务完成需要多长时间,这为优化网络或磁盘性能提供了上限。

3.4 SkewTune in Action: Mitigating Skew in MapReduce Applications

SkewTune将Hadoop作业作为输入。为了避免偏斜,SkewTune将作业的拓扑和reduce阶段视为单独的UDO(user-defined operations, 用户自定义操作)。在SkewTune中,与在Hadoop中一样,UDO从前一个UDO的输出中提取输入,并将其输入到本地。假定UDO一次处理输入的一条记录,而在各个输入记录之间不保留状态。键值对(即mapper输入)和键组(即reducer输入)均被认为是记录的特殊情况。每个UDO被并行化为任务,并且每个任务在集群中被分配一个槽(slot)。任务完成后,插槽即变为可用[23]。 SkewTune的偏斜缓解技术专为MapReduce类型的数据处理引擎而设计。这些引擎相对于倾斜处理的三个重要特征如下:(1)coordinator-worker体系结构,其中coordinator节点进行调度决策并且worker节点运行其分配的任务。完成一项任务后,worker节点向coordinator请求一项新任务。 (2)解耦执行:operator不会对上游operator施压。相反,他们彼此独立执行。 (3)独立记录处理:任务正在执行一个UDO,它相互独立地处理每个输入记录(可能是嵌套的)。(4)任务进度估计,估计每个任务的剩余时间。每个worker定期向coordinator报告这一估计。(5)按任务统计:每项任务都会跟踪一些基本统计信息,例如未处理字节和记录的总数。

3.5 Camdoop: Exploiting In-network Aggregation for Big Data Applications

CamCube上运行的类似MapReduce的系统,支持数据流的全路径聚合。执行聚合投影时,根据服务器的中间数据集中共有多少键,利用中间数据的来源构建聚合树。这可以减少网络流量,因为在每跳处只有一小部分接收到的数据被转发。所有转发流量的服务器都将有效地参与reduce阶段,分担计算负载,这对于输出大小通常远小于中间数据大小的工作负载很重要。可以证明,当使用关联和交换reduce函数时,可以有显著的性能提升,最高达两个数量级[24]。Camdoop同样提供了容错机制,以及我们如何确保中间数据中的每个条目在出现故障时只在最终输出中包含一次。实验还表明,即使reduce函数不具有关联性和交换性,路径上聚合仍然具有以下优点:Camdoop可以将负载分布到所有服务器上,并支持将reduce阶段并行化,从而进一步缩短总工作时间。最后,Camdoop利用定制的传输层,通过利用应用程序级先验允许基于内容的数据包优先级调度。即使没有执行聚合,这也有助于确保reducer不会因为缺乏处理任务而停止。Camdoop支持在MapReduce 中使用相同功能集,它旨在成为Hadoop的插件,并且与现有的MapReduce作业兼容。由于MapReduce广泛的适用性和普及性,我们选择其作为编程模型。但是,该方法也可以扩展到其他使用分区/聚合模型的平台,例如Dryad 或Storm。

3.6 Dolly系统,Effective Straggler Mitigation: Attack of the Clones

该系统提出一种不同的方法。不是等待并试图预测落伍任务,而是将推测执行推向极端,并建议对每个任务启动多个副本,并仅使用首先完成的副本的结果。这种技术既通用又强大,因为它避免了等待,推测以及寻找复杂的相关性。在处理小型互动工作时,这种积极的克隆将显著提高缓解落后任务的敏捷性。克隆带来两个主要挑战:第一个挑战是额外的克隆可能会使用额外的资源。但是,对生产日志的分析显示,最小90%的工作消耗的资源少于6%,试图改进的交互式作业都属于这类小型工作。因此,我们可以通过使用少量额外资源来改善它们。第二个挑战是额外克隆由于创建中间数据而产生的潜在问题,这可能会影响工作表现。有效的克隆需要我们克隆每个任务并使用首先完成的任务克隆的输出。但是,这可能会导致在作业的不同阶段(例如map, reduce, join)任务之间争用中间数据;框架通常以下游阶段的任务(例如reduce)读取上游阶段的任务(例如map)的输出拓扑来组合作业。如果所有下游克隆从首先完成的上游克隆中读取,则它们将争用IO带宽。避免这种争用的替代方案是使每个下游克隆仅从单个上游克隆中读取。但是这使下游克隆的开始时间滞后。对争用问题的解决方案是延迟分配。它基于直觉,除了少数落伍任务之外,大多数克隆几乎同时完成。使用成本效益分析来克服这些克隆之间的这种差异,它会检查克隆是否可以在将下游克隆分配给上游输出的可用副本之前获得独占副本。成本优势拟合分析是通用的,以说明阶段之间的不同通信模式。因此构建了Dolly,这是一个在资源预算内运行时执行克隆以减轻落后任务影响的系统。使用来自Facebook和Bing的生产工作负载对150个节点群集进行评估表明,以LATE 和Mantri为基准,Dolly分别将小型作业的平均完成时间提高了34%至46%。通过挑选每个任务的最快克隆,Dolly有效地将最慢的任务从运行速度平均降低8倍提高到降低1.06倍,从而有效地消除任务延迟[25]。

3.7 Reining in the Outliers in Map-Reduce Clusters using Mantri

Mantri,这是一个监视任务并根据其原因筛选异常值的系统。它使用以下技术:(1)重新启动观测到的资源约束和工作不平衡的异常任务,(2)基于网络感知的任务配置,(3)基于成本效益分析任务的输出。Mantri采用的详细分析和决策流程与Map-Reduce实施中异常值缓解的技术不同,前者不仅仅关注重复任务。MapReduce不能防止数据丢失导致的重新计算或者网络拥塞导致的异常值。而Mantri根据数据源的位置以及当前网络连接的使用情况来安排任务,在任务完成时,Mantri会复制其输出。此外,Mantri执行异常值的智能重启。长时间运行的任务,因为它有更多的工作要做,不会重新启动;如果由于通过低带宽网路读取数据而滞后,则只有存在更有利的网络位置可用时才会重新启动任务。与当前只在某个阶段结束时复制任务的方法不同,Mantri使用实时进度报告以决定任务的复制[26]。

3.8 Rock You like a Hurricane: Taming Skew in Large Scale Analytics

Hurricane是一个与Hadoop或Spark类似的大数据计算框架,创新之处在于,它基于节点在运行时观察到的负载,自适应工作分区。超载节点在执行期间可以随时产生任务的克隆,每个任务克隆处理一个原始数据的子集。该系统核心思想是任务克隆,其中在空闲节点上,高负荷节点可以克隆其任务,并让每个克隆处理原始输入的子集。这使得Hurricane能够自适应地调整任务的并行,基于节点动态负载平衡[27]。对于表现不佳的任务在其执行期间可能会分裂到多个空闲节点中运行,这可以分担一部分任务负载。相比于Hadoop和Spark等大数据处理框架能够更好的处理数据偏斜。Hurricane通过联合两个新颖的手段实现任务克隆:细粒度的独立数据访问和支持合并的编程模型,这些技术可以帮助程序员写出高性能,抗偏斜的应用。

4 Categorize and Compare

Hurricane是第一个基于任务执行期间worker观察到的负载自适应分配工作的集群计算框架。通过多个worker之间细粒度的数据共享和程序员定义的合并程序,可以比较容易的解决数据偏斜带来的问题。
相比于Dolly或者Mantri,已经提出了几种技术将分析作业分成更小的任务,以便减轻倾斜并改善负载平衡。这些技术不但需要程序员的手动干预,而且是特定于应用程序和输入。例如,他们需要微调程序员定义的分割函数,利用交换性和关联性来组合相同的键值,或跨多个分区分割相同键值的记录。Hurricane通过在克隆任务时动态分割分区,以独立于应用程序的方式缓解偏斜。可以表明,该方法可以有效地缓解偏斜,而无需调整应用程序,并且适用于任意操作,例如查找唯一值。 传统的集群计算框架如Hadoop或者Spark,将数据分割成分区,并使用混洗和排序将它们合并回应用。这通常是以排序中间输出为代价的,并且需要防止将具有相同key的记录发送到多个reducer,这可能导致存在偏斜时的负载不平衡。更重要的是,这种方法限制了分区的大小,使得以平衡的方式重新分配分区变得更加困难。Hurricane通过赋予应用程序适时使用开发人员提供的自定义合并方法,以采取不同的方法。这种合并包含传统的混洗和排序,但同时更灵活,因为它允许在执行过程中的任何节点创建的克隆的输出以特定于应用程序的方式进行合并。尽管向现有框架添加合并过程相对简单,但充分利用该过程需要重新设计和更改执行模型,以便可以即时重新分配任务,容错机制也需要进行调整以解决可能存在多个输出子集的问题。 SkewTune。SkewTune通过识别慢速任务并将其重新分区以在空闲节点上运行,来缓解MapReduce程序中的偏差。由于该系统旨在成为MapReduce的直接替代品,因此它受到类似的限制,即必须保留输出顺序并将数据放置在worker本地。虽然这种方法可以帮助缓解数据偏斜,但也会导致重大的数据移动,这可能会使已经超负荷的节点情况更加糟糕。SkewTune还可能会通过对即将完成的任务进行重新分区而无意中恶化了性能[27]。
Camdoop在MapReduce应用程序的混洗阶段执行网络内数据聚合,这可以通过减少移动的数据量和网络上的整体负载来帮助缓解数据偏斜。不幸的是,这个解决方案需要目前不可用的特殊硬件。
对落伍节点(Straggler)的任务分析工作负载是一项挑战。一种常用的处理落伍节点的方法是推测性执行,其中包括尽快侦测落伍任务,并在另一台机器上重新启动任务副本[10]。虽然这种方法有助于解决机器偏斜问题,但它不能解决数据或计算歪斜问题。Hurricane让较慢的worker通过克隆来分配任务,避免了从头开始重新开始任务的需要。

5 Summery

现实生活中,数据偏斜是十分普遍的,此外任务运行时间还可能受到机器偏差的影响,例如不同性能的机器,故障机器等,而数据偏斜对大数据执行框架的执行时间影响显著。一般消除偏斜影响的技术大致有:推测性执行,黑名单机制以及近些年出现的自适用负载均衡。也出现大量为了解决这类问题而特殊设计的大数据计算框架,比如SkewTune,Camdoop,Spongefiles,Sparrow 和Hawk。而Hurricane通过细粒度的数据共享以及自主的任务克隆,可以有效减少偏斜带来的任务延时,并且使用简单,可以帮助用户开发出高性能,抗偏斜的应用。 6 References

[1] Guojie L, Xueqi C. 大数据研究: 未来科技及经济社会发展的重大战略领域——大数据的研究现状与科学思考[J]. 中国科学院院刊, 2012 (2012 年 06): 647-657.
[2] 邬贺铨. 大数据时代的机遇与挑战[J]. 唯实: 现代管理, 2013 (5): 33-34.
[3] 陈全, 邓倩妮. 云计算及其关键技术[J]. 计算机应用, 2009, 29(9): 2562-2567.
[4] Zikopoulos P, Eaton C. Understanding big data: Analytics for enterprise class hadoop and streaming data[M]. McGraw-Hill Osborne Media, 2011.
[5] Dean J, Ghemawat S. MapReduce: simplified data processing on large clusters[J]. Communications of the ACM, 2008, 51(1): 107-113.
[6] Shvachko K, Kuang H, Radia S, et al. The hadoop distributed file system[C]//Mass storage systems and technologies (MSST), 2010 IEEE 26th symposium on. Ieee, 2010: 1-10.
[7] Gasbarro J A, Horowitz M A, Barth R M, et al. Method and circuitry for minimizing clock-data skew in a bus system: U.S. Patent 5,432,823[P]. 1995-7-11.
[8] McCarthy P. Camus: The Stranger[M]. Cambridge University Press, 2004.
[9] Garduno E, Kavulya S, Tan J, et al. Theia: Visual Signatures for Problem Diagnosis in Large Hadoop Clusters[C]//LISA. 2012: 33-42.
[10] Chen Q, Liu C, Xiao Z. Improving MapReduce performance using smart speculative execution strategy[J]. IEEE Transactions on Computers, 2014, 63(4): 954-967.
[11] 王忠. 美国推动大数据技术发展的战略价值及启示[J]. 中国发展观察, 2012 (6): 44-45.
[12] 李伯虎, 张霖, 任磊, 等. 再论云制造[J]. 计算机集成制造系统, 2011, 17(3): 449-457.
[13] 孟小峰, 周龙骧, 王珊. 数据库技术发展趋势[J]. 软件学报, 2004, 15(12):1822-1836.
[14] Tony Hey, Stewart Tansley, Kristin Tolle. 第四范式[M]. 科学出版社, 2012.
[15] 孟小峰, 慈祥. 大数据管理:概念、技术与挑战[J]. 计算机研究与发展, 2013, 50(1):146-169.
[16] 冯登国, 张敏, 李昊. 大数据安全与隐私保护[J]. 计算机学报, 2014, 37(1):246-258.
[17] Ghemawat S, Gobioff H, Leung S T. The Google file system[M]. ACM, 2003.
[18] Zaharia M, Xin R S, Wendell P, et al. Apache spark: a unified engine for big data processing[J]. Communications of the ACM, 2016, 59(11): 56-65.
[19] Armbrust M, Xin R S, Lian C, et al. Spark sql: Relational data processing in spark[C]//Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data. ACM, 2015: 1383-1394.
[20] Meng X, Bradley J, Yavuz B, et al. Mllib: Machine learning in apache spark[J]. The Journal of Machine Learning Research, 2016, 17(1): 1235-1241.
[21] Xin R S, Gonzalez J E, Franklin M J, et al. Graphx: A resilient distributed graph system on spark[C]//First International Workshop on Graph Data Management Experiences and Systems. ACM, 2013: 2.
[22] Ousterhout K, Rasti R, Ratnasamy S, et al. Making Sense of Performance in Data Analytics Frameworks[C]//NSDI. 2015, 15: 293-307.
[23] Kwon Y C, Balazinska M, Howe B, et al. Skewtune in action: Mitigating skew in mapreduce applications[J]. Proceedings of the VLDB Endowment, 2012, 5(12): 1934-1937.
[24] Costa P, Donnelly A, Rowstron A, et al. Camdoop: Exploiting in-network aggregation for big data applications[C]//Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation. USENIX Association, 2012: 3-3.
[25] Ananthanarayanan G, Ghodsi A, Shenker S, et al. Effective Straggler Mitigation: Attack of the Clones[C]//NSDI. 2013, 13: 185-198.
[26] Ananthanarayanan G, Kandula S, Greenberg A G, et al. Reining in the Outliers in Map-Reduce Clusters using Mantri[C]//OSDI. 2010, 10(1): 24.
[27] Bindschaedler L, Malicevic J, Schiper N, et al. Rock you like a hurricane: taming skew in large scale analytics[R]. 2018.