Spark Streaming简介

Apache Spark 是当今最流行的开源大数据处理框架。和人们耳熟能详的 MapReduce 一样,Spark 用于进行分布式、大规模的数据处理,但 Spark 作为 MapReduce 的接任者,提供了更高级的编程接口、更高的性能。除此之外,Spark 不仅能进行常规的批处理计算,还提供了流式计算支持。

Apache Spark 诞生于大名鼎鼎的 AMPLab(这里还诞生过 Mesos 和 Alluxio),从创立之初就带有浓厚的学术气质,其设计目标是为各种大数据处理需求提供一个统一的技术栈。如今 Spark 背后的商业公司 Databricks 创始人也是来自 AMPLab 的博士毕业生。

Spark 本身使用 Scala 语言编写,Scala 是一门融合了面向对象与函数式的“双范式”语言,运行在 JVM 之上。Spark 大量使用了它的函数式、即时代码生成等特性。Spark 目前提供了 Java、Scala、Python、R 四种语言的 API,前两者因为同样运行在 JVM 上可以达到更原生的支持。

1 Introduction

从时间维度可以将数据分析可以分为历史数据的分析和实时数据的分析,例如Hive可以实现对于历史全量数据的计算,但是花费时间往往较长。实际场景中,如“双11”各大电商平台实时计算当前订单的情况时,需要实时对各个订单的数据依次进行采集、分析处理、存储等步骤,对于数据处理的速度要求很高,而且需要保持一段时期内不间断的运行。对于这类问题,Spark通过Spark Streaming组件提供了支持,Spark Streaming可以实现对于高吞吐量、实时的数据流的处理和分析,支持多样化的数据源如Kafka、Flume、HDFS、Kinesis和Twitter等。

2 DStream Model

类似于RDD之于Spark,Spark Streaming也有自己的核心数据对象,称为DStream(Discretized Stream,离散流)。使用Spark Streaming需要基于一些数据源创建DStream,然后在DStream上进行一些数据操作,这里的DStream可以近似地看作是一个比RDD更高一级的数据结构或者是RDD的序列。

虽然Spark Streaming声称是进行流数据处理的大数据系统,但从DStream的名称中就可以看出,它本质上仍然是基于批处理的。DStream将数据流进行分片,转化为多个batch,然后使用Spark Engine对这些batch进行处理和分析。

这里的batch是基于时间间隔来进行分割的,这里的__批处理时间间隔__(batch interval)需要人为确定,每一个batch的数据对应一个RDD实例。

2.1 Input DStream

Input DStream是一种特殊的DStream,它从外部数据源中获取原始数据流,连接到Spark Streaming中。Input DStream可以接受两种类型的数据输入:

(1)基本输入源:文件流、套接字连接和Akka Actor。

(2)高级输入源:Kafka、Flume、Kineis、Twitter等。

基本输入源可以直接应用于StreamingContext API,而对于高级输入源则需要一些额外的依赖包。

由于Input DStream要接受外部的数据输入,因此每个Input DStream(不包括文件流)都会对应一个单一的接收器(Receiver)对象,每个接收器对象都对应接受一个数据流输入。由于接收器需要持续运行,因此会占用分配给Spark Streaming的一个核,如果可用的核数不大于接收器的数量,会导致无法对数据进行其他变换操作。

2.2 DStream Transformation

DStream的转换也和RDD的转换类似,即对于数据对象进行变换、计算等操作,但DStream的Transformation中还有一些特殊的操作,如updateStateByKey()、transform()以及各种Window相关的操作。下面列举了一些主要的DStream操作:

TransformationInterpretation
map(func)对DStream中的各个元素进行func函数操作,然后返回一个新的DStream.
flatMap(func)与map方法类似,只不过各个输入项可以被输出为零个或多个输出项.
filter(func)过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream.
repartition(numPartitions)增加或减少DStream中的分区数,从而改变DStream的并行度.
union(otherStream)将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count()通过对DStream中的各个RDD中的元素进行计数.
reduce(func)对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue()对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数.
reduceByKey(func, [numTasks])利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream.
join(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStream.
cogroup(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream.
transform(func)通过RDD-to-RDD函数作用于DStream中的各个RDD,返回一个新的RDD.
updateStateByKey(func)根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream.

Spark Streaming提供了基于窗口(Window)的计算,即可以通过一个滑动窗口,对原始DStream的数据进行转换,得到一个新的DStream。这里涉及到两个参数的设定:

(1)窗口长度(window length):一个窗口覆盖的流数据的时间长度,必须是批处理时间间隔的倍数。窗口长度决定了一个窗口内包含多少个batch的数据。

(2)窗口滑动时间间隔(slide interval):前一个窗口到后一个窗口所经过的时间长度,必须是批处理时间间隔的倍数。

2.3 DStream Output Operations

DStream的输出操作(Output Operations)可以将DStream的数据输出到外部的数据库或文件系统。与RDD的Action类似,当某个Output Operation被调用时,Spark Streaming程序才会开始真正的计算过程。

下面列举了一些具体的输出操作:

Output OperationsInterpretation
print()打印到控制台.
saveAsTextFiles(prefix, [suffix])保存DStream的内容为文本文件,文件名为”prefix-TIME_IN_MS[.suffix]”.
saveAsObjectFiles(prefix, [suffix])保存DStream的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”.
saveAsHadoopFiles(prefix, [suffix])保存DStream的内容为Hadoop文件,文件名为”prefix-TIME_IN_MS[.suffix]”.
foreachRDD(func)对Dstream里面的每个RDD执行func,并将结果保存到外部系统,如保存到RDD文件中或写入数据库.

3 Fault Tolerance

容错(Fault Tolerance)指的是一个系统在部分模块出现故障时仍旧能够提供服务的能力。一个分布式数据处理程序要能够长期不间断运行,这就要求计算模型具有很高的容错性。

Spark操作的数据一般存储与类似HDFS这样的文件系统上,这些文件系统本身就有容错能力。但是由于Spark Streaming处理的很多数据是通过网络接收的,即接收到数据的时候没有备份,为了让Spark Streaming程序中的RDD都能够具有和普通RDD一样的容错性,这些数据需要被复制到多个Worker节点的Executor内存中。

Spark Streaming通过检查点(Check Point)的方式来平衡容错能力和代价问题。DStream依赖的RDD是可重复的数据集,每一个RDD从建立之初都记录了每一步的计算过程,如果RDD某个分区由于一些原因数据丢失了,就可以重新执行计算来恢复数据。随着运行时间的增加,数据恢复的代价也会随着计算过程而增加,因此Spark提供了检查点功能,定期记录计算的中间状态,避免数据恢复时的漫长计算过程。Spark Streaming支持两种类型的检查点:

(1)元数据检查点。这种检查点可以记录Driver的配置、操作以及未完成的批次,可以让Driver节点在失效重启后可以继续运行。

(2)数据检查点。这种检查点主要用来恢复有状态的转换操作,例如updateStateByKey或者reduceByKeyAndWindow操作,它可以记录数据计算的中间状态,避免在数据恢复时,长依赖链带来的恢复时间的无限增长。

开启检查点功能需要设置一个可靠的文件系统路径来保存检查点信息,代码如下:

streamingContext.checkpoing(checkPointDirectory)    //参数是文件路径

为了让Driver自动重启时能够开启检查点功能,需要对原始StreamingContext进行简单的修改,创建一个检查检查点目录的函数,代码如下:

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)
    ssc.checkpoint(checkpointDirecroty)    //设置检查点目录
    ...
    val lines = ssc.socketTextStream(...)  //创建DStream
}

//从检查点目录恢复一个StreamingContext或者创建一个新的
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
                                      functionToCreateContext())
//启动context
context.start()
context.awaitTermination()

参考文献

[1] 于俊. Spark核心技术与高级应用[M]. 机械工业出版社, 2016.

[2] 陈欢林世飞. Spark最佳实践[M].人民邮电出版社, 2016.

1、Spark Streaming用于处理流式计算问题。能够和Spark的其他模块无缝集成。

2、Spark Streaming是一个粗粒度的框架【也就是只能对一批数据指定处理方法】,核心是采用微批次架构。和Storm采用的以条处理的不同。

3、Spark Streaming会运行接收器来不断的接收输入的数据流,然后根据程序配置的时间,将时间范围内的所有数据打成一个RDD,发送给Spark Core去进行处理。依次来打成对数据流的计算。

4、Spark Streaming有它自己的抽象,叫DStream Discretized Stream离散化流

5、如果入水口的速度大于出水口的速度,那么势必导致水管爆裂,Spark Streaming也存在这个问题,内部采用背压机制来进行处理,会通过ReceiverRateController来不断计算RDD的处理速度和RDD的生成速度,来通过令牌桶机制进行速度控制。只要是控制令牌的生成周期。

Spark Streaming编程指南

Overview

Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。

它可以接受来自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源,使用简单的api函数比如 mapreducejoinwindow等操作,还可以直接使用内置的机器学习算法、图算法包来处理数据。

它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。

它支持的数据流叫Dstream,直接支持Kafka、Flume的数据源。Dstream是一种连续的RDDs,下面是一个例子帮助大家理解Dstream。

A Quick Example

// 创建StreamingContext,1秒一个批次
val ssc = new StreamingContext(sparkConf, Seconds(1));

// 获得一个DStream负责连接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);

// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);

// 输出结果
wordCounts.print();

ssc.start();             // 开始
ssc.awaitTermination();  // 计算完毕退出

具体的代码可以访问这个页面:

https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala

如果已经装好Spark的朋友,我们可以通过下面的例子试试。

首先,启动Netcat,这个工具在Unix-like的系统都存在,是个简易的数据服务器。

使用下面这句命令来启动Netcat:

$ nc -lk 9999

接着启动example

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999

在Netcat这端输入hello world,看Spark这边的

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

Basics

下面这块是如何编写代码的啦,哇咔咔!

首先我们要在SBT或者Maven工程添加以下信息:

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating
//需要使用一下数据源的,还要添加相应的依赖<br>Source    Artifact
Kafka     spark-streaming-kafka_2.10
Flume     spark-streaming-flume_2.10
Twitter     spark-streaming-twitter_2.10
ZeroMQ     spark-streaming-zeromq_2.10
MQTT     spark-streaming-mqtt_2.10

接着就是实例化

new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])

这是之前的例子对DStream的操作。

Input Sources

除了sockets之外,我们还可以这样创建Dstream

streamingContext.fileStream(dataDirectory)

这里有3个要点:

(1)dataDirectory下的文件格式都是一样

(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的

(3)一旦文件进去之后就不能再改变

假设我们要创建一个Kafka的Dstream。

import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)

如果我们需要自定义流的receiver,可以查看https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html

Operations

对于Dstream,我们可以进行两种操作,transformations 和 output 

Transformations

Transformation                          Meaning
map(func)                        对每一个元素执行func方法
flatMap(func)                    类似map函数,但是可以map到0+个输出
filter(func)                     过滤
repartition(numPartitions)       增加分区,提高并行度     
union(otherStream)               合并两个流
count()                    统计元素的个数
reduce(func)                     对RDDs里面的元素进行聚合操作,2个输入参数,1个输出参数
countByValue()                   针对类型统计,当一个Dstream的元素的类型是K的时候,调用它会返回一个新的Dstream,包含<K,Long>键值对,Long是每个K出现的频率。
reduceByKey(func, [numTasks])    对于一个(K, V)类型的Dstream,为每个key,执行func函数,默认是local是2个线程,cluster是8个线程,也可以指定numTasks 
join(otherStream, [numTasks])    把(K, V)和(K, W)的Dstream连接成一个(K, (V, W))的新Dstream 
cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream连接成一个(K, Seq[V], Seq[W])的新Dstream 
transform(func)                  转换操作,把原来的RDD通过func转换成一个新的RDD
updateStateByKey(func)           针对key使用func来更新状态和值,可以将state该为任何值
UpdateStateByKey Operation

使用这个操作,我们是希望保存它状态的信息,然后持续的更新它,使用它有两个步骤:

(1)定义状态,这个状态可以是任意的数据类型

(2)定义状态更新函数,从前一个状态更改新的状态

下面展示一个例子:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

它可以用在包含(word, 1) 的Dstream当中,比如前面展示的example

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

它会针对里面的每个word调用一下更新函数,newValues是最新的值,runningCount是之前的值。

Transform Operation

和transformWith一样,可以对一个Dstream进行RDD->RDD操作,比如我们要对Dstream流里的RDD和另外一个数据集进行join操作,但是Dstream的API没有直接暴露出来,我们就可以使用transform方法来进行这个操作,下面是例子:

val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

另外,我们也可以在里面使用机器学习算法和图算法。

Window Operations

先举个例子吧,比如前面的word count的例子,我们想要每隔10秒计算一下最近30秒的单词总数。

我们可以使用以下语句:

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

这里面提到了windows的两个参数:

(1)window length:window的长度是30秒,最近30秒的数据

(2)slice interval:计算的时间间隔

通过这个例子,我们大概能够窗口的意思了,定期计算滑动的数据。

下面是window的一些操作函数,还是有点儿理解不了window的概念,Meaning就不翻译了,直接删掉

Transformation                                                                              Meaning
window(windowLength, slideInterval)     
countByWindow(windowLength, slideInterval)     
reduceByWindow(func, windowLength, slideInterval)     
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])     
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])    
countByValueAndWindow(windowLength, slideInterval, [numTasks])     

Output Operations

Output Operation                                      Meaning
print()                                 打印到控制台
foreachRDD(func)                        对Dstream里面的每个RDD执行func,保存到外部系统
saveAsObjectFiles(prefix, [suffix])     保存流的内容为SequenceFile, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsTextFiles(prefix, [suffix])       保存流的内容为文本文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix])     保存流的内容为hadoop文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".

Persistence

 Dstream中的RDD也可以调用persist()方法保存在内存当中,但是基于window和state的操作,reduceByWindow,reduceByKeyAndWindow,updateStateByKey它们就是隐式的保存了,系统已经帮它自动保存了。

从网络接收的数据(such as, Kafka, Flume, sockets, etc.),默认是保存在两个节点来实现容错性,以序列化的方式保存在内存当中。

RDD Checkpointing

 状态的操作是基于多个批次的数据的。它包括基于window的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。

ssc.checkpoint(hdfsPath)  //设置检查点的保存位置
dstream.checkpoint(checkpointInterval)  //设置检查点间隔

对于必须设置检查点的Dstream,比如通过updateStateByKeyreduceByKeyAndWindow创建的Dstream,默认设置是至少10秒。

Performance Tuning

对于调优,可以从两个方面考虑:

(1)利用集群资源,减少处理每个批次的数据的时间

(2)给每个批次的数据量的设定一个合适的大小

Level of Parallelism

像一些分布式的操作,比如reduceByKey和reduceByKeyAndWindow,默认的8个并发线程,可以通过对应的函数提高它的值,或者通过修改参数spark.default.parallelism来提高这个默认值。

Task Launching Overheads

通过进行的任务太多也不好,比如每秒50个,发送任务的负载就会变得很重要,很难实现压秒级的时延了,当然可以通过压缩来降低批次的大小。

Setting the Right Batch Size

要使流程序能在集群上稳定的运行,要使处理数据的速度跟上数据流入的速度。最好的方式计算这个批量的大小,我们首先设置batch size为5-10秒和一个很低的数据输入速度。确实系统能跟上数据的速度的时候,我们可以根据经验设置它的大小,通过查看日志看看Total delay的多长时间。如果delay的小于batch的,那么系统可以稳定,如果delay一直增加,说明系统的处理速度跟不上数据的输入速度。

24/7 Operation

Spark默认不会忘记元数据,比如生成的RDD,处理的stages,但是Spark Streaming是一个24/7的程序,它需要周期性的清理元数据,通过spark.cleaner.ttl来设置。比如我设置它为600,当超过10分钟的时候,Spark就会清楚所有元数据,然后持久化RDDs。但是这个属性要在SparkContext 创建之前设置。

但是这个值是和任何的window操作绑定。Spark会要求输入数据在过期之后必须持久化到内存当中,所以必须设置delay的值至少和最大的window操作一致,如果设置小了,就会报错。

Monitoring

除了Spark内置的监控能力,还可以StreamingListener这个接口来获取批处理的时间, 查询时延, 全部的端到端的试验。

Memory Tuning

Spark Stream默认的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY

默认的,所有持久化的RDD都会通过被Spark的LRU算法剔除出内存,如果设置了spark.cleaner.ttl,就会周期性的清理,但是这个参数设置要很谨慎。一个更好的方法是设置spark.streaming.unpersist为true,这就让Spark来计算哪些RDD需要持久化,这样有利于提高GC的表现。

推荐使用concurrent mark-and-sweep GC,虽然这样会降低系统的吞吐量,但是这样有助于更稳定的进行批处理。

Fault-tolerance Properties

Failure of a Worker Node

下面有两种失效的方式:

1.使用hdfs上的文件,因为hdfs是可靠的文件系统,所以不会有任何的数据失效。

2.如果数据来源是网络,比如Kafka和Flume,为了防止失效,默认是数据会保存到2个节点上,但是有一种可能性是接受数据的节点挂了,那么数据可能会丢失,因为它还没来得及把数据复制到另外一个节点。

Failure of the Driver Node

为了支持24/7不间断的处理,Spark支持驱动节点失效后,重新恢复计算。Spark Streaming会周期性的写数据到hdfs系统,就是前面的检查点的那个目录。驱动节点失效之后,StreamingContext可以被恢复的。

为了让一个Spark Streaming程序能够被回复,它需要做以下操作:

(1)第一次启动的时候,创建 StreamingContext,创建所有的streams,然后调用start()方法。

(2)恢复后重启的,必须通过检查点的数据重新创建StreamingContext。

下面是一个实际的例子:

通过StreamingContext.getOrCreate来构造StreamingContext,可以实现上面所说的。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

在stand-alone的部署模式下面,驱动节点失效了,也可以自动恢复,让别的驱动节点替代它。这个可以在本地进行测试,在提交的时候采用supervise模式,当提交了程序之后,使用jps查看进程,看到类似DriverWrapper就杀死它,如果是使用YARN模式的话就得使用其它方式来重新启动了。

这里顺便提一下向客户端提交程序吧,之前总结的时候把这块给落下了。

./bin/spark-class org.apache.spark.deploy.Client launch
   [client-options] \
   <cluster-url> <application-jar-url> <main-class> \
   [application-options]

cluster-url: master的地址.
application-jar-url: jar包的地址,最好是hdfs上的,带上hdfs://...否则要所有的节点的目录下都有这个jar的 
main-class: 要发布的程序的main函数所在类. 
Client Options: 
--memory <count> (驱动程序的内存,单位是MB) 
--cores <count> (为你的驱动程序分配多少个核心) 
--supervise (节点失效的时候,是否重新启动应用) 
--verbose (打印增量的日志输出)

在未来的版本,会支持所有的数据源的可恢复性。

为了更好的理解基于HDFS的驱动节点失效恢复,下面用一个简单的例子来说明:

Time     Number of lines in input file     Output without driver failure     Output with driver failure
1      10                     10                    10
2      20                     20                    20
3      30                     30                    30
4      40                     40                    [DRIVER FAILS] no output
5      50                     50                    no output
6      60                     60                    no output
7      70                     70                    [DRIVER RECOVERS] 40, 50, 60, 70
8      80                     80                    80
9      90                     90                    90
10     100                     100                   100

在4的时候出现了错误,40,50,60都没有输出,到70的时候恢复了,恢复之后把之前没输出的一下子全部输出。

发表评论

电子邮件地址不会被公开。 必填项已用*标注