博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink 读取Kafka写入Hive
阅读量:7087 次
发布时间:2019-06-28

本文共 1315 字,大约阅读时间需要 4 分钟。

hot3.png

官方文档:

在流式处理系统中,Flink和kafka的结合很是经典。我们可以通过Flink消费Kafka数据,层层处理后,丢到Kafka另一个Topic,下游再处理该Topic的数据。而对于OLAP查询需求,我们往往需要将数据输出到 Hive。一般的,我们使用Parquet格式来存储(Spark对parquet的支持较好)。

Flink提供了bucket sink的模式将流式数据写入到文件中,在官方给的demo中,代码如下

import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;DataStream
input = ...;final StreamingFileSink
sink = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8")) .build();input.addSink(sink);

为了使用Parquet格式,我们还需要转换代码:

StreamingFileSink
streamingFileSink = StreamingFileSink. forBulkFormat(new Path(outputPath), ParquetAvroWriters.forReflectRecord(LogTest.class)) .withBucketAssigner(bucketAssigner) .build();

在测试过程中,会发现目录创建了,但文件全为空且处于inprogress状态。经过多番搜索未解决该问题。最后在官方文档中发现了这么一句:

IMPORTANT: Bulk-encoding formats can only be combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every checkpoint.

这说明Flink将一直缓存从Flink消费出来的数据,只有当Checkpoint 触发的时候,才把数据刷新到目标目录--即我们定义的parquet路径中。 加上启用CheckPoint之后,重新执行程序,可以发现文件成功写入了。

env.enableCheckpointing(3000);

其他思考:消费kafka输出到Parquet这一个过程,Flink能否保证一致性语义?

转载于:https://my.oschina.net/u/992559/blog/3057926

你可能感兴趣的文章
蓝牙音箱连接成功但没有声音还是电脑的声音
查看>>
ng-file-upload结合springMVC使用
查看>>
005 Hadoop的三种模式区别
查看>>
在笛卡尔坐标系上描绘函数 y=4x^2-2/4x-3
查看>>
ubuntu 下无损扩展分区
查看>>
Caused by: org.xml.sax.SAXParseException; lineNumber: 1
查看>>
手机资源共享
查看>>
Mahout-DistanceMeasure (数据点间的距离计算方法)
查看>>
在线研讨会网络视频讲座 - 方案设计利器Autodesk Infrastructure Modeler 2013
查看>>
【转】批量杀进程
查看>>
通过file_get_contents执行带参数的php
查看>>
Java 公历转农历,然后农历减一年(或者几天或者任意天),再把这个日期转成公历...
查看>>
Hibernate HQL查询:
查看>>
系统吞吐量(TPS)、用户并发量、性能测试概念和公式
查看>>
R语言笔记1--向量、数组、矩阵、数据框、列表
查看>>
大数进制转换 poj1220
查看>>
练习--LINUX进程间通信之有名管理FIFO
查看>>
使用memcached加速web应用实例
查看>>
Educational Codeforces Round 11 C. Hard Process 二分
查看>>
Android Camera 使用一例,视频聊天app
查看>>