文档详情

大数据平台与编程第11章 Apache Flink入门.pptx

发布:2024-09-12约2.28千字共77页下载文档
文本预览下载声明

第11章ApacheFlink入门;?Flink概述;;;;;;;;;;;;;?Flink运行架构;;;;;;;;;?流处理DataStreamAPI;;;;;;;;;;;;;properties.setProperty(group.id,consumer-group);

properties.setProperty(key.deserializer,mon.serialization.StringDeserializer);

properties.setProperty(value.deserializer,mon.serialization.StringDeserializer);

properties.setProperty(auto.offset.reset,latest);

//2.从kafka读取数据(kafka从/Alingyuzi/article/details/115282103这里查看)

DataStreamStringdataStream=env.addSource(newFlinkKafkaConsumerString(

dblab,

newSimpleStringSchema(),//序列化与反序列化方式

properties));

//3.打印输出

dataStream.print();

//4.执行

env.execute();

}

};;//定义一个随机数发生器

Randomrandom=newRandom();

//设置10个传感器的初始温度

HashMapString,DoublesensorTempMap=newHashMap();

for(inti=0;i10;i++){

sensorTempMap.put(sensor_+(i+1),60+random.nextGaussian()*20);//生成高斯分布

}

while(running){

for(StringsensorId:

sensorTempMap.keySet()){

//在当前温度基础上随即波动

DoublenewTemp=sensorTempMap.get(sensorId)+random.nextGaussian();

sensorTempMap.put(sensorId,newTemp);

sourceContext.collect(newSensorReading(sensorId,System.currentTimeMillis(),newTemp));

}

//控制输出频率

Thread.sleep(1000L);

}

}

@Override

publicvoidcancel(){

running=false;

}

}

};;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;DataStreamStringinputStream=env.addSource(newFlinkKafkaConsumerString(sensor,newSimpleStringSchema(),properties));

//转换成SensorReading类型

DataStreamStringdataStream=inputStream.map(line-{

String[]fields=line.split(,);

returnnewSensorReading(fields[0],newLong(fiel

显示全部
相似文档