Co-authored-by: 絵空事スピリット <wanglichen@starrocks.com>
This commit is contained in:
parent
fadb8061f3
commit
3d2a0d5301
|
|
@ -187,6 +187,12 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency
|
|||
**Default value**: -1<br/>
|
||||
**Description**: Supported since 1.2.10. The time duration for which the HTTP client waits for data. Unit: ms. The default value `-1` means there is no timeout.
|
||||
|
||||
### sink.sanitize-error-log
|
||||
|
||||
**Required**: No<br/>
|
||||
**Default value**: false<br/>
|
||||
**Description**: Supported since 1.2.12. Whether to sanitize sensitive data in the error log for production security. When this item is set to `true`, sensitive row data and column values in Stream Load error logs are redacted in both the connector and SDK logs. The value defaults to `false` for backward compatibility.
|
||||
|
||||
### sink.wait-for-continue.timeout-ms
|
||||
|
||||
**Required**: No<br/>
|
||||
|
|
@ -259,6 +265,12 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency
|
|||
**Default value**: NONE<br/>
|
||||
**Description**: The compression algorithm used for Stream Load. Valid values: `lz4_frame`. Compression for the JSON format requires Flink connector 1.2.10+ and StarRocks v3.2.7+. Compression for the CSV format only requires Flink connector 1.2.11+.
|
||||
|
||||
### sink.properties.prepared_timeout
|
||||
|
||||
**Required**: No<br/>
|
||||
**Default value**: NONE<br/>
|
||||
**Description**: Supported since 1.2.12 and only effective when `sink.version` is set to `V2`. Requires StarRocks 3.5.4 or later. Sets the timeout in seconds for the Transaction Stream Load phase from `PREPARED` to `COMMITTED`. Typically, only needed for exactly-once; at-least-once usually does not require setting this (the connector defaults to 300s). If not set in exactly-once, StarRocks FE configuration `prepared_transaction_default_timeout_second` (default 86400s) applies. See [StarRocks Transaction timeout management](./Stream_Load_transaction_interface.md#transaction-timeout-management).
|
||||
|
||||
## Data type mapping between Flink and StarRocks
|
||||
|
||||
| Flink data type | StarRocks data type |
|
||||
|
|
@ -312,16 +324,15 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency
|
|||
in Flink in this [blogpost](https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/).
|
||||
|
||||
- If the label prefix is not specified, lingering transactions will be cleaned up by StarRocks only after they time out. However the number of running transactions can reach the limitation of StarRocks `max_running_txn_num_per_db` if
|
||||
Flink jobs fail frequently before transactions time out. The timeout length is controlled by StarRocks FE configuration
|
||||
`prepared_transaction_default_timeout_second` whose default value is `86400` (1 day). You can set a smaller value to it
|
||||
to make transactions expired faster when the label prefix is not specified.
|
||||
Flink jobs fail frequently before transactions time out. You can set a smaller timeout for `PREPARED` transactions
|
||||
to make them expired faster when the label prefix is not specified. See the following about how to set the prepared timeout.
|
||||
|
||||
- If you are certain that the Flink job will eventually recover from checkpoint or savepoint after a long downtime because of stop or continuous failover,
|
||||
please adjust the following StarRocks configurations accordingly, to avoid data loss.
|
||||
|
||||
- `prepared_transaction_default_timeout_second`: StarRocks FE configuration, default value is `86400`. The value of this configuration needs to be larger than the downtime
|
||||
of the Flink job. Otherwise, the lingering transactions that are included in a successful checkpoint may be aborted because of timeout before you restart the
|
||||
Flink job, which leads to data loss.
|
||||
- Adjust `PREPARED` transaction timeout. See the following about how to set the timeout.
|
||||
|
||||
The timeout needs to be larger than the downtime of the Flink job. Otherwise, the lingering transactions that are included in a successful checkpoint may be aborted because of timeout before you restart the Flink job, which leads to data loss.
|
||||
|
||||
Note that when you set a larger value to this configuration, it is better to specify the value of `sink.label-prefix` so that the lingering transactions can be cleaned according to the label prefix and some information in
|
||||
checkpoint, instead of due to timeout (which may cause data loss).
|
||||
|
|
@ -329,13 +340,11 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency
|
|||
- `label_keep_max_second` and `label_keep_max_num`: StarRocks FE configurations, default values are `259200` and `1000`
|
||||
respectively. For details, see [FE configurations](./loading_introduction/loading_considerations.md#fe-configurations). The value of `label_keep_max_second` needs to be larger than the downtime of the Flink job. Otherwise, the Flink connector can not check the state of transactions in StarRocks by using the transaction labels saved in the Flink's savepoint or checkpoint and figure out whether these transactions are committed or not, which may eventually lead to data loss.
|
||||
|
||||
These configurations are mutable and can be modified by using `ADMIN SET FRONTEND CONFIG`:
|
||||
- How to set the timeout for PREPARED transactions
|
||||
|
||||
```SQL
|
||||
ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
|
||||
ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
|
||||
ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000");
|
||||
```
|
||||
- For Connector 1.2.12+ and StarRocks 3.5.4+, you can set the timeout by configuring the connector parameter `sink.properties.prepared_timeout`. By default, the value is not set, and it falls back to the StarRocks FE's global configuration `prepared_transaction_default_timeout_second` (default value is `86400`).
|
||||
|
||||
- For other versions of Connector or StarRocks, you can set the timeout by configuring the StarRocks FE's global configuration `prepared_transaction_default_timeout_second` (default value is `86400`).
|
||||
|
||||
### Flush Policy
|
||||
|
||||
|
|
|
|||
|
|
@ -187,6 +187,12 @@ Maven プロジェクトの `pom.xml` ファイルに、以下の形式で Flink
|
|||
**デフォルト値**: -1<br/>
|
||||
**説明**: 1.2.10 以降でサポートされています。HTTP クライアントがデータを待機する時間の長さ。単位: ms。デフォルト値 `-1` はタイムアウトがないことを意味します。
|
||||
|
||||
### sink.sanitize-error-log
|
||||
|
||||
**必須**: いいえ<br/>
|
||||
**デフォルト値**: false<br/>
|
||||
**説明**: 1.2.12以降でサポートされています。 生産環境のセキュリティのために、エラーログ内の機密データをサニタイズするかどうか。 この項目が` true` に設定されている場合、Stream Load エラーログ内の機密行データおよび列値は、コネクタと SDK の両方のログで編集されます。 互換性維持のため、デフォルト値は `false` です。
|
||||
|
||||
### sink.wait-for-continue.timeout-ms
|
||||
|
||||
**必須**: いいえ<br/>
|
||||
|
|
@ -259,6 +265,12 @@ Maven プロジェクトの `pom.xml` ファイルに、以下の形式で Flink
|
|||
**デフォルト値**: NONE<br/>
|
||||
**説明**: Stream Load に使用する圧縮アルゴリズム。有効な値:`lz4_frame`。JSON フォーマットの圧縮には、Flink connector 1.2.10+ と StarRocks v3.2.7+ が必要です。CSV フォーマットの圧縮には、Flink コネクタ 1.2.11+ のみが必要です。
|
||||
|
||||
### sink.properties.prepared_timeout
|
||||
|
||||
**必須**: いいえ<br/>
|
||||
**デフォルト値**: NONE<br/>
|
||||
**説明**: Flink コネクタ 1.2.12 以降でサポートされ、`sink.version` が `V2` に設定されている場合にのみ有効です。StarRocks 3.5.4 以降が必要です。トランザクションストリームロードフェーズにおける `PREPARED` から `COMMITTED` までのタイムアウトを秒単位で設定します。通常、exactly-once のみに必要です。at-least-once では通常設定不要(コネクタのデフォルトは300秒)。exactly-once で設定されていない場合、StarRocks FE 設定の `prepared_transaction_default_timeout_second`(デフォルト 86400 秒)が適用されます。詳細は[StarRocks トランザクションのタイムアウト管理](./Stream_Load_transaction_interface.md#トランザクションのタイムアウト管理)を参照してください。
|
||||
|
||||
## Flink と StarRocks 間のデータ型マッピング
|
||||
|
||||
| Flink データ型 | StarRocks データ型 |
|
||||
|
|
@ -299,23 +311,21 @@ Maven プロジェクトの `pom.xml` ファイルに、以下の形式で Flink
|
|||
|
||||
- ラベルプレフィックスが指定されている場合、Flink コネクタはラベルプレフィックスを使用して、Flink の失敗シナリオで生成される可能性のある残存トランザクションをクリーンアップします。これらの残存トランザクションは、Flink ジョブがチェックポイント中に失敗した場合などに一般的に `PREPARED` ステータスになります。Flink ジョブがチェックポイントから復元されると、Flink コネクタはラベルプレフィックスとチェックポイントの情報に基づいてこれらの残存トランザクションを見つけて中止します。Flink ジョブが終了すると、exactly-once を実装するための二段階コミットメカニズムのため、Flink コネクタはこれらのトランザクションが成功したチェックポイントに含まれるべきかどうかの通知を受け取っていないため、Flink ジョブが終了するときに中止することはできません。これらのトランザクションが中止されるとデータが失われる可能性があります。Flink でエンドツーエンドの exactly-once を達成する方法については、この [ブログ記事](https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/) を参照してください。
|
||||
|
||||
- ラベルプレフィックスが指定されていない場合、残存トランザクションはタイムアウト後にのみ StarRocks によってクリーンアップされます。ただし、Flink ジョブが頻繁に失敗すると、トランザクションがタイムアウトする前に StarRocks の `max_running_txn_num_per_db` の制限に達する可能性があります。タイムアウトの長さは、StarRocks FE の設定 `prepared_transaction_default_timeout_second` によって制御され、デフォルト値は `86400`(1 日)です。ラベルプレフィックスが指定されていない場合は、トランザクションがより早く期限切れになるように、これを小さい値に設定できます。
|
||||
- ラベルプレフィックスが指定されていない場合、長期間実行中のトランザクションはタイムアウト後にのみ StarRocks によってクリーンアップされます。ただし、トランザクションがタイムアウトする前にFlinkジョブが頻繁に失敗すると、実行中のトランザクション数がStarRocksの `max_running_txn_num_per_db` 制限に達する可能性があります。ラベルプレフィックスが指定されていない場合、`PREPARED` トランザクションのタイムアウトを短く設定することで、より早く期限切れにすることができます。準備済みトランザクションのタイムアウト設定方法については以下を参照してください。
|
||||
|
||||
- Flink ジョブが停止または継続的なフェイルオーバーの後に長時間のダウンタイムから最終的にチェックポイントまたはセーブポイントから復元されることが確実である場合、データ損失を避けるために次の StarRocks 設定を適切に調整してください。
|
||||
|
||||
- `prepared_transaction_default_timeout_second`: StarRocks FE の設定で、デフォルト値は `86400` です。この設定の値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink ジョブを再起動する前にタイムアウトのために成功したチェックポイントに含まれる残存トランザクションが中止され、データ損失が発生する可能性があります。
|
||||
- `PREPARED` トランザクションのタイムアウトを調整します。タイムアウトの設定方法については以下を参照してください。
|
||||
|
||||
この設定に大きな値を設定する場合、ラベルプレフィックスの値を指定することをお勧めします。これにより、残存トランザクションはタイムアウトによる(データ損失を引き起こす可能性がある)代わりに、ラベルプレフィックスとチェックポイントの情報に基づいてクリーンアップされます。
|
||||
タイムアウトは Flink ジョブのダウンタイムよりも長く設定する必要があります。そうしないと、正常なチェックポイントに含まれる未処理トランザクションが、Flink ジョブを再起動する前にタイムアウトにより中止され、データ損失が発生する可能性があります。
|
||||
|
||||
- `label_keep_max_second` および `label_keep_max_num`: StarRocks FE の設定で、デフォルト値はそれぞれ `259200` および `1000` です。詳細については、[FE 設定](./loading_introduction/loading_considerations.md#fe-configurations) を参照してください。`label_keep_max_second` の値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink コネクタは Flink のセーブポイントまたはチェックポイントに保存されたトランザクションラベルを使用して StarRocks のトランザクションの状態を確認し、これらのトランザクションがコミットされているかどうかを判断できず、最終的にデータ損失につながる可能性があります。
|
||||
|
||||
これらの設定は変更可能であり、`ADMIN SET FRONTEND CONFIG` を使用して変更できます。
|
||||
- PREPARED トランザクションのタイムアウト設定方法
|
||||
|
||||
```SQL
|
||||
ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
|
||||
ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
|
||||
ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000");
|
||||
```
|
||||
- コネクタ 1.2.12 以降および StarRocks 3.5.4 以降では、コネクタパラメータ `sink.properties.prepared_timeout` を設定することでタイムアウトを設定できます。デフォルトでは値は設定されておらず、StarRocks FE のグローバル設定 `prepared_transaction_default_timeout_second`(デフォルト値は `86400`)がフォールバックされます。
|
||||
|
||||
- その他のバージョンのコネクタまたは StarRocks では、StarRocks FEのグローバル設定 `prepared_transaction_default_timeout_second`(デフォルト値は`86400`)を設定することでタイムアウトを設定できます。
|
||||
|
||||
### フラッシュポリシー
|
||||
|
||||
|
|
|
|||
|
|
@ -112,6 +112,7 @@ Flink connector JAR 文件的命名格式如下:
|
|||
| sink.max-retries | No | 3 | Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。该参数只在 `sink.version` 为 `V1` 才会生效。 |
|
||||
| sink.connect.timeout-ms | No | 30000 | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 Flink connector v1.2.9 之前,默认值为 `1000`。 |
|
||||
| sink.socket.timeout-ms | No | -1 | 此参数自 Flink connector 1.2.10 开始支持。HTTP 客户端等待数据的超时时间。单位:毫秒。默认值 `-1` 表示没有超时时间。|
|
||||
| sink.sanitize-error-log | No | false | 此参数自 Flink connector 1.2.12 开始支持。用于控制是否对错误日志中的敏感数据进行清理,保护生产环境数据安全。当此项设置为 `true` 时,Stream Load 错误日志中的敏感行数据和列值将在连接器和 SDK 日志中被屏蔽。为保持向后兼容性,默认值为 `false`。 |
|
||||
| sink.wait-for-continue.timeout-ms | No | 10000 | 此参数自 Flink connector 1.2.7 开始支持。等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 |
|
||||
| sink.ignore.update-before | No | TRUE | 此参数自 Flink connector 1.2.8 开始支持。将数据导入到主键表时,是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果将此参数设置为 false,则将该记录在主键表中视为 DELETE 操作。 |
|
||||
| sink.parallelism | No | NONE | 写入的并行度。仅适用于 Flink SQL。如果未设置, Flink planner 将决定并行度。**在多并行度的场景中,用户需要确保数据按正确顺序写入。** |
|
||||
|
|
@ -124,6 +125,7 @@ Flink connector JAR 文件的命名格式如下:
|
|||
| sink.properties.partial_update_mode | No | row | 指定部分更新的模式,取值包括 `row` 和 `column`。<ul><li>`row`(默认值),指定使用行模式执行部分更新,比较适用于较多列且小批量的实时更新场景。</li><li>`column`,指定使用列模式执行部分更新,比较适用于少数列并且大量行的批处理更新场景。在该场景,开启列模式,更新速度更快。例如,在一个包含 100 列的表中,每次更新 10 列(占比 10%)并更新所有行,则开启列模式,更新性能将提高 10 倍。</li></ul> |
|
||||
| sink.properties.strict_mode | No | false | 是否为 Stream Load 启用严格模式。在导入数据中出现不合格行(如列值不一致)时,严格模式会影响导入行为。有效值: `true` 和 `false`。具体参考 [STREAM LOAD](../sql-reference/sql-statements/loading_unloading/STREAM_LOAD.md)。 |
|
||||
| sink.properties.compression | No | NONE | 用于 Stream Load 的压缩算法。有效值:`lz4_frame`。压缩 JSON 格式需要 Flink Connector 1.2.10+ 和 StarRocks v3.2.7+。压缩 CSV 格式仅需要 Flink Connector 1.2.11+。 |
|
||||
| sink.properties.prepared_timeout | No | NONE | 自 Flink Connector 1.2.12版本起支持,且仅当 `sink.version` 为 `V2` 时生效。需StarRocks 3.5.4 及以上版本。设置事务 Stream Load 阶段从 `PREPARED` 到 `COMMITTED` 的超时时间(单位:秒)。通常仅需在 exactly-once 模式下设置;at-least-once 模式通常无需设置(Connector 默认值为 300 秒)。若在精确一次模式下未设置,则采用 StarRocks FE 配置项 `prepared_transaction_default_timeout_second`(默认 86400 秒)。详见[StarRocks 事务超时管理](./Stream_Load_transaction_interface.md#超时管理)。 |
|
||||
|
||||
## 数据类型映射
|
||||
|
||||
|
|
@ -167,23 +169,24 @@ Flink connector JAR 文件的命名格式如下:
|
|||
- 如果 Flink connector 版本为 1.2.8 及更高,则建议指定 `sink.label-prefix` 的值。需要注意的是,label 前缀在 StarRocks 的所有类型的导入作业中必须是唯一的,包括 Flink job、Routine Load 和 Broker Load。
|
||||
|
||||
- 如果指定了 label 前缀,Flink connector 将使用 label 前缀清理因为 Flink job 失败而生成的未完成事务,例如在checkpoint 进行过程中 Flink job 失败。如果使用 `SHOW PROC '/transactions/<db_id>/running';` 查看这些事务在 StarRock 的状态,则返回结果会显示事务通常处于 `PREPARED` 状态。当 Flink job 从 checkpoint 恢复时,Flink connector 将根据 label 前缀和 checkpoint 中的信息找到这些未完成的事务,并中止事务。当 Flink job 因某种原因退出时,由于采用了两阶段提交机制来实现 exactly-once语义,Flink connector 无法中止事务。当 Flink 作业退出时,Flink connector 尚未收到来自 Flink checkpoint coordinator 的通知,说明这些事务是否应包含在成功的 checkpoint 中,如果中止这些事务,则可能导致数据丢失。您可以在这篇[文章](https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)中了解如何在 Flink 中实现端到端的 exactly-once。
|
||||
- 如果未指定 label 前缀,则未完成的事务将在超时后由 StarRocks 清理。然而,如果 Flink job 在事务超时之前频繁失败,则运行中的事务数量可能会达到 StarRocks 的 `max_running_txn_num_per_db` 限制。超时长度由 StarRocks FE 配置 `prepared_transaction_default_timeout_second` 控制,默认值为 `86400`(1天)。如果未指定 label 前缀,您可以设置一个较小的值,使事务更快超时。
|
||||
|
||||
- 若未指定 label 前缀,StarRocks 仅会在超时后清理滞留事务。但若 Flink 作业在事务超时前频繁失败,运行中的事务数量可能达到 StarRocks `max_running_txn_num_per_db` 的限制。当标签前缀未指定时,可为 `PREPARED` 事务设置更短的超时时间使其更快失效。关于预备状态超时设置方法,请参阅以下说明。
|
||||
|
||||
- 如果您确定 Flink job 将在长时间停止后最终会使用 checkpoint 或 savepoint 恢复,则为避免数据丢失,请调整以下 StarRocks 配置:
|
||||
|
||||
- `prepared_transaction_default_timeout_second`:StarRocks FE 参数,默认值为 `86400`。此参数值需要大于 Flink job 的停止时间。否则,在重新启动 Flink job 之前,可能会因事务超时而中止未完成事务,这些事务可能包含在成功 checkpoint 中的,如果中止,则会导致数据丢失。
|
||||
|
||||
请注意,当您设置一个较大的值时,则建议指定 `sink.label-prefix` 的值,则 Flink connector 可以根据 label 前缀和检查点中的一些信息来清理未完成的事务,而不是因事务超时后由 StarRocks 清理(这可能会导致数据丢失)。
|
||||
- 调整 `PREPARED` 事务超时。关于如何设置超时,请参阅以下说明。
|
||||
|
||||
该超时时间需大于 Flink 作业的停机时间。否则,在重启 Flink 作业前,包含在成功 checkpoint 中的滞留事务可能因超时而被中止,导致数据丢失。
|
||||
|
||||
请注意:当您将此配置值设为较大数值时,建议同时指定 `sink.label-prefix` 的值,以便根据标签前缀和检查点中的信息清理滞留事务,而非依赖超时机制(后者可能导致数据丢失)。
|
||||
|
||||
- `label_keep_max_second` 和 `label_keep_max_num`:StarRocks FE 参数,默认值分别为 `259200` 和 `1000`。更多信息,参见[FE 配置](./loading_introduction/loading_considerations.md#fe-配置)。`label_keep_max_second` 的值需要大于 Flink job 的停止时间。否则,Flink connector 无法使用保存在 Flink 的 savepoint 或 checkpoint 中的事务 label 来检查事务在 StarRocks 中的状态,并判断这些事务是否已提交,最终可能导致数据丢失。
|
||||
|
||||
您可以使用 `ADMIN SET FRONTEND CONFIG` 修改上述配置。
|
||||
- 如何设置 `PREPARED` 事务的超时时间
|
||||
|
||||
```SQL
|
||||
ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
|
||||
ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
|
||||
ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000");
|
||||
```
|
||||
- 对于 Connector 1.2.12+ 和 StarRocks 3.5.4+,可通过配置连接器参数 `sink.properties.prepared_timeout` 设置超时值。默认情况下该值未设置,此时将回退至 StarRocks FE 的全局配置 `prepared_transaction_default_timeout_second`(默认值为 `86400`)。
|
||||
|
||||
- 对于其他版本的连接器或 StarRocks,可通过配置 StarRocks FE 的全局配置项 `prepared_transaction_default_timeout_second`(默认值为 `86400`)来设置超时。
|
||||
|
||||
### Flush 策略
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue