首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法从'airflow.providers.google.cloud.sensors.bigquery‘导入名称“BigQueryTableExistenceAsyncSensor”

无法从'airflow.providers.google.cloud.sensors.bigquery‘导入名称“BigQueryTableExistenceAsyncSensor”
EN

Stack Overflow用户
提问于 2022-10-25 03:10:03
回答 1查看 54关注 0票数 0

我想从airflow.providers.google.cloud.sensors.bigquery导入BigQueryTableExistenceAsyncSensor

这是我的代码:

代码语言:javascript
复制
from airflow import DAG
from util.dags_hourly import create_dag_write_append #this is class that I created, no issues with other DAG
from airflow.providers.google.cloud.sensors.bigquery import 
BigQueryTableExistenceAsyncSensor

def __init__(self, dataset=None, table_name=None):
    self.dataset = dataset
    self.table_name = table_name

def check_table_exists(self):
    return BigQueryTableExistenceAsyncSensor(
    task_id="check_table_exists_async",
    project_id='x-staging',
    dataset_id=self.dataset,
    table_id=self.table 
)

with create_dag_write_append('test') as dag:

    a = BigQueryTableExistenceAsyncSensor(
        dataset_id='data_lake_staging',
        table_id='test_table'
    )
    task1 = a.check_table_exists()

    task1

但是,它返回气流上的DAG导入错误:

代码语言:javascript
复制
Broken DAG: [/home/airflow/gcs/dags/manatal/test_dag.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/airflow/gcs/dags/test/test_dag.py", line 4, in <module>
    from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceAsyncSensor
ImportError: cannot import name 'BigQueryTableExistenceAsyncSensor' from 'airflow.providers.google.cloud.sensors.bigquery' (/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/sensors/bigquery.py)BigQueryTableExistenceAsyncSensor

我阅读了这里的文档,但我不明白为什么库没有正确读取。

我的最终目标是检查数据表是否存在于我的数据集中。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-04 02:36:27

我想出办法解决这个问题。原来我可以用

代码语言:javascript
复制
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.exceptions import NotFound

class ExistsOperator(GCSToBigQuery, PostgresToGCS):

def __init__(self, filenames, table_name, task_id=None, task_id_pgcs=None, task_id_pgcs_init=None, task_id_gb=None):
    self.task_id = task_id or f"check_exists_{filenames}_from_gcs"
    self.task_id_pgcs = task_id_pgcs
    self.task_id_pgcs_init = task_id_pgcs_init
    self.task_id_gb = task_id_gb
    self.bucket = constants.GCS_BUCKET
    self.source_objects = filenames
    self.table_name = table_name

def is_exists_storage(self):
    client = storage.Client()
    bucket = client.bucket(self.bucket)
    filenames = bucket.get_blob(self.source_objects)
    try:
        filenames.exists(client)
        return self.task_id_gb
    except:
        return 'no_upsert_task'

def is_exists_table(self):
    client = bigquery.Client()
    try:
        table_name = client.get_table(self.table_name)
        if table_name:
            return self.task_id_pgcs
    except NotFound as error:       
        return self.task_id_pgcs_init

我创建了额外的函数来检查云存储桶中文件的存在。希望这能帮助到有需要的人!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74188750

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档