首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink应用集成测试

Flink应用集成测试
EN

Stack Overflow用户
提问于 2018-10-16 09:22:55
回答 1查看 1.1K关注 0票数 0

我正在尝试对我的flink应用程序进行集成测试。我的测试代码如下所示:

代码语言:javascript
复制
public class HttpsCsvIngestorTest extends AbstractTestBase {

    private final static Logger LOG = LoggerFactory.getLogger(HttpsCsvIngestorTest.class);

    @Test
    public void testHttpsCsvIngestion() throws Exception {

        Thread flinkJob = new Thread(new Runnable() {
            @Override
            public void run() {
                String[] args = new String[] { "--configFile", "the/path/to/config.properties", "--secretKey",
                        "12345" };
                JobExecutionResult execResult = CsvProcessorFlinkDriver.runFlinkJob(args);
            }
        });

        flinkJob.start();
        LOG.info("Starting flink job");

        Thread.sleep(10000);
        String[] args2 = new String[] { "localhost", filename }; 
        FileUploadClient.main(args2);

        Thread.sleep(30000);
        assertTrue(new File(System.getProperty("user.dir") + File.separator + "C:/Desktop/Result.csv")
                .exists());
        System.out.println("Test completed. Going to shutdown flink job");
    }

}

这里,我从一个子线程启动我的flink应用程序,并从主线程上传一个用于处理的文件。测试运行良好,我得到了预期的结果文件。但是,当应用程序被关闭时,在结束时我会得到以下错误:

代码语言:javascript
复制
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) Task Task.java:843  - FATAL - exception in resource cleanup of task Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) (12d3e0627e62ad44c57c45b720682e56).
java.lang.IllegalStateException: Memory manager has been shut down.
    at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:470)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:824)
    at java.lang.Thread.run(Thread.java:745)
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
    at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
    at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:566)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:540)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster.executeJobBlocking(FlinkMiniCluster.scala:714)
    at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
    at mycode.CsvProcessorFlinkDriver.flinkJettyExecution(CsvProcessorFlinkDriver.java:132)
    at mycode.CsvProcessorFlinkDriver.runFlinkJob(CsvProcessorFlinkDriver.java:56)
    at com.demo.code.HttpsCsvIngestorTest$1.run(HttpsCsvIngestorTest.java:30)
    at java.lang.Thread.run(Thread.java:745)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/$a#-1711434410]] after [21474835000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait".
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(LightArrayRevolverScheduler.scala:338)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:142)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:141)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at akka.actor.LightArrayRevolverScheduler.close(LightArrayRevolverScheduler.scala:140)
    at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:892)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:826)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826)
    at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:842)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这里CsvProcessorFlinkDriver.java:132是executionResult = env.execute行。

我做错什么了吗?我还注意到,如果我在测试类的主线程中启动flink应用程序(而不是从子线程启动),则执行不会进展到从LOG.info开始的行(“启动flink作业”);

我的flink版本是1.5.0。即使使用flink-1.6.0,测试也会产生相同的误差。

EN

回答 1

Stack Overflow用户

发布于 2018-10-27 14:36:45

当flink作业结束时,代码使用的flink 卸货类。

如果在flink卸载这些类之后,有什么东西试图使用这些类,那么通常会出现ClassNotFound异常。因此,您必须确保没有任何代码接触到这些类。

在您的例子中,您可以在类NoSuchMethodError上得到一个org.apache.commons.io.IOUtils。似乎您使用了一个不同版本的公域-io和JettyServerFileSource试图AutoClose一些资源。

查看您是否依赖于与版本2.4不同的commons-io (这是flink使用的版本)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52832132

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档