首页
学习
活动
专区
圈层
工具
发布
    • 综合排序
    • 最热优先
    • 最新优先
    时间不限
  • 来自专栏Panda诚

    FlowFile存储库原理

    FlowFile元数据包括与FlowFile相关联的所有attributes,指向FlowFile实际内容的指针(该内容存在于内容存储库中)以及FlowFile的状态,例如FlowFile所属的Connection FlowFile存储库充当NiFi的预写日志,因此当FlowFile在系统中流动时,每个更改在作为事务工作单元发生之前都会记录在FlowFile存储库中。 因为FlowFile对象保存在内存中,所以处理器要获得FlowFile所要做的就是请求ProcessSession从队列中获取它。 源码跟踪 那么我们应该着重关注一下一个FlowFile变更了后是怎么将变更信息存到FlowFile存储库(即FlowFile是怎么预写日志的) FlowFileRepository接口 在以下接口中, /flowfile_repository nifi.flowfile.repository.checkpoint.interval=20 secs nifi.flowfile.repository.always.sync

    1.8K10发布于 2021-03-08
  • 来自专栏Panda诚

    Apache NIFI ExecuteScript组件脚本使用教程

    flowFile) return Jython flowFile = session.get() if (flowFile ! flowFile = session.get(); if (flowFile ! 通常,用于存储FlowFile引用的变量将被更改FlowFile的方法返回的最新版本覆盖(中间的FlowFile引用将被自动丢弃)。 flowFile) return flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue') Jython flowFile = session.get flowFile) return myAttr = flowFile.getAttribute('filename') Jython flowFile = session.get() if (flowFile

    7.2K40发布于 2020-09-01
  • 来自专栏码匠的流水账

    聊聊nifi的AbstractBinlogTableEventWriter

    flowFile = session.create(); flowFile = session.write(flowFile, (outputStream) -> { flowFile = session.create(); flowFile = session.write(flowFile, (outputStream) -> { flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> {

    44020发布于 2020-06-03
  • 来自专栏python前行者

    [707]Apache NiFi安装及简单使用

    NiFi 组件 1.FlowFile FlowFile代表每个被系统处理的数据对象。每个FlowFile由两部分组成:属性和内容。 包含的属性,路由FlowFile ScanAttribute:扫描FlowFile的属性,看是否有匹配的属性 RouteOnContent:通过FlowFile内容 路由FlowFile ScanContent HashContent:对FlowFile的内容执行散列函数,并将哈希值作为属性添加。 IdentifyMimeType:评估FlowFile的内容,以便确定FlowFile封装的文件类型。 FlowFile的内容可选择作为附件发送。 PutFile:将 FlowFile的内容写入本地(或网络连接)文件系统上的目录。 PutFTP:将 FlowFile的内容复制到远程FTP服务器。 FlowFile可以作为单个消息发送,或者可以指定分隔符,例如新行,以便为单个FlowFile发送许多消息。

    11K21发布于 2020-01-13
  • 来自专栏Lansonli技术博客

    大数据NiFi(十九):实时Json日志数据导入到Hive

    根据处理器的配置,这些表达式的结果被赋值给FlowFile属性,或者被写入FlowFile本身的内容。 如果目标是"flowfile-attribute",而表达式不匹配任何内容,那么将使用空字符串作为属性的值,并且FlowFile将始终被路由到"matched"。 ▪flowfile-attribute 指示是否将JsonPath计算结果写入FlowFile内容或FlowFile属性;如果使用flowfile-attribute,则必须指定属性名称。 选择"auto-detect","flowfile-content"的返回类型自动设置为"json","flowfile-attribute"的返回类型自动设置为"scalar"标量。 中的内容,生成新的FlowFile内容。

    3.6K91编辑于 2023-02-24
  • 来自专栏Lansonli技术博客

    大数据NiFi(十五):NiFi入门案例二

    一、配置“GenerateFlowFile”处理器这个处理器可以生成随机的FlowFile数据或者生成自定义内容的FlowFile。多用于负载测试和模拟生成数据测试。 Unique FlowFiles(唯一FlowFile)falsetruefalse如果为true,每次生成的FlowFile独一无二,如果为false,每个FlowFile随机内容相同,吞吐量大。 二、配置“ReplaceText”处理器“ReplaceText”处理器会替换正则表达式匹配到的FlowFile中的内容,生成新的FlowFile内容。 s)(^.*$)对FlowFile内容匹配的正则表达式。仅用于“Regex Replace”和“Literal Replace”替换策略。 “评估模式”如果选择了“Entire Text”,并且FlowFile大于这个值,那么FlowFile将被路由到“failure”;在“Line-by-Line”模式下,如果一行文本比这个值大,那么FlowFile

    2.2K121编辑于 2023-02-08
  • 来自专栏大数据杂货铺

    Apache Nifi的工作原理

    最后,FlowFile Controller负责管理这些组件之间的资源。 ? 处理器、FlowFile、连接器和FlowFile控制器:NiFi中的四个基本概念 让我们看看它是如何工作的。 FlowFile流文件 在NiFi中,FlowFile 是在管道处理器中移动的信息包。 ? 内容存储库存储FlowFile的内容 为了访问内容,FlowFile 从内容存储库中声明 资源。稍后会跟踪内容所在位置的确切磁盘偏移,并将其流回FlowFile。 对于系统中当前存在的每个FlowFileFlowFile存储库存储: • FlowFile属性 • 指向位于FlowFile存储库中的FlowFile内容的指针 • FlowFile的状态。 例如:Flowfile在此瞬间属于哪个队列。 ? FlowFile存储库包含有关流中当前文件的元数据。 FlowFile存储库为我们提供了流程的最新状态;因此,它是从中断中恢复的强大工具。

    5.6K12发布于 2020-03-10
  • 来自专栏Lansonli技术博客

    大数据NiFi(二):NiFi架构

    FlowFile ProcessorProcessor 是实际操作数据的模块。Processor负责创建、接收、发送、转换、路由、拆分、合并、处理FlowFile。 Processor可以访问零到多个FlowFile的属性和内容,可以提交或回退提交的任务。 当一个FlowFile被发送到某个Relationship时,它就被加到了对应的COnnect队列里。 参照上述表格,简单来讲FlowFile是在各个节点间流动的数据;FlowFile Processor 是数据的处理模块;Connection是各个处理模块间的一个队列;Flow Controllers是复杂流程的调度 FlowFile Repository(FlowFile 存储库):FlowFile Repository 负责保存在目前活动流中FlowFile的状态。

    3.6K71编辑于 2022-12-31
  • 来自专栏Lansonli技术博客

    大数据NiFi(六):NiFi Processors(处理器)

    例如,可以配置处理器将FlowFile拆分为多个FlowFile,每个FlowFile只有一行。SplitJson:将JSON对象拆分成多个FlowFile。 三、数据出口/发送数据PutFile:将FlowFile的内容写入指定的目录。 PutKafka:将FlowFile的内容作为消息发送到Apache Kafka,可以将FlowFile中整个内容作为一个消息也可以指定分隔符将其封装为多个消息发送。 PutHDFS : 将FlowFile数据写入Hadoop分布式文件系统HDFS。四、数据库访问ExecuteSQL:执行用户定义的SQL SELECT命令,将结果写入Avro格式的FlowFile。 PutHiveQL:通过执行FlowFile内容定义的HiveQL DDM语句来更新Hive数据库。

    3.2K122编辑于 2023-01-08
  • 来自专栏Panda诚

    自定义Processor组件

    flowFile = session.get(); if (flowFile == null) { return; } String String constantValue = context.getProperty(CONSTANT_VALUE).evaluateAttributeExpressions(flowFile).getValue (); ArrayNode arrayNode = validateAndEstablishJsonArray(session, flowFile); for (JsonNode > rootNodeRef = new AtomicReference<>(null); try { session.read(flowFile, in -> { , pe.toString()}, pe); session.transfer(flowFile, REL_FAILURE); return null;

    2.1K21发布于 2020-09-01
  • 来自专栏码匠的流水账

    聊聊nifi的AbstractBinlogTableEventWriter

    flowFile = session.create(); flowFile = session.write(flowFile, (outputStream) -> { flowFile = session.create(); flowFile = session.write(flowFile, (outputStream) -> { flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { ​ flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { ​ flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { ​

    40500发布于 2020-05-28
  • 来自专栏Panda诚

    Apache NIFI 讲解(读完立即入门)

    最后,FlowFile Controller负责管理这些组件之间的资源。 ? 让我们看看它是如何工作的。 FlowFile 在NIFI中,FlowFile是在pipeline处理器中移动的信息包。 FlowFile不包含数据本身,否则会严重限制pipeline的吞吐量。相反,FlowFile保留的是一个指针,该指针引用存储在本地存储中某个位置的数据。 为了访问内容,FlowFile从内容存储库中声明资源(claims),然后将跟踪内容所在位置的确切磁盘偏移,并将其返回FlowFile。 对于系统中当前存在的每个FlowFileFlowFile Repository存储: FlowFile属性 指向FlowFile内容的指针 FlowFile的状态。 例如:Flowfile在此瞬间属于哪个队列。 ? FlowFile Repository为我们提供了流程的最新状态;因此,它是从中断中恢复的强大工具。

    26K93发布于 2020-09-01
  • 来自专栏Lansonli技术博客

    大数据NiFi(二十):实时同步MySQL数据到Hive

    属性,将FlowFile通过“ReplaceText”处理器获取上游FowFile属性,动态拼接sql替换所有的FlowFile内容,将拼接好的sql组成FlowFile路由到“PutHiveQL”将数据写入到 CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作发生时的顺序输出为单独的FlowFile文件。 ,“cdc.event.type”是上游FlowFile中的属性,“equales”是对应的方法,“delete”使用单引号引起,表示匹配的CDC事件。 默认false指的是如果在处理FlowFile时发生错误,则FlowFile将根据错误类型路由到“failure”或“retry”关系,处理器继续处理下一个FlowFile。 相反,可以设置为true回滚当前已处理的FlowFile,并立即停止进一步的处理。

    4.6K121编辑于 2023-02-27
  • 来自专栏Lansonli技术博客

    大数据NiFi(十八):离线同步MySQL数据到HDFS

    Max Rows Per Flow File (每个FlowFile行数) 0 在一个FlowFile文件中的数据行数。通过这个参数可以将很大的结果集分到多个FlowFile中。 默认设置为0,所有结果存入一个FlowFile。 Output Batch Size (数据输出批次量) 0 输出的FlowFile批次数据大小,当设置为0代表所有数据输出到下游关系。 Max Rows Per Flow File (每个FlowFile行数) 0 在一个FlowFile文件中的数据行数。通过这个参数可以将很大的结果集分到多个FlowFile中。 默认设置为0,所有结果存入一个FlowFile。 如果传入的FlowFile不包含任何记录,则输出一个空JSON对象。

    6.2K91编辑于 2023-02-21
  • 来自专栏Lansonli技术博客

    大数据NiFi(十六):处理器Connection连接

    ​处理器Connection连接一、查看队列中的FlowFile单独启动“GenerateFlowFile”处理器后,可以观察到对应的Connection连接队列中有数据,在Connection连接上右键 “List Queue”可以查看队列中的FlowFile信息:​二、查看FlowFile自定义属性值队列中的FlowFile属性中还可以查看自定义的属性信息,例如:在“GenerateFlowFile” 处理器中设置自定义属性“mykey”,对应的value值设置为“myvalue”:单独启动“GenerateFlowFile”生产部分数据,查看队列中的FlowFile属性如下:三、​​​​​​​Connection 配置针对Connectiond连接,可以通过“Configure”配置更多信息:弹出页面点击“SETTINGS”:“FlowFile expiration”数据过期:设置FlowFile expiration

    1.9K61编辑于 2023-02-10
  • 来自专栏Panda诚

    深入理解 Apache NIFI Connection

    NiFi FlowFiles由FlowFile内容和FlowFile属性/元数据组成。FlowFile内容永远不会保存在Connection中。 Connection仅将FlowFile属性/元数据放置在堆中。 数据大小基于与每个排队的FlowFile相关联的内容的累积大小。 一些处理器一次处理一个FlowFile,另一些处理器处理批量的FlowFile,还有一些处理器可能处理传入连接队列中的每个FlowFile。 (当然,如果你打算合并40000个FlowFile,则传入连接中必须有40,000个Flowfile

    1.6K31发布于 2020-09-01
  • 来自专栏Lansonli技术博客

    大数据NiFi(二十一):监控日志文件生产到Kafka

    二、配置“PublishKafka_1_0”处理器“PublishKafka_1_0”处理器作用是使用Kafka 1.0生产者API将FlowFile的内容作为消息发送给Apache Kafka。 发送的内容可以是单独的FlowFile,也可以通过用户指定分隔符分割的FlowFile内容。 该选项就是如果消息被单个Kafka节点接收到,FlowFile将被路由到成功,无论它是否被复制,但如果Kafka节点崩溃,可能会导致数据丢失。 Best Effort (尽力交付,相当于ack=0): 在向Kafka节点写出消息后,FlowFile将被路由到成功,而不需要等待响应。这提供了最好的性能,但可能会导致数据丢失。 该选项就是如果消息被单个Kafka节点接收到,FlowFile将被路由到成功,无论它是否被复制,但如果Kafka节点崩溃,可能会导致数据丢失。

    1.4K71编辑于 2023-03-05
  • 来自专栏Panda诚

    UpdateAttribute

    属性名称 属性值 描述 用户自由定义的属性名称(将要update的属性名) 用户自由定义的属性值 用动态属性的值指定的值更新由动态属性的键指定的FlowFile属性支持表达式语言:true(只使用变量注册表进行计算 应用场景 该处理器基本用法最为常用,及增加,修改或删除流属性; 此处理器使用用户添加的属性或规则更新FlowFile的属性。有三种方法可以使用此处理器添加或修改属性。 一种方法是“基本用法”; 默认更改通过处理器的每个FlowFile的匹配的属性。第二种方式是“高级用法”; 可以进行条件属性更改,只有在满足特定条件时才会影响FlowFile。 也就是说,“删除属性表达式”仅适用于输入FlowFile中存在的属性,如果属性是由此处理器添加的,则“删除属性表达式”将不会匹配到它。 示例说明 1:基本用法增加一个属性 ? 结果输出: ?

    1.2K10发布于 2020-09-01
  • 来自专栏Panda诚

    SplitAvro

    输出策略决定split后的文件是Avro数据文件,还是只保留Avro记录(在FlowFile属性中包含元数据信息 )。输出总是二进制编码的。 属性配置 在下面的列表中,必需属性的名称以粗体显示。 如果输出策略是Bare Record,则元数据将存储为FlowFile属性,否则将存储在数据文件头中。 Record 分解传入数据文件的策略。 如果输出策略是Bare Record,则元数据将存储为FlowFile属性,否则将存储在数据文件头中。

    72230发布于 2020-08-31
  • 来自专栏Lansonli技术博客

    大数据NiFi(十七):NiFi术语

    二、FlowFile FlowFile代表NiFi中的单个数据。FlowFile由属性(attribute)和内容(content)组成。 内容是FlowFile表示的数据,属性由键值对组成,提供有关数据的信息或上下文的特征。所有FlowFiles都具有以下标准属性: uuid:一个通用唯一标识符,用于区分各个FlowFiles。 这些关系指示如何对FlowFile进行处理:处理器处理完FlowFile后,它会将FlowFile路由(传输)到其中一个关系。 DFM能够将每一个关系连接到其他组件,以指定FlowFile应该在哪里进行下一步处理。 五、Connection Connection可以将不同的Processor连接在一起创建自动的数据处理流程。

    2.6K11编辑于 2023-02-20
领券