Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。
title: Flink对接OBS date: 2023-01-30T19:09:43Z lastmod: 2023-02-01T09:37:38Z
Flink对接OBS
概述
Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。
本文将以Flink 1.16.0版本为基础,介绍Flink对接OBS的整个步骤,并提供样例程序。
注意事项
-
flink-obs-fs-hadoop目前仅支持OBS并行文件系统。
-
不推荐state状态数据存储在OBS上。
-
为了减少日志输出,在/opt/flink-1.16.0/conf/log4j.properties文件中增加配置:
logger.obs.name=com.obs logger.obs.level=ERROR -
flink-obs-fs-hadoop的实现基于flink的plugin加载机制(flink从1.9开始引入),flink-obs-fs-hadoop必须通过flink的plugin机制进行加载,即将flink-obs-fs-hadoop放入/opt/flink-1.16.0/plugins/obs-fs-hadoop目录下。
对接步骤
前提
-
若要正常运行flink,必须提前安装好Java JDK。执行以下命令,若如下图所示,返回Java版本号信息,说明Java JDK安装成功。注意,具体返回内容因JavaJDK版本号不同有所差异。
java -version
-
已经开通华为云obs服务,并获取obs服务的Access Key,Secret Key和endPoint信息。
一、安装flink
若已经安装flink,可跳过此环节。下面以flink-1.16.0为例,介绍整个安装过程。
-
下载flink-1.16.0-bin-scala_2.12.tgz,并解压到/opt/flink-1.16.0目录。
$ tar -zxvf flink-1.16.0-bin-scala_2.12.tgz -
修改环境变量
a. 打开环境变量
$ sudo vi /etc/profileb. 添加环境变量
export FLINK_HOME=/opt/flink-1.12.1 export PATH=$FLINK_HOME/bin:$PATHc. 使环境变量在当前环境中生效
$ source /etc/profile
二、安装flink-obs-fs-hadoop插件
-
在Github下载flink-obs-fs-hadoop:下载地址。说明:
- flink-obs-fs-{flinkversion}-hadoop-${}${version}.jar版本规则:flinkversion为对应的flink版本号,version为flink-obs-fs-hadoop版本号。
- 若没有匹配版本的jar包,可自行修改flink-obs-fs-hadoop目录下pom文件中的flink版本重新编译生成。详情见编译指南。
-
在/opt/flink-1.16.0/plugins目录下创建obs-fs-hadoop目录,并将上述jar放入此目录。
三、修改程序并验证
-
在/opt/flink-1.16.0/conf/flink-conf.yaml文件中设置如下参数,说明如表1所示。
表1 flink对接obs所需参数说明
参数 说明 fs.obs.impl obs文件系统实现类,默认为org.apache.hadoop.fs.obs.OBSFileSystem fs.obs.access.key 已开通obs对应华为账号的Access Key。获取访问密钥(AK/SK) fs.obs.secret.key 已开通obs对应华为账号的Secret Key。获取访问密钥(AK/SK) fs.obs.endpoint OBS终端节点名,各区域的终端节点详情请参见地区和终端节点。 fs.obs.buffer.dir 写数据到OBS时需要的本地临时目录,是绝对路径,flink程序需具备此目录读写权限 fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem fs.obs.access.key: xxx fs.obs.secret.key: xxx fs.obs.endpoint: xxx fs.obs.buffer.dir: /data/buf #写数据到OBS时需要的本地临时目录,flink程序需具备此目录读写权限 -
修改flink程序,适配obs。
-
以下代码适用于flink 1.16.0版本。示例中的obs-bucket需要替换为已开通obs服务的桶名,下同。
a. Checkpoint设置为OBS中的路径。示例:
env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("obs://obs-bucket/test/checkpoint");b. FileSink设置为OBS中的路径。示例:
FileSink<Alert> sink= FileSink.forRowFormat(new Path("obs://obs-bucket/test/sink"),new SimpleStringEncoder<Alert>("UTF-8")). withBucketAssigner(new BasePathBucketAssigner<Alert>()). withRollingPolicy(DefaultRollingPolicy.builder(). withMaxPartSize(10*1024).build()). withBucketCheckInterval(1000L).build(); -
以下代码适用于flink 1.12.1版本。
a. StateBackend设置为OBS中的路径。 示例:
env.setStateBackend(new mFsStateBackend("obs://obs-bucket/test/checkpoint"));b. FileSink设置为OBS中的路径。 示例:
final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path("obs://obs-bucket/test/data"),new SimpleStringEncoder<String>("UTF-8")) .withBucketAssigner(new BasePathBucketAssigner()) .withRollingPolicy(rollingPolicy) .withBucketCheckInterval(1000L) .build()
-
-
将程序打包,上传到/opt/flink-1.16.0/examples/路径下。
-
在/opt/flink-1.16.0/路径下,通过如下指令启动flink主进程。
$ ./bin/start-cluster.sh主进程启动后,通过如下指令启动flink任务进程。
$ ./bin/flink run ./examples/xxxx.jar -
登录华为云控制台,查看obs数据情况,如果成功创建文件夹,且内含计算结果数据,则说明flink对接obs插件已经生效。

四、异常场景问题定位
若测试过程出现问题,可以通过以下两个方法来定位问题。
-
在/opt/flink-1.16.0/log路径下,查看各项日志,是否有异常信息。
flink-root-standalonesession-0-ecs-a066.log:flink主进程日志。
flink-root-taskexecutor-0-ecs-a066.log:flink任务相关日志。

-
通过IDEA debug模式本地联调。
a. 修改启动配置,打开启动配置修改界面后,依次点击“Modify options”,“Add dependencies with "provided" scope to classpath”。将Flink相关依赖

b. 在src/resources/路径下,新增名为core-site.xml的配置文件,文件内容如下:
<configuration> <property> <name>fs.obs.access.key</name> <value>xxx</value> </property> <property> <name>fs.obs.secret.key</name> <value>xxx</value> </property> <property> <name>fs.obs.endpoint</name> <value>xxx</value> </property> <property> <name>fs.obs.buffer.dir</name> <value>D:\\tmp</value> </property> </configuration>c. 开启联调。
Introduction
Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。
Customize my domainDownloads
Total downloads (including clone, pull, ZIP & release downloads), updated by T+1.