我目前正在尝试使用Python数据验证包“远大期望”。
我目前正在使用GreatExpectationsOperator来调用特定数据源( PostgreSQL数据源)上的期望套件。
my_ge_task = GreatExpectationsOperator(
task_id='my_task',
expectation_suite_name='suite.error',
batch_kwargs={
'table': 'data_quality',
'datasource': 'data_quality_datasource',
'query': "SELECT * FROM data_qualityWHERE batch='abc';"
},
data_context_root_dir=ge_root_dir
)我想弄清楚的是如何存储和获取我的数据源凭据。对于使用PostgreSQL的其他操作,我一直使用PostgreSQL连接来存储数据库凭据,并使用PostgreSQL钩子与数据库进行交互。但是,由于期望很高,postgreSQL连接细节存储在config_variables.yaml中的“远大期望”上下文中。我尝试在创建我的dockerfile时使用ENV变量,并将这些变量用作凭据,但我试图找到一种更干净的方法,可能会使用我现有的PostgreSQL连接细节来使用数据源。
网上似乎没有太多关于如何完成这一任务的细节,因此任何帮助都将是非常感谢的。
谢谢,
发布于 2022-02-04 19:45:10
可能的解决方法之一是在GreatExpectationOperator中使用PythonOperator,这样在运行GE之前,脚本可以从气流连接中提取连接数据,并将其保存为环境变量。
就像这样:
import os
from airflow.hooks.base import BaseHook
from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator,
)
def get_ge_runner(task_id, checkpoint_name, connection_name):
def run_ge(ds, **kwargs):
connection = BaseHook.get_connection(connection_name)
os.environ[
"my_db_creds"
] = f"postgresql+psycopg2://{connection.login}:{connection.password}@{connection.host}:{connection.port}/{connection.schema}"
op = GreatExpectationsOperator(
task_id=task_id,
data_context_root_dir=ge_root_dir,
run_name=task_id,
checkpoint_name=checkpoint_name,
)
op.execute(kwargs)
return run_great_expectationshttps://stackoverflow.com/questions/68828016
复制相似问题