conf = new SparkConf().setMaster("local[2]").setAppName("HDFSWordCount"); JavaStreamingContext jssc )); // 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流 JavaDStream<String> lines = jssc.textFileStream Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start (); jssc.awaitTermination(); jssc.close(); } } 验证: Hadoop fs –mkdir /wordCount_dir
NetworkWordCount").set("spark.testing.memory", "2147480000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); System.out.println(jssc); / ("master", 9999); //JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream = new JavaStreamingContext(conf, Durations.seconds(1)); System.out.println(jssc); / ("master", 9999); JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream"
new SparkConf().setMaster("local").setAppName("streaming word count"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaDStream<String> lines = jssc.socketTextStream , String>>(); for (int i = 0; i < part; i++) { list.add(KafkaUtils.createStream(jssc Exception { return x + y; } }); counts.print(); jssc.start (); } finally { jssc.close(); } } } 执行方法 $ spark-submit --queue=root.XXX
final SparkConf conf = new SparkConf().setAppName("scan"); final JavaStreamingContext jssc JavaStreamingContext(conf, Durations.seconds(60)); final HiveContext sqlContext = new HiveContext(jssc.sc //checkpoint失败后会自动重启,会造成数据丢失 //sqlContext.setConf("hive.optimize.ppd", "false"); //jssc.checkpoint Long.parseLong(offset)); } KafkaUtils.createDirectStream( jssc test1", 1), 6600L); JavaInputDStream<String> inputDStream = KafkaUtils.createDirectStream( jssc
SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount"); JavaStreamingContext jssc 那么spark streaming是要求必须用checkpoint的,以便于在 // 内存数据丢失的时候,可以从checkpoint中恢复数据 // 开启checkpoint机制,很简单,只要调用jssc 的checkpoint()方法,设置一个hdfs目录即可 jssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint"); // 然后先实现基础的 wordcount逻辑 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); JavaDStream (); jssc.awaitTermination(); jssc.close(); } }
问题补充:再补充一个在javaeye发现的强大的js语法高亮库:JSSC JSSC 3 rc(js语法高亮器 ver3 rc)&& JSSC 2.2 发布: http://www.iteye.com /topic/189502 时隔很久了,JSSC4 beta版终于亮相…… http://www.iteye.com/topic/291314 jssc ver5.0 alpha http://www.iteye.com/topic/459788 关于jssc的项目主页以及之前的老版本或者想使用它的,请看这里: http://code.google.com/p/ jssc/ http://jssc.googlecode.com/ 问题补充: kjah 写道 又找到一个更好的 http://www.open-open.com/ajax/ajax20080713173520
消费流数据的调用代码分别如下 JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port * @return DStream of (Kafka message key, Kafka message value) */ def createStream( jssc * @param jssc JavaStreamingContext object * @param keyTypeClass Key type of DStream *
conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount"); JavaStreamingContext jssc topicThreadMap.put("WordCount", 1); JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start (); jssc.awaitTermination(); jssc.close(); } }
bin-hadoop2.6"); // JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc Durations.seconds(3)); Logger.getRootLogger().setLevel(Level.OFF); JavaReceiverInputDStream<String> lines=jssc.socketTextStream Integer arg1) throws Exception { return arg0+arg1; } }); javaPairDStream.print(); jssc.start (); jssc.awaitTermination(); } } 4.截图 ?
final SparkConf conf = new SparkConf().setAppName("scan"); final JavaStreamingContext jssc JavaStreamingContext(conf, Durations.seconds(60)); final HiveContext sqlContext = new HiveContext(jssc.sc //checkpoint失败后会自动重启,会造成数据丢失 //sqlContext.setConf("hive.optimize.ppd", "false"); //jssc.checkpoint Long.parseLong(offset)); } KafkaUtils.createDirectStream( jssc test1", 1), 6600L); JavaInputDStream<String> inputDStream = KafkaUtils.createDirectStream( jssc
SparkStreamingExample").setMaster("local[2]"); // 创建JavaStreamingContext对象 JavaStreamingContext jssc "topic2"); JavaDStream<String> kafkaStream = KafkaUtils.createDirectStream( jssc word.startsWith("a")); // 输出处理结果 processedStream.print(); // 启动StreamingContext jssc.start (); // 等待StreamingContext停止 jssc.awaitTermination(); } } 在这个示例中,我们首先创建了一个SparkConf
一个使用jCommSerial库读取串口通讯的示例程序: import jssc.SerialPort; import jssc.SerialPortException; import jssc.SerialPortList
new SparkConf().setMaster("local[2]").setAppName("WindowHotWord"); JavaStreamingContext jssc leo hello // tom world // hello // worldss JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream searchWordCountsRDD; } }); // 这个无关紧要,只是为了触发job的执行,所以必须有output操作 finalDStream.print(); jssc.start (); jssc.awaitTermination(); jssc.close(); } }
set("spark.es.nodes.wan.only","true");//指定es端口 //指定5秒获取一次kafka数据 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); jssc.sparkContext().setLogLevel("WARN rddstream JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc Elasticsearch JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc"); //启动streaming jssc.start (); // 等待生产者发送数据 jssc.awaitTermination(); jssc.stop(); }}也可以直接写入ES或者带上指定了数据结构的
------------ JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc ----------- JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc ------------ JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc ------------ JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc
new SparkConf().setMaster("local[2]").setAppName("KafkaDirectWordCount"); JavaStreamingContext jssc ); // 创建输入DStream JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(jssc Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start (); jssc.awaitTermination(); jssc.close(); } }
对象对象之外 // 还必须接收一个batch interval参数,就是说,每收集多长时间的数据,划分为一个batch,进行处理 // 这里设置一秒 JavaStreamingContext jssc // socketTextStream()方法接收两个基本参数,第一个是监听哪个主机上的端口,第二个是监听哪个端 JavaReceiverInputDStream<String> lines = jssc.socketTextStream 进行一下后续处理 // 必须调用JavaStreamingContext的start()方法,整个Spark Streaming // Application才会启动执行 // 否则是不会执行的 jssc.start (); jssc.awaitTermination(); jssc.close(); } } Scala版本 import org.apache.spark.SparkConf import
用户之前使用的RFID驱动示例使用了jssc库进行串口通信,因此电机驱动可能也需要类似的串口操作。需要确定电机控制器的通信协议,例如命令格式、波特率、数据位等参数。 以下是为智能消防作战服柜的 电机驱动模块 编写的Java程序代码,结合技术规范中的 雷赛步进电机+行星减速器 控制需求,实现柜体自动旋转功能:import jssc.SerialPort;import jssc.SerialPortException;import jssc.SerialPortList;/** * 智能单警装备柜RFID阅读器驱动 * 支持多标签批量读取、过滤及数据上报 */public catch (SerialPortException e) { System.err.println("Data read error: " + e.g关键功能说明:通信层:使用JSSC 库实现串口通信(需引入jssc-2.8.0.jar)支持自动检测可用串口实现命令发送与异步数据接收协议解析:示例解析超高频RFID常见帧结构提取EPC码和信号强度(RSSI)实际开发需根据设备文档调整解析逻辑数据模型
setMaster("local[2]") .setAppName("WindowHotWord"); JavaStreamingContext jssc * 没有优化的窗口函数可以不设置checkpoint目录 * 优化的窗口函数必须设置checkpoint目录 */ // jssc.checkpoint ("hdfs://node1:9000/spark/checkpoint"); jssc.checkpoint(". /checkpoint"); JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node04 (); jssc.awaitTermination(); jssc.close(); } } Scala代码: package com.bjsxt.sparkstreaming
spark.sparkContext(), Durations.milliseconds(5000)); JavaStreamingContext jssc JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils .createDirectStream(jssc <String, String> Subscribe(topics, confMap)); jssc.sparkContext().setLogLevel tables[0]); }); jssc.start (); jssc.awaitTermination(); logger.info("完成!")