User Guide

Learn how to use the OmniStateStore feature effectively with this document. Ensure that OmniStateStore has been installed following instructions in the Installation Guide.

Using OmniStateStore

  1. Set the related configuration items in the $FLINK_HOME/conf/flink-conf.yaml file based on the service usage and operating environment. Note that the modification must be performed on the JobManager and all TaskManagers.

    The configuration item format is [Configuration item name] + [Colon] + [Space] + [Configuration item value]. For details about how to set the parameters, see [Configuration Items](#Configuration Items). The following is a configuration example:

## Enable the RocksDB state backend.
state.backend: rocksdb
state.backend.rocksdb.localdir: /data/rocksdb

## Set the OmniStateStore parameters.
state.backend.rocksdb.options-factory: com.huawei.falcon.state.RocksDBOptOptionsFactory
state.backend.rocksdb.falcon.use-partition-filter: true
state.backend.rocksdb.falcon.use-range-filter: true
state.backend.rocksdb.falcon.prefix-extractor.length: 13
state.backend.rocksdb.falcon.use-hash-memtable: true
state.backend.rocksdb.falcon.use-opt-join: true
state.backend.rocksdb.falcon.use-state-cache: true
state.backend.rocksdb.falcon.state-cache-sizeLimit: 20000
state.backend.rocksdb.falcon.state-cache-bypass-hitRatio: 0.2
state.backend.rocksdb.falcon.use-merge: true
  1. Start the Flink task, verify that the configuration items in the logs are set correctly, and check the logs to confirm that OmniStateStore is enabled. For details, see [Observing the Enabling Status of OmniStateStore](#Observing the Enabling Status of OmniStateStore).

Configuration Items

Table 1 OmniStateStore configuration items

Configuration Item Example Value Description Remarks
state.backend rocksdb State backend type, either in-memory or RocksDB. OmniStateStore can be enabled only when the state backend is set to RocksDB.
state.backend.rocksdb.localdir /data/rocksdb Drive path for the state backend. You are advised to set it to the NVMe drive path and ensure that the drive has sufficient storage space.
state.backend.rocksdb.options-factory com.huawei.falcon.state.RocksDBOptOptionsFactory Indicates whether to enable dynamic filter. The subfeatures of this technology can be configured separately. The default value is "null". -
state.backend.rocksdb.falcon.use-partition-filter true Subfeature 1 of dynamic filter, used to optimize point read/write operations on state. The default value is "false". -
state.backend.rocksdb.falcon.use-hash-memtable true Subfeature 2 of dynamic filter, used to optimize ValueState read/write operations. The default value is "false". -
state.backend.rocksdb.falcon.use-range-filter true Subfeature 3 of dynamic filter, used to optimize range queries on MapState. The default value is "false". -
state.backend.rocksdb.falcon.prefix-extractor.length 13 Parameter for subfeature 3 of dynamic filter. Its value specifies the storage length of the state prefix filter. A larger value indicates a smaller number of filters that can be stored but higher accuracy of state filtering. The default value is 13. The recommended maximum value is 21. Avoid increasing it beyond this limit.
state.backend.rocksdb.falcon.use-opt-join true Indicates whether to optimize StreamingJoinOperator data caching, used to reduce the frequency of MapState range queries in the operator. The default value is "false". -
state.backend.rocksdb.falcon.use-merge true Indicates whether to enable merge read/write optimization for StreamingJoinOperator, used to reduce the MapState read/write overhead of the operator. The default value is "false". -
state.backend.rocksdb.falcon.use-state-cache true Indicates whether to optimize ValueState caching, used to reduce the RocksDBValueState read and write overhead. The default value is "false". -
state.backend.rocksdb.falcon.state-cache-sizeLimit 20000 Number of ValueState entries that can be cached by a TaskManager. The state cache uses the heap memory of Flink. You need to evaluate the TaskManager memory usage based on the service type and data characteristics. The default value is 12,000. If the KV size is 200 bytes, each TaskManager will consume an additional 2.2 MB of memory. The recommended maximum value is 20,000. If memory resources are limited, the value can be reduced accordingly.
state.backend.rocksdb.falcon.state-cache-bypass-hitRatio 0.2 Threshold for bypassing ValueState caching. If the cache hit ratio falls below this value, state caching is disabled, and read/write operations revert to the native Flink mechanism. The default value is -1, indicating that state caching is never disabled. The recommended maximum value is 0.5 to ensure that state caching optimization is enabled in most scenarios. The recommended maximum value is 0.05 to prevent state caching when the cache hit ratio is low, avoiding additional performance overhead./td>

Observing the Enabling Status of OmniStateStore

After starting a Flink task, check the Flink logs to verify whether the OmniStateStore feature is enabled. The following table shows how to verify whether each OmniStateStore subfeature is enabled in the corresponding application scenario.

Table 2 Observing the enabling status of OmniStateStore

Feature Application Scenario How to Observe
Dynamic filter–Flink intelligent multi-stream awareness algorithm For ValueState, the MemTable structure is changed from a SkipList to a HashLinkList to improve point read/write performance. If "[FALCON] {StateName} is valueState, use HashLinkList as memTable structure" is displayed, the subfeature is enabled.
Dynamic filter–Prefix filtering For MapState, prefix filtering is used to eliminate redundant drive lookups, improving state range query performance. If "[FALCON] {StateName} is map, use range filter" is displayed, the subfeature is enabled.
Flink semantic state caching–Join operator data caching For StreamingJoinOperator, data caching is used to reduce the number of mapState range queries. If "[FALCON] enable miniBatch process for StreaminJoinOperator" is displayed, the subfeature is enabled.
Flink semantic state caching–ValueState caching For ValueState, state caching is used to reduce the overhead of point query and point write. If "[FALCON] <{StateName}, VALUE> enable falcon cache" is displayed, the subfeature is enabled.
Merge read/write optimization For StreamingJoinOperator, the Merge interface of RocksDB is used to replace the RMW operation. If "[FALCON] merge operation is used for left-records" is displayed, the subfeature is enabled.

Maintaining the Feature

To upgrade OmniStateStore, install the new version following instructions in the 《Installation Guide》. You do not need to uninstall the existing version.

To uninstall OmniStateStore, perform operations following instructions in the 《Installation Guide》 and delete related configuration items from the $FLINK_HOME/conf/flink-conf.yaml file.