首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带有python的aws应用程序同步graphql

带有python的aws应用程序同步graphql
EN

Stack Overflow用户
提问于 2020-07-22 13:56:29
回答 1查看 1.6K关注 0票数 1

我想使用AppSync代码访问AWS,并将其与请求库混淆。

8月模式是认知用户池。我的问题是:

  1. 如何从认知用户池中获取访问令牌?
  2. 如何进行查询、突变和处理订阅?

我尝试用auth模式API键来实现它。但是我得到了以下错误。

代码语言:javascript
复制
import requests
import json

URL = "https://vtcarmq7zzeadnkwzcgfr24irm.appsync-api.us-east-1.amazonaws.com/graphql"

headers = {"x-api-key":"da2-bwuyzqchhfgyxemcmdinjegb7e"}

data = json.dumps({
    "query": '''
    
  listTodos(filter:{
    title:{
      contains:"g"
    }     
  }   )     {
    items{
      id title duedate     
    }   
  }
'''

} )

r = requests.request("POST", URL , data = data , headers = headers)

print(r.text)

{“错误”:{“消息”:“无法解析GraphQL查询”,"errorType“:"MalformedHttpRequestException”}}

我在这段视频中看到了这个视频https://www.youtube.com/watch?v=2U4RsbFO4bA&t=1172s,为了使用认知用户池进行身份验证,他说要调用认知用户池并获取令牌,并将其传递给头文件中的appsync。

我是aws和python请求模块的新手,我试图为这个视频编写python代码。

EN

回答 1

Stack Overflow用户

发布于 2021-12-10 09:43:33

graphql-python/gql支持自版本3.0.0rc0以来的AWS AppSync。

它支持实时端点上的查询、突变甚至订阅。

它支持IAM、api密钥和JWT认证方法。

这些文档可获得这里

下面是一个使用API密钥身份验证的突变示例:

代码语言:javascript
复制
import asyncio
import os
import sys
from urllib.parse import urlparse

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.appsync_auth import AppSyncApiKeyAuthentication

# Uncomment the following lines to enable debug output
# import logging
# logging.basicConfig(level=logging.DEBUG)


async def main():

    # Should look like:
    # https://XXXXXXXXXXXXXXXXXXXXXXXXXX.appsync-api.REGION.amazonaws.com/graphql
    url = os.environ.get("AWS_GRAPHQL_API_ENDPOINT")
    api_key = os.environ.get("AWS_GRAPHQL_API_KEY")

    if url is None or api_key is None:
        print("Missing environment variables")
        sys.exit()

    # Extract host from url
    host = str(urlparse(url).netloc)

    auth = AppSyncApiKeyAuthentication(host=host, api_key=api_key)

    transport = AIOHTTPTransport(url=url, auth=auth)

    async with Client(
        transport=transport, fetch_schema_from_transport=False,
    ) as session:

        query = gql(
            """
mutation createMessage($message: String!) {
  createMessage(input: {message: $message}) {
    id
    message
    createdAt
  }
}"""
        )

        variable_values = {"message": "Hello world!"}

        result = await session.execute(query, variable_values=variable_values)
        print(result)


asyncio.run(main())
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63035922

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档