首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将spark-streaming链接到HBase

将spark-streaming链接到HBase
EN

Stack Overflow用户
提问于 2016-03-27 21:30:26
回答 3查看 258关注 0票数 0

我是Spark和HBase的新手,但我需要将两者链接在一起,我尝试了spark-hbase-connector库,但使用spark-submit时,即使没有显示错误,它也不起作用。我在这里和其他地方搜索了类似的问题或教程,但都找不到,所以谁能解释一下如何从Spark streaming中写入HBase,或者推荐一个教程或一本书?提前谢谢你

EN

回答 3

Stack Overflow用户

发布于 2016-04-06 16:32:32

最终起作用的是:

代码语言:javascript
复制
val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(row))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(value)
hTable.put(thePut)
票数 1
EN

Stack Overflow用户

发布于 2016-08-04 04:51:44

下面是一些示例代码,使用Splice Machine (开源)通过Spark Streaming和Kafka将数据存储到HBase中。

https://github.com/splicemachine/splice-community-sample-code/tree/master/tutorial-kafka-spark-streaming

我们也经历了这一切,我们知道这可能有点令人生畏。

票数 0
EN

Stack Overflow用户

发布于 2016-08-04 23:48:51

下面是相关的代码。

代码语言:javascript
复制
        LOG.info("************ SparkStreamingKafka.processKafka start");

   // Create the spark application and set the name to MQTT
    SparkConf sparkConf = new SparkConf().setAppName("KAFKA");

    // Create the spark streaming context with a 'numSeconds' second batch size
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds));
    jssc.checkpoint(checkpointDirectory);

    LOG.info("zookeeper:" + zookeeper);
    LOG.info("group:" + group);
    LOG.info("numThreads:" + numThreads);
    LOG.info("numSeconds:" + numSeconds);


    Map<String, Integer> topicMap = new HashMap<>();
    for (String topic: topics) {
        LOG.info("topic:" + topic);
      topicMap.put(topic, numThreads);
    }

    LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream");
    //2. KafkaUtils to collect Kafka messages
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);

    //Convert each tuple into a single string.  We want the second tuple
    JavaDStream<String> lines = messages.map(new TupleFunction());

    LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD");
    //process the messages on the queue and save them to the database
    lines.foreachRDD(new SaveRDDWithVTI());


    LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt");
    // Start the context
    jssc.start();
    jssc.awaitTermination();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36247889

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档