在graphene-python中测试订阅的惯用方法是什么?看来,client.execute选项在graphene.test中只适用于Query测试。
在文档中有一个订阅执行示例,但它似乎不是测试库(https://docs.graphene-python.org/en/latest/execution/subscriptions/)的一部分。
发布于 2020-12-21 19:24:39
graphene (3)的预发布版本以这种方式支持订阅:
import asyncio
from datetime import datetime
from graphene import ObjectType, String, Schema, Field
class Query(ObjectType):
hello = String()
def resolve_hello(root, info):
return 'Hello, world!'
class Subscription(ObjectType):
time_of_day = Field(String)
async def resolve_time_of_day(root, info):
while True:
yield datetime.now().isoformat()
await asyncio.sleep(1)
schema = Schema(query=Query, subscription=Subscription)
async def main():
subscription = 'subscription { timeOfDay }'
result = await schema.execute_async(subscription)
async for item in result:
print(item)
asyncio.run(main())发布于 2022-08-10 16:22:51
这是你在2022年用石墨烯3所做的:
pip安装pytest pytest-异步
import pytest
from main.schema import schema
@pytest.mark.asyncio
async def test_realtime_updates():
q = 'subscription { count }'
result = await schema.subscribe(q)
async for item in result:
print(item)
pytest.fail(pytrace=False)如果您没有任何应用程序,下面是一个关于订阅的最小示例:
pip安装石墨烯graphene_starlette3
# Graphene Subscriptions demo
import graphene
from graphene import ResolveInfo
from starlette_graphene3 import GraphQLApp, make_playground_handler
class Query(graphene.ObjectType):
hello = graphene.String()
class Subscription(graphene.ObjectType):
count = graphene.Float()
async def subscribe_count(_, info: ResolveInfo):
for i in range(10):
yield i
# GraphQL Schema
schema = graphene.Schema(
query=Query,
subscription=Subscription,
)
# ASGI application
# You'll probably want to mount it to a Starlette application
app = GraphQLApp(
schema=schema,
on_get=make_playground_handler(),
)运行它:
$ uvicorn run main:app --reloadhttps://stackoverflow.com/questions/65206482
复制相似问题