首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法从云函数触发当云存储发生更改时触发的composer/airflow dag

无法从云函数触发当云存储发生更改时触发的composer/airflow dag
EN

Stack Overflow用户
提问于 2020-11-15 17:21:16
回答 3查看 4.8K关注 0票数 4

我在(dlkpipelinesv1:composer-1.13.0-airflow-1.10.12)环境上创建并运行了一个google-cloud-composer环境。我能够手动触发这些dags,并使用调度程序,但当涉及到通过cloud-functions检测google-cloud-storage桶中的更改时,我会陷入困境。

请注意,我有另一个GC-Composer环境(pipelines:composer-1.7.5-airflow-1.10.2),它使用相同的google云功能来触发相关的dags,而则在运行

我遵循本指南来创建触发dags的函数。因此,我检索了以下变量:

代码语言:javascript
复制
PROJECT_ID = <project_id>
CLIENT_ID = <client_id_retrieved_by_running_the_code_in_the_guide_within_my_gcp_console>
WEBSERVER_ID = <airflow_webserver_id>
DAG_NAME = <dag_to_trigger>
WEBSERVER_URL = f"https://{WEBSERVER_ID}.appspot.com/api/experimental/dags/{DAG_NAME}/dag_runs"


def file_listener(event, context):
    """Entry point of the cloud function: Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    logging.info("Running the file listener process")
    logging.info(f"event : {event}")
    logging.info(f"context : {context}")
    file = event
    if file["size"] == "0" or "DTM_DATALAKE_AUDIT_COMPTAGE" not in file["name"] or ".filepart" in file["name"].lower():
        logging.info("no matching file")
        exit(0)

    logging.info(f"File listener detected the presence of : {file['name']}.")

    # id_token = authorize_iap()
    # make_iap_request({"file_name": file["name"]}, id_token)
    make_iap_request(url=WEBSERVER_URL, client_id=CLIENT_ID, method="POST")


def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.

    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.

    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service account.
    open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
    logging.info(f"Retrieved open id connect (bearer) token {open_id_connect_token}")

    # Fetch the Identity-Aware Proxy-protected URL, including an authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(method, url, headers={"Authorization": f"Bearer {open_id_connect_token}"}, **kwargs)

    if resp.status_code == 403:
        raise Exception("Service account does not have permission to access the IAP-protected application.")
    elif resp.status_code != 200:
        raise Exception(f"Bad response from application: {resp.status_code} / {resp.headers} / {resp.text}")
    else:
        logging.info(f"Response status - {resp.status_code}")
        return resp.json

这是在GC-函数中运行的代码,我使用以下代码分别检查了dlkpipelinesv1样条中的环境细节:

代码语言:javascript
复制
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()

两者使用相同的服务帐户来运行,即相同的IAM角色。虽然我注意到了以下不同的细节:

在旧环境中:

代码语言:javascript
复制
"airflowUri": "https://p5<hidden_value>-tp.appspot.com",
    "privateEnvironmentConfig": { "privateClusterConfig": {} },

在新的环境中:

代码语言:javascript
复制
"airflowUri": "https://da<hidden_value>-tp.appspot.com",
    "privateEnvironmentConfig": {
      "privateClusterConfig": {},
      "webServerIpv4CidrBlock": "<hidden_value>",
      "cloudSqlIpv4CidrBlock": "<hidden_value>"
    }

用于发出post请求的服务帐户具有以下角色:

代码语言:javascript
复制
Cloud Functions Service Agent 
Composer Administrator 
Composer User
Service Account Token Creator 
Service Account User

运行my环境的服务帐户具有以下角色:

代码语言:javascript
复制
BigQuery Admin
Composer Worker
Service Account Token Creator
Storage Object Admin

但是,当向气流API发出403 - Forbidden请求时,我仍然在Log Explorer中接收post

编辑2020-11-16:

我更新了最新的make_iap_request代码。我在安全服务中修改了IAP,但是我找不到将接受来自云功能的HTTP: post请求的IAP服务器.请参见下面的图像,无论如何,我将服务帐户添加到默认和CRM IAP资源中,以防万一,但我仍然会收到以下错误:

代码语言:javascript
复制
Exception: Service account does not have permission to access the IAP-protected application.

主要的问题是: IAP在这里有什么利害关系?以及如何将我的服务帐户添加为此IAP的用户。

我遗漏了什么?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-12-01 11:12:35

有一个配置参数会导致对API的所有请求都被拒绝.

文档中,我们需要覆盖以下气流配置:

代码语言:javascript
复制
[api]
auth_backend = airflow.api.auth.backend.deny_all

转到

代码语言:javascript
复制
[api]
auth_backend = airflow.api.auth.backend.default

这个细节是非常重要的,而且谷歌的文档中没有提到.

有用的链接:

  1. 用GCS触发DAGS (工作流)
  2. request.py存储库
票数 4
EN

Stack Overflow用户

发布于 2020-11-16 02:25:20

抛出403的代码就是它过去的工作方式。在2020年中期发生了巨大的变化。与其使用requests对令牌进行HTTP调用,不如使用谷歌的OAuth2库

代码语言:javascript
复制
from google.oauth2 import id_token
from google.auth.transport.requests import Request
open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

请参阅这个例子

票数 1
EN

Stack Overflow用户

发布于 2020-11-18 18:20:09

我遵循了触发DAGs中的步骤,并在我的env中工作过,请参阅下面的建议。

Componser环境的启动和运行是一个很好的开始。通过这个过程,您只需要上传新的DAG (trigger_response_dag.py),并在第一次打开气流UI时使用python脚本或从登录页面获取clientID (以.apps.googleusercontent.com结束)。

在Cloud方面,我注意到Node.js和Python的指令组合在一起,例如,USER_AGENT只适用于Node.js。而常规的make_iap_request只适用于python。我希望以下几点有助于解决你的问题:

  1. 服务帐户(SA)。Node.js代码使用默认服务帐户${projectId}@appspot.gserviceaccount.com,其默认角色是Editor,这意味着它可以广泛访问GCP服务,包括Composer。在python中,我认为身份验证是由client_id管理的,因为令牌是用id检索的。请确保SA具有此编辑器角色,并且不要忘记指定指南中指定的serviceAccountTokenCreator
  2. 我使用了Node.js 8运行时,注意到您所关心的用户代理应该是‘gcf-事件触发器’,因为它是硬编码的;USER_AGENT = 'gcf-event-trigger'。在python中,这似乎不是必要的。
  3. 默认情况下,在GCS触发器中,GCS事件类型设置为Archive,需要将其更改为Finalize/Create。如果设置为Archive,则当您上传对象时,触发器将无法工作,DAG也不会启动。
  4. 如果您认为您的云功能配置正确,并且错误仍然存在,您可以在控制台中的云函数的LOGS选项卡中找到其原因。它可以给你更多的细节。

基本上,根据指南,我只需在Node.js中更改以下值:

代码语言:javascript
复制
// The project that holds your function. Replace <YOUR-PROJECT-ID>
const PROJECT_ID = '<YOUR-PROJECT-ID>';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = '<ALPHANUMERIC>.apps.googleusercontent.com';
// This should be part of your webserver's URL in the Env's detail page: {tenant-project-id}.appspot.com. 
const WEBSERVER_ID = 'v90eaaaa11113fp-tp';
// The name of the DAG you wish to trigger. It's DAG's name in the script trigger_response_dag.py you uploaded to your Env.
const DAG_NAME = 'composer_sample_trigger_response_dag';

对于Python,我只更改了这些参数:

代码语言:javascript
复制
client_id = '<ALPHANUMERIC>.apps.googleusercontent.com'
# This should be part of your webserver's URL:
# {tenant-project-id}.appspot.com
webserver_id = 'v90eaaaa11113fp-tp'
# Change dag_name only if you are not using the example
dag_name = 'composer_sample_trigger_response_dag'
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64847488

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档