首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在warp10中摄取数据-性能提示

在warp10中摄取数据-性能提示
EN

Stack Overflow用户
提问于 2021-07-28 10:43:40
回答 1查看 57关注 0票数 0

我们正在寻找在warp10中摄取数据的最佳方法。我们使用的是一种主要使用Kafka的微服务架构。有两种解决办法:

正如这里所描述的,从现在开始,我们基于数据的聚合(x秒)使用了Ingress解决方案,并调用Ingress来发送每个数据包的数据。(而不是每次我们需要插入一些东西时调用API )。

几天来,我们一直在试验卡夫卡插件。我们成功地设置了插件并创建了一个.mc2,负责使用来自给定主题的数据,然后使用UPDATE将它们插入到warp10中。

问题:

  1. 使用了卡夫卡插件,当我们使用汇入端点时,使用相同的缓冲机制会更好吗?或者,在UPDATE Kafka插件中是否有任何特定的实现,允许使用主题中的每条消息并为每个消息调用函数?
  2. 今天,由于这两种解决方案都在工作,我们正在努力寻找差异,以获得最佳的性能结果,在摄取数据。如果可能的话,也不必应用任何缓冲机制,因为我们正在尽量实时地使用.

MC2文件:

代码语言:javascript
复制
    {                                                                                                                                                                                                                                                                             
    'topics' [ 'our_topic_name' ] // List of Kafka topics to subscribe to                                                                                                                                                                                                    
    'parallelism' 1                // Number of threads to start for processing the incoming messages. Each thread will handle a certain number of partitions.                                                                                                                  
    'config' { // Map of Kafka consumer parameters                                                                                                                                                                                                                              
        'bootstrap.servers' 'kafka-headless:9092'                                                                                                                                                                                                                                  
        'group.id' 'senx-consumer'                                                                                                                                                                                                                                                 
        'enable.auto.commit' 'true'                                                                                                                                                                                                                                                
    }                                                                                                                                                                                                                                                                          
    'macro' <%                                                                                                                                                                                                                                                                  
        // macro executed each time a kafka record is consumed                                                                                                                                                                                                                          
        /*                                                                                                                                                                                                                                                                        
            // received record format :                                                                                                                                                                                                                                           
            {                                                                                                                                                                                                                                                                     
                'timestamp' 123        // The record timestamp                                                                                                                                                                                                                    
                'timestampType' 'type' // The type of timestamp, can be one of 'NoTimestampType', 'CreateTime', 'LogAppendTime'                                                                                                                                                   
                'topic' 'topic_name'   // Name of the topic which received the message                                                                                                                                                                                            
                'offset' 123           // Offset of the message in 'topic'                                                                                                                                                                                                        
                'partition' 123        // Id of the partition which received the message                                                                                                                                                                                          
                'key' ...              // Byte array of the message key                                                                                                                                                                                                           
                'value' ...            // Byte array of the message value                                                                                                                                                                                                         
                'headers' { }          // Map of message headers                                                                                                                                                                                                                  
            }                                                                                                                                                                                                                                                                     
        */                                                                                                                                                                                                                                                                        
        "recordArray" STORE                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                                    
        "preprod.write" "token" STORE                                                                                                                                                                                                                                                 

        // macro can be called on timeout with an empty entry map
        $recordArray SIZE 0 !=
        <%
            $recordArray 'value' GET // kafka record value is retrieved in bytes
            'UTF-8' BYTES-> // convert bytes to string (WARP10 INGRESS format)
            JSON->
            "value" STORE
            "Records received through Kafka" LOGMSG
            $value LOGMSG
            $value
            <%
                DROP
                PARSE
                // PARSE outputs a gtsList, including only one gts
                0 GET
                // GTS rename is required to use UPDATE function
                "gts" STORE
                $gts $gts NAME RENAME
            %>
            LMAP

            // Store GTS in Warp10
            $token
            UPDATE                                                                                                                                                                                                                                                                    
        %>                                                                                                                                                                                                                                                                       
        IFT                                                                                                                                                                                                                                                                      
    %> // end macro                                                                                                                                                                                                                                                            
    'timeout' 10000 // Polling timeout (in ms), if no message is received within this delay, the macro will be called with an empty map as input                                                                                                                 
}
EN

回答 1

Stack Overflow用户

发布于 2021-07-28 13:28:37

如果您想在Warp 10中缓存一些东西以避免每秒进行大量更新,则可以使用SHM (SHared存储器)。这是一个你需要激活的内置扩展。

激活之后,将其与SHMSTORE和SHMLOAD一起使用,以便在两次WarpScript执行期间将对象保存在内存中。

在您的示例中,您可以使用+!将所有传入的GTS推送到列表或GTS列表中,以将元素添加到现有列表中。

然后,缓存中所有GTS的合并(按名称+标签)和数据库中的更新可以在运行程序中完成(不要忘记使用MUTEX)。

别忘了总运营成本:

  • 如果您不重复类名和标签,如果您按照gts收集行,则可以对入口格式进行优化,以提高摄入速度。看这里
  • 从Warp 10入口格式解析反序列化数据。
  • 将数据更新为Warp 10优化的入口格式(并将其推送到更新端点)。
  • 更新终结点再次反序列化。

如果输入数据远不是最佳的入口格式,那么执行这些反序列化/序列化/反序列化操作是有意义的。如果您想要RANGECOMPACT您的数据以节省磁盘空间,或者进行任何预处理,这也是有意义的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68558921

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档