首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python Prefect上的MySQL连接

Python Prefect上的MySQL连接
EN

Stack Overflow用户
提问于 2021-06-12 20:33:52
回答 1查看 169关注 0票数 1

我正在尝试创建一个Prefect任务,它接收PyMySQL connection的一个实例作为输入,例如:

代码语言:javascript
复制
@task
def connect_db():
    connection = pymysql.connect(user=user,
                                 password=password,
                                 host=host,
                                 port=port,
                                 db=db,
                                 connect_timeout=5,
                                 cursorclass=pymysql.cursors.DictCursor,
                                 local_infile=True)
    return connection


@task
def query_db(connection) -> Any:
    query = 'SELECT * FROM myschema.mytable;'
    with connection.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
    return rows


@task
def get_df(rows) -> Any:
    return pd.DataFrame(rows, dtype=str)


@task
def save_csv(df):
    path = 'mypath'
    df.to_csv(path, sep=';', index=False)


with Flow(FLOW_NAME) as f:
    con = connect_db()
    rows = query_db(con)
    df = get_df(rows)
    save_csv(df)

但是,当我尝试注册结果流时,它会引发"TypeError: cannot pickle 'socket‘object“。浏览一下Prefect的文档,我发现了内置的MySQL任务( https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute),但它们每次被调用时都会打开和关闭连接。有没有办法将之前打开的连接传递给Prefect任务(或者实现连接管理器之类的东西)?

EN

回答 1

Stack Overflow用户

发布于 2021-11-11 20:35:09

我试图复制您的示例,但它注册良好。像这样的错误最常见的弹出方式是,如果在流使用的全局名称空间中有一个客户端。Prefect将在注册时尝试将其序列化。例如,如果您尝试注册以下代码片段,它将出错:

代码语言:javascript
复制
import pymysql
connection = pymysql.connect(user=user,
                             password=password,
                             host=host,
                             port=port,
                             db=db,
                             connect_timeout=5,
                             cursorclass=pymysql.cursors.DictCursor,
                             local_infile=True)

@task
def query_db(connection) -> Any:
    query = 'SELECT * FROM myschema.mytable;'
    with connection.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
    return rows

with Flow(FLOW_NAME) as f:
    rows = query_db(connection)

此错误是因为connection变量与流对象一起序列化。您可以通过将流存储为脚本来解决此问题。有关详细信息,请参阅此链接:

https://docs.prefect.io/core/idioms/script-based.html#using-script-based-flow-storage

这将避免流对象的序列化,并在运行时创建该连接。

如果在运行时期间发生这种情况,则为

如果你在运行时遇到这个错误,有两个可能的原因。第一个是Dask序列化它,第二个是来自Prefect检查点。

Dask使用cloudpickle通过网络将数据发送给工作人员。因此,如果您使用具有DaskExecutor的Prefect,它将使用cloudpickle发送要执行的任务。因此,任务输入和输出需要是可序列化的。在这个场景中,您应该实例化客户端并在任务中执行查询(就像您在当前的MySQL任务实现中看到的那样)

如果使用LocalExecutor,则默认情况下将序列化任务输出,因为默认情况下检查点处于打开状态。在定义任务时,您可以通过执行checkpoint=False来切换。

如果您需要进一步的帮助,请随时加入Prefect Slack频道,网址为pretect.io/slack。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67948893

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档