首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >函数在生产中执行成功,但在Flink的测试中执行不成功

函数在生产中执行成功,但在Flink的测试中执行不成功
EN

Stack Overflow用户
提问于 2021-06-17 14:27:31
回答 1查看 141关注 0票数 0

我已经在Flink 1.12.3中编写了一个集成测试,它测试StreamingJob类中的execute方法。令人惊讶的是,此方法在生产环境中成功地将记录输出到接收器,但在本地测试中不输出任何内容。如何解决此问题并启用测试?

This may be related

代码语言:javascript
复制
    private static final DeviceIdSink deviceIdSink = new DeviceIdSink();
    
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(1)
                            .setNumberTaskManagers(2)
                            .build());
    
    @Test
    public void testingAStreamingJob() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        
        List<JsonNode> events = getListFromResource("events.json");
        DataStream<JsonNode> testStream = env.fromCollection(events);

        StreamingJob job = new StreamingJob(env, Time.seconds(60),
                 testStream, deviceIdSink);


        job.execute();

        System.out.println(deviceIdSink.values);


    ```
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-06-17 22:16:45

一旦testStream源耗尽,作业将终止。因此,如果您有任何基于时间的窗口发生,您将有挂起的结果,永远不会发出。

我使用了一个直到调用cancel()方法才终止的MockSource,例如

代码语言:javascript
复制
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A very simple, non-parallel source based on a list of elements. You can specify a delay
 * for between each element that is emitted.
 * 
 * @param <T>
 */
@SuppressWarnings("serial")
public class MockSource<T> implements SourceFunction<T>, ResultTypeQueryable<T>, Serializable {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MockSource.class);
    
    private int listSize;
    private byte[] elementsSerialized;
    private TypeInformation<T> typeInfo;
    private TypeSerializer<T> serializer;
    private Time delay = null;
    
    private transient volatile boolean running;

    // Constructor for cases where you want an empty list as the source.
    public MockSource(TypeInformation<T> typeInfo) throws IOException {
        this(Collections.emptyList(), typeInfo);
    }

    @SuppressWarnings("unchecked")
    public MockSource(T... elements) throws IOException {
        this((List<T>) Arrays.asList(elements));
    }

    /**
     * Create a source from <data>, which cannot be empty (if so, use the other constructor that takes a typeInfo
     * argument.
     * 
     * @param data
     * @throws IOException
     */
    public MockSource(List<T> data) throws IOException {
        this(data, TypeExtractor.getForObject(data.get(0)));
    }

    public MockSource(List<T> data, TypeInformation<T> typeInfo) throws IOException {
        this.typeInfo = typeInfo;
        this.serializer = typeInfo.createSerializer(new ExecutionConfig());

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);

        listSize = 0;
        try {
            for (T element : data) {
                serializer.serialize(element, wrapper);
                listSize++;
            }
        } catch (Exception e) {
            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
        }

        this.elementsSerialized = baos.toByteArray();
    }

    public MockSource<T> setDelay(Time delay) {
        this.delay = delay;
        return this;
    }
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception {
        running = true;
        Object lock = ctx.getCheckpointLock();

        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
        final DataInputView input = new DataInputViewStreamWrapper(bais);

        int i = 0;
        while (running && (i < this.listSize)) {
            T next;
            try {
                next = serializer.deserialize(input);
            } catch (Exception e) {
                throw new IOException("Failed to deserialize an element from the source. "
                        + "If you are using user-defined serialization (Value and Writable types), check the "
                        + "serialization functions.\nSerializer is " + serializer, e);
            }

            synchronized (lock) {
                ctx.collect(next);
                i++;
                
                if (delay != null) {
                    LOGGER.debug("MockSource delaying for {}ms", delay.toMilliseconds());
                    
                    Thread.sleep(delay.toMilliseconds());
                }
            }
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return typeInfo;
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68014044

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档