Spark Executor如何执行代码?它是否有多个线程在运行?如果是,它是否会打开多个JDBC连接来读取/写入RDBMS中的数据?
发布于 2017-09-18 14:59:56
Spark Executor如何执行代码?
开源(包括Apache Spark项目)的美妙之处在于,您可以自己查看代码并找到答案。这并不是说这是找到答案的最佳且唯一的方法,但我的方法可能不像代码本身那样清晰(相反的情况也可能是正确的:)
话虽如此,您可以自己查看Executor的代码。
是否有多个线程在运行?
是。参见this line,其中Executor创建了一个新的TaskRunner,它是一个Runnable (一个单独的线程)。这个Runnable将会是executed on the thread pool。
引用Spark用于线程池的Java Executors.newCachedThreadPool:
创建了一个根据需要创建新线程的线程池,但是当以前构造的线程可用时,它将重用这些线程,并在需要时使用提供的ThreadFactory创建新线程。
如果是,它是否会打开多个JDBC连接来从RDBMS读取/写入数据?
我相信你已经知道答案了。是的,它将打开多个连接,这就是为什么您应该使用foreachPartition操作来_“将函数f应用于此数据集的每个分区”。(同样适用于RDDs)和某种类型的连接池。
发布于 2017-09-18 14:55:31
是的,如果您将spark.executor.cores设置为大于1,那么您的执行器将有多个并行线程,是的,我猜将打开多个JDBC连接。
发布于 2017-09-19 00:24:51
您可以通过在本地运行spark来轻松地测试这一点。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("JDBCTest")
val sc = new SparkContext(conf)在上面的代码片段中,local2表示两个线程。现在,如果您在处理RDD时打开JDBC连接,spark将为每个任务执行此操作。
转换和动作在spark中并行运行,根据设计,spark在运行内存中的任务时更有效,所以首先我们应该避免编写需要为每个RDD打开JDBC连接的代码,相反,您可以将其加载到内存中进行处理,请参见下面的代码片段。
Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", mySQLConnectionURL)
.option("driver", MYSQL_DRIVER).option("dbtable", sql).option("user", userId)
.option("password", dbpassword).load();干杯!
https://stackoverflow.com/questions/46271961
复制相似问题