obs-flink-plugins:Flink文件系统抽象实现,支持OBS作为数据读写与StateBackend载体

Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。

Branch3Tags0

title: Flink对接OBS date: 2023-01-30T19:09:43Z lastmod: 2023-02-01T09:37:38Z

Flink对接OBS

概述

Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。

本文将以Flink 1.16.0版本为基础,介绍Flink对接OBS的整个步骤,并提供样例程序。

注意事项

  • flink-obs-fs-hadoop目前仅支持OBS并行文件系统。

  • 不推荐state状态数据存储在OBS上。

  • 为了减少日志输出,在/opt/flink-1.16.0/conf/log4j.properties文件中增加配置:

    logger.obs.name=com.obs
    logger.obs.level=ERROR
    
  • flink-obs-fs-hadoop的实现基于flink的plugin加载机制(flink从1.9开始引入),flink-obs-fs-hadoop必须通过flink的plugin机制进行加载,即将flink-obs-fs-hadoop放入/opt/flink-1.16.0/plugins/obs-fs-hadoop目录下。

对接步骤

前提
  1. 若要正常运行flink,必须提前安装好Java JDK。执行以下命令,若如下图所示,返回Java版本号信息,说明Java JDK安装成功。注意,具体返回内容因JavaJDK版本号不同有所差异。

    java -version
    

    image

  2. 已经开通华为云obs服务,并获取obs服务的Access Key,Secret Key和endPoint信息。

若已经安装flink,可跳过此环节。下面以flink-1.16.0为例,介绍整个安装过程。

  1. 下载flink-1.16.0-bin-scala_2.12.tgz,并解压到/opt/flink-1.16.0目录。

    $ tar -zxvf  flink-1.16.0-bin-scala_2.12.tgz
    
  2. 修改环境变量

    a. 打开环境变量

    $ sudo vi /etc/profile
    

    b. 添加环境变量

    export FLINK_HOME=/opt/flink-1.12.1
    export PATH=$FLINK_HOME/bin:$PATH
    

    c. 使环境变量在当前环境中生效

    $ source /etc/profile
    
  1. 在Github下载flink-obs-fs-hadoop:下载地址。说明:

    • flink-obs-fs-{flinkversion}-hadoop-${}${version}.jar版本规则:flinkversion为对应的flink版本号,version为flink-obs-fs-hadoop版本号。
    • 若没有匹配版本的jar包,可自行修改flink-obs-fs-hadoop目录下pom文件中的flink版本重新编译生成。详情见编译指南
  2. 在/opt/flink-1.16.0/plugins目录下创建obs-fs-hadoop目录,并将上述jar放入此目录。

三、修改程序并验证
  1. 在/opt/flink-1.16.0/conf/flink-conf.yaml文件中设置如下参数,说明如表1所示。

    表1 flink对接obs所需参数说明

    参数 说明
    fs.obs.impl obs文件系统实现类,默认为org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key 已开通obs对应华为账号的Access Key。获取访问密钥(AK/SK)
    fs.obs.secret.key 已开通obs对应华为账号的Secret Key。获取访问密钥(AK/SK)
    fs.obs.endpoint OBS终端节点名,各区域的终端节点详情请参见地区和终端节点
    fs.obs.buffer.dir 写数据到OBS时需要的本地临时目录,是绝对路径,flink程序需具备此目录读写权限
    fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key: xxx
    fs.obs.secret.key: xxx
    fs.obs.endpoint: xxx
    fs.obs.buffer.dir: /data/buf #写数据到OBS时需要的本地临时目录,flink程序需具备此目录读写权限
    
  2. 修改flink程序,适配obs。

    • 以下代码适用于flink 1.16.0版本。示例中的obs-bucket需要替换为已开通obs服务的桶名,下同。

      a. Checkpoint设置为OBS中的路径。示例:

      env.setStateBackend(new HashMapStateBackend());
      		env.getCheckpointConfig().setCheckpointStorage("obs://obs-bucket/test/checkpoint");
      

      b. FileSink设置为OBS中的路径。示例:

      FileSink<Alert> sink= FileSink.forRowFormat(new Path("obs://obs-bucket/test/sink"),new SimpleStringEncoder<Alert>("UTF-8")).
      	withBucketAssigner(new BasePathBucketAssigner<Alert>()).
      	withRollingPolicy(DefaultRollingPolicy.builder().
      	withMaxPartSize(10*1024).build()).
      	withBucketCheckInterval(1000L).build();
      
    • 以下代码适用于flink 1.12.1版本。

      a. StateBackend设置为OBS中的路径。 示例:

      env.setStateBackend(new mFsStateBackend("obs://obs-bucket/test/checkpoint"));
      

      b. FileSink设置为OBS中的路径。 示例:

      final StreamingFileSink<String> sink = StreamingFileSink
      	.forRowFormat(new Path("obs://obs-bucket/test/data"),new SimpleStringEncoder<String>("UTF-8"))
      	.withBucketAssigner(new BasePathBucketAssigner())
      	.withRollingPolicy(rollingPolicy)
      	.withBucketCheckInterval(1000L)
      	.build()
      
  3. 将程序打包,上传到/opt/flink-1.16.0/examples/路径下。

  4. 在/opt/flink-1.16.0/路径下,通过如下指令启动flink主进程。

    $ ./bin/start-cluster.sh 
    

    主进程启动后,通过如下指令启动flink任务进程。

    $ ./bin/flink run ./examples/xxxx.jar
    
  5. 登录华为云控制台,查看obs数据情况,如果成功创建文件夹,且内含计算结果数据,则说明flink对接obs插件已经生效。image

四、异常场景问题定位

若测试过程出现问题,可以通过以下两个方法来定位问题。

  1. 在/opt/flink-1.16.0/log路径下,查看各项日志,是否有异常信息。

    flink-root-standalonesession-0-ecs-a066.log:flink主进程日志。

    flink-root-taskexecutor-0-ecs-a066.log:flink任务相关日志。

    image

  2. 通过IDEA debug模式本地联调。

    a. 修改启动配置,打开启动配置修改界面后,依次点击“Modify options”,“Add dependencies with "provided" scope to classpath”。将Flink相关依赖

    image

    b. 在src/resources/路径下,新增名为core-site.xml的配置文件,文件内容如下:

    <configuration>
        <property>
            <name>fs.obs.access.key</name>
            <value>xxx</value>
        </property>
        <property>
            <name>fs.obs.secret.key</name>
            <value>xxx</value>
        </property>
        <property>
            <name>fs.obs.endpoint</name>
            <value>xxx</value>
        </property>
        <property>
            <name>fs.obs.buffer.dir</name>
            <value>D:\\tmp</value>
        </property>
    </configuration>
    

    c. 开启联调。

Introduction

Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。

Customize my domain

Downloads

0

Total downloads (including clone, pull, ZIP & release downloads), updated by T+1.

Languages

Java100%