flink计算pv uv代码实现海量资源.pdf
本文由简悦SimpRead转码,原文地址
上一我们学习了Flink消费Kafka数据计算PV和UV的水印和窗口设计,并且定义了窗口计算的
触发器,完成了计算PV和UV前的所有准备工作。
接下来就需要计算PV和UV了。在当前业务场景下,根据userId进行统计,PV需要对userId进行统
计,而UV则需要对userId进行去重统计。
下面我们使用不同的方法来统计PV和UV。
单窗口内存统计
这种方法需要把一天内所有的数据进行缓存,然后在内存中遍历接收的数据,进行PV和UV的叠加统
计。
StreamExecutionEnvironmentenv
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStateBackend(newMemoryStateBackend(true));
Propertiesproperties=newProperties();
properties.setProperty(bootstrap.servers,:9092);
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_M
ILLIS,10);
FlinkKafkaConsumerStringconsumer=newFlinkKafkaConsumer
(log_user_action,newSimpleStringSchema(),properties);
consumer.setStartFromEarliest();
DataStreamUserClickdataStream=env
.addSource(consumer)
.name(log_user_action)
.map(message-{
JSONObjectrecord=JSON.parseObject(message);
returnnewUserClick(
record.getString(user_id),
record.getLong(timestamp),
record.getString(action)
);
})
.returns(TypeInformation.of(UserClick.class));
SingleOutputStreamOperatorUserClickuserClickSingleOutputStreamOperator
dataStream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractorUserClick(Time.seconds(30)){
@Override
publiclongextractTimestamp(UserClickelement){
returnelement.getTimest
}
});
userClickSingleOutputStreamOperator
.windowAll(TumblingProcessingTimeWindows.of(Time.days(1),
Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
...
在上一中我们已经定义了全局的窗口,并且自定义触发器,每20秒触发一次计算输出中间结果。
我们在后面可以继续调用process方法,自定义ProcessFunction如下:
publicclassMyProcessFunctionextends
ProcessAllWindowFu