首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在ksql中读取管道分隔的值

在ksql中读取管道分隔的值
EN

Stack Overflow用户
提问于 2020-03-11 07:31:47
回答 1查看 362关注 0票数 0

我正在编写POC,我必须读取管道分隔的值文件,并将这些记录插入ms sql server中。我使用Content5.4.1来使用value_delimiter创建流属性。但它的例外:Delimeter only supported with DELIMITED format

1.开始汇合(版本: 5.4.1)::

代码语言:javascript
复制
[Dev root @ myip ~]
# confluent local start
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.vHhSRAnj
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
[Dev root @ myip ~]
# jps
49923 KafkaRestMain
50099 ConnectDistributed
49301 QuorumPeerMain
50805 KsqlServerMain
49414 SupportedKafka
52103 Jps
51020 ControlCenter
1741
49646 SchemaRegistryMain
[Dev root @ myip ~]
#

2.创建主题:

代码语言:javascript
复制
[Dev root @ myip ~]
# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic SampleData
Created topic SampleData.

3.向SampeData主题提供管道分离数据

代码语言:javascript
复制
[Dev root @ myip ~]
# kafka-console-producer --broker-list localhost:9092 --topic SampleData <<EOF
> this is col1|and now col2|and col 3 :)
> EOF
>>[Dev root @ myip ~]
#

4.启动KSQL::

代码语言:javascript
复制
[Dev root @ myip ~]
# ksql

                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2019 Confluent Inc.

CLI v5.4.1, Server v5.4.1 located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

5.为现有主题声明架构: SampleData

代码语言:javascript
复制
ksql> CREATE STREAM sample_delimited (
>       column1 varchar(1000),
>       column2 varchar(1000),
>       column3 varchar(1000))
>       WITH (KAFKA_TOPIC='SampleData', VALUE_FORMAT='DELIMITED', VALUE_DELIMITER='|');

 Message
----------------
 Stream created
----------------

6.将数据验证为KSQl流

代码语言:javascript
复制
ksql>  SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM sample_delimited emit changes limit 1;
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|ROWTIME                    |ROWKEY                     |COLUMN1                    |COLUMN2                    |COLUMN3                    |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|1584339233947              |null                       |this is col1               |and now col2               |and col 3 :)               |
Limit Reached
Query terminated

7.编写一个新的Kafka主题:将从sample_delimited流到Avro格式流的所有数据序列化的SampleDataAvro

代码语言:javascript
复制
ksql> CREATE STREAM sample_avro WITH (KAFKA_TOPIC='SampleDataAvro', VALUE_FORMAT='AVRO') AS SELECT * FROM sample_delimited;
Delimeter only supported with DELIMITED format
ksql>

8.上面的行给出了例外::

Delimeter only supported with DELIMITED format

9.加载ms sql kafka连接配置

confluent local load test-sink -- -d ./etc/kafka-connect-jdbc/sink-quickstart-mssql.properties

EN

回答 1

Stack Overflow用户

发布于 2020-03-11 13:38:48

唯一需要指定分隔符的时间是定义从源主题读取的流时。

下面是我的工作示例:

  1. 用管道分隔的数据填充主题: $ kafkacat -b localhost:9092 -t SampleData -P<
  2. 在上面声明一条溪流 创建流sample_delimited ( column1 varchar(1000),column2 varchar(1000),column3 varchar(1000));
  3. 查询流以确保其工作正常 ksql>设置'auto.offset.reset‘=’最早‘;成功地将本地属性'auto.offset.reset’更改为‘最早’。使用UNSET命令恢复您的更改。ksql> SELECT *从sample_delimited发出的更改限制为1;+----------------+--------+---------------+--------------+--------------+
  4. 将数据重新整理到Avro: 创建流sample_avro (KAFKA_sample_delimited=‘SampleDataAvro’,VALUE_FORMAT='AVRO')作为SELECT *从sample_delimited;
  5. 转储主题的内容-注意,现在是Avro: ksql>打印SampleDataAvro;键格式:未定义的值格式: AVRO : 3/11/20 1:33:04 PM UTC,key:,value:{"COLUMN1":"this is col1","COLUMN2":"and now col2","COLUMN3":"and col3 :)"}

您正在命中的错误是错误#4200的结果。您可以等待汇流平台的下一个版本,也可以使用已经解决了问题的独立ksqlDB

下面使用ksqlDB 0.7.1将数据流到MS:

代码语言:javascript
复制
CREATE SINK CONNECTOR SINK_MSSQL WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:sqlserver://mssql:1433',
    'connection.user'     = 'sa',
    'connection.password' = 'Admin123',
    'topics'              = 'SampleDataAvro',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'insert.mode'         = 'insert'
  );

现在查询MS中的数据。

代码语言:javascript
复制
1> Select @@version
2> go

---------------------------------------------------------------------
Microsoft SQL Server 2017 (RTM-CU17) (KB4515579) - 14.0.3238.1 (X64)
        Sep 13 2019 15:49:57
        Copyright (C) 2017 Microsoft Corporation
        Developer Edition (64-bit) on Linux (Ubuntu 16.04.6 LTS)

(1 rows affected)
代码语言:javascript
复制
1> SELECT * FROM SampleDataAvro;
2> GO
COLUMN3        COLUMN2         COLUMN1     
-------------- --------------- ------------------
and col 3 :)   and now col2    this is col1

(1 rows affected)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60631130

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档