文档详情

flink计算pv uv代码实现海量资源.pdf

发布:2024-12-20约1.67万字共17页下载文档
文本预览下载声明

本文由简悦SimpRead转码,原文地址

上一我们学习了Flink消费Kafka数据计算PV和UV的水印和窗口设计,并且定义了窗口计算的

触发器,完成了计算PV和UV前的所有准备工作。

接下来就需要计算PV和UV了。在当前业务场景下,根据userId进行统计,PV需要对userId进行统

计,而UV则需要对userId进行去重统计。

下面我们使用不同的方法来统计PV和UV。

单窗口内存统计

这种方法需要把一天内所有的数据进行缓存,然后在内存中遍历接收的数据,进行PV和UV的叠加统

计。

StreamExecutionEnvironmentenv

StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setStateBackend(newMemoryStateBackend(true));

Propertiesproperties=newProperties();

properties.setProperty(bootstrap.servers,:9092);

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_M

ILLIS,10);

FlinkKafkaConsumerStringconsumer=newFlinkKafkaConsumer

(log_user_action,newSimpleStringSchema(),properties);

consumer.setStartFromEarliest();

DataStreamUserClickdataStream=env

.addSource(consumer)

.name(log_user_action)

.map(message-{

JSONObjectrecord=JSON.parseObject(message);

returnnewUserClick(

record.getString(user_id),

record.getLong(timestamp),

record.getString(action)

);

})

.returns(TypeInformation.of(UserClick.class));

SingleOutputStreamOperatorUserClickuserClickSingleOutputStreamOperator

dataStream.assignTimestampsAndWatermarks(new

BoundedOutOfOrdernessTimestampExtractorUserClick(Time.seconds(30)){

@Override

publiclongextractTimestamp(UserClickelement){

returnelement.getTimest

}

});

userClickSingleOutputStreamOperator

.windowAll(TumblingProcessingTimeWindows.of(Time.days(1),

Time.hours(-8)))

.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))

...

在上一中我们已经定义了全局的窗口,并且自定义触发器,每20秒触发一次计算输出中间结果。

我们在后面可以继续调用process方法,自定义ProcessFunction如下:

publicclassMyProcessFunctionextends

ProcessAllWindowFu

显示全部
相似文档