首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >socketTextStream在pyflink中的位置

socketTextStream在pyflink中的位置
EN

Stack Overflow用户
提问于 2020-06-11 09:16:46
回答 2查看 332关注 0票数 2

我想将下面的代码转换成pyflink,然后在pyflink-shell.sh . it中运行它。

代码语言:javascript
复制
public class MapDemo {
    private static int index = 1;
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.map操作。
        DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }

但我找不到socketTextStreamb_envbt_envs_envst_env

那么,pyflink api中的socketTextStream在哪里呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-06-12 15:04:25

从Flink 1.12开始,开箱即用的PyFlink似乎只支持这些连接器:

  • FlinkKafkaConsumer
  • FlinkKafkaProducer
  • JdbcSink
  • StreamingFileSink

https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py

因为socketTextStream不能完全支持一次语义,所以通常不鼓励使用它,而且它没有包含在PyFlink中。

票数 2
EN

Stack Overflow用户

发布于 2021-06-12 16:21:04

是的,您可以在pyflink中使用socketTextStream,尽管它还没有得到官方支持。示例:

代码语言:javascript
复制
from pyflink.datastream import DataStream, StreamExecutionEnvironment

if __name__ == '__main__':
    s_env = StreamExecutionEnvironment.get_execution_environment()
    socket_stream = DataStream(s_env._j_stream_execution_environment.socketTextStream('localhost', 9999))
    socket_stream.print()
    s_env.execute('socket_stream')
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62320993

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档