大数据平台与编程第10章 Spark Streaming.pptx
第10章SparkStreaming;?SparkStreaming概述;;;;?SparkStreaming工作原理;;;;;;?DStream创建;;//4.创建QueueInputDstream
valinputStream=ssc.queueStream(rddQueue,oneAtATime=false)
//5.处理队列中的RDD数据
valmappedStream=inputstream.map((_,1))
valreducedstream=mappedstream.reduceByKey(_+_)
//6.打印结果
reducedstream.print()
//7.启动任务
ssc.start()
//8.循环创建并向RDD队列中放入RDD
for(i-1to5){
rddQueue+=ssc.sparkContext.makeRDD(1to300,10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
};;//读数据并将数据发送给Spark
defreceive():Unit={
//创建一个socket
varsocket:socket=newSocket(host,port)
//定义一个变量,用来接收端口传过来的数据
varinput:string=nu11
//创建一个BufferedReader用于读取端口传来的数据
valreader=newBufferedReader(newInputStreamReader(socket.getInputStream,
StandardCharsets.UTF_8))
//读取数据
input=reader.readLine()
//当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
while(!isStopped()input!=nu11){
store(input)
input=reader.readLine()
};
//跳出循环则关闭资源
reader.close()
socket.close()
//重启任务
restart(restart)
}
overridedefonstop():Unit={}
};2)使用自定义的数据源采集数据
objectFilestrean{
defmain(args:Array[String]):Unit={
//1.初始化spark配置信息
valsparkConf=newSparkConf().setMaster(local[*])
.setAppName(StreamWordCount)
//2.初始化SparkStreamingContext
valssc=newStreamingContext(sparkConf,Seconds(5))
//3.创建自定义receiver的streaming
vallineStream=ssc.receiverStream(newCustomerReceiver(hadoop102,9999))
//4.将每一行数据做切分,形成一个个单词
valwordstream=linestream.flatMap(_.split(\t));
//5.将单词映射成元组(word,1)
valwordAndOnestream=wordstream.map((_,1))
//6.将相同的单词次数做统计
valwordAndCountStream=wordAndOneStream.reduceByKey(_+_)
//7.打印
wordAndCountstream.print()
//8.启动sparkstreamingContext
ssc.start()
ssc.awaitTerminati