为了测试django中的一些异步函数,我在testdriven.io上跟踪testdriven.io,我需要添加som装饰器,以使异步测试能够访问DB。但是,我收到以下错误消息:
Error
Traceback (most recent call last):
File "/Users/apple/.local/share/virtualenvs/backend-KucV-wUh/lib/python3.9/site-packages/django/db/backends/base/base.py", line 237, in _cursor
return self._prepare_cursor(self.create_cursor(name))
File "/Users/apple/.local/share/virtualenvs/backend-KucV-wUh/lib/python3.9/site-packages/django/db/backends/sqlite3/base.py", line 274, in create_cursor
return self.connection.cursor(factory=SQLiteCursorWrapper)
sqlite3.ProgrammingError: Cannot operate on a closed database.@database_sync_to_async
def create_user():
user = User.objects.create(
username='username',
)
user.set_password('password')
user.save()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
class WebsocketTests(TestCase):
async def test_not_authenticated(self):
await create_user()
..... other async functions发布于 2022-05-12 09:10:10
@pytest.mark.asyncio和@pytest.mark.django_db(transaction=True)from asgiref.sync import sync_to_async
from channels.db import database_sync_to_async
@database_sync_to_async
def MakeMessage():
Message.objects.create()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
class TestWebSocket:
async def test_connection(self, settings):
communicator = WebsocketCommunicator( application=application, path=path )
connected, _ = await communicator.connect()
assert connected
m = await sync_to_async(Message.objects.count)()
assert m == 0
await MakeMessage()
m = await sync_to_async(Message.objects.count)()
assert m == 1#settings.py
WSGI_APPLICATION = 'frosty_breeze_25125.wsgi.application'
ASGI_APPLICATION = 'frosty_breeze_25125.asgi.application'
host = os.environ.get('REDIS_URL', 'redis://localhost:6379') if IS_DEPLOYED else ('0.0.0.0', 6379)
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [host],
},
},
}
REDIS_URL = [host]https://stackoverflow.com/questions/68758257
复制相似问题