作为一款高性能 MPP 数据库,Apache Doris 在 PB 级数据分析场景中表现出色,但许多用户在数据导入阶段常陷入选择困境。本文将彻底拆解 Stream Load、Broker Load、Routine Load、Insert into 四大核心方案,从原理解析到导入调优,从数据转换到故障排查,提供一站式解决方案。
Client(提交任务) → [FE (协调任务)] → BE (数据写入/均衡)
FE节点:负责元数据管理、任务调度
BE节点:数据导入、副本同步、数据分片存储
★注:stream load可以直接指定BE导入数据
维度 | Stream Load | Broker Load | Routine Load | Insert into |
|---|---|---|---|---|
使用场景 | 本地文件 / 程序写入 | 对象存储 / HDFS | Kafka 实时写入 | JBDC 接口 / 外部表 / 对象存储 / HDFS |
单次导入数据量 | 小于 10GB | 可达数百 GB | 微批导入 MB 到 GB | 视内存而定 |
支持文件格式 | CSV、JSON、Parquet、ORC | CSV、JSON、Parquet、ORC | CSV、JSON | SQL 方式获取 |
转化案例
LOAD LABEL db.transform_demo (
DATA INFILE("hdfs://path/data.csv")
INTO TABLE target_table
COLUMNS TERMINATED BY ","
(raw_ts, uid, temp_value, dyn_col)
PRECEDING FILTER uid = 666
SET (
event_time = from_unixtime(raw_ts),
value = if(temp_value = 'null', null, cast(temp_value as DECIMAL(10,2))),
category = case
when dyn_col in ('A','B') then'Group1'
else'Group2'
end
)
WHERE raw_ts > 666
)
关键技术点:
优先判断是不是资源的瓶颈,搜集CPU、内存、IO、网卡的监控!!!
通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。
主要针对:
参数配置优化:
参数 | 默认值 | 优化建议 |
|---|---|---|
group_commit_interval_ms | 1000 | 根据业务对数据可见性延迟的容忍度来设置 |
group_commit_data_bytes | 104857600 | 根据系统内存资源和数据可靠性要求来权衡。 |
group_commit | off_mode | sync_mode:适用于高并发写入场景;async_mode:适用于写入延迟敏感以及高频写入 |
MemTable 前移进一步减少导入过程中的开销,MemTable 将生成的 Segment 数据发给下游节点,减少了数据多次编码的开销,同时使内存反压更准确和及时。此外,我们使用了 Streaming RPC 来替代了 Ping-pong RPC,减少了数据传输过程中的等待。
MemTable 前移在 2.1 版本中默认开启。
★如果在使用过程中遇到问题、希望回退到原有的导入方式,可以在 MySQL 连接中设置环境变量 enable_memtable_on_sink_node=false 来关闭 MemTable 前移。
该能力会从多个副本中选择一个副本作为主副本(其他副本为从副本),且只对主副本进行计算,当主副本的数据文件都写入成功后,通知从副本所在节点直接拉取主副本的数据文件,实现副本间的数据同步,当所有从副本节点拉取完后进行返回或超时返回(大多数副本成功即返回成功)。
开启参数
FE 配置:enable_single_replica_load = true
BE 配置:enable_single_replica_load = true
环境变量(insert into): set experimental_enable_single_replica_insert = true;
Stream Load
设置sync_tablet_meta=false(无高可用需求时),减少元数据同步。
控制并发数不超过 BE 的 HTTP Server 线程数(默认 48)(be 参数webserver_num_workers)。
Routine Load
任务配置:
增大批次参数:max_batch_interval=60s
max_batch_size=1G。(注意数据可见性的时间要求)
调整并发数:desired_concurrent_number需小于 BE 的routine_load_consumer_pool_size(默认 10)。
Fe:max_routine_load_task_num_per_be=1024,max_routine_load_task_concurrent_num=256
Be:routine_load_consumer_pool_size=10
Broker Load
并行度调优:
设置任务参数:load_parallelism=8、send_batch_parallelism=1。(后者仅在关闭 memtable 前移时生效)
调整 FE 全局参数:max_broker_concurrency=10。
★本次导入并发数 = Math.min(源文件大小/min_bytes_per_broker_scanner,max_broker_concurrency,当前BE节点个数 * load_parallelism)
提高compaction速度
BE 参数
max_base_compaction_task_num_per_disk:默认值 2,每个磁盘最大compaction任务数 (视情况增大)
vertical_compaction_num_columns_per_group:默认值 5,在列式 compaction 中,组成一个合并组的列个数 (视情况增大)
max_base_compaction_threads :默认值 2, Base Compaction 线程池中线程数量的最大值,-1 表示每个磁盘一个线程。(视情况增大)
FE 参数
async_loading_load_task_pool_size:默认值10, loading_load任务执行程序池大小。该池大小限制了正在运行的最大 loading_load任务数。当前,它仅限制 broker load的 loading_load任务的数量。
通过本文的系统性梳理,读者可精准匹配业务场景与技术方案。建议收藏本文作为 Doris 数据导入的案头手册,随时应对各类数据接入挑战。