首页
学习
活动
专区
圈层
工具
发布
    • 综合排序
    • 最热优先
    • 最新优先
    时间不限
  • 来自专栏大数据生态

    Spark读写ES最佳实践

    本文介绍了Spark local模式下读写ES的2种方式Spark RDD读写ESSpark Streaming写入ES环境准备Elaticsearch-7.14.2Spark-3.2.1jdk-1.8maven 读写ES还支持JSON格式//直接读JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);//ES嵌套数据格式{test .set("es.nodes.wan.only", "true") .set("es.resource", "spark_write/_doc") ", "43.139.24.126")//指定es地址 .set("spark.es.port", "9200") .set("spark.es.nodes.wan.only 自动创建index开关es.resource指定要读写的index和typees.mapping.names表字段与Elasticsearch的索引字段名映射es.input.use.sliced.partitions

    1.3K20编辑于 2023-11-14
  • 来自专栏暴走大数据

    Spark SQL读写 ES7.x 及问题总结

    本文主要介绍 spark SQL 读写 ES,参数的配置以及问题总结。 ES官方提供了对spark的支持,可以直接通过spark读写es,具体可以参考ES Spark Support文档(https://www.elastic.co/guide/en/elasticsearch >7.3.1</version> </dependency> Spark SQL to ES 主要提供了两种读写方式: 一种是通过DataFrameReader/Writer传入ES Source 实现 另一种是直接读写DataFrame实现 在实现前,还要列一些相关的配置: ? ", map) Spark RDD to ES SparkRDD方式写 ES,以下是源码截图。

    3.9K40发布于 2021-01-26
  • 来自专栏summerking的专栏

    ES读写原理

    # ElasticSearch 原理 # 1.1 ES写数据过程: 客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node(协调节点)。 # 1.3 ES写数据底层原理 先写入内存 buffer,在 buffer 里的时候数据是搜索不到的;同时将数据写入 translog 日志文件。 为什么叫 es 是准实时的? NRT,全称 near real-time。默认是每隔 1 秒 refresh 一次的,所以 es 是准实时的,因为写入的数据 1 秒之后才能被看到。 flush 操作就对应着 commit 的全过程,我们可以通过 es api,手动执行 flush 操作,手动将 os cache 中的数据 fsync 强刷到磁盘上去。 实际上你在这里,如果面试官没有问你 es 丢数据的问题,你可以在这里给面试官炫一把,你说,其实 es 第一是准实时的,数据写入 1 秒后可以搜索到;可能会丢失数据的。

    49920编辑于 2022-10-27
  • 来自专栏每天学Java

    Spark读写MySQL数据

    导入依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql 具体原因未知:信息如下 javax.servlet.FilterRegistration"'s signer information does not match signer information 将 spark SparkSession sparkSession = SparkSession .builder() .appName("Java Spark 执行Jar 使用IDEA可以直接在控制台查看查询的数据,我们也可以将Java打包成Jar,通过spark-submit执行 这里要带上驱动路径,不然会报错找不到MySQL的驱动 . /spark-submit --class 'package.SparkMySQL' --jar /mysql-connection.jar /SparkMySQL.jar 2>&1 写入MySQL 和读取数据库有很大的不同

    3.2K20发布于 2020-06-02
  • 来自专栏大数据进阶

    spark submit读写hudi

    localhost:9092 -t stock_tick 元数据查看:kafkacat -b localhost:9092 -L -J | jq schema准备:hudi官方自带的schema.avsc spark 这里我们用的是spark-2.4.8-bin-hadoop2.7 执行命令: 1. 非自动同步 bin/spark-submit \ --master yarn \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer warehouse/stock_ticks_cow \ --database hudi_stock \ --table stock_ticks_cow 执行完去hive中查看 2.自动同步 bin/spark-submit -enable-hive-sync 执行完上述命令hive中就能看到期望中的表 上诉都是针对的copy on write 下面我们同样的步骤描述一下merge on read 1.非自动同步 bin/spark-submit

    1.4K20发布于 2021-09-10
  • 来自专栏友弟技术工作室

    spark加载数据到ES

    在日常开发中一定会遇到,spark将计算好的数据load到es中,供后端同学查询使用。下面介绍一下sparkes的方式。 使用scala进行演示,对应的java自己google了。 sparkes需要使用到 对应的包es包。 org.bigdata.es; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import scala.collection.Seq , arrival: String) 使用字符串json方式 package org.bigdata.es import org.apache.spark. /json-trips") } } 动态index package org.bigdata.es import org.apache.spark.

    1.2K10发布于 2021-03-02
  • 来自专栏大数据-BigData

    Flink和Spark读写avro文件

    前面文章基于Java实现Avro文件读写功能我们说到如何使用java读写avro文件,本文基于上述文章进行扩展,展示flink和spark如何读取avro文件。 Flink读写avro文件 flink支持avro文件格式,内置如下依赖: <dependency> <groupId>org.apache.flink</groupId> <artifactId 56,TIMESTAMP '1970-01-01 00:00:08','par4'); 查看本地文件: image.png 数据读取: select * from t1; 得到: image.png Spark 读写avro文件 在文章基于Java实现Avro文件读写功能中我们使用java写了一个users.avro文件,现在使用spark读取该文件并重新将其写入新文件中: SparkConf sparkConf = new SparkConf() .setMaster("local") .setAppName("Java Spark

    1.5K20编辑于 2022-02-24
  • 来自专栏个人分享

    Spark读写Hbase中的数据

    Array[String]) { val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").set("spark.kryo.registrator ", classOf[HBaseConfiguration].getName) .set("spark.executor.memory", "4g") val sc: SparkContext user=root&password=yangsiyi" val rows = sqlContext.jdbc(mySQLUrl, "person") val tableName = "spark

    2K10发布于 2018-09-06
  • 来自专栏我是攻城师

    如何使用scala+spark读写hbase?

    最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好了,言归正传,下面进入今天的主题: 如何使用scala+spark读写Hbase 软件版本如下: scala2.11.8 spark2.1.0 hbase1.2.0 公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为 关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala +spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd /spark-hbase-connector https://github.com/hortonworks-spark/shc

    1.9K70发布于 2018-05-14
  • 来自专栏YG小书屋

    ES-Spark连接ES后,ES Client节点流量打满分析

    问题描述 前段时间用es-spark读取es数遇到了client节点流量打满的现象。es-spark配置的es.nodes是es的域名。 解决方法 临时解决方案:降低es-spark的并发,并重启主节点。 最终解决方案:设置es.nodes.wan.only为false,即不用域名访问。将es.nodes配置为client节点的IP。 配置es.nodes为client节点的IP后,spark只通过data节点访问ESes.nodes.data.only (default true) Whether to use Elasticsearch 源码角度分析 1、es-spark 读 其架构图如下所示: ? 3、shard-partition 对应关系 es-spark写的话就是就是一个partition对应一个shard,这里从上述的es-spark写代码中可以看出,不再过多介绍。

    3.5K30发布于 2018-05-23
  • 来自专栏腾讯云Elasticsearch Service

    HadoopSpark读写ES之性能调优

    腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇 腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇 通过前面几篇文章的介绍,相信大家已经基本了解了大数据组件结合 比如很多开发者测试Hive,Spark等数据导出写入到ES性能非常慢,百万级别的数据导出需要数小时之久。 ES-Hadoop的参数非常多,包含了索引setting,mapping,网络,读写等等,其中的一些Advance高级参数非常重要,和我们的读写性能息息相关。 我们以Spark RDD写入ES为例: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.spark_project.guava.collect.ImmutableMap; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark

    6K44发布于 2020-04-09
  • 来自专栏我是攻城师

    使用ES-Hadoop插件结合sparkes插入数据

    上篇文章简单介绍了ES-Hadoop插件的功能和使用场景,本篇就来看下如何使用ES-Hadoop里面的ES-Spark插件,来完成使用sparkes里面大批量插入数据。 那么就可以单独引入es-spark的包,而不需要使用fat包因为它会附加引入其他的包依赖,如果使用Hive那就单独引入es-hive的包即可。 en/elasticsearch/hadoop/current/install.html 下面看下如何使用es-spark读写es的数据: spark版本:2.1.0 Scala版本:2.11.8 es 从上面的代码量我们可以看到非常少,这是由于es-spark底层已经帮我们封装好了相关的代码,所以用起来非常简单,围绕的核心还是rdd,无论是写入es,还是从es读取数据都是通过spark的rdd做中转的 上面的代码使用spark的core来完成的,此外我门还可以使用spark sql或者spark streaming来与es对接,这个以后用到的时候再总结分享,最后使用spark操作es的时候我门还可以有非常多的配置参数设置

    2.4K50发布于 2018-05-14
  • 来自专栏大数据-BigData

    Flink与Spark读写parquet文件全解析

    Spark读写parquet文件 Spark SQL 支持读取和写入 Parquet 文件,自动捕获原始数据的模式,它还平均减少了 75% 的数据存储。 Spark 默认在其库中支持 Parquet,因此我们不需要添加任何依赖库。下面展示如何通过spark读写parquet文件。 本文使用spark版本为3.0.3,运行如下命令进入本地模式: bin/spark-shell 数据写入 首先通过Seq创建DataFrame,列名为“firstname”, “middlename”, ("/tmp/output/people2.parquet/gender=M") parqDF3.show() 得到如下结果 image.png Flink读写parquet文件 默认情况下,Flink bin/start-cluster.sh 执行如下命令进入Flink SQL Client bin/sql-client.sh 读取spark写入的parquet文件 在上一节中,我们通过spark写入了

    6.9K74编辑于 2022-01-27
  • 来自专栏java开发的那点事

    09-Elasticsearch-ES集群文档读写原理

    ES集群的文档读写原理 文档写原理 文档读原理

    18940编辑于 2022-10-04
  • 来自专栏大数据生态

    hive读写ES集群及Role权限控制

    hive读写ES1.下载elasticsearch-hadoop-hive-xxx.jar包,版本要与ES集群对应add jar hdfs:///user/es/jars/elasticsearch-hadoop-hive /hive_test' TBLPROPERTIES('es.nodes' = '172.16.48.53', 'es.port'='9200', 'es.index.auto.create ' = 'true', 'es.resource' ='hive/_doc', 'es.net.http.auth.user'='elastic', 'es.net.http.auth.pass'='passwd', 'es.read.metadata' = 'true', 'es.mapping.names es.resource指定关联的ES索引名称'es.resource' ='hive/_doc' 在ES源端创建hive index即成功关联ES Roles权限控制hive读写Roles是ES中具有若干种权限的角色

    51820编辑于 2023-11-15
  • 来自专栏Spark学习技巧

    Spark读写XML文件及注意事项

    最近有粉丝问浪尖spark 如何读写xml格式的文件,尤其是嵌套型的,spark本身是不支持xml格式文件读取的,但是databricks开源了一个jar,支持xml文件的读写,浪尖这里给大家介绍一下用法 </description> </book> </catalog> 读取XML 浪尖以前讲过关于spark sql自定义数据源的加载方式吧? package com.vivo.study.xml import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema ") .getOrCreate() val df = spark.sqlContext.read .format("com.databricks.spark.xml") package com.vivo.study.xml import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

    2K20发布于 2021-03-05
  • 来自专栏不温卜火

    Spark SQL 快速入门系列(8) | | Hive与Spark SQL的读写操作

    若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。 2.2 启动 spark-sql   在spark-shell执行 hive 方面的查询比较麻烦.spark.sql("").show   Spark 专门给我们提供了书写 HiveQL 的工具: spark-sql // 一般用于测试学习 [bigdata@hadoop002 spark]$ bin/spark-sql spark-sql> select count(*) from emp; ? ._ spark.sql("show databases") spark.sql("select * from emp").show spark.close() } // 先创建一个数据库 // 创建一次就行否则会报错 spark.sql("create database spark0806").show spark.sql("use spark0806

    4.9K10发布于 2020-10-28
  • 来自专栏大数据生态

    Spark写入ES报错403|Forbidden问题处理

    环境配置Spark 版本:2.3.1Elasticsearch :7.14.2问题spark连接es写入报错[HEAD] on [yuqing_info1] failed; server[https:/ /es-8gp5f0ej.public.tencentelasticsearch.com:9200] returned [403|Forbidden:]图片问题原因问题产生原因是用户在向es中写入数据的时候 .set("es.nodes", "129.204.98.76") .set("es.port", "9200") .set("es.nodes.wan.only -- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId

    41710编辑于 2025-05-30
  • 来自专栏Spark学习技巧

    Spark Core读取ES的分区问题分析

    1.Spark Core读取ES ES官网直接提供的有elasticsearch-hadoop 插件,对于ES 7.x,hadoop和Spark版本支持如下: hadoop2Version = 2.7.1 hadoop22Version = 2.2.0 spark13Version = 1.6.2 spark20Version = 2.3.0 浪尖这了采用的ES版本是7.1.1,测试用的Spark版本是 要分析Spark Core读取ES的并行度,只需要分析ScalaEsRDD的getPartitions函数即可。 这个其实个人觉得会浪费一定的性能,假如真的要ES结合Spark的话,建议合理设置分片数。 Core读取ES数据的时候分片和RDD分区的对应关系分析,默认情况下是一个es 索引分片对应Spark RDD的一个分区。

    1.8K40发布于 2019-06-20
  • 来自专栏大数据及人工智能

    线上ES集群提高读写能力的大致方案

    问题背景: 业务在使用ES集群读取ES数据,如果同时向ES集群写任务时,会遇到RT涨的情况,会出现一些抖动,尤其是在计算框架大量增加并发度像ES集群写的情况下会出现抖动,目前的话是大数据计算集群减少并发写 以后还是期望增加并发度,加快写入速度,预期会对ES集群读性能带来挑战 目前现状: 目前线上是采用的 5台 64C 128G 1THDD,机器配置比较高,使用比较稳定,在集群同时大量读写时出现一些抖动 ES集群部署: 基础知识 通常ES集群中有以下角色的节点类型 Master / Data / Ingest / Coordinating / Machine Learning 各个角色的作用如下 读写数据都会找到相应的 Data Node 节点。 Coordinating Node 节点:协调节点主要负责协调客户端的请求,将接收到的请求分发给合适的节点,并把结果汇集到一起。 总结 随着业务量和数据量上升, 当前ES集群使用的是默认配置,没有对ES节点角色进行区分的使用方式,在未来估计会受到一定的压力和挑战,当前根据监控数据和ES集群监控,暂时满足业务需求,后续集群架构需要进行一定的调整

    1.7K40发布于 2021-10-12
领券