一、Flink 实时应用场景
二、实时数据体系架构
2.1 实时数据体系整体架构
其中,平台层详细工作如下:
平台监控部分一是对任务运行状态进行监控,对异常的任务进行报警并根据设定的参数对任务进行自动拉起与恢复,二是针对 Flink 任务要对 Kafka 消费处理延迟进行监控并实时报警。
数据据监控则分为两个部分,首先流式 ETL 是整个实时数据流转过程中重要的一环,ETL 的过程中会关联各种维表,实时关联时,定时对没有关联上的记录上报异常日志到监控平台,当数量达到一定阈值时触发报警, 其次,部分关键实时指标采用了 lambda 架构,因此需要对历史的实时指标与离线 hive 计算的数据定时做对比,提供实时数据的数据质量监控,对超过阈值的指标数据进行报警。
为了配合数据监控,需要做实时数据血缘,主要是梳理实时数据体系中数据依赖关系,以及实时任务的依赖关系,从底层ODS 到 DW 再到 DM,以及 DM 层被哪些模型用到, 将整个链条串联起来,这样做在数据/任务主动调整时可以通知关联的下游,指标异常时借助血缘定位问题,同时基于血缘关系的分析,我们也能评估数据的应用价值,核算数据的计算成本。
2.2 实时数据模型分层
公共维度层,基于维度建模理念思想,建立整个业务过程的一致性维度,降低数据计算口径和算法不统一风险;
DIM 层数据来源于两部分:一部分是Flink程序实时处理ODS层数据得到,另外一部分是通过离线任务出仓得到;
DIM 层维度数据主要使用 MySQL、Hbase、Redis 三种存储引擎,对于维表数据比较少的情况可以使用 MySQL,对于单条数据大小比较小,查询 QPS 比较高的情况,可以使用 Redis 存储,降低机器内存资源占用,对于数据量比较大,对维表数据变化不是特别敏感的场景,可以使用HBase 存储。
(1)数据集市层
以数据域+业务域的理念建设公共汇总层,对于DM层比较复杂,需要综合考虑对于数据落地的要求以及具体的查询引擎来选择不同的存储方式,分为轻度汇总层和高度汇总层,同时产出,高度汇总层数据用于前端比较简单的KV查询, 提升查询性能,比如实时大屏,实时报表等,数据的时效性要求为秒级,轻度汇总层Kafka中宽表实时写入OLAP存储引擎,用于前端产品复杂的OLAP查询场景,满足自助分析和产出复杂报表的需求,对数据的时效性要求可容忍到分钟级;
(2)轻度汇总层
轻度汇总层由明细层通过Streaming ETL得到,主要以宽表的形式存在,业务明细汇总是由业务事实明细表和维度表join得到,流量明细汇总是由流量日志按业务线拆分和维度表join得到;
轻度汇总层数据存储比较多样化,首先利用Flink实时消费DWD层Kafka中明细数据join业务过程需要的维表,实时打宽后写入该层的Kafka中,以Json或PB格式存储;
同时对多维业务明细汇总数据通过Flink实时写入Kudu,用于查询明细数据和更复杂的多维数据分析需求,对于流量数据通过Flink分别写入HDFS和ClickHouse用于复杂的多维数据分析, 实时特征数据则通过Flink join维表后实时写入HDFS,用于下游的离线ETL消费;
对于落地Kudu和HDFS的宽表数据,可用Spark SQL做分钟级的预计算,满足业务方复杂数据分析需求,提供分钟级延迟的数据,从而加速离线ETL过程的延迟, 另外随着Flink SQL与Hive生态集成的不断完善,可尝试用Flink SQL做离线ETL和OLAP计算任务(Flink流计算基于内存计算的特性,和presto非常类似,这使其也可以成为一个OLAP计算引擎),用一套计算引擎解决实时离线需求,从而实现批流统一;
对于Kudu中的业务明细数据、ClickHouse中的流量明细数据,也可以满足业务方的个性化数据分析需求,利用强大的OLAP计算引擎,实时查询明细数据,在10s量级的响应时间内给出结果,这类需求也即是实时OLAP需求,灵活性比较高。
(3)高度汇总层
高度汇总层由明细数据层或轻度汇总层通过聚合计算后写入到存储引擎中,产出一部分实时数据指标需求,灵活性比较差;
计算引擎使用Flink Datastream API和Flink SQL,指标存储引擎根据不同的需求,对于常见的简单指标汇总模型可直接放在MySQL里面,维度比较多的、写入更新比较大的模型会放在HBase里面, 还有一种是需要做排序、对查询QPS、响应时间要求非常高、且不需要持久化存储如大促活动期间在线TopN商品等直接存储在Redis里面;
在秒级指标需求中,需要混用Lambda和Kappa架构,大部分实时指标使用Kappa架构完成计算,少量关键指标(如金额相关)使用Lambda架构用批处理重新处理计算,增加一次校对过程。
总体来说 DM 层对外提供三种时效性的数据:
首先是 Flink 等实时计算引擎预计算好的秒级实时指标,这种需求对数据的时效性要求非常高,用于实时大屏、计算维度不复杂的实时报表需求。
其次是 Spark SQL 预计算的延迟在分钟级的准实时指标, 该类指标满足一些比较复杂但对数据时效性要求不太高的数据分析场景,可能会涉及到多个事实表的join,如销售归因等需求。
最后一种则是不需要预计算,ad-hoc查询的复杂多维数据分析场景,此类需求比较个性化,灵活性比较高,如果 OLAP 计算引擎性能足够强大,也可完全满足秒级计算需求的场景; 对外提供的秒级实时数据和另外两种准实时数据的比例大致为 3:7,绝大多数的业务需求都优先考虑准实时计算或 ad-hoc 方式,可以降低资源使用、提升数据准确性,以更灵活的方式满足复杂的业务场景。
2.3 实时数据体系建设方式
2.4 流批一体实时数据架构发展
三、Flink SQL 实时计算 UV 指标
3.1 Kafka 源数据解析
public class PageViewDeserializationSchema implements DeserializationSchema<Row> {
public static final Logger LOG = LoggerFactory.getLogger(PageViewDeserializationSchema.class);
protected SimpleDateFormat dayFormatter;
private final RowTypeInfo rowTypeInfo;
public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){
dayFormatter = new SimpleDateFormat("yyyyMMdd", Locale.UK);
this.rowTypeInfo = rowTypeInfo;
}
public Row deserialize(byte[] message) throws IOException {
Row row = new Row(rowTypeInfo.getArity());
MobilePage mobilePage = null;
try {
mobilePage = MobilePage.parseFrom(message);
String mid = mobilePage.getMid();
row.setField(0, mid);
Long timeLocal = mobilePage.getTimeLocal();
String logDate = dayFormatter.format(timeLocal);
row.setField(1, logDate);
row.setField(2, timeLocal);
}catch (Exception e){
String mobilePageError = (mobilePage != null) ? mobilePage.toString() : "";
LOG.error("error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e);
}
return null;
}
3.2 编写 Flink Job 主程序
public class RealtimeUV {
public static void main(String[] args) throws Exception {
//step1 从properties配置文件中解析出需要的Kakfa、Hbase配置信息、checkpoint参数信息
Map<String, String> config = PropertiesUtil.loadConfFromFile(args[0]);
String topic = config.get("source.kafka.topic");
String groupId = config.get("source.group.id");
String sourceBootStrapServers = config.get("source.bootstrap.servers");
String hbaseTable = config.get("hbase.table.name");
String hbaseZkQuorum = config.get("hbase.zk.quorum");
String hbaseZkParent = config.get("hbase.zk.parent");
int checkPointPeriod = Integer.parseInt(config.get("checkpoint.period"));
int checkPointTimeout = Integer.parseInt(config.get("checkpoint.timeout"));
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//step2 设置Checkpoint相关参数,用于Failover容错
sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class,
ProtobufSerializer.class);
sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false);
sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE);
sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout);
sEnv.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//step3 使用Blink planner、创建TableEnvironment,并且设置状态过期时间,避免Job OOM
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, environmentSettings);
tEnv.getConfig().setIdleStateRetentionTime(Time.days(1), Time.days(2));
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", sourceBootStrapServers);
sourceProperties.setProperty("auto.commit.interval.ms", "3000");
sourceProperties.setProperty("group.id", groupId);
//step4 初始化KafkaTableSource的Schema信息,笔者这里使用register TableSource的方式将源表注册到Flink中,而没有用register DataStream方式,也是因为想熟悉一下如何注册KafkaTableSource到Flink中
TableSchema schema = TableSchemaUtil.getAppPageViewTableSchema();
Optional<String> proctimeAttribute = Optional.empty();
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.emptyList();
Map<String, String> fieldMapping = new HashMap<>();
List<String> columnNames = new ArrayList<>();
RowTypeInfo rowTypeInfo = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
columnNames.addAll(Arrays.asList(schema.getFieldNames()));
columnNames.forEach(name -> fieldMapping.put(name, name));
PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema(
rowTypeInfo);
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
Kafka011TableSource kafkaTableSource = new Kafka011TableSource(
schema,
proctimeAttribute,
rowtimeAttributeDescriptors,
Optional.of(fieldMapping),
topic,
sourceProperties,
deserializationSchema,
StartupMode.EARLIEST,
specificOffsets);
tEnv.registerTableSource("pageview", kafkaTableSource);
//step5 初始化Hbase TableSchema、写入参数,并将其注册到Flink中
HBaseTableSchema hBaseTableSchema = new HBaseTableSchema();
hBaseTableSchema.setRowKey("log_date", String.class);
hBaseTableSchema.addColumn("f", "UV", Long.class);
HBaseOptions hBaseOptions = HBaseOptions.builder()
.setTableName(hbaseTable)
.setZkQuorum(hbaseZkQuorum)
.setZkNodeParent(hbaseZkParent)
.build();
HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxRows(1000)
.setBufferFlushIntervalMillis(1000)
.build();
HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions);
tEnv.registerTableSink("uv_index", hBaseSink);
//step6 实时计算当天UV指标sql, 这里使用最简单的group by agg,没有使用minibatch或窗口,在大数据量优化时最好使用后两种方式
String uvQuery = "insert into uv_index "
+ "select log_date,\n"
+ "ROW(count(distinct mid) as UV)\n"
+ "from pageview\n"
+ "group by log_date";
tEnv.sqlUpdate(uvQuery);
//step7 执行Job
sEnv.execute("UV Job");
}
}
2020 首场 Meetup 杭州站
好久不见,2020 你们心心念念的 Meetup 重磅回归啦!
5月16日,2020 首场 Meetup 重磅上线。一如既往,本次 Meetup 邀请了来自袋鼠云、网易云音乐、有赞及阿里巴巴的四位技术专家为您现场直播,让您足不出户,有直播看、有干货学、有奖品拿~
识别二维码,收看直播~~
关于我们: