我正在运行一个Kafka代理程序,通过Python程序将消息推送到其中。为了有效地进行数据交换,我使用Apache格式。在卡夫卡经纪人那里,这条信息是由一条带有处理器的骆驼路线接收到的。在这个处理器中,我想反序列化消息,并最终希望将数据推送到InfluxDB。
过程机械工作,但在骆驼路线上,我没有得到我输入的数据。在Python方面,我创建了一个字典:
testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)Python端相应的Avro模式如下所示:
{
"namespace": "my.namespace",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "double_one", "type": "double"},
{"name": "double_two", "type": "double"},
{"name": "double_three", "type": "double"},
{"name": "time_stamp", "type": "long"}
]
}将avro格式的消息发送给Kafka的Python代码如下所示:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: FurtureRecordMetadata
"""
schema = avro.schema.parse(schemaDefinition)
writer = avro.io.DatumWriter(schema)
bytes_stream = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_stream)
writer.write(dataDict, encoder)
raw_bytes = bytes_stream.getvalue()
messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
return result消息按预期的方式到达代理,由camel接收并由以下JAVA代码处理:
from(kafkaEndpoint) //
.process(exchange -> {
Long kafkaInboundTime = Long
.parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
if (exchange.getIn().getHeader("kafka.KEY") != null) {
BinaryDecoder decoder = DecoderFactory.get()
.binaryDecoder(exchange.getIn().getBody(InputStream.class), null);
SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);
System.out.println(datumReader.read(null, decoder).toString());
}
}) //
.to(influxdbEndpoint);avroSchema当前硬编码在我类的构造函数中,如下所示:
avroSchema = SchemaBuilder.record("myRecord") //
.namespace("my.namespace") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();System.out.println的输出是
{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}很明显,出了点问题,但我不知道是什么。任何帮助都很感激。
更新1代码运行在Intel/Window机器上,卡夫卡(在VM中)和架构未知的Linux机器上的代码,这会不会是系统不同的终端性造成的呢?
更新1.1端点-可以排除。两边都查过了,都是“小”
作为检查,我将所有字段的模式定义更改为string类型。有了这个定义,值和键就被正确地传递了-- Python输入和Java/Camel输出是相同的。
更新3 -- camel rout生产者端点--卡夫卡没有任何特殊特性,如反序列化器等:
"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"发布于 2021-02-08 19:35:10
我找到了解决问题的办法。下面的Python代码将所需的输出输出到Kafka中:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: None
"""
schema = avro.schema.parse(schemaDefinition)
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer = DatumWriter(schema)
writer.write(dataDict, encoder)
raw_bytes = bytes_writer.getvalue()
self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)
try:
# NOTE: I use the 'AVRO' key to separate avro formatted messages from others
result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
except Exception as err:
print(err)
self._messageBrokerWriterConnection.flush()解决方案的关键是将valueDeserializer=...添加到Apache端的端点定义:
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
...
TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());然后,可以编写Apache路由代码,包括向InfluxDB点的转换,如下所示:
@Component
public class Route_TEST_QUEUE extends RouteBuilder {
Schema avroSchema = null;
private Route_TEST_QUEUE() {
avroSchema = SchemaBuilder //
.record("ElectronCoolerCryoMessage") //
.namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
}
private String fromEndpoint = TEST_QUEUE.definitionString();
@Override
public void configure() throws Exception {
from(fromEndpoint) //
.process(messagePayload -> {
byte[] data = messagePayload.getIn().getBody(byte[].class);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
GenericRecord record = datumReader.read(null, decoder);
try {
Point.Builder influxPoint = Point
.measurement(record.get("name").toString());
long acqStamp = 0L;
if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
acqStamp = (long) record.get("time_stamp");
} else {
acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
}
influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);
Map<String, Object> fieldMap = new HashMap<>();
avroSchema.getFields().stream() //
.filter(field -> !field.name().equals("keyFieldname")) //
.forEach(field -> {
Object value = record.get(field.name());
fieldMap.put(field.name().toString(), value);
});
influxPoint.fields(fieldMap);
} catch (Exception e) {
MessageLogger.logError(e);
}
}) //
.to(...InfluxEndpoint...) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.to("stream:out");
}
}
}这为我的目的工作-没有合流,只有卡夫卡。
https://stackoverflow.com/questions/66047812
复制相似问题