我试图用moto来模拟AWS SQS,下面是我的代码
from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs
@mock_sqs
def test_get_all_msg_from_queue():
#from myClass import get_msg_from_sqs
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
resp = get_msg_from_sqs(queue["QueueUrl"])
assert resp is not None在执行此操作时,我将得到以下错误
> queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E AttributeError: 'dict' object has no attribute 'send_message'如果我尝试另一种方式在SQS中发送消息(请参阅注释掉的代码#也尝试过了),那么在我的方法get_msg_from_sqs中实际调用SQS时,我将得到以下错误
E botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation:
The address https://queue.amazonaws.com/ is not valid for this endpoint.我在win10上使用PyCharm运行它,moto版本设置为
moto = "^2.2.6"下面给出了我的代码
sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)我在这漏掉了什么?
发布于 2021-09-24 10:36:26
您的queue变量是由create_queue返回的dict:
queue = conn.create_queue(QueueName='Test')它不是一个队列,因此不能在它上调用sendMessage。
为此,您需要创建一个队列对象:
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueURL"]
queue = sqs.Queue(queue_url)
queue.send_message()发布于 2021-09-22 10:08:06
如per @gshpychka所示,您需要查看create_queue是如何工作的。具体来说,它返回此表单的一个dict:
响应结构(dict) -- QueueUrl (字符串) --
创建的Amazon队列的URL。
因此,使用此api您可以:
import boto3
from time import sleep
conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
QueueUrl=queue,
MessageBody='string',
...)我同意医生们很困惑。这种混乱可能是由于sqs资源api产生的,它的工作方式不同:
import boto3
from time import sleep
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)这是因为这个api返回一个Queue对象,这可能是您所期望的。
请注意,@gshpychka已经在一条评论中给出了答案,我只是把它写了出来。
https://stackoverflow.com/questions/69213098
复制相似问题