用户指南
前提条件
请参考安装指南完成相应软件的安装。
使用特性
SQL算子和表达式支持情况
介绍在Flink 1.16.3、Flink 1.17.1和Flink 1.20.0引擎下,OmniStream Flink Native化特性对SQL算子及表达式(含数据类型)的支持范围、限制条件与使用规则。
OmniStream Flink Native化特性支持的算子、表达式、函数如表 2 支持的算子列表和表 3 支持的表达式列表所示,表格中使用符号表示算子和表达式是否支持,符号的含义请参见表 1 算子和表达式支持表格中符号的含义。
须知:
- 表 2 支持的算子列表和表 3 支持的表达式列表中仅描述了OmniStream Flink Native化特性支持或涉及的数据类型,未展示的数据类型是OmniStream Flink Native化特性不支持的。
- 如果使用OmniStream Flink Native化特性不支持的算子和表达式,会导致执行计划回退为原生执行,对性能会有影响。
- 使用sql-client交互式界面执行SQL时,推荐将SQL的结果输出到connector为blackhole的数据表中,具体可参考Nexmark Q0的执行方式。
- 由于内存限制,默认情况下只支持Calc和LookupJoin算子,其他支持的算子需要export FLINK_PERFORMANCE=false设置环境变量使能。
| 符号 | 含义 |
|---|---|
| S | 表示支持该算子或表达式。 |
| PS | 表示部分支持该算子或表达式,但存在一些限定条件。具体的限定条件请参见约束与限制。 |
| NS | 表示不支持该算子或表达式。 |
| NA | 表示不涉及该算子或表达式。开源版本Flink也没有此输入场景。 |
| [Blank Cell] | 表示不适用或需要确认。 |
| 支持算子名称 | BIGINT | VARCHAR | TIMESTAMP(3) |
|---|---|---|---|
| Calc | S | S | S |
| Sink | S | S | S |
| Csv Source | S | S | S |
| Kafka Source | S | S | S |
| Kafka Sink | S | S | S |
| Join | PS | PS | PS |
| LookupJoin | PS | PS | PS |
| GroupAggregate | PS | PS | PS |
| LocalGroupAggregate | PS | PS | PS |
| GlobalGroupAggregate | PS | PS | PS |
| IncrementalGroupAggregate | PS | PS | PS |
| LocalWindowAggregate | PS | PS | PS |
| GlobalWindowAggregate | PS | PS | PS |
| GroupWindowAggregate | PS | PS | PS |
| WindowAggregate | PS | PS | PS |
| WindowJoin | PS | PS | PS |
| Deduplicate | PS | PS | PS |
| Expand | PS | PS | PS |
| Rank | PS | PS | PS |
| 表达式 | 函数类型 | BIGINT | VARCHAR | NULL | TIMESTAMP(3) |
|---|---|---|---|---|---|
| * | Scalar Functions | S | NS | S | S |
| + | Scalar Functions | S | NS | S | S |
| - | Scalar Functions | S | NS | S | S |
| / | Scalar Functions | S | NS | S | S |
| LOWER | Scalar Functions | NA | S | NA | NA |
| SPLIT_INDEX | Scalar Functions | S | S | NA | NA |
| DATE_FORMAT | Scalar Functions | NA | NA | NA | S |
| COUNT_CHAR | Scalar Functions | NA | S | NA | NA |
| HOUR | Scalar Functions | S | NA | NS | S |
| REGEX_EXTRACT | Scalar Functions | NA | S | NS | NA |
| JSON_VALUE | Scalar Functions | NA | S | S | NA |
| JSON_QUERY | Scalar Functions | NA | S | S | NA |
| COALESCE | Scalar Functions | S | S | S | S |
| PROCTIME_MATERIALIZE | Scalar Functions | NA | NA | NA | S |
| CHAR_LENGTH | Scalar Functions | NA | S | NA | NA |
| TO_TIMESTAMP_LTZ | Scalar Functions | S | NA | S | NA |
DataStream算子和UDF支持情况
介绍在Flink 1.16.3引擎下,OmniStream Flink Native化特性对DataStream算子及用户自定义函数(UDF)的支持范围、限制条件与性能影响。
须知: 如果使用OmniStream Flink Native化特性不支持的DataStream算子和UDF,会导致执行计划回退为原生执行,对性能会有影响。
- OmniStream Flink Native化特性支持的DataStream算子包括Kafka Source、Kafka Sink、Map、Reduce、FlatMap和Filter。
- 从数据传输对象、Function类型、UDF依赖类及接口、Java类型翻译和Java语句翻译多个维度给出支持的UDF白名单,请参见支持的UDF白名单。
支持的数据传输对象包括Long、String和Tuple2<String, Long>。
支持的依赖类及接口如表 1 支持的表达式列表所示,其余约束请参见UDF翻译工具用户指南。环境配置不同可能会导致支持的表达式略有变化,如有差异,请联系华为一线工程师确认。
| Java类 | Java类接口 |
|---|---|
| Arrays | static <T> List<T> asList(Array); |
| HashMap(存取的元素均需要实现hashCode和equals方法) | Object get(Object key); Object put(Object key, Object value); void putAll(HashMap m); boolean containsKey(Object key); int size(); boolean remove(Object key)(与Java接口不同,当前不支持使用变量承接返回值。); Set<Map.Entry<Object,Object>> entrySet(); Set<Object> keySet(); HashMap clone(); |
| Iterator | boolean hasNext(); Object next(); |
| ArrayList | Object get(int index); void clear(); void add(Object e); Iterator iterator(); boolean contains(Object o); int size(); boolean isEmpty() |
| LinkedList | Object getFirst(); Object getLast(); void addLast(Object e); void addFirst(Object e); |
| Map.Entry(mapentry中的元素需实现hash和equals方法) | Object getKey(); Object getValue(); void setValue(Object value);(与Java接口不同,当前不支持使用变量承接返回值。) |
| HashSet(存取的元素需要实现hash和equals方法) | boolean addAll(ArrayList list); boolean add(Object e); boolean remove(Object o); boolean contains(Object o); int size(); void clear(); Iterator iterator(); |
| StringBuilder | StringBuilder append(String str); String toString(); |
| 数组(当前只支持对象类型一维数组,不支持基本类型数组及多维数组。) | 大小; 取元素; 存元素(只支持顺序存元素); |
| Integer | String toString(); bool equals(Integer *obj); overrideint intValue(); static Integer valueOf(String s); static Integer valueOf(int i); |
| Boolean | static Boolean valueOf; (boolean b)boolean booleanValue() |
| Long | int hashCode(); boolean equals(Long obj); String toString(); Long clone(); long longValue(); static Long valueOf(String s); static Long valueOf(long l); |
| Object | int hashCode(); bool equals(Object *obj); String toString(); Object clone(); |
| String | int hashCode(); boolean equals(String anObject); String toString(); Object clone(); String replace(String target, String replacement); String[] split(String regex);(暂时只支持字符串的split,不支持正则表达式。) String replaceAll(String regex, String replacement); int lastIndexOf(String str); int length(); String substring(int beginIndex); String substring(int beginIndex, int endIndex); boolean contains(String s); boolean endsWith(String suffix); boolean startsWith(String prefix); |
| Gson | String toJson(HashMap<String,String> map); Map fromJson(String json, Type typeOf);(只支持将String类型转为Map) |
| JsonObject | JsonObject getAsJsonObject(String memberName);(只支持String常量) |
| JsonParser | static JsonObject parseString(String json); |
| JsonPrimitive | boolean getAsBoolean(); |
| JsonElement | JsonObject getAsJsonObject(); double getAsDouble(); float getAsFloat(); int getAsInt(); long getAsLong(); short getAsShort(); boolean getAsBoolean(); String getAsString(); boolean isJsonNull(); String toString(); String toString(); |
| JsonArray | Iterator<JsonElement> iterator(); |
(SQL场景)使能OmniStream
在SQL场景下,详细描述从启动Flink集群到完成OmniStream使能的操作步骤。
-
进入flink_jm_8c32g容器,启动Flink集群。
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/flink-1.16.3/bin ./start-cluster.sh
须知:
每次退出并重新进入容器后,需要执行source /etc/profile命令重新注入环境变量,避免运行任务找不到依赖组件。 -
查看Job Manager和Task Manager是否启动成功。
-
在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
source /etc/profile jps存在StandaloneSessionClusterEntrypoint进程,表示Job Manager启动成功。

-
分别进入flink_tm1_8c32g、flink_tm2_8c32g容器查看是否存在TaskManagerRunner进程。下述命令以flink_tm1_8c32g容器为例:
docker exec -it flink_tm1_8c32g /bin/bash source /etc/profile jps存在TaskManagerRunner进程,表示Task Manager启动成功。

-
-
在flink_jm_8c32g容器中启动Nexmark。
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/nexmark/bin ./setup_cluster.sh -
分别进入flink_tm1_8c32g、flink_tm2_8c32g容器查看Nexmark是否启动成功。下述命令以flink_tm1_8c32g容器为例:
docker exec -it flink_tm1_8c32g /bin/bash source /etc/profile jps exit存在CpuMetricSender进程,表示Nexmark启动成功。

-
在flink_jm_8c32g容器执行Nexmark用例Query0。
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/nexmark/bin sh run_query.sh q0 exit观察执行结果输出,预期结果:用例运行完成且无报错。

-
在Task Manager所在容器上查看Flink最新.out日志文件。
docker exec -it flink_tm1_8c32g /bin/bash cd /usr/local/flink-1.16.3/log- 日志中提示
Shared Memory Metric Manager Loading Succeed!,表示Native so库已经正常加载。 - 日志中提示
welcome to native,表示已经成功使能OmniStream。

- 日志中提示
(DataStream场景)使能OmniStream
在DataStream场景下,详细描述从启动Flink集群到完成OmniStream使能的操作步骤。
-
如果是在多Task Manager场景下运行DataStream任务,需要在flink-conf.yaml文件中添加配置omni.batch: true,以提升多该场景下的shuffle效率,以达到更优性能。
-
进入容器flink_jm_8c32g在flink-conf.yaml文件中添加配置omni.batch: true。
```bash docker exec -it flink_jm_8c32g /bin/bash ``` -
打开
/usr/local/flink/conf/flink-conf.yaml文件。vi /usr/local/flink/conf/flink-conf.yaml -
按
i进入编辑模式,增加如下配置。omni.batch: true -
按
Esc键,输入 :wq!,按Enter保存并退出编辑。 -
依次进入容器flink_tm1_8c32g和flink_tm2_8c32g在flink-conf.yaml文件中添加配置omni.batch: true。
docker exec -it flink_tm1_8c32g /bin/bash vi /usr/local/flink/conf/flink-conf.yaml omni.batch: true docker exec -it flink_tm2_8c32g /bin/bash vi /usr/local/flink/conf/flink-conf.yaml omni.batch: true
-
-
进入flink_jm_8c32g容器,启动Flink集群。
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/flink-1.16.3/bin ./start-cluster.sh
须知:
每次退出并重新进入容器后,需要执行source /etc/profile命令重新注入环境变量,避免运行任务找不到依赖组件。 -
查看Job Manager和Task Manager是否启动成功。
-
在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
source /etc/profile jps存在StandaloneSessionClusterEntrypoint进程,表示Job Manager启动成功。

-
分别进入flink_tm1_8c32g、flink_tm2_8c32g容器查看是否存在TaskManagerRunner进程。下述命令以flink_tm1_8c32g容器为例。
docker exec -it flink_tm1_8c32g /bin/bash source /etc/profile jps存在TaskManagerRunner进程,表示Task Manager启动成功。

-
-
创建并配置Kafka消费者和生产者配置文件。
-
进入flink_tm1_8c32g容器。
docker exec -it flink_tm1_8c32g /bin/bash -
mkdir /opt/conf cd /opt/conf -
新增Kafka消费者配置文件kafka_consumer.conf。
fetch.queue.backoff.ms=20 group.id=omni max.poll.records=10000 -
新增Kafka生产者配置文件kafka_producer.conf。
queue.buffering.max.messages=2000000 queue.buffering.max.kbytes=20971520 queue.buffering.max.ms=5 linger.ms=5 batch.num.messages=200000 batch.size=3145728 max.push.records=10000 -
进入flink_tm2_8c32g,执行步骤4.ii~4.iv。
docker exec -it flink_tm2_8c32g /bin/bash
-
-
在物理机上启动ZooKeeper和Kafka,详情请参见《Kafka 部署指南》。
-
使用Kafka创建Topic并生成数据。
说明:
实际操作过程中,请将命令或脚本中所有物理机IP地址替换为实际的Kafka服务端的IP地址。-
创建Source和Sink的Topic。
cd /usr/local/kafka bin/kafka-topics.sh --create --bootstrap-server Kafka服务端的物理机IP地址:9092 --replication-factor 1 --partitions 1 --topic source_abcd bin/kafka-topics.sh --create --bootstrap-server Kafka服务端的物理机IP地址:9092 --replication-factor 1 --partitions 1 --topic result -
将下面的内容保存为脚本文件producer.sh。
#!/bin/bash # Kafka安装目录(请根据实际路径修改) KAFKA_HOME="/usr/local/kafka" TOPIC_NAME="source_abcd" # Kafka Topic名称 BROKER="IP地址:9092" # Kafka Broker服务端的IP地址 MESSAGE_COUNT=10 # 发送消息条数 # 检查Kafka console-producer.sh是否存在 if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then echo "错误: 未找到 kafka-console-producer.sh,请检查 KAFKA_HOME路径" exit 1 fi # 生成随机字符串并发送到Kafka for ((i=1; i<=$MESSAGE_COUNT; i++)); do # 生成4个随机字母(大小写混合) + 空格 + 1 RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1) MESSAGE="${RAND_STR} 1" # 格式: 4字母 + 空格 + 1 # 调用Kafka Producer发送消息 echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \ --bootstrap-server "$BROKER" \ --topic "$TOPIC_NAME" echo "已发送: $MESSAGE" done -
执行脚本文件,生成测试数据并写入Source Topic。
./producer.sh
-
-
构建作业JAR包。
-
进入物理机
/opt路径,创建/opt/job/src/main/java/com/huawei/boostkit路径。mkdir -p /opt/job/src/main/java/com/huawei/boostkit cd /opt/job/ -
创建Flink作业Java文件。
-
打开
/opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java。vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java -
按
i进入编辑模式,添加如下内容。package com.huawei.boostkit; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.util.Properties; public class FlinkWordCount { public static void main(String[] args) throws Exception { String broker = "ip:port"; String sourceTopic = "source_abcd"; String targetTopic = "result"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers(broker) .setTopics(sourceTopic) .setGroupId("your-group-id") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setProperties(properties) .build(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator"); KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers(broker) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(targetTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setKafkaProducerConfig(properties) .build(); DataStream<String> source; source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining(); SingleOutputStreamOperator<String> result = source.map(line -> line ); result.sinkTo(sink); result.disableChaining(); env.execute("Wordcount"); } } -
按
Esc键,输入 :wq!,按Enter保存并退出编辑。
-
-
创建pom.xml文件。
-
打开
/opt/job/pom.xml。vi /opt/job/pom.xml -
按
i进入编辑模式,添加如下内容。<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huawei.boostkit</groupId> <artifactId>ziliao</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <flink.version>1.16.3</flink.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>1.16.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> -
按
Esc键,输入 :wq!,按Enter保存并退出编辑。
-
-
执行mvn clean package打包命令后,将会在target目录下生成ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar。再将该JAR包上传到flink_jm_8c32g容器的
/usr/local/flink目录。mvn clean package docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
-
-
在flink_jm_8c32g容器导出环境变量。
export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$C_INCLUDE_PATH export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH -
修改UDF配置文件。
-
设置运行用例包名udf_package和主类名main_class。
vim /opt/udf-trans-opt/udf-translator/conf/udf_tune.properties -
按
i进入编辑模式,修改udf_package和main_class,修改为以下内容。udf_package=com.huawei.boostkit main_class=com.huawei.boostkit.FlinkWordCount -
按
Esc键,输入 wq!,按Enter保存并退出编辑。
-
-
翻译测试用例JAR包。
sh /opt/udf-trans-opt/udf-translator/bin/udf_translate.sh /usr/local/flink/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink -
在flink_jm_8c32g容器提交作业。
cd /usr/local/flink bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar -
查看Sink Topic的数据。
消费Kafka数据查看作业是否正常运行。
cd /usr/local/kafka bin/kafka-console-consumer.sh --bootstrap-server 服务端的物理机IP地址:9092 --topic result --from-beginning
-
在flink_jm_8c32g容器上查看最新的Flink客户端日志flink-root-client-xxx.log。
cd /usr/local/flink-1.16.3/log确认无报错信息,表示已经成功使能OmniStream。

维护特性
升级或卸载OmniStream时请满足操作规范。
须知: 当前暂不支持通过工具进行升级,因此如需要升级,请下载安装包重新安装。
请通过鲲鹏社区下载要升级的OmniStream Flink Native化软件安装包。
须知:
- 当前步骤仅供需要卸载OmniStream时参考,不属于部署OmniStream的必要操作步骤。
- 卸载OmniStream之前,请确保Flink引擎没有处于任务执行的状态。
下述卸载过程以安装目录为/opt/Dependency_library和/usr/local/OmniStream为例进行说明。
-
删除
/opt/Dependency_library和/usr/local/OmniStream部署软件时加入的依赖软件包。 -
修改
$FLINK_HOME/bin目录下的config.sh文件,恢复Flink默认配置。具体操作为,将安装指南-安装OmniStream-步骤3中的修改还原至未修改前的状态。
-
修改
$FLINK_HOME/conf目录下的flink-conf.yaml文件,恢复Flink默认配置。具体操作为,将安装指南-安装OmniStream-步骤4中的修改还原至未修改前的状态。