kafka spark sparkstreaming kafka整合的结果怎么看

Michael G. Noll:整合Kafka到Spark Streaming——代码示例和挑战
查看: 2133|
评论: 0|原作者: Michael G. Noll|来自: CSDN
摘要: 作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中。期间, Mic ...
Spark Streaming中的并行Downstream处理在之前的章节中,我们覆盖了从Kafka的并行化读取,那么我们就可以在Spark中进行并行化处理。那么这里,你必须弄清楚Spark本身是如何进行并行化处理的。类似Kafka,Spark将parallelism设置的与(RDD)分区数量有关,&。在有些文档中,分区仍然被称为“slices”。在任何Spark应用程序中,一旦某个Spark Streaming应用程序接收到输入数据,其他处理都与非streaming应用程序相同。也就是说,与普通的Spark数据流应用程序一样,在Spark Streaming应用程序中,你将使用相同的工具和模式。更多详情可见&文档。因此,我们同样将获得两个控制手段:1. input DStreams的数量,也就是说,我们在之前章节中read parallelism的数量作为结果。这是我们的立足点,这样一来,我们在下一个步骤中既可以保持原样,也可以进行修改。2. DStream转化的重分配。这里将获得一个全新的DStream,其parallelism等级可能增加、减少,或者保持原样。在DStream中每个返回的RDD都有指定的N个分区。DStream由一系列的RDD组成,DStream.repartition则是通过RDD.repartition实现。接下来将对RDD中的所有数据做随机的reshuffles,然后建立或多或少的分区,并进行平衡。同时,数据会在所有网络中进行shuffles。换句话说,DStream.repartition非常类似Storm中的shuffle grouping。因此,repartition是从processing parallelism解耦read parallelism的主要途径。在这里,我们可以设置processing tasks的数量,也就是说设置处理过程中所有core的数量。间接上,我们同样设置了投入machines/NICs的数量。一个DStream转换相关是&。这个方法同样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。通常情况下,你更愿意用StreamingContext的派生。一个union将返回一个由Union RDD支撑的UnionDStream。Union RDD由RDDs统一后的所有分区组成,也就是说,如果10个分区都联合了3个RDDs,那么你的联合RDD实例将包含30个分区。换句话说,union会将多个 DStreams压缩到一个 DStreams或者RDD中,但是需要注意的是,这里的parallelism并不会发生改变。你是否使用union依赖于你的用例是否需要从所有Kafka分区进行“in one place”信息获取决定,因此这里大部分都是基于语义需求决定。举个例子,当你需要执行一个不用元素上的(全局)计数。注意:RDDs是无序的。因此,当你union RDDs时,那么结果RDD同样不会拥有一个很好的序列。如果你需要在RDD中进行sort。你的用例将决定需要使用的方法,以及你需要使用哪个。如果你的用例是CPU密集型的,你希望对zerg.hydra topic进行5 read parallelism读取。也就是说,每个消费者进程使用5个receiver,但是却可以将processing parallelism提升到20。val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -& "terran", ...)
val readParallelism = 5
val topics = Map("zerg.hydra" -& 1)
val kafkaDStreams = (1 to readParallelism).map { _ =&
KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
//& collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
//& single DStream
val processingParallelism = 20
val processingDStream = unionDStream(processingParallelism)
//& single DStream but now with 20 partitions在下一节中,我将把所有部分结合到一起,并且联合实际数据处理进行讲解。写入到Kafka写入到Kafka需要从foreachRDD输出操作进行:通用的输出操作者都包含了一个功能(函数),让每个RDD都由Stream生成。这个函数需要将每个RDD中的数据推送到一个外部系统,比如将RDD保存到文件,或者通过网络将它写入到一个数据库。需要注意的是,这里的功能函数将在驱动中执行,同时其中通常会伴随RDD行为,它将会促使流RDDs的计算。注意:重提“功能函数是在驱动中执行”,也就是Kafka生产者将从驱动中进行,也就是说“功能函数是在驱动中进行评估”。当你使用foreachRDD从驱动中读取Design Patterns时,实际过程将变得更加清晰。在这里,建议大家去阅读Spark文档中的&一节,它将详细讲解使用foreachRDD读外部系统中的一些常用推荐模式,以及经常出现的一些陷阱。在我们这个例子里,我们将按照推荐来重用Kafka生产者实例,通过生产者池跨多个RDDs/batches。 我通过&实现了这样一个工具,已经上传到&。这个生产者池本身通过提供给tasks。最终结果看起来如下:val producerPool = {
// See the full code on GitHub for details on how the pool is created
val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
ssc.sparkContext.broadcast(pool)
stream.map { ... }.foreachRDD(rdd =& {
rdd.foreachPartition(partitionOfRecords =& {
// Get a producer from the shared pool
val p = producerPool.value.borrowObject()
partitionOfRecords.foreach { case tweet: Tweet =&
// Convert pojo back into Avro binary format
val bytes = converter.value.apply(tweet)
// Send the bytes to Kafka
p.send(bytes)
// Returning the producer to the pool also shuts it down
producerPool.value.returnObject(p)
})需要注意的是, Spark Streaming每分钟都会建立多个RDDs,每个都会包含多个分区,因此你无需为Kafka生产者实例建立新的Kafka生产者,更不用说每个Kafka消息。上面的步骤将最小化Kafka生产者实例的建立数量,同时也会最小化TCP连接的数量(通常由Kafka集群确定)。你可以使用这个池设置来精确地控制对流应用程序可用的Kafka生产者实例数量。如果存在疑惑,尽量用更少的。完整示例下面的代码是示例Spark Streaming应用程序的要旨(所有代码参见&)。这里,我做一些解释:并行地从Kafka topic中读取Avro-encoded数据。我们使用了一个最佳的read parallelism,每个Kafka分区都配置了一个单线程 input DStream。并行化Avro-encoded数据到pojos中,然后将他们并行写到binary,序列化可以通过&执行。通过Kafka生产者池将结果写回一个不同的Kafka topic。// Set up the input DStream to read from Kafka (in parallel)
val kafkaStream = {
val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
val kafkaParams = Map(
"zookeeper.connect" -& "zookeeper1:2181",
"group.id" -& "spark-streaming-test",
"zookeeper.connection.timeout.ms" -& "1000")
val inputTopic = "input-topic"
val numPartitionsOfInputTopic = 5
val streams = (1 to numPartitionsOfInputTopic) map { _ =&
KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -& 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.
unifiedStream.repartition(sparkProcessingParallelism)
// We use accumulators to track global "counters" across the tasks of our streaming app
val numInputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages consumed")
val numOutputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages produced")
// We use a broadcast variable to share a pool of Kafka producers, which we use to write data from Spark to Kafka.
val producerPool = {
val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
ssc.sparkContext.broadcast(pool)
// We also use a broadcast variable for our Avro Injection (Twitter Bijection)
val converter = ssc.sparkContext.broadcast(SpecificAvroCodecs.toBinary[Tweet])
// Define the actual data flow of the streaming job
kafkaStream.map { case bytes =&
numInputMessages += 1
// Convert Avro binary data to pojo
converter.value.invert(bytes) match {
case Success(tweet) =& tweet
case Failure(e) =& // ignore if the conversion failed
}.foreachRDD(rdd =& {
rdd.foreachPartition(partitionOfRecords =& {
val p = producerPool.value.borrowObject()
partitionOfRecords.foreach { case tweet: Tweet =&
// Convert pojo back into Avro binary format
val bytes = converter.value.apply(tweet)
// Send the bytes to Kafka
p.send(bytes)
numOutputMessages += 1
producerPool.value.returnObject(p)
// Run the streaming job
ssc.start()
ssc.awaitTermination()就我自己而言,我非常喜欢 Spark Streaming代码的简洁和表述。在Bobby Evans和 Tom Graves讲话中没有提到的是,Storm中这个功能的等价代码是非常繁琐和低等级的:&中的会运行一个Stormtopology来执行相同的计算。同时,规范文件本身只有非常少的代码,当然是除下说明语言,它们能更好的帮助理解;同时,需要注意的是,在Storm的Java API中,你不能使用上文Spark Streaming 示例中所使用的匿名函数,比如map和foreach步骤。取而代之的是,你必须编写完整的类来获得相同的功能,你可以查看&。这感觉是将Spark的API转换到Java,在这里使用匿名函数是非常痛苦的。最后,我同样也非常喜欢&,它非常适合初学者查看,甚至还包含了一些&。关于Kafka整合到Spark,上文已经基本介绍完成,但是我们仍然需要浏览mailing list和深挖源代码。这里,我不得不说,维护帮助文档的同学做的实在是太棒了。知晓Spark Streaming中的一些已知问题你可能已经发现在Spark中仍然有一些尚未解决的问题,下面我描述一些我的发现:一方面,在对Kafka进行读写上仍然存在一些含糊不清的问题,你可以在类似&和&&mailing list的讨论中发现。另一方面,Spark Streaming中一些问题是因为Spark本身的固有问题导致,特别是故障发生时的数据丢失问题。换句话说,这些问题让你不想在生产环境中使用Spark。在Spark 1.1版本的驱动中,Spark并不会考虑那些已经接收却因为种种原因没有进行处理的元数据(&)。因此,在某些情况下,你的Spark可能会丢失数据。Tathagata Das指出驱动恢复问题会在Spark的1.2版本中解决,当下已经释放。1.1版本中的Kafka连接器是基于Kafka的高等级消费者API。这样就会造成一个问题,Spark Streaming不可以依赖其自身的KafkaInputDStream将数据从Kafka中重新发送,从而无法解决下游数据丢失问题(比如Spark服务器发生故障)。有些人甚至认为这个版本中的Kafka连接器不应该投入生产环境使用,因为它是基于Kafka的高等级消费者API。取而代之,Spark应该使用简单的消费者API(就像Storm中的Kafka spout),它将允许你控制便宜和分区分配确定性。但是当下Spark社区已经在致力这些方面的改善,比如Dibyendu Bhattacharya的Kafka连接器。后者是Apache Storm Kafka spout的一个端口,它基于Kafka所谓的简单消费者API,它包含了故障发生情景下一个更好的重放机制。即使拥有如此多志愿者的努力,Spark团队更愿意非特殊情况下的Kafka故障恢复策略,他们的目标是“在所有转换中提供强保证,通用的策略”,这一点非常难以理解。从另一个角度来说,这是浪费Kafka本身的故障恢复策略。这里确实难以抉择。这种情况同样也出现在写入情况中,很可能会造成数据丢失。Spark的Kafka消费者参数auto.offset.reset的使用同样与Kafka的策略不同。在Kafka中,将auto.offset.reset设置为最小是消费者将自动的将offset设置为最小offset,这通常会发生在两个情况:第一,在ZooKeeper中不存在已有offsets;第二,已存在offset,但是不在范围内。而在Spark中,它会始终删除所有的offsets,并从头开始。这样就代表着,当你使用auto.offset.reset = "smallest"重启你的应用程序时,你的应用程序将完全重新处理你的Kafka应用程序。更多详情可以在下面的两个讨论中发现:和。Spark-1341:用于控制Spark Streaming中的数据传输速度。这个能力可以用于很多情况,当你已经受Kafka引起问题所烦恼时(比如auto.offset.reset所造成的),然后可能让你的应用程序重新处理一些旧数据。但是鉴于这里并没有内置的传输速率控制,这个功能可能会导致你的应用程序过载或者内存不足。在这些故障处理策略和Kafka聚焦的问题之外之外,扩展性和稳定性上的关注同样不可忽视。再一次,仔细观看&以获得更多细节。在Spark使用经验上,他们都永远比我更丰富。当然,我也有我的&,在 G1 garbage(在Java 1.7.0u4+中) 上可能也会存在问题。但是,我从来都没碰到这个问题。Spark使用技巧和敲门在我实现这个示例的代码时,我做了一些重要的笔记。虽然这不是一个全面的指南,但是在你开始Kafka整合时可能发挥一定的作用。它包含了&中的一些信息,也有一些是来自Spark用户的mailing list。通用当你建立你的Spark环境时,对Spark使用的cores数量配置需要特别投入精力。你必须为Spark配置receiver足够使用的cores(见下文),当然实际数据处理所需要的cores的数量也要进行配置。在Spark中,每个receiver都负责一个input DStream。同时,每个receiver(以及每个input DStream) occies一个core,这样做是服务于每个文件流中的读取(详见文档)。举个例子,你的作业需要从两个input streams中读取数据,但是只访问两个cores,这样一来,所有数据都只会被读取而不会被处理。注意,在一个流应用程序中,你可以建立多个input DStreams来并行接收多个数据流。在上文从Kafka并行读取一节中,我曾演示过这个示例作业。你可以使用 broadcast variables在不同主机上共享标准、只读参数,相关细节见下文的优化指导。在示例作业中,我使用了broadcast variables共享了两个参数:第一,Kafka生产者池(作业通过它将输出写入Kafka);第二,encoding/decoding Avro数据的注入(从Twitter Bijection中)。。你可以使用累加器参数来跟踪流作业上的所有全局“计数器”,这里可以对照Hadoop作业计数器。在示例作业中,我使用累加器分别计数所有消费的Kafka消息,以及所有对Kafka的写入。如果你对累加器进行命名,它们同样可以在Spark UI上展示。不要忘记import Spark和Spark Streaming环境:// Required to gain access to RDD transformations via implicits.
import org.apache.spark.SparkContext._
// Required when working on `PairDStreams` to gain access to e.g. `DStream.reduceByKey`
// (versus `DStream.transform(rddBatch =& rddBatch.reduceByKey()`) via implicits.
// See also
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions如果你是 Twitter Algebird的爱好者,你将会喜欢使用Count-Min Sketch和Spark中的一些特性,代表性的,你会使用reduce或者reduceByWindow这样的操作(比如,&)。Spark项目包含了&和&的示例介绍。如果你需要确定Algebird数据结构的内存介绍,比如Count-Min Sketch、HyperLogLog或者Bloom Filters,你可以使用SparkContext日志进行查看,更多细节参见&。Kafka整合我前文所述的一些增补:你可能需要修改Spark Streaming中的一些Kafka消费者配置。举个例子,如果你需要从Kafka中读取大型消息,你必须添加fetch.message.max.bytes消费设置。你可以使用KafkaUtils.createStream(...)将这样定制的Kafka参数给Spark Streaming传送。测试首先,确定已经&在一个finally bloc或者测试框架的teardown method中使用stop()关闭了StreamingContext 和/或 SparkContext,因为在同一个程序(或者JVM?)中Spark不支持并行运行两种环境。根据我的经验,在使用sbt时,你希望在测试中将你的建立配置到分支JVM中。最起码在kafka-storm-starter中,测试必须并行运行多个线程,比如ZooKeeper、Kafka和Spark的内存实例。开始时,你可以参考。同样,如果你使用的是Mac OS X,你可能期望关闭JVM上的IPv6用以阻止DNS相关超时。这个问题与Spark无关,你可以查看来获得关闭IPv6的方法。性能调优确定你理解作业中的运行时影响,如果你需要与外部系统通信,比如Kafka。在使用foreachRDD时,你应该阅读中&中的Design Patterns一节。举个例子,我的用例中使用Kafka产生者池来优化 Spark Streaming到Kafka的写入。在这里,优化意味着在多个task中共享同一个生产者,这个操作可以显著地减少由Kafka集群建立的新TCP连接数。使用Kryo做序列化,取代默认的Java serialization,详情可以访问&。我的例子就使用了Kryo和注册器,举个例子,使用Kryo生成的Avro-generated Java类(见&)。除此之外,在Storm中类似的问题也可以使用Kryo来解决。通过将spark.streaming.unpersist设置为true将Spark Streaming 作业设置到明确持续的RDDs。这可以显著地减少Spark在RDD上的内存使用,同时也可以改善GC行为。(点击访问&)通过MEMORY_ONLY_SER开始你的储存级别P&S测试(在这里,RDD被存储到序列化对象,每个分区一个字节)。取代反序列化,这样做更有空间效率,特别是使用Kryo这样的高速序列化工具时,但是会增加读取上的CPU密集操作。这个优化对 Spark Streaming作业也非常有效。对于本地测试来说,你可能并不想使用*_2派生(2=复制因子)。总结完整的Spark Streaming示例代码可以在&查看。这个应用包含了Kafka、Zookeeper、Spark,以及上文我讲述的示例。总体来说,我对我的初次Spark Streaming体验非常满意。当然,在Spark/Spark Streaming也存在一些需要特别指明的问题,但是我肯定Spark社区终将解决这些问题。在这个过程中,得到了Spark社区积极和热情的帮助,同时我也非常期待Spark 1.2版本的新特性。在大型生产环境中,基于Spark还需要一些TLC才能达到Storm能力,这种情况我可能将它投入生产环境中么?大部分情况下应该不会,更准确的说应该是现在不会。那么在当下,我又会使用Spark Streaming做什么样的处理?这里有两个想法,我认为肯定存在更多:它可以非常快的原型数据流。如果你因为数据流太大而遭遇扩展性问题,你可以运行 Spark Streaming,在一些样本数据或者一部分数据中。搭配使用Storm和Spark Streaming。举个例子,你可以使用Storm将原始、大规模输入数据处理到易管理等级,然后使用Spark Streaming来做下一步的分析,因为后者可以开箱即用大量有趣的算法、计算指令和用例。感谢Spark社区对大数据领域所作出的贡献!
上一篇:下一篇:
快毕业了,没工作经验,
找份工作好难啊?
赶紧去人才芯片公司磨练吧!!Spark_Streaming_大规模准实时流式数据处理_图文_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
评价文档:
Spark_Streaming_大规模准实时流式数据处理
上传于||文档简介
&&S​p​a​r​k​_​S​t​r​e​a​m​i​n​g​_​大​规​模​准​实​时​流​式​数​据​处​理
大小:3.47MB
登录百度文库,专享文档复制特权,财富值每天免费拿!
你可能喜欢微课堂第23期:Spark Streaming 流式计算实战实战 - StuQ
微课堂第23期:Spark Streaming 流式计算实战实战
Hi,大家好,我是祝威廉。具有多年基础框架/搜索研发经验。 2012年开始从事大数据平台架构相关工作,现专注在机器学习平台构建以及上层产品研发。业余时间喜欢研究 Spark/Yarn 等框架,对资源管理调度,自动化部署等感兴趣。
这次分享会比较实战些。具体业务场景描述:
我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成 &userName/year/month/day/hh/normal
userName/year/month/day/hh/normal/delay 结构,存储到HDFS中。如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 normal 目录。
& & & & & & & & & & &
Spark Streaming 与 Storm 适用场景分析
为什么这里不使用 Storm ?我们初期使用 Storm 去实现,然而使用 Storm 接受到数据后写入 HDFS 。这里面会产生两个问题:
& & *Storm
需要持有大量的 HDFS 文件句柄。我们事先并不知道有多少用户,所以根据不断进来的数据,我们会不断积累新的文件句柄
& & * 这要求使用HDFS
的 append 模式,不断追加
大量持有文件句柄以及在什么时候释放这些文件句柄都是一件很困难的事情。另外使用 HDFS 的追加内容模式也会有些问题。
后续我们就调研 Spark Streaming 。 Spark Streaming 有个好处,我可以攒个一分钟处理一次即可。这就意味着,我们可以隔一分钟(你可以设置)批量写一次集群,HDFS 对这种形态的文件存储还是非常友好的。这样就很轻易的解决了 Storm 遇到的两个问题。
而且能保证一定的实时性。譬如我的 batch interval 设置为 一分钟 那么我们就能保证一分钟左右的延迟&,事实上我们的业务场景是可以容忍一小时以下落地到 HDFS 延迟的。
事实上我们的业务场景是可以容忍一小时以下落地到 HDFS 延迟的&。
当然,Spark 处理完数据后,如何落到集群是比较麻烦的一件事情,不同的记录是要写到不同的文件里面去的,没办法简单的 saveAsTextFile 就搞定。这个我们通过自定义 Partitioner 来解决,第三个环节会告诉大家具体怎么做。&
上面大家其实可以看到 Spark Streaming 和 Storm 都作为流式处理的一个解决方案,但是在不同的场景下,其实有各自适合的时候。&
& & & & & & & & &
2. Spark Streaming 与 Kafka 集成方案选型
我们的数据来源是Kafka 我们之前也有应用来源于 HDFS文件系统监控的,不过建议都尽量对接 Kafka&。
Spark Streaming 对接Kafka 做数据接受的方案有两种:
& & *Receiver-based Approach
& & *Direct Approach (No Receivers)
两个方案具体优劣我专门写了文章分析,大家晚点可以看看这个链接和 Spark Streaming 相关的文章。&
我这里简单描述下:
& & *Receiver-based Approach 内存问题比较严重,因为她接受数据和处理数据是分开的。如果处理慢了,它还是不断的接受数据。容易把负责接受的节点给搞挂了。
& & * Direct Approach 是直接把 Kafka 的 partition 映射成 RDD 里的 partition 。 所以数据还是在 kafka 。只有在算的时候才会从 Kafka 里拿,不存在内存问题,速度也快。
所以建议都是用 Direct Approach 。具体调用方式是这样:
& & & & & & &
还是简单的。之后就可以像正常的 RDD 一样做处理了。
当然,Spark 处理完数据后,如何落到集群是比较麻烦的一件事情,不同的记录是要写到不同的文件里面去的,没办法简单的 saveAsTextFile 就搞定。这个我们通过自定义 Partitioner 来解决,第三个环节会告诉大家具体怎么做。&
& & & & & & & &&
3. 自定义 Partitioner 实现日志文件快速存储到 HDFS
& & & & & & & & & &&
& & & & & & & &
经过处理完成后&,我们拿到了logs 对象&。
到这一步位置,日志的每条记录其实是一个 tuple(path,line)& 也就是每一条记录都会被标记上一个路径。那么现在要根据路径,把每条记录都写到对应的目录去该怎么做呢?
& & & & & &
& & & & & &
首先收集到所有的路径。接着 for 循环 paths ,然后过滤再进行存储,类似这样:
& & & & & & & & &&
这里我们一般会把 rdd 给 cache 住,这样每次都直接到内存中过滤就行了。但如果 path 成百上千个呢? 而且数据量一分钟至少几百万,save 到磁盘也是需要时间的。所以这种方案肯定是不可行的。
我当时还把 paths 循环给并行化了,然而当前情况是 CPU 处理慢了,所以有改善,但是仍然达不到要求。
这个时候你可能会想,要是我能把每个路径的数据都事先收集起来,得到几个大的集合,然后把这些集合并行的写入到 HDFS 上就好了。事实上,后面我实施的方案也确实是这样的。所谓集合的概念,其实就是 Partition 的概念。而且这在Spark
中也是易于实现的,而实现的方式就是利用自定义 Partioner 。具体的方式如下:
& & & & & &&
通过上面的代码,我们就得到了路径和 partiton
id 的对应关系。接着遍历 partition 就行了。对应的 a 是分区号,b 则是分区的数据迭代器。接着做对应的写入操作就行。这些分区写入都是在各个 Executor 上执行的,并不是在 Driver 端,所以足够快。
我简单解释下代码&,首先我把收集到的路径 zipWithIndex 这样就把路径和数字一一对应了&;接着我新建了一个匿名类 实现了 Partitioner&。numPartitions 显然就是我们的路径集合的大小,遇到一个 key (其实就是个路径)时,则调用路径和数字的映射关系&,然后就把所有的数据根据路径 hash 到不同的 partition 了&。接着遍历 partition 就行了,对应的 a 是分区号,b 则是分区的数据迭代器。接着做对应的写入操作就行。这些分区写入都是在各个 Executor 上执行的,并不是在 Driver 端,所以足够快。我们在测试集群上五分钟大概 w 数据,90颗核,180G 内存,平均处理时间大概是2分钟左右。内存可以再降降 &我估计 100G 足够了&&。
& & & & & & & &
4. 在演示场景中,Spark Streaming 如何保证数据的完整性,不丢,不重
虽然 Spark Streaming 是作为一个24 * 7 不间断运行的程序来设计的,但是程序都会 crash ,那如果 crash 了,会不会导致数据丢失?会不会启动后重复消费?
关于这个,我也有专门的文章阐述( ),但是我这里直接给出结论:
使用 Direct
Approach 模式
启用 checkPoint
做了上面两步,就可以保证数据至少被消费一次。
那如何保证不重复消费呢?
这个需要业务自己来保证。简单来说,业务有两种:
自己保证事务
所谓幂等操作就是重复执行不会产生问题,如果是这种场景下,你不需要额外做任何工作。但如果你的应用场景是不允许数据被重复执行的,那只能通过业务自身的逻辑代码来解决了。
以当前场景为例,mideng 就是典型的幂等&,因为可以做写覆盖&,
& & & & & & & &
具体代码如上&,那如何保证写覆盖呢?&也就是幂等&。
文件名我采用了
job batch time 和 partition 的&id 作为名称。这样,如果假设系统从上一次失败的&job 重新跑的时候,相同的内容会被覆盖写,所以就不会存在重复的问题。
基本上 到目前为止&,就已经实现了我刚提到的业务逻辑&。
& & & & & &&
我们再来回顾下:&
我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成 userName/year/month/day/hh/normal & userName/year/month/day/hh/normal/delay 结构,存储到 HDFS 中。如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 normal 目录。&
我们分别作了哪四个方面的分析:&
& & 1. Spark Streaming 与 Storm 适用场景分析&;
& & 2. Spark Streaming 与 Kafka 集成方案选型,我们推荐Direct Approach 方案&;
& & 3. 自定义 Partitioner 实现日志文件快速存储到HDFS&;
& & 4. Spark Streaming 如何保证数据的完整性,不丢,不重&。
好的 &感谢大家 圣诞快乐&^_^
streaming 可以直接在后面连上 elasticsearch 么?
A1. 可以的。透露下,我马上也要做类似的实践。
Q2.&公司选用 storm 是由于它可以针对每条日志只做一次处理,spark streaming 可以做到么?
A3.&都是一样的。只是 spark streaming 需要攒一段时间,再一次性做处理。
比如我所提到的场景就适合 Spark Streaming。
& & & & & & & &&
Q3.&什么是文件句柄?
A3. HDFS 写入 你需要持有对应的文件的 client 。不可能来一条数据,就重新常见一个链接,然后用完就关掉。
Q4. Spark&分析流数据,分析好的数据怎么存到 mysql 比较好?
A4. 我没有这个实践过存储到 MySQL。一般数据量比较大,所以对接的会是 Reids/HBase/HDFS。
& & & & & &&
Q5.&有没有尝试过将数据写入 hive?
A5. 没有。但没有问题的。而且 Spark Streaming 里也可以使用 Spark SQL 。我不知道这会不会有帮助。
Q6.&幂等是什么?
A6. 就是反复操作不会有副作用。
& & & & & & & &&
可不可以分享一下 spark 完整的应用场景?
A7.&这个有点太大。 目前 spark 覆盖了离线计算,数据分析,机器学习,图计算,流式计算等多个领域,目标也是一个通用的数据平台,所以你一般你想到的都能用 spark 解决。
& & & & & & &
如何理解日志产生时间和到达时间相差超过一定的阈值?
A8. 每条日志都会带上自己产生的时间。同时,如果这条日志到我们的系统太晚了,我们就认为这属于延时日志。
& & & & & & & & & & & &
Q9.&目前这套体系稳定性如何?会不会有经常d节点的情况?
A9.&稳定性确实有待商榷 建议小范围尝试。
& & & & &&
Spark Streaming&内部是如何设计并解决 storm 存在的两个问题的?老师能分析一下细节吗?
A10.&这和 Spark Streaming 的设计是相关的。微批处理模式使得我们可以一个周期打开所有文件句柄,然后直接写入几千万条数据,然后关闭。第二个是使用 partition 并行加快写入速度。
& & & & & &&
Q11.&如何应对网络抖动导致阻塞?
A11. Spark 本身有重试机制,还有各种超时机制。
& & & & & &
Q12.&怎样保证消息的及时性?
A12. 依赖于数据源,kafka,Spark Streaming 是否处理能力充足,没有 delay . 所有环节都会影响消息的及时性。
Q13.&实际运用中,分析完的数据,本身有很大的结构关系,有时又需要对数据二次补充,处理完的数据量不大,该选哪种存储方式?
A13. 能用分布式存储的就用分布式存储。可以不做更新的,尽量不做更新。我一般推荐对接到 HBase 。
& & & & & &
Q14. Streaming
字面是流的意思,倒是课程中提到对日志有延迟的考虑,是 Spark &Streaming 是自定一个周期,处理周期到达的数据集合,通俗讲感觉像批处理,不是每条记录不一定要有时间戳?
A14. 你理解对了。每条记录没有时间戳。如果有,也是日志自己带的。Spark Streaming 并不会给每条记录带上时间。
& & & & & & &&
Q15.&我的理解 YARN 的最终定位是通用资源管理和调度系统,包括支持像类似 MapReduce,Spark 的短作业和类似 Web Service,MySQL 的长服务这样对吗,Spark Streaming 与 Kafka 集成方案选型是否也试用于YARN?
A15. 关于这个,可参看我的这篇关于文章:
/p/05ec21c1790d
谈及长任务 ( long running services ) 的支持,其实大可不必担心,譬如现在基于其上的 Spark Streaming 就是7x24小时运行的。一般而言,要支持长任务,需要考虑如下几个点:
*& Fault tolerance. 主要是 AM 的容错
Yarn Security. 如果开启了安全机制,令牌等的失效时间也是需要注意的
日志收集到集群
还有就是资源隔离和优先级
而这些社区都已经很好的解决了。
我理解你是想问 Yarn 对长任务的支持的问题。因为没怎么看懂……
& & & & & & &&
Q16. storm&避免重复是依赖 zookeeper,Spark Streaming 靠什么记录处理到哪行呢?
Q16. 通过 checkpoint 机制,自己维护了 zookeeper 的偏移量。
& & & & & &&
Q17.&请问一下 Spark
Streaming 处理日志数据的压测结果如何呢?
Q17. 刚刚说了,在我们的测试集群里, w 条记录,平均处理时间大约2分钟,90颗核,180G 内存。没有任何调优参数。理论内存可以继续降低,,因为不 cache 数据 。
& & & & &&
Q18. AMQ&与他们之间区别和联系?
也是消息队列?
Spark Streaming 支持相当多的消息队列。
& & & & & &
Q19.&国内 spark 集群部署在哪些云上?
A19.&没有用过云。
& & & & & & & &&
Q20.&请问老师,怎么能知道 storm 或者 spark streaming 占用了多少 hdfs 的句柄呢?
A20. storm
会一直累积。你需要定时清除。spark streaming 在你的处理周期里,需要生成多少文件,就需要并行打开多少文件
& & & & & & &&
Q21. zookeeper&目前 hbase 都不想依赖它了,因为会导致系统的不稳定,请问老师怎么看?
A21. 还好吧,产生问题主要是 client 太多。比如 hbase 依赖 zookeeper,所有使用 hbase 的,都需要先和 zookeeper 建立连接,这对 zookeeper 产生较大的压力。其他的系统也类似。如果共享 zookeeper 集群,那么它的连接数会成为一个瓶颈。
精品课程:提升职业技能的系统学习
活动视频:聆听专家在技术活动中的分享
技术访谈:名家名言访谈录
技术白板:心热技术名字快速解读
课程学习小组
智能硬件制作工坊
技术团队开发日
工作机会推荐
北京极客邦科技有限公司}

我要回帖

更多关于 streaming集成kafka 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信