首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Clickhouse Kafka引擎抛出异常

Clickhouse Kafka引擎抛出异常
EN

Stack Overflow用户
提问于 2018-01-08 09:00:38
回答 2查看 1K关注 0票数 1

我正在尝试使用Clickhouse Kafka引擎来摄取数据。数据采用CSV格式。在数据摄入过程中,有时我会遇到异常。

代码语言:javascript
复制
2018.01.08 08:41:47.016826 [ 3499 ] <Debug> StorageKafka (consumer_queue): Started streaming to 1 attached views
2018.01.08 08:41:47.016906 [ 3499 ] <Trace> StorageKafka (consumer_queue): Creating formatted reader
2018.01.08 08:41:49.680816 [ 3499 ] <Error> void DB::StorageKafka::streamThread(): Code: 117, e.displayText() = DB::Exception: Expected end of line, e.what() = DB::Exception, Stack trace:

0. clickhouse-server(StackTrace::StackTrace()+0x16) [0x3221296]
1. clickhouse-server(DB::Exception::Exception(std::string const&, int)+0x1f) [0x144a02f]
2. clickhouse-server() [0x36e6ce1]
3. clickhouse-server(DB::CSVRowInputStream::read(DB::Block&)+0x1a0) [0x36e6f60]
4. clickhouse-server(DB::BlockInputStreamFromRowInputStream::readImpl()+0x64) [0x36e3454]
5. clickhouse-server(DB::IProfilingBlockInputStream::read()+0x16e) [0x2bcae0e]
6. clickhouse-server(DB::KafkaBlockInputStream::readImpl()+0x6c) [0x32f6e7c]
7. clickhouse-server(DB::IProfilingBlockInputStream::read()+0x16e) [0x2bcae0e]
8. clickhouse-server(DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::atomic<bool>*)+0x55) [0x35b3e25]
9. clickhouse-server(DB::StorageKafka::streamToViews()+0x366) [0x32f54f6]
10. clickhouse-server(DB::StorageKafka::streamThread()+0x143) [0x32f58c3]
11. clickhouse-server() [0x40983df]
12. /lib/x86_64-linux-gnu/libpthread.so.0(+0x76ba) [0x7f4d115d06ba]
13. /lib/x86_64-linux-gnu/libc.so.6(clone+0x6d) [0x7f4d10bf13dd]

下表是

代码语言:javascript
复制
CREATE TABLE test.consumer_queue (ID Int32,  DAY Date) ENGINE = Kafka('broker-ip:port', 'clickhouse-kyt-test','clickhouse-kyt-test-group', '**CSV**')

CREATE TABLE test.consumer_request ( ID Int32,  DAY Date) ENGINE = MergeTree PARTITION BY DAY ORDER BY (DAY, ID) SETTINGS index_granularity = 8192

CREATE MATERIALIZED VIEW test.consumer_view TO test.consumer_request (ID Int32, DAY Date) AS SELECT ID, DAY FROM test.consumer_queue

CSV数据

代码语言:javascript
复制
10034,"2018-01-05"
10035,"2018-01-05"
10036,"2018-01-05"
10037,"2018-01-05"
10038,"2018-01-05"
10039,"2018-01-05"

Clickhouse服务器版本1.1.54318.

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-01-11 13:39:31

ClickHouse似乎读取了卡夫卡的一批消息,然后尝试将所有这些消息解码为一个CSV。并且这个CSV中的消息应该用新的行字符分隔。所以所有的消息都应该在结尾处有新的行字符。

我不确定它是ClickHouse的一个特性还是一个bug。

您可以尝试只发送一条消息给卡夫卡,并检查它在ClickHouse中是否正确。

如果您用脚本kafka-console-producer.sh向Kafka发送消息,则该脚本(类ConsoleProducer.scala)从文件中读取行,并将每行发送到没有新行字符的Kafka主题,因此无法正确处理此类消息。

如果您用自己的脚本/应用程序发送消息,那么您可以尝试修改它,并在每条消息的末尾添加新的行字符。这应该能解决这个问题。或者你可以为卡夫卡引擎使用另一种格式,例如JSONEachRow。

票数 1
EN

Stack Overflow用户

发布于 2019-07-19 09:11:30

同意@mikhail的答案,我想,试试kafka_row_delimiter = '\n‘在设置卡夫卡引擎

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48146978

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档