下面是我的管道的高级视图:
MQTT -> -> Kafka/MQTT桥产生器-> Kafka连接-> AWS动态数据流-> Lambda函数(解码Kinesis.record.data) ->存储DynamoDB
基本上,整个流程正常工作,但是当我试图读取kinesis.record.data时,我得到了一种奇怪的"json“格式。
这不是utf8。它包括utf8 8编码的引号-字符串编码- JSON记录,由二进制头分隔(所以看起来是JSON通过一个可打印的可商过滤器,然后被打包成二进制格式)。我不认为这是什么特定的动态,我已经浪费了无数小时的调试-它必须来自卡夫卡制片人或中间,卡夫卡连接.
下面是python代码,我将从各种MQTT主题中获取我的度量标准,并使用Kafka -python将消息发送给我的kafka生产者:
kafka_producer = KafkaProducer( bootstrap_servers= "10.0.0.129:9092",
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
data = {
"datetime": str(datetime.now()),
"topic": str(msg.topic),
"value": str(msg.payload.decode()),
"environment": "test-v1"
}
kafka_producer.send("greenforge-events", data)我在连接-分布式模式下运行Kafka的属性:
bootstrap.servers=PLAINTEXT://kafka:9092
group.id=connect-cluster
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins我读过一些关于卡夫卡连接的编码和解码的文章,但我对此很陌生。从我的python代码中可以看到,我成功地发送了我的JSON (作为一个字符串,但是在属性文件中,我试图将它设置为JSONConverter,但没有成功).
下面是我的连接器配置:
name=greenforges
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=greenforge-events
region=us-east-2
streamName=greenforge-prototype-stream
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstandings records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true最后,当我尝试阅读kinesis.record.data时,我得到的是:
const stream = "84mawgoBMBqfAQgAGpoBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE0MzY0OSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlycHJlc3N1cmUiLCAidmFsdWUiOiAie1wiaGVjdG8gcGFzY2Fsc1wiOiA2NDJ9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqdAQgAGpgBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE1ODMxOCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvY2FzaW5ndGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xNzc3MzgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xODUwNjAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMTkyNzg4IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqjAQgAGp4BeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjIwNTU3NiIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTkvaGlnaGxldmVsZmxvYXRzd2l0Y2giLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiB0cnVlfSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anwEIABqaAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjIyMDAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L2FpcnByZXNzdXJlIiwgInZhbHVlIjogIntcImhlY3RvIHBhc2NhbHNcIjogNzQ4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0argEIABqpAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjc4ODgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk2L2Nhc2luZ3RlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogMTUuMDMzODQ4NDg3NTEwNzY5fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anQEIABqYAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yNjU4ODIiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L3NvbGVub2lkdmFsdmUiLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiBmYWxzZX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq8BCAAaqgF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMjcxMTg2IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5OS9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IC0yLjk5NDY5NTUwNzkxNjc0MDh9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqsAQgAGqcBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjE3MTI0NCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiAtMTMuMjE1NTQ4ODAzNjQxMTU3fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xNzc5OTMiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xODU2NzAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMTkzMTUzIiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqqAQgAGqUBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjIxOTIwMSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTYvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA0NS44MDQ0OTA2MDc4OTEwNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq4BCAAaqQF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMjI5OTc3IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5Ni9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IDE1LjAzMzg0ODQ4NzUxMDc2OX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ90UczjB09DYqhb40gF9vKEA==";
var payload = Buffer.from(stream, 'base64').toString('utf8');
console.log('Decoded payload: %s', payload);这是日志:
Decoded payload: ��
0��
{
"datetime": "2022-10-11 14:46:16.143649",
"topic": "greenforge/uuid/utility1/airpressure",
"value": "{\"hecto pascals\": 642}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.158318",
"topic": "greenforge/uuid/utility1/casingtemperature",
"value": "{\"celsius\": 8}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.177738",
"topic": "greenforge/uuid/utility1/co2",
"value": "{\"parts per million\": 1346}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.185060",
"topic": "greenforge/uuid/utility3/airtemperature",
"value": "{\"celsius\": -22.902578536320426}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.192788",
"topic": "greenforge/uuid/utility3/airpressure",
"value": "{\"hecto pascals\": 1054}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.205576",
"topic": "greenforge/uuid/utility9/highlevelfloatswitch",
"value": "{\"enabled\": true}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.222200",
"topic": "greenforge/uuid/utility9/airpressure",
"value": "{\"hecto pascals\": 748}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.227888",
"topic": "greenforge/uuid/utility6/casingtemperature",
"value": "{\"celsius\": 15.033848487510769}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.265882",
"topic": "greenforge/uuid/utility9/solenoidvalve",
"value": "{\"enabled\": false}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:16.271186",
"topic": "greenforge/uuid/utility9/casingtemperature",
"value": "{\"celsius\": -2.9946955079167408}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:17.171244",
"topic": "greenforge/uuid/utility1/airtemperature",
"value": "{\"celsius\": -13.215548803641157}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:17.177993",
"topic": "greenforge/uuid/utility1/co2",
"value": "{\"parts per million\": 1346}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:17.185670",
"topic": "greenforge/uuid/utility3/airtemperature",
"value": "{\"celsius\": -22.902578536320426}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:17.193153",
"topic": "greenforge/uuid/utility3/airpressure",
"value": "{\"hecto pascals\": 1054}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:17.219201",
"topic": "greenforge/uuid/utility6/airtemperature",
"value": "{\"celsius\": 45.80449060789106}",
"environment": "test-v1"
}
��
{
"datetime": "2022-10-11 14:46:17.229977",
"topic": "greenforge/uuid/utility6/casingtemperature",
"value": "{\"celsius\": 15.033848487510769}",
"environment": "test-v1"
}
�G3�=
��o� ��我110%确信奇怪的字符表示流记录数据结构,例如:
(
topic='greenforge-events',
partition=0,
offset=24,
timestamp=21321421312312,
key=None,
value={JSON_Object},
checksum=321321,
serialized_key_size=-1
serialized_value_size=49
)我在这里错过了什么?
发布于 2022-10-12 18:09:33
基于打印的有效载荷,数据正在按某些自定义二进制格式在Kinesis中进行分组处理。
您可能需要可以分析并将逐行插入到下游系统中的单个Kinesis记录,这样您就可以设置
aggregation=falsehttps://github.com/awslabs/kinesis-kafka-connector#kafka-kinesis-streams-connectorproperties
那么lambda中的JSON.parse(payload)应该可以工作。
如果您确实希望对记录进行批处理,则可以使用Python进行批处理。
kafka_producer = ...
counter = 0
events = []
for msg in mqtt_consumer:
data = {
"datetime": str(datetime.now()),
"topic": msg.topic,
"value": msg.payload.decode(),
"environment": "test-v1"
}
events.append(data)
counter += 1
if counter % 10 == 0: # for example, 10 at a time
kafka_producer.send("greenforge-events", events)
events.clear()
counter = 0
if events: # send remainder if for-loop exits with error
kafka_producer.send("greenforge-events", events)https://stackoverflow.com/questions/74035089
复制相似问题