首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Apache NiFi中提前修复流文件过期?

如何在Apache NiFi中提前修复流文件过期?
EN

Stack Overflow用户
提问于 2018-12-05 04:39:24
回答 1查看 833关注 0票数 3

我有流程,我是从第三方获得文件。我必须等待所有文件的到来,一旦我将它们全部保存在一个文件夹中,我就必须调用一个处理器(仅一次)。所以产生了这样的流。

GetHTTP -> gReadTotalCount -> runGetHTTP in a循环->保存文件->通知-等待(tillTotalCount) ->释放所有文件-> ControlRate (第1版文件) ->结束

  1. ControlRate有以下属性(每分钟发布一个文件)--这就是我想要的。
  2. 进入ControlRate的关系将流文件过期设置为59秒(因此所有文件都会根据需要被删除)

通知等待需要时间(在任何地方,b/w 30分钟到1小时) --这在功能上是很好的。

但是,当所有文件到达ControlRate队列时,所有文件都会被删除/删除。

我过去常想,排队时的年龄,是那个特定队列中的年龄,而不是从一开始就完全离开的年龄。

如果你认为行为是正确的,那么它似乎是令人困惑的,谁会知道,如果年龄在进入特定队列之前。

我只是编写了一个更小的进程来显示正在发生的事情,如果您保持日志属性(处于停止状态),并在1分钟后启用它,那么flow文件甚至不会到达ControlRate,这完全破坏了我的代码。

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
    <description></description>
    <groupId>01671244-06f3-1392-593a-e93025e7fe98</groupId>
    <name>testing control flow - dnu</name>
    <snippet>
        <connections>
            <id>c3137278-6c82-3a3a-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
            <destination>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>d8dcf169-30aa-384d-0000-000000000000</id>
                <type>PROCESSOR</type>
            </destination>
            <flowFileExpiration>59 sec</flowFileExpiration>
            <labelIndex>1</labelIndex>
            <name></name>
            <selectedRelationships>success</selectedRelationships>
            <source>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>69cadcc5-6fee-3ce8-0000-000000000000</id>
                <type>PROCESSOR</type>
            </source>
            <zIndex>0</zIndex>
        </connections>
        <connections>
            <id>1ed5ea0a-00a3-3e32-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
            <destination>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>51f1202e-a045-3ab3-0000-000000000000</id>
                <type>PROCESSOR</type>
            </destination>
            <flowFileExpiration>0 sec</flowFileExpiration>
            <labelIndex>1</labelIndex>
            <name></name>
            <selectedRelationships>success</selectedRelationships>
            <source>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>d8dcf169-30aa-384d-0000-000000000000</id>
                <type>PROCESSOR</type>
            </source>
            <zIndex>0</zIndex>
        </connections>
        <connections>
            <id>210554a9-111e-3693-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
            <destination>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>69cadcc5-6fee-3ce8-0000-000000000000</id>
                <type>PROCESSOR</type>
            </destination>
            <flowFileExpiration>0 sec</flowFileExpiration>
            <labelIndex>1</labelIndex>
            <name></name>
            <selectedRelationships>success</selectedRelationships>
            <source>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>d2690b7b-6518-3e94-0000-000000000000</id>
                <type>PROCESSOR</type>
            </source>
            <zIndex>0</zIndex>
        </connections>
        <connections>
            <id>3bfbd1da-ac16-30d3-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
            <destination>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>ecaf1958-503d-3788-0000-000000000000</id>
                <type>PROCESSOR</type>
            </destination>
            <flowFileExpiration>0 sec</flowFileExpiration>
            <labelIndex>1</labelIndex>
            <name></name>
            <selectedRelationships>failure</selectedRelationships>
            <source>
                <groupId>bb618021-2bd4-3942-0000-000000000000</groupId>
                <id>d8dcf169-30aa-384d-0000-000000000000</id>
                <type>PROCESSOR</type>
            </source>
            <zIndex>0</zIndex>
        </connections>
        <processors>
            <id>d2690b7b-6518-3e94-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <position>
                <x>570.0000076293945</x>
                <y>68.99999487400055</y>
            </position>
            <bundle>
                <artifact>nifi-standard-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.6.0</version>
            </bundle>
            <config>
                <bulletinLevel>WARN</bulletinLevel>
                <comments></comments>
                <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                <descriptors>
                    <entry>
                        <key>File Size</key>
                        <value>
                            <name>File Size</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Batch Size</key>
                        <value>
                            <name>Batch Size</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Data Format</key>
                        <value>
                            <name>Data Format</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Unique FlowFiles</key>
                        <value>
                            <name>Unique FlowFiles</name>
                        </value>
                    </entry>
                    <entry>
                        <key>generate-ff-custom-text</key>
                        <value>
                            <name>generate-ff-custom-text</name>
                        </value>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>
                            <name>character-set</name>
                        </value>
                    </entry>
                </descriptors>
                <executionNode>ALL</executionNode>
                <lossTolerant>false</lossTolerant>
                <penaltyDuration>30 sec</penaltyDuration>
                <properties>
                    <entry>
                        <key>File Size</key>
                        <value>0B</value>
                    </entry>
                    <entry>
                        <key>Batch Size</key>
                        <value>1</value>
                    </entry>
                    <entry>
                        <key>Data Format</key>
                        <value>Text</value>
                    </entry>
                    <entry>
                        <key>Unique FlowFiles</key>
                        <value>false</value>
                    </entry>
                    <entry>
                        <key>generate-ff-custom-text</key>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>UTF-8</value>
                    </entry>
                </properties>
                <runDurationMillis>0</runDurationMillis>
                <schedulingPeriod>1 d</schedulingPeriod>
                <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                <yieldDuration>1 sec</yieldDuration>
            </config>
            <name>GenerateFlowFile</name>
            <relationships>
                <autoTerminate>false</autoTerminate>
                <name>success</name>
            </relationships>
            <state>STOPPED</state>
            <style/>
            <type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
        </processors>
        <processors>
            <id>d8dcf169-30aa-384d-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <position>
                <x>589.0000076293945</x>
                <y>357.99999487400055</y>
            </position>
            <bundle>
                <artifact>nifi-standard-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.6.0</version>
            </bundle>
            <config>
                <bulletinLevel>WARN</bulletinLevel>
                <comments></comments>
                <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                <descriptors>
                    <entry>
                        <key>Rate Control Criteria</key>
                        <value>
                            <name>Rate Control Criteria</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Maximum Rate</key>
                        <value>
                            <name>Maximum Rate</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Rate Controlled Attribute</key>
                        <value>
                            <name>Rate Controlled Attribute</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Time Duration</key>
                        <value>
                            <name>Time Duration</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Grouping Attribute</key>
                        <value>
                            <name>Grouping Attribute</name>
                        </value>
                    </entry>
                </descriptors>
                <executionNode>ALL</executionNode>
                <lossTolerant>false</lossTolerant>
                <penaltyDuration>30 sec</penaltyDuration>
                <properties>
                    <entry>
                        <key>Rate Control Criteria</key>
                        <value>flowfile count</value>
                    </entry>
                    <entry>
                        <key>Maximum Rate</key>
                        <value>1</value>
                    </entry>
                    <entry>
                        <key>Rate Controlled Attribute</key>
                    </entry>
                    <entry>
                        <key>Time Duration</key>
                        <value>1 min</value>
                    </entry>
                    <entry>
                        <key>Grouping Attribute</key>
                    </entry>
                </properties>
                <runDurationMillis>0</runDurationMillis>
                <schedulingPeriod>0 sec</schedulingPeriod>
                <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                <yieldDuration>1 sec</yieldDuration>
            </config>
            <name>ControlRate</name>
            <relationships>
                <autoTerminate>false</autoTerminate>
                <name>failure</name>
            </relationships>
            <relationships>
                <autoTerminate>false</autoTerminate>
                <name>success</name>
            </relationships>
            <state>RUNNING</state>
            <style/>
            <type>org.apache.nifi.processors.standard.ControlRate</type>
        </processors>
        <processors>
            <id>ecaf1958-503d-3788-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <position>
                <x>1003.9999465942383</x>
                <y>0.0</y>
            </position>
            <bundle>
                <artifact>nifi-standard-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.6.0</version>
            </bundle>
            <config>
                <bulletinLevel>WARN</bulletinLevel>
                <comments></comments>
                <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                <descriptors>
                    <entry>
                        <key>Log Level</key>
                        <value>
                            <name>Log Level</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Log Payload</key>
                        <value>
                            <name>Log Payload</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Attributes to Log</key>
                        <value>
                            <name>Attributes to Log</name>
                        </value>
                    </entry>
                    <entry>
                        <key>attributes-to-log-regex</key>
                        <value>
                            <name>attributes-to-log-regex</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Attributes to Ignore</key>
                        <value>
                            <name>Attributes to Ignore</name>
                        </value>
                    </entry>
                    <entry>
                        <key>attributes-to-ignore-regex</key>
                        <value>
                            <name>attributes-to-ignore-regex</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Log prefix</key>
                        <value>
                            <name>Log prefix</name>
                        </value>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>
                            <name>character-set</name>
                        </value>
                    </entry>
                </descriptors>
                <executionNode>ALL</executionNode>
                <lossTolerant>false</lossTolerant>
                <penaltyDuration>30 sec</penaltyDuration>
                <properties>
                    <entry>
                        <key>Log Level</key>
                        <value>info</value>
                    </entry>
                    <entry>
                        <key>Log Payload</key>
                        <value>false</value>
                    </entry>
                    <entry>
                        <key>Attributes to Log</key>
                    </entry>
                    <entry>
                        <key>attributes-to-log-regex</key>
                        <value>.*</value>
                    </entry>
                    <entry>
                        <key>Attributes to Ignore</key>
                    </entry>
                    <entry>
                        <key>attributes-to-ignore-regex</key>
                    </entry>
                    <entry>
                        <key>Log prefix</key>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>UTF-8</value>
                    </entry>
                </properties>
                <runDurationMillis>0</runDurationMillis>
                <schedulingPeriod>0 sec</schedulingPeriod>
                <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                <yieldDuration>1 sec</yieldDuration>
            </config>
            <name>LogAttribute</name>
            <relationships>
                <autoTerminate>false</autoTerminate>
                <name>success</name>
            </relationships>
            <state>STOPPED</state>
            <style/>
            <type>org.apache.nifi.processors.standard.LogAttribute</type>
        </processors>
        <processors>
            <id>51f1202e-a045-3ab3-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <position>
                <x>1187.0000076293945</x>
                <y>229.99999487400055</y>
            </position>
            <bundle>
                <artifact>nifi-standard-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.6.0</version>
            </bundle>
            <config>
                <bulletinLevel>WARN</bulletinLevel>
                <comments></comments>
                <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                <descriptors>
                    <entry>
                        <key>Log Level</key>
                        <value>
                            <name>Log Level</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Log Payload</key>
                        <value>
                            <name>Log Payload</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Attributes to Log</key>
                        <value>
                            <name>Attributes to Log</name>
                        </value>
                    </entry>
                    <entry>
                        <key>attributes-to-log-regex</key>
                        <value>
                            <name>attributes-to-log-regex</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Attributes to Ignore</key>
                        <value>
                            <name>Attributes to Ignore</name>
                        </value>
                    </entry>
                    <entry>
                        <key>attributes-to-ignore-regex</key>
                        <value>
                            <name>attributes-to-ignore-regex</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Log prefix</key>
                        <value>
                            <name>Log prefix</name>
                        </value>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>
                            <name>character-set</name>
                        </value>
                    </entry>
                </descriptors>
                <executionNode>ALL</executionNode>
                <lossTolerant>false</lossTolerant>
                <penaltyDuration>30 sec</penaltyDuration>
                <properties>
                    <entry>
                        <key>Log Level</key>
                        <value>info</value>
                    </entry>
                    <entry>
                        <key>Log Payload</key>
                        <value>false</value>
                    </entry>
                    <entry>
                        <key>Attributes to Log</key>
                    </entry>
                    <entry>
                        <key>attributes-to-log-regex</key>
                        <value>.*</value>
                    </entry>
                    <entry>
                        <key>Attributes to Ignore</key>
                    </entry>
                    <entry>
                        <key>attributes-to-ignore-regex</key>
                    </entry>
                    <entry>
                        <key>Log prefix</key>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>UTF-8</value>
                    </entry>
                </properties>
                <runDurationMillis>0</runDurationMillis>
                <schedulingPeriod>0 sec</schedulingPeriod>
                <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                <yieldDuration>1 sec</yieldDuration>
            </config>
            <name>LogAttribute</name>
            <relationships>
                <autoTerminate>false</autoTerminate>
                <name>success</name>
            </relationships>
            <state>STOPPED</state>
            <style/>
            <type>org.apache.nifi.processors.standard.LogAttribute</type>
        </processors>
        <processors>
            <id>69cadcc5-6fee-3ce8-0000-000000000000</id>
            <parentGroupId>bb618021-2bd4-3942-0000-000000000000</parentGroupId>
            <position>
                <x>0.0</x>
                <y>221.9999796152115</y>
            </position>
            <bundle>
                <artifact>nifi-standard-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.6.0</version>
            </bundle>
            <config>
                <bulletinLevel>WARN</bulletinLevel>
                <comments></comments>
                <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                <descriptors>
                    <entry>
                        <key>Log Level</key>
                        <value>
                            <name>Log Level</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Log Payload</key>
                        <value>
                            <name>Log Payload</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Attributes to Log</key>
                        <value>
                            <name>Attributes to Log</name>
                        </value>
                    </entry>
                    <entry>
                        <key>attributes-to-log-regex</key>
                        <value>
                            <name>attributes-to-log-regex</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Attributes to Ignore</key>
                        <value>
                            <name>Attributes to Ignore</name>
                        </value>
                    </entry>
                    <entry>
                        <key>attributes-to-ignore-regex</key>
                        <value>
                            <name>attributes-to-ignore-regex</name>
                        </value>
                    </entry>
                    <entry>
                        <key>Log prefix</key>
                        <value>
                            <name>Log prefix</name>
                        </value>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>
                            <name>character-set</name>
                        </value>
                    </entry>
                </descriptors>
                <executionNode>ALL</executionNode>
                <lossTolerant>false</lossTolerant>
                <penaltyDuration>30 sec</penaltyDuration>
                <properties>
                    <entry>
                        <key>Log Level</key>
                        <value>info</value>
                    </entry>
                    <entry>
                        <key>Log Payload</key>
                        <value>false</value>
                    </entry>
                    <entry>
                        <key>Attributes to Log</key>
                    </entry>
                    <entry>
                        <key>attributes-to-log-regex</key>
                        <value>.*</value>
                    </entry>
                    <entry>
                        <key>Attributes to Ignore</key>
                    </entry>
                    <entry>
                        <key>attributes-to-ignore-regex</key>
                    </entry>
                    <entry>
                        <key>Log prefix</key>
                    </entry>
                    <entry>
                        <key>character-set</key>
                        <value>UTF-8</value>
                    </entry>
                </properties>
                <runDurationMillis>0</runDurationMillis>
                <schedulingPeriod>0 sec</schedulingPeriod>
                <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                <yieldDuration>1 sec</yieldDuration>
            </config>
            <name>LogAttribute</name>
            <relationships>
                <autoTerminate>false</autoTerminate>
                <name>success</name>
            </relationships>
            <state>STOPPED</state>
            <style/>
            <type>org.apache.nifi.processors.standard.LogAttribute</type>
        </processors>
    </snippet>
    <timestamp>12/05/2018 04:20:47 UTC</timestamp>
</template>
EN

回答 1

Stack Overflow用户

发布于 2020-07-02 07:49:57

我认为Cloudera的文档是最清楚的:

user-guide/content/flowfile-expiration.html

简而言之:它是一个功能,它的过期取决于流文件的时间,而不是它在特定队列中的时间。

我认为这对于大多数用例来说都是有意义的,因为过期的主要目标不是管理工作流,而是确保没有将任何资源花费在太老的、没有价值的消息上。

我希望有了这些知识,重新设计你的流程就不会太难了。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53625278

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档