我想将下面的代码转换成pyflink,然后在pyflink-shell.sh . it中运行它。
public class MapDemo {
private static int index = 1;
public static void main(String[] args) throws Exception {
//1.获取执行环境配置信息
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.定义加载或创建数据源(source),监听9000端口的socket消息
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
//3.map操作。
DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
//4.打印输出sink
result.print();
//5.开始执行
env.execute();
}但我找不到socketTextStream在b_env,bt_env,s_env,st_env
那么,pyflink api中的socketTextStream在哪里呢?
发布于 2020-06-12 15:04:25
从Flink 1.12开始,开箱即用的PyFlink似乎只支持这些连接器:
见https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py。
因为socketTextStream不能完全支持一次语义,所以通常不鼓励使用它,而且它没有包含在PyFlink中。
发布于 2021-06-12 16:21:04
是的,您可以在pyflink中使用socketTextStream,尽管它还没有得到官方支持。示例:
from pyflink.datastream import DataStream, StreamExecutionEnvironment
if __name__ == '__main__':
s_env = StreamExecutionEnvironment.get_execution_environment()
socket_stream = DataStream(s_env._j_stream_execution_environment.socketTextStream('localhost', 9999))
socket_stream.print()
s_env.execute('socket_stream')https://stackoverflow.com/questions/62320993
复制相似问题