大数据平台与编程第11章 Apache Flink入门.pptx
第11章ApacheFlink入门;?Flink概述;;;;;;;;;;;;;?Flink运行架构;;;;;;;;;?流处理DataStreamAPI;;;;;;;;;;;;;properties.setProperty(group.id,consumer-group);
properties.setProperty(key.deserializer,mon.serialization.StringDeserializer);
properties.setProperty(value.deserializer,mon.serialization.StringDeserializer);
properties.setProperty(auto.offset.reset,latest);
//2.从kafka读取数据(kafka从/Alingyuzi/article/details/115282103这里查看)
DataStreamStringdataStream=env.addSource(newFlinkKafkaConsumerString(
dblab,
newSimpleStringSchema(),//序列化与反序列化方式
properties));
//3.打印输出
dataStream.print();
//4.执行
env.execute();
}
};;//定义一个随机数发生器
Randomrandom=newRandom();
//设置10个传感器的初始温度
HashMapString,DoublesensorTempMap=newHashMap();
for(inti=0;i10;i++){
sensorTempMap.put(sensor_+(i+1),60+random.nextGaussian()*20);//生成高斯分布
}
while(running){
for(StringsensorId:
sensorTempMap.keySet()){
//在当前温度基础上随即波动
DoublenewTemp=sensorTempMap.get(sensorId)+random.nextGaussian();
sensorTempMap.put(sensorId,newTemp);
sourceContext.collect(newSensorReading(sensorId,System.currentTimeMillis(),newTemp));
}
//控制输出频率
Thread.sleep(1000L);
}
}
@Override
publicvoidcancel(){
running=false;
}
}
};;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;DataStreamStringinputStream=env.addSource(newFlinkKafkaConsumerString(sensor,newSimpleStringSchema(),properties));
//转换成SensorReading类型
DataStreamStringdataStream=inputStream.map(line-{
String[]fields=line.split(,);
returnnewSensorReading(fields[0],newLong(fiel