我在Google上部署了一个gRPC服务,我想从Composer那里调用它。
我已经将roles/iam.serviceAccountTokenCreator角色分配给我的composer工作节点正在运行的服务帐户,并且我不会挂载任何自定义服务密钥文件或设置GOOGLE_APPLICATION_CREDENTIALS环境变量。
使用气流认证选项 gRPC钩子,我得到以下错误:
[2022-05-31 14:20:16,082] {grpc.py:90} INFO - Calling gRPC service
[2022-05-31 14:20:16,097] {taskinstance.py:1152} ERROR - 'Credentials' object has no attribute 'signer_email'
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/providers/grpc/operators/grpc.py", line 95, in execute
for response in responses:
File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 136, in run
with self.get_conn() as channel:
File "/usr/local/lib/airflow/airflow/providers/grpc/hooks/grpc.py", line 104, in get_conn
jwt_creds = google_auth_jwt.OnDemandCredentials.from_signing_credentials(credentials)
File "/opt/python3.6/lib/python3.6/site-packages/google/auth/jwt.py", line 695, in from_signing_credentials
kwargs.setdefault("issuer", credentials.signer_email)
AttributeError: 'Credentials' object has no attribute 'signer_email'
[2022-05-31 14:20:16,100] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=example_dag, task_id=example_task, execution_date=20220531T135709, start_date=20220531T142015, end_date=20220531T142016
[2022-05-31 14:20:23,826] {local_task_job.py:102} INFO - Task exited with return code 1有人知道我的凭据怎么/为什么不包括我需要的领域吗?
发布于 2022-07-04 13:23:58
在与Google讨论之后找到了解决方案--本质上,JWT_GOOGLE身份验证方法似乎不是为GCE服务帐户设置的,所以我选择了CUSTOM身份验证路径:
import google.auth.transport.grpc
import google.auth.transport.requests
import google.oauth2.credentials
import google.oauth2.id_token
from airflow.providers.grpc.operators.grpc import GrpcOperator
def connection_func(conn):
"""Custom connection function for gRPC authentication.
Args:
conn: Airflow Connection object
Returns:
An instantiated gRPC channel for making calls to our remote service.
"""
request = google.auth.transport.requests.Request()
if not str(conn.host).startswith("https://"):
audience = f"https://{conn.host}"
else:
audience = conn.host
token = google.oauth2.id_token.fetch_id_token(request, audience)
creds = google.oauth2.credentials.Credentials(token)
base_url = conn.host
if conn.port:
base_url = f"{base_url}:{conn.port}"
channel = google.auth.transport.grpc.secure_authorized_channel(
creds, None, base_url
)
return channel
return GrpcOperator(
...
custom_connection_func=connection_func,
)这使用看到的这里方法为给定的受众获取ID令牌,然后从那里创建一组凭据,最后实例化gRPC安全通道,以便在操作符中使用。
https://stackoverflow.com/questions/72516608
复制相似问题