首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >与卡夫卡连接混淆的编码和解码

与卡夫卡连接混淆的编码和解码
EN

Stack Overflow用户
提问于 2022-10-11 23:45:13
回答 1查看 142关注 0票数 0

下面是我的管道的高级视图:

MQTT -> -> Kafka/MQTT桥产生器-> Kafka连接-> AWS动态数据流-> Lambda函数(解码Kinesis.record.data) ->存储DynamoDB

基本上,整个流程正常工作,但是当我试图读取kinesis.record.data时,我得到了一种奇怪的"json“格式。

这不是utf8。它包括utf8 8编码的引号-字符串编码- JSON记录,由二进制头分隔(所以看起来是JSON通过一个可打印的可商过滤器,然后被打包成二进制格式)。我不认为这是什么特定的动态,我已经浪费了无数小时的调试-它必须来自卡夫卡制片人或中间,卡夫卡连接.

下面是python代码,我将从各种MQTT主题中获取我的度量标准,并使用Kafka -python将消息发送给我的kafka生产者:

代码语言:javascript
复制
kafka_producer = KafkaProducer( bootstrap_servers= "10.0.0.129:9092",
                                value_serializer=lambda v: json.dumps(v).encode('utf-8'))
data = {
                "datetime": str(datetime.now()),
                "topic": str(msg.topic),
                "value": str(msg.payload.decode()),
                "environment": "test-v1"
        }
kafka_producer.send("greenforge-events", data)

我在连接-分布式模式下运行Kafka的属性:

代码语言:javascript
复制
bootstrap.servers=PLAINTEXT://kafka:9092
group.id=connect-cluster
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins

我读过一些关于卡夫卡连接的编码和解码的文章,但我对此很陌生。从我的python代码中可以看到,我成功地发送了我的JSON (作为一个字符串,但是在属性文件中,我试图将它设置为JSONConverter,但没有成功).

下面是我的连接器配置:

代码语言:javascript
复制
name=greenforges
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=greenforge-events
region=us-east-2
streamName=greenforge-prototype-stream
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if 
# threshold for outstandings records have reached 
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms) 
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true

最后,当我尝试阅读kinesis.record.data时,我得到的是:

代码语言:javascript
复制
const stream = "84mawgoBMBqfAQgAGpoBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE0MzY0OSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlycHJlc3N1cmUiLCAidmFsdWUiOiAie1wiaGVjdG8gcGFzY2Fsc1wiOiA2NDJ9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqdAQgAGpgBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE1ODMxOCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvY2FzaW5ndGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xNzc3MzgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xODUwNjAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMTkyNzg4IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqjAQgAGp4BeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjIwNTU3NiIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTkvaGlnaGxldmVsZmxvYXRzd2l0Y2giLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiB0cnVlfSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anwEIABqaAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjIyMDAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L2FpcnByZXNzdXJlIiwgInZhbHVlIjogIntcImhlY3RvIHBhc2NhbHNcIjogNzQ4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0argEIABqpAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjc4ODgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk2L2Nhc2luZ3RlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogMTUuMDMzODQ4NDg3NTEwNzY5fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anQEIABqYAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yNjU4ODIiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L3NvbGVub2lkdmFsdmUiLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiBmYWxzZX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq8BCAAaqgF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMjcxMTg2IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5OS9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IC0yLjk5NDY5NTUwNzkxNjc0MDh9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqsAQgAGqcBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjE3MTI0NCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiAtMTMuMjE1NTQ4ODAzNjQxMTU3fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xNzc5OTMiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xODU2NzAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMTkzMTUzIiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqqAQgAGqUBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjIxOTIwMSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTYvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA0NS44MDQ0OTA2MDc4OTEwNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq4BCAAaqQF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMjI5OTc3IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5Ni9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IDE1LjAzMzg0ODQ4NzUxMDc2OX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ90UczjB09DYqhb40gF9vKEA==";
        
var payload = Buffer.from(stream, 'base64').toString('utf8');
console.log('Decoded payload: %s', payload);

这是日志:

代码语言:javascript
复制
Decoded payload: ��
0��
{
    "datetime": "2022-10-11 14:46:16.143649",
    "topic": "greenforge/uuid/utility1/airpressure",
    "value": "{\"hecto pascals\": 642}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.158318",
    "topic": "greenforge/uuid/utility1/casingtemperature",
    "value": "{\"celsius\": 8}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.177738",
    "topic": "greenforge/uuid/utility1/co2",
    "value": "{\"parts per million\": 1346}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.185060",
    "topic": "greenforge/uuid/utility3/airtemperature",
    "value": "{\"celsius\": -22.902578536320426}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.192788",
    "topic": "greenforge/uuid/utility3/airpressure",
    "value": "{\"hecto pascals\": 1054}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.205576",
    "topic": "greenforge/uuid/utility9/highlevelfloatswitch",
    "value": "{\"enabled\": true}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.222200",
    "topic": "greenforge/uuid/utility9/airpressure",
    "value": "{\"hecto pascals\": 748}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.227888",
    "topic": "greenforge/uuid/utility6/casingtemperature",
    "value": "{\"celsius\": 15.033848487510769}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.265882",
    "topic": "greenforge/uuid/utility9/solenoidvalve",
    "value": "{\"enabled\": false}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.271186",
    "topic": "greenforge/uuid/utility9/casingtemperature",
    "value": "{\"celsius\": -2.9946955079167408}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.171244",
    "topic": "greenforge/uuid/utility1/airtemperature",
    "value": "{\"celsius\": -13.215548803641157}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.177993",
    "topic": "greenforge/uuid/utility1/co2",
    "value": "{\"parts per million\": 1346}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.185670",
    "topic": "greenforge/uuid/utility3/airtemperature",
    "value": "{\"celsius\": -22.902578536320426}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.193153",
    "topic": "greenforge/uuid/utility3/airpressure",
    "value": "{\"hecto pascals\": 1054}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.219201",
    "topic": "greenforge/uuid/utility6/airtemperature",
    "value": "{\"celsius\": 45.80449060789106}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.229977",
    "topic": "greenforge/uuid/utility6/casingtemperature",
    "value": "{\"celsius\": 15.033848487510769}",
    "environment": "test-v1"
}
�G3�=
��o� ��

我110%确信奇怪的字符表示流记录数据结构,例如:

代码语言:javascript
复制
(
 topic='greenforge-events',
 partition=0,
 offset=24,
 timestamp=21321421312312,
 key=None,
 value={JSON_Object},
 checksum=321321,
 serialized_key_size=-1
 serialized_value_size=49
)

我在这里错过了什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-12 18:09:33

基于打印的有效载荷,数据正在按某些自定义二进制格式在Kinesis中进行分组处理。

您可能需要可以分析并将逐行插入到下游系统中的单个Kinesis记录,这样您就可以设置

代码语言:javascript
复制
aggregation=false

https://github.com/awslabs/kinesis-kafka-connector#kafka-kinesis-streams-connectorproperties

那么lambda中的JSON.parse(payload)应该可以工作。

如果您确实希望对记录进行批处理,则可以使用Python进行批处理。

代码语言:javascript
复制
kafka_producer = ...

counter = 0
events = []
for msg in mqtt_consumer:
    data = {
        "datetime": str(datetime.now()),
        "topic": msg.topic,
        "value": msg.payload.decode(),
        "environment": "test-v1"
    }
    events.append(data)
    counter += 1
    if counter % 10 == 0:  # for example, 10 at a time
        kafka_producer.send("greenforge-events", events)
        events.clear()
        counter = 0
if events:  # send remainder if for-loop exits with error
    kafka_producer.send("greenforge-events", events)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74035089

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档