首页
学习
活动
专区
圈层
工具
发布
    • 综合排序
    • 最热优先
    • 最新优先
    时间不限
  • 来自专栏java编程那点事

    输入DStream之基础数据源

    conf = new SparkConf()​​​​.setMaster("local[2]")​​​​.setAppName("HDFSWordCount"); JavaStreamingContext jssc )); ​​// 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流 ​​JavaDStream<String> lines = jssc.textFileStream Integer v1, Integer v2) throws Exception { ​​​​​​return v1 + v2; ​​​​​}​​​​ ​​​​}); wordCounts.print(); ​​jssc.start (); ​​jssc.awaitTermination(); ​​jssc.close(); ​} } 验证: Hadoop fs –mkdir /wordCount_dir

    42620编辑于 2023-02-25
  • 来自专栏积累沉淀

    Spark实时流计算Java案例

    NetworkWordCount").set("spark.testing.memory", "2147480000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); System.out.println(jssc); / ("master", 9999); //JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream = new JavaStreamingContext(conf, Durations.seconds(1)); System.out.println(jssc); / ("master", 9999); JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream"

    3K60发布于 2018-01-11
  • 来自专栏扎心了老铁

    java spark-streaming接收TCP/Kafka数据

    new SparkConf().setMaster("local").setAppName("streaming word count"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaDStream<String> lines = jssc.socketTextStream , String>>(); for (int i = 0; i < part; i++) { list.add(KafkaUtils.createStream(jssc Exception { return x + y; } }); counts.print(); jssc.start (); } finally { jssc.close(); } } } 执行方法 $ spark-submit --queue=root.XXX

    1.1K40发布于 2018-03-05
  • 来自专栏码字搬砖

    Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版

    final SparkConf conf = new SparkConf().setAppName("scan"); final JavaStreamingContext jssc JavaStreamingContext(conf, Durations.seconds(60)); final HiveContext sqlContext = new HiveContext(jssc.sc //checkpoint失败后会自动重启,会造成数据丢失 //sqlContext.setConf("hive.optimize.ppd", "false"); //jssc.checkpoint Long.parseLong(offset)); } KafkaUtils.createDirectStream( jssc test1", 1), 6600L); JavaInputDStream<String> inputDStream = KafkaUtils.createDirectStream( jssc

    19000编辑于 2025-05-16
  • 来自专栏java编程那点事

    updateStateByKey

    SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount"); ​​JavaStreamingContext jssc 那么spark streaming是要求必须用checkpoint的,以便于在 ​​// 内存数据丢失的时候,可以从checkpoint中恢复数据 // 开启checkpoint机制,很简单,只要调用jssc 的checkpoint()方法,设置一个hdfs目录即可 ​​jssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint"); ​​// 然后先实现基础的 wordcount逻辑 ​​JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); ​​JavaDStream (); ​​jssc.awaitTermination(); ​​jssc.close(); ​} }

    45240编辑于 2023-02-25
  • 来自专栏全栈程序员必看

    代码在线编辑工具_php代码编辑器安卓版

    问题补充:再补充一个在javaeye发现的强大的js语法高亮库:JSSC JSSC 3 rc(js语法高亮器 ver3 rc)&& JSSC 2.2 发布: http://www.iteye.com /topic/189502 时隔很久了,JSSC4 beta版终于亮相…… http://www.iteye.com/topic/291314 jssc ver5.0 alpha http://www.iteye.com/topic/459788 关于jssc的项目主页以及之前的老版本或者想使用它的,请看这里: http://code.google.com/p/ jssc/ http://jssc.googlecode.com/ 问题补充: kjah 写道 又找到一个更好的 http://www.open-open.com/ajax/ajax20080713173520

    3.2K10编辑于 2022-09-22
  • 来自专栏stream process

    spark-streaming-kafka包源码分析

    消费流数据的调用代码分别如下 JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port * @return DStream of (Kafka message key, Kafka message value) */ def createStream( jssc * @param jssc JavaStreamingContext object * @param keyTypeClass Key type of DStream *

    78510发布于 2020-03-04
  • 来自专栏java编程那点事

    Kafka基于Receiver的开发

    conf = new SparkConf()​​.setMaster("local[2]")​​.setAppName("KafkaWordCount"); JavaStreamingContext jssc topicThreadMap.put("WordCount", 1); JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc Integer v1, Integer v2) throws Exception { ​​​​return v1 + v2; ​​​} ​​}); wordCounts.print(); jssc.start (); jssc.awaitTermination(); jssc.close(); ​} }

    62920编辑于 2023-02-25
  • 来自专栏成长道路

    SparkStreaming小例子

    bin-hadoop2.6"); //  JavaSparkContext sc = new JavaSparkContext(sparkConf);   JavaStreamingContext jssc Durations.seconds(3));   Logger.getRootLogger().setLevel(Level.OFF);   JavaReceiverInputDStream<String> lines=jssc.socketTextStream Integer arg1) throws Exception {     return arg0+arg1;    }   });   javaPairDStream.print();   jssc.start ();   jssc.awaitTermination();  } } 4.截图 ?

    1.5K00发布于 2017-12-28
  • 来自专栏码字搬砖

    Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版

    final SparkConf conf = new SparkConf().setAppName("scan"); final JavaStreamingContext jssc JavaStreamingContext(conf, Durations.seconds(60)); final HiveContext sqlContext = new HiveContext(jssc.sc //checkpoint失败后会自动重启,会造成数据丢失 //sqlContext.setConf("hive.optimize.ppd", "false"); //jssc.checkpoint Long.parseLong(offset)); } KafkaUtils.createDirectStream( jssc test1", 1), 6600L); JavaInputDStream<String> inputDStream = KafkaUtils.createDirectStream( jssc

    1.3K20发布于 2018-10-24
  • 来自专栏Java

    Spark中的Spark Streaming是什么?请解释其作用和用途。

    SparkStreamingExample").setMaster("local[2]"); // 创建JavaStreamingContext对象 JavaStreamingContext jssc "topic2"); JavaDStream<String> kafkaStream = KafkaUtils.createDirectStream( jssc word.startsWith("a")); // 输出处理结果 processedStream.print(); // 启动StreamingContext jssc.start (); // 等待StreamingContext停止 jssc.awaitTermination(); } } 在这个示例中,我们首先创建了一个SparkConf

    1.2K10编辑于 2025-01-21
  • 【读取串口技术】

    一个使用jCommSerial库读取串口通讯的示例程序: import jssc.SerialPort; import jssc.SerialPortException; import jssc.SerialPortList

    36310编辑于 2025-08-29
  • 来自专栏java编程那点事

    window滑动窗口

    new SparkConf()​​​​.setMaster("local[2]")​​​​.setAppName("WindowHotWord"); ​​JavaStreamingContext jssc leo hello ​​// tom world // hello // worldss ​​JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream searchWordCountsRDD; ​​​​​} ​​​​}); // 这个无关紧要,只是为了触发job的执行,所以必须有output操作 ​​finalDStream.print(); ​​jssc.start (); ​​jssc.awaitTermination(); ​​jssc.close(); ​} }

    1.2K10编辑于 2023-02-25
  • 来自专栏大数据生态

    Spark读写ES最佳实践

    set("spark.es.nodes.wan.only","true");//指定es端口 //指定5秒获取一次kafka数据 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); jssc.sparkContext().setLogLevel("WARN rddstream JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc Elasticsearch JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc"); //启动streaming jssc.start (); // 等待生产者发送数据 jssc.awaitTermination(); jssc.stop(); }}也可以直接写入ES或者带上指定了数据结构的

    1.3K20编辑于 2023-11-14
  • 来自专栏悦思悦读

    Spark Tips3: 在Spark Streaming job中读取Kafka messages及其offsetRange

    ------------ JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc ----------- JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc ------------ JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc ------------ JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc

    1.8K120发布于 2018-03-15
  • 来自专栏java编程那点事

    基于Direct的方式

    new SparkConf()​​​​.setMaster("local[2]").setAppName("KafkaDirectWordCount"); JavaStreamingContext jssc ); ​​// 创建输入DStream ​JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(​​​​jssc Integer v1, Integer v2) throws Exception { ​​​​​​return v1 + v2; ​​​​​} ​​​​}); wordCounts.print(); ​​jssc.start (); ​​jssc.awaitTermination(); ​​jssc.close(); ​} }

    54420编辑于 2023-02-25
  • 来自专栏java编程那点事

    WordCount案例

    对象对象之外 ​​// 还必须接收一个batch interval参数,就是说,每收集多长时间的数据,划分为一个batch,进行处理 ​​// 这里设置一秒 JavaStreamingContext jssc ​​// socketTextStream()方法接收两个基本参数,第一个是监听哪个主机上的端口,第二个是监听哪个端 JavaReceiverInputDStream<String> lines = jssc.socketTextStream 进行一下后续处理 ​​// 必须调用JavaStreamingContext的start()方法,整个Spark Streaming ​​// Application才会启动执行 ​​// 否则是不会执行的 ​​jssc.start (); ​​jssc.awaitTermination(); ​​jssc.close(); ​} } Scala版本 import org.apache.spark.SparkConf import

    56720编辑于 2023-02-25
  • 结合参数解读智能战斗服柜的核心技术

    用户之前使用的RFID驱动示例使用了jssc库进行串口通信,因此电机驱动可能也需要类似的串口操作。需要确定电机控制器的通信协议,例如命令格式、波特率、数据位等参数。 以下是为智能消防作战服柜的 电机驱动模块 编写的Java程序代码,结合技术规范中的 雷赛步进电机+行星减速器 控制需求,实现柜体自动旋转功能:import jssc.SerialPort;import jssc.SerialPortException;import jssc.SerialPortList;/** * 智能单警装备柜RFID阅读器驱动 * 支持多标签批量读取、过滤及数据上报 */public catch (SerialPortException e) { System.err.println("Data read error: " + e.g关键功能说明:通信层:使用JSSC 库实现串口通信(需引入jssc-2.8.0.jar)支持自动检测可用串口实现命令发送与异步数据接收协议解析:示例解析超高频RFID常见帧结构提取EPC码和信号强度(RSSI)实际开发需根据设备文档调整解析逻辑数据模型

    39610编辑于 2025-01-27
  • 来自专栏LhWorld哥陪你聊算法

    【Spark篇】---SparkStreaming算子操作transform和updateStateByKey

    setMaster("local[2]") .setAppName("WindowHotWord"); JavaStreamingContext jssc * 没有优化的窗口函数可以不设置checkpoint目录 * 优化的窗口函数必须设置checkpoint目录 */ // jssc.checkpoint ("hdfs://node1:9000/spark/checkpoint"); jssc.checkpoint(". /checkpoint"); JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node04 (); jssc.awaitTermination(); jssc.close(); } }  Scala代码: package com.bjsxt.sparkstreaming

    1.3K20发布于 2018-09-13
  • 来自专栏大数据成神之路

    实时数仓链路分享:kafka =>SparkStreaming=>kudu集成kerberos

    spark.sparkContext(), Durations.milliseconds(5000)); JavaStreamingContext jssc JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils .createDirectStream(jssc <String, String> Subscribe(topics, confMap)); jssc.sparkContext().setLogLevel tables[0]); }); jssc.start (); jssc.awaitTermination(); logger.info("完成!")

    78131发布于 2020-08-12
领券