我们正在寻找在warp10中摄取数据的最佳方法。我们使用的是一种主要使用Kafka的微服务架构。有两种解决办法:
正如这里所描述的,从现在开始,我们基于数据的聚合(x秒)使用了Ingress解决方案,并调用Ingress来发送每个数据包的数据。(而不是每次我们需要插入一些东西时调用API )。
几天来,我们一直在试验卡夫卡插件。我们成功地设置了插件并创建了一个.mc2,负责使用来自给定主题的数据,然后使用UPDATE将它们插入到warp10中。
问题:
UPDATE Kafka插件中是否有任何特定的实现,允许使用主题中的每条消息并为每个消息调用函数?MC2文件:
{
'topics' [ 'our_topic_name' ] // List of Kafka topics to subscribe to
'parallelism' 1 // Number of threads to start for processing the incoming messages. Each thread will handle a certain number of partitions.
'config' { // Map of Kafka consumer parameters
'bootstrap.servers' 'kafka-headless:9092'
'group.id' 'senx-consumer'
'enable.auto.commit' 'true'
}
'macro' <%
// macro executed each time a kafka record is consumed
/*
// received record format :
{
'timestamp' 123 // The record timestamp
'timestampType' 'type' // The type of timestamp, can be one of 'NoTimestampType', 'CreateTime', 'LogAppendTime'
'topic' 'topic_name' // Name of the topic which received the message
'offset' 123 // Offset of the message in 'topic'
'partition' 123 // Id of the partition which received the message
'key' ... // Byte array of the message key
'value' ... // Byte array of the message value
'headers' { } // Map of message headers
}
*/
"recordArray" STORE
"preprod.write" "token" STORE
// macro can be called on timeout with an empty entry map
$recordArray SIZE 0 !=
<%
$recordArray 'value' GET // kafka record value is retrieved in bytes
'UTF-8' BYTES-> // convert bytes to string (WARP10 INGRESS format)
JSON->
"value" STORE
"Records received through Kafka" LOGMSG
$value LOGMSG
$value
<%
DROP
PARSE
// PARSE outputs a gtsList, including only one gts
0 GET
// GTS rename is required to use UPDATE function
"gts" STORE
$gts $gts NAME RENAME
%>
LMAP
// Store GTS in Warp10
$token
UPDATE
%>
IFT
%> // end macro
'timeout' 10000 // Polling timeout (in ms), if no message is received within this delay, the macro will be called with an empty map as input
}发布于 2021-07-28 13:28:37
如果您想在Warp 10中缓存一些东西以避免每秒进行大量更新,则可以使用SHM (SHared存储器)。这是一个你需要激活的内置扩展。
激活之后,将其与SHMSTORE和SHMLOAD一起使用,以便在两次WarpScript执行期间将对象保存在内存中。
在您的示例中,您可以使用+!将所有传入的GTS推送到列表或GTS列表中,以将元素添加到现有列表中。
然后,缓存中所有GTS的合并(按名称+标签)和数据库中的更新可以在运行程序中完成(不要忘记使用MUTEX)。
别忘了总运营成本:
如果输入数据远不是最佳的入口格式,那么执行这些反序列化/序列化/反序列化操作是有意义的。如果您想要RANGECOMPACT您的数据以节省磁盘空间,或者进行任何预处理,这也是有意义的。
https://stackoverflow.com/questions/68558921
复制相似问题