DStream的Transformation/output算子 有更新!

Ttransformation算子

SparkStreaming Dstream的Transformation算子和SparkCore的操作基本类似,毕竟DStream的底层还是RDD.



TransformationMeaning
map对传入的每个元素返回一个新的元素
flatMap对传入的每个元素返回一个或多个元素
filter过滤掉不符合条件的元素/选择符合条件的元素
union将两个DStream合并
count返回元素的个数
reduce对所有的Values进行聚合
countByValue按值分组并统计每组的总数,返回(K,V)的格式
reduceByKey按K分组,对Values进行聚合
cogroup对两个Dstream进行链接操作,一个Key连接起来的两个RDD的数据,都会以Iterable'<'V'>'的形式出现在一个Tuple中
join对两个Dstream进行链接操作,每个连接起来的Pair作为Dstream的RDD的一个新元素
transform对Dstream进行转换,转换成RDD,Dataset.
updateStateBykey为每个Key维护一个状态,并进行更新
window对滑动窗口内的数据执行操作
可以看到大部分的算子跟RDD的操作算子基本相同,独有的算子如`transform,updateStateByKey,window`,也是SparkStreaming中比较有用的算子。

Window 滑动窗口

ac7a14b4f0604b3f9fd247a3b626aaa9-image.png

window:滑动窗口,最早在计算机网络中接触,用于做流量控制,这里的滑动窗口用于选择某一段时间内的RDD构成一个Dstream进行计算,实时流计算设置的Batch interval只能让我们获取一段一段的数据,每一段数据之间是不会交叉重叠的,对于数据分析来说,两组数据之间可能存在断层,阶跃。window可以让数据之间进行平滑的过度。当然有一定的局限性,如果我们的目标仅仅里类似对数据进行一些处理,不去探寻数据间的关系的话,就没有必要,因为这会让每一条数据进行多次处理,处理次数取决于window算子的参数。

Window 使用:

参数1:窗口长度
参数2:滑动间隔
这两个参数必须是batch inveral的整数倍。
类似上图,窗口长度为3秒,时间间隔为2秒。每三秒获取的inputDStream聚合为一个窗口进行计算,间隔两秒再计算一次…

Window 相关算子:

TransformationMeaning
window对每个滑动窗口的数据执行自定义计算
countByWindow对每个窗口的数据执行count操作
countByVaueAndWindow对每个窗口的数据执行countByValue操作
reduceByWindow对每个窗口的数据执行reduce操作
reduceBykeyAndWindow对每个窗口的数据执行reduceBykey操作
groupBykeyAndWindow对每个窗口的数据执行groupBykey操作

使用起来比较简单:只需要把wordcounnt中的Dstream执行window操作得到一个Dstream,对这个Dstream进行后续的操作即是对window的操作.

//3秒作为一个窗口,间隔为2秒
JavaPairDStream<String,Integer> pairDStreamWindows = pairDStream.window(Durations.seconds(3),Durations.seconds(2));
JavaPairDStream<String,Integer> wordCount =  pairDStreamWindows.reduceByKey((x1,x2)->(x1+x2));
//scala
def wordCountwindow(){
  val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount");
  val ssc  = new StreamingContext(conf,Seconds(1))
  val words = ssc.socketTextStream("localhost",9999)
				  .flatMap((line)=>line.split(" "))
				  .map(x=>(x,1))
	
  //window()此函数有多种重载。
  val w3window = words.window(Seconds(3),Seconds(2))
  
  val wordcount = w3window.reduceByKey(+)
  wordcount.print()
  ssc.start()
  ssc.awaitTermination()
}

如此我们从一个每一秒进行一次wordcount的统计就变成了每隔2秒统计3秒内的数据作为一个窗口统计一次wordcount。

def wordCountRBwindow(){
  val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount");
  val ssc  = new StreamingContext(conf,Seconds(1))
  val words = ssc.socketTextStream("localhost",9999)
    .flatMap((line)=>line.split(" "))
    .map(x=>(x,1))

  //reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration),此函数有多种参数重载。
  val wordcount20 = words.reduceByKeyAndWindow((x1:Int,x2:Int)=>x1+x2,Seconds(20),Seconds(2))

  wordcount20.print()
  ssc.start()
  ssc.awaitTermination()
}

其他的window算子类似。print算子是为了触发job操作。

updateStateByKey

updateStateBykey可以为每一个Key维护一个state,并在每次数据产生时对state进行更新。前提条件是必须开启CheckPoint机制。checkPoint可以保证在内存中长期存贮的state故障丢失的时候可以得到恢复。

  • 定义一个state,可以为任意数据类型。
  • 设置更新函数,更新函数会会执行如何来进行更新。

对于每一个batch的数据,spark都会为之前已经存在的key更新state[无论这个batch中是否有和key相同的数据],对于新出现的key也会进行state个更新。如果更新函数返回none,那么key的state就会被删除。

回顾前面的wordcount程序,从开始的离线操作记录某个文件的wordcount。到实时的batch inveral 一个时间延迟内的wordcount。再到滑动窗口的多个batch inveral 内的worcount。

而使用updateStateByKey,可以实现全局的worcount。我们用每个key的state保存word的count,每次一batch将数据state进行累加,即每一次batch和前面所有的数据进行一次worcount的reduceByKey操作。<如果不借助updateStateByKey我们就需要将每个batch的wordCount保存/缓存起来,进行累加。>

Window案例:基于缓存的全局WordCount

开启Checkpoint,不使用reduceByKey而使用updateStateByKey:

private static  void wordCountUpdateStateByKey() throws InterruptedException{
	SparkConf conf = new SparkConf().setAppName("ssBSDataSource").setMaster("local[*]");

	JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
	 //开启checkpoint
	jsc.checkpoint("hdfs://spark:9000/checkPointDir");

	//ReceiverInputDStream
	JavaReceiverInputDStream lines =   jsc.socketTextStream("localhost",9999);

	JavaDStream listDstream =  lines.flatMap(line-> Arrays.asList(line.split(" ")).iterator());
	JavaPairDStream pairDStream =  listDstream.mapToPair(x->new Tuple2<>(x,1));
	
	//这里不再使用reduceByKey 而使用updateStateByKey
	JavaPairDStream wordCount =  pairDStream.updateStateByKey(
			new Function2, Optional, Optional>() {
				public Optional call(List values,Optional state) throws Exception {
					int vl =0;
					if(state.isPresent()){
						vl = state.get();
					}
					for(Integer value : values) {
						vl += value;
					}

					return Optional.of(vl);
				}
			});
	//这里是Lambda版本
	JavaPairDStream wordCount2 =  pairDStream.updateStateByKey(
				(values,state)->{
					int vl =0;
					if(state.isPresent()){
						vl = state.get();
					}
					for(Integer value : values) {
						vl += value;
					}
					return Optional.of(vl);
				 });

	wordCount.print();
	jsc.start();
	jsc.awaitTermination();
	jsc.close();
}

这里的Optional类是Java8中的一种防御性检查机制,用于消除空指针异常。
上述的这些Transformation算子都是懒加载的都不能出发job操作。最终的操作结果需要output操作才能输出/保存起来。

Output算子:

TransformationMeaning
print直接输出batch中的前十条数据,通常用来测试,在没有output操做的时候用于触发job
saveAsTextFile(prefix, [suffix])将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIMEINMS[.suffix]
savaAsObjectFile同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。
saveAsHadoopFile同上,将数据保存到Hadoop中
foreachRDD最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。

Dstream中的所有的操作最终都是由output操作触发的,没有output操作spark是不会执行前面的操作逻辑。其中较为特殊的是foreachRDD,尽管是用了这个output算子也不会触发job,在其中还需要action算子才能够真正的开始触发job执行.

foreachRDD:

顾名思义,遍历RDD,然后对每个RDD进行操作。多数情况下时进行持久化操作,写入到外部存储。通常需要建立一个Connection,比如JDBC Connection。然后通过Connnection将数据写入外部存储。如下:

1、在foreach操作的外部创建Connection,这种方式会导致Connection对象被序列化后传输到每个task中,实际上这种Connection对象是不能被序列化的,所以这是一种错误的操作。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()
  rdd.foreach { 
	record => connection.send(record)
  }
}

2、在foreach内部创建Connection对象,这种方式会让每一条数据都创建一个Connection,非常消耗内存和性能。

dstream.foreachRDD { rdd =>
  rdd.foreach { 
    val connection = createNewConnection()
	record => connection.send(record)
	connection.close()
  }
}

3、使用foreachPartition代替foreach,这样只会对一个Partition创建一个Connection,节省开销。

dstream.foreachRDD { 
  rdd =>rdd.foreachPartition { 
		partitionOfRecords =>
		val connection = createNewConnection()
		partitionOfRecords.foreach(record => connection.send(record))
		connection.close()
        }
}

总结:

SparkStreaming的操作算子也分为两类,Transformation和output(类似Action),其中最实用的window和updateStateByKey,以及最常用的foreachRDD,对于foreachRDD需要注意写法,是一个值得优化的地方。

. - - —— ————THE END——— —— - - .

⚠求而不得,往往不求而得!
⚠此文章为原创作品,转载务必保留本文地址及原作者。

评论

发表评论

validate