首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用MOTO模拟Python AWS SQS

用MOTO模拟Python AWS SQS
EN

Stack Overflow用户
提问于 2021-09-16 17:55:29
回答 2查看 4.4K关注 0票数 3

我试图用moto来模拟AWS SQS,下面是我的代码

代码语言:javascript
复制
from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs


@mock_sqs
def test_get_all_msg_from_queue():
    
    #from myClass import get_msg_from_sqs
    
    conn = boto3.client('sqs', region_name='us-east-1')
    
    queue = conn.create_queue(QueueName='Test')
    
    os.environ["SQS_URL"] = queue["QueueUrl"]
    
    queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
    
    
    #Tried this as well
    #conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))

    resp = get_msg_from_sqs(queue["QueueUrl"]) 

    assert resp is not None

在执行此操作时,我将得到以下错误

代码语言:javascript
复制
>       queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E       AttributeError: 'dict' object has no attribute 'send_message'

如果我尝试另一种方式在SQS中发送消息(请参阅注释掉的代码#也尝试过了),那么在我的方法get_msg_from_sqs中实际调用SQS时,我将得到以下错误

代码语言:javascript
复制
E  botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation: 
The address https://queue.amazonaws.com/ is not valid for this endpoint.

我在win10上使用PyCharm运行它,moto版本设置为

代码语言:javascript
复制
moto = "^2.2.6"

下面给出了我的代码

代码语言:javascript
复制
sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
    return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
               MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)

我在这漏掉了什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-09-24 10:36:26

您的queue变量是由create_queue返回的dict:

代码语言:javascript
复制
queue = conn.create_queue(QueueName='Test')

它不是一个队列,因此不能在它上调用sendMessage

为此,您需要创建一个队列对象:

代码语言:javascript
复制
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueURL"]
queue = sqs.Queue(queue_url)

queue.send_message()
票数 2
EN

Stack Overflow用户

发布于 2021-09-22 10:08:06

如per @gshpychka所示,您需要查看create_queue是如何工作的。具体来说,它返回此表单的一个dict:

响应结构(dict) -- QueueUrl (字符串) --

创建的Amazon队列的URL。

因此,使用此api您可以:

代码语言:javascript
复制
import boto3
from time import sleep

conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
    QueueUrl=queue,
    MessageBody='string',
    ...)

我同意医生们很困惑。这种混乱可能是由于sqs资源api产生的,它的工作方式不同:

代码语言:javascript
复制
import boto3
from time import sleep

sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)

这是因为这个api返回一个Queue对象,这可能是您所期望的。

请注意,@gshpychka已经在一条评论中给出了答案,我只是把它写了出来。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69213098

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档