文档详情

大数据平台与编程第10章 Spark Streaming.pptx

发布:2024-09-08约5.27千字共44页下载文档
文本预览下载声明

第10章SparkStreaming;?SparkStreaming概述;;;;?SparkStreaming工作原理;;;;;;?DStream创建;;//4.创建QueueInputDstream

valinputStream=ssc.queueStream(rddQueue,oneAtATime=false)

//5.处理队列中的RDD数据

valmappedStream=inputstream.map((_,1))

valreducedstream=mappedstream.reduceByKey(_+_)

//6.打印结果

reducedstream.print()

//7.启动任务

ssc.start()

//8.循环创建并向RDD队列中放入RDD

for(i-1to5){

rddQueue+=ssc.sparkContext.makeRDD(1to300,10)

Thread.sleep(2000)

}

ssc.awaitTermination()

}

};;//读数据并将数据发送给Spark

defreceive():Unit={

//创建一个socket

varsocket:socket=newSocket(host,port)

//定义一个变量,用来接收端口传过来的数据

varinput:string=nu11

//创建一个BufferedReader用于读取端口传来的数据

valreader=newBufferedReader(newInputStreamReader(socket.getInputStream,

StandardCharsets.UTF_8))

//读取数据

input=reader.readLine()

//当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark

while(!isStopped()input!=nu11){

store(input)

input=reader.readLine()

};

//跳出循环则关闭资源

reader.close()

socket.close()

//重启任务

restart(restart)

}

overridedefonstop():Unit={}

};2)使用自定义的数据源采集数据

objectFilestrean{

defmain(args:Array[String]):Unit={

//1.初始化spark配置信息

valsparkConf=newSparkConf().setMaster(local[*])

.setAppName(StreamWordCount)

//2.初始化SparkStreamingContext

valssc=newStreamingContext(sparkConf,Seconds(5))

//3.创建自定义receiver的streaming

vallineStream=ssc.receiverStream(newCustomerReceiver(hadoop102,9999))

//4.将每一行数据做切分,形成一个个单词

valwordstream=linestream.flatMap(_.split(\t));

//5.将单词映射成元组(word,1)

valwordAndOnestream=wordstream.map((_,1))

//6.将相同的单词次数做统计

valwordAndCountStream=wordAndOneStream.reduceByKey(_+_)

//7.打印

wordAndCountstream.print()

//8.启动sparkstreamingContext

ssc.start()

ssc.awaitTerminati

显示全部
相似文档