首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何并发运行databricks笔记本

如何并发运行databricks笔记本
EN

Stack Overflow用户
提问于 2022-01-27 07:46:43
回答 1查看 1.1K关注 0票数 0

我是Databricks和Scala的新手。我试图在主笔记本上并行运行多本笔记本。

我得到了下面的代码从数据库网站运行笔记本并行。

平行

代码语言:javascript
复制
import scala.concurrent. [Future, Await}
import scala.concurrent.duration._
import scala.util.control.NonFatal


case class NotebookData (path: String, timeout: Int, parameters: Map [String, String] = Map.empty [String, String])

def parallelNotebooks (notebooks: Seq[NotebookData]): Future[Seq[String]] = {

import scala.concurrent.{Future, blocking, Await}

import java.util.concurrent.Executors
import scala.concurrent. ExecutionContext
import com.databricks.WorkflowException

val numNotebooksInParallel = 4

// If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once. 
// This code limits the number of parallel notebooks.

implicit val ec = ExecutionContext. fromExecutor (Executors.newFixed ThreadPool (numNotebooksInParallel)) 

val ctx = dbutils.notebook.getContext() I

Future.sequence (

notebooks.map { notebook =>

Future {

dbutils.notebook.setContext(ctx)

if (notebook.parameters.nonEmpty)

    dbutils.notebook.run(notebook.path, notebook. timeout, notebook.parameters)

else

    dbutils.notebook.run(notebook.path, notebook. timeout)

}

.recover {

case NonFatal(e) => s'ERROR: ${e.getMessage}"

}

}
)

}


def parallelNotebook (notebook: NotebookData): Future [String] = {

import scala.concurrent. {Future, blocking, Await}
import java.util.concurrent. Executors
import scala.concurrent. ExecutionContext. Implicits.global
import com.databricks.WorkflowException

val ctx = dbutils. notebook.getContext ()
// The simplest interface we can have but doesn't
// have protection for submitting to many notebooks in parallel at once
Future {
    dbutils. notebook.setContext(ctx)
    if (notebook.parameters.nonEmpty)

        dbutils.notebook.run(notebook.path, notebook. timeout, notebook.parameters)

    else

        dbutils.notebook.run (notebook.path, notebook. timeout)

}
.recover{
case NonFatal(e) => s'ERROR: ${e.getMessage}"
}


}

并发性

代码语言:javascript
复制
import scala.concurrent. Await
import scala.concurrent.duration..
import scala.language.postfix0ps


var notebooks = Seq(
NotebookData("testing", 15),
NotebookData("testing-2", 15, Map ("Hello" > "yes")),
NotebookData("testing-2", 15, Map ("Hello" -> "else")),
NotebookData("testing-2", 15, Map ("Hello" -> "lots of notebooks")
)

va res = parallelNotebooks (notebooks)
Await.result (res, 30 seconds) // this is a blocking call.
res.value

在上面,他们给出所需的笔记本路径和参数直接进入一个序列。但是我想给出基于IF条件的笔记本路径。

例如:

val notebook1="Y“

val notebook2="Y“

val notebook2="N“

我想提供的笔记本,标志为"Y“,只有序列。

if(notebook1=="Y")

然后,这个笔记本应该添加到顺序。

EN

回答 1

Stack Overflow用户

发布于 2022-06-23 11:32:05

实现这一点的非常简单的方法是使用dbutils.notebook实用程序。从笔记本上调用dbutils.notebook.run(),你就可以运行了。如果从同一个单元格多次调用并将完成此工作。

代码语言:javascript
复制
dbutils.notebook.run( "/path/mynotebook", timeout_seconds = 60,
  arguments = {"arg1": "value1", "arg2": "value2"})

如果我们必须同时运行,那么参数化笔记本的实例,

  1. 创建并发所需的资源池。这可以根据一次运行需要存在多少实例来计算。

代码语言:javascript
复制
from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)

然后,

  1. 为并发执行使用此池.

代码语言:javascript
复制
pool.map(
  lambda invalue: dbutils.notebook.run(
    timeout_seconds=60, "/User/Path/notebook", arguments= {"argument": invalue}),
    ["parameter1", "parameter2"])
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70874935

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档