我正在尝试创建一个Prefect任务,它接收PyMySQL connection的一个实例作为输入,例如:
@task
def connect_db():
connection = pymysql.connect(user=user,
password=password,
host=host,
port=port,
db=db,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor,
local_infile=True)
return connection
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
@task
def get_df(rows) -> Any:
return pd.DataFrame(rows, dtype=str)
@task
def save_csv(df):
path = 'mypath'
df.to_csv(path, sep=';', index=False)
with Flow(FLOW_NAME) as f:
con = connect_db()
rows = query_db(con)
df = get_df(rows)
save_csv(df)但是,当我尝试注册结果流时,它会引发"TypeError: cannot pickle 'socket‘object“。浏览一下Prefect的文档,我发现了内置的MySQL任务( https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute),但它们每次被调用时都会打开和关闭连接。有没有办法将之前打开的连接传递给Prefect任务(或者实现连接管理器之类的东西)?
发布于 2021-11-11 20:35:09
我试图复制您的示例,但它注册良好。像这样的错误最常见的弹出方式是,如果在流使用的全局名称空间中有一个客户端。Prefect将在注册时尝试将其序列化。例如,如果您尝试注册以下代码片段,它将出错:
import pymysql
connection = pymysql.connect(user=user,
password=password,
host=host,
port=port,
db=db,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor,
local_infile=True)
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
with Flow(FLOW_NAME) as f:
rows = query_db(connection)此错误是因为connection变量与流对象一起序列化。您可以通过将流存储为脚本来解决此问题。有关详细信息,请参阅此链接:
https://docs.prefect.io/core/idioms/script-based.html#using-script-based-flow-storage
这将避免流对象的序列化,并在运行时创建该连接。
如果在运行时期间发生这种情况,则为
如果你在运行时遇到这个错误,有两个可能的原因。第一个是Dask序列化它,第二个是来自Prefect检查点。
Dask使用cloudpickle通过网络将数据发送给工作人员。因此,如果您使用具有DaskExecutor的Prefect,它将使用cloudpickle发送要执行的任务。因此,任务输入和输出需要是可序列化的。在这个场景中,您应该实例化客户端并在任务中执行查询(就像您在当前的MySQL任务实现中看到的那样)
如果使用LocalExecutor,则默认情况下将序列化任务输出,因为默认情况下检查点处于打开状态。在定义任务时,您可以通过执行checkpoint=False来切换。
如果您需要进一步的帮助,请随时加入Prefect Slack频道,网址为pretect.io/slack。
https://stackoverflow.com/questions/67948893
复制相似问题