我试图探索SNS FIFO主题与SQS队列,这是我只是尝试。我创建了SNS FIFO主题和SQS FIFO队列,并将FIFO队列订阅到FIFO主题。根据文档,对于上述设置,每当我们将消息发布到SNS FIFO队列时,它应该将该消息扇出到SQS队列,但它不会发生。我能够获得PublishResult#getMessageId()意味着发布部分正在成功运行,但是队列中没有任何消息。由于SNS FIFO主题不支持电子邮件协议订阅,所以我断言这个发布子体系结构的唯一方法是轮询队列中的消息。由于没有风扇,队列似乎总是空的。
完整的代码块:
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.UUID;
class FifoTopicsITest {
@Test
void test() {
final String topicName = UUID.randomUUID().toString().substring(15);
//creating sns client
AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sns.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating sqs client
AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sqs.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating SNS topic
CreateTopicRequest createTopicRequest = new CreateTopicRequest().withName(topicName + ".fifo");
createTopicRequest
.addAttributesEntry("FifoTopic", "true")
.addAttributesEntry("ContentBasedDeduplication", "false");
CreateTopicResult topicResult = amazonSNS.createTopic(createTopicRequest);
String topicArn = topicResult.getTopicArn();
//creating dead-letter sqs queue
CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
createDLQQueueRequest.addAttributesEntry("FifoQueue", "true");
createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);
//getting ARN value of dead-letter queue
GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
.withAttributeNames("QueueArn"));
String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//creating sqs queue
CreateQueueRequest createQueueRequest = new CreateQueueRequest();
createQueueRequest.addAttributesEntry("FifoQueue", "true");
createQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createQueueRequest.withQueueName(topicName + ".fifo");
String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ deleteQueueArn + "\"}";
createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
String queueUrl = createQueueResult.getQueueUrl();
//getting ARN value of queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn"));
String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//Subscribe FIFO queue to FIFO Topic
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.withProtocol("sqs")
.withTopicArn(topicArn)
.withEndpoint(queueArn);
SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
Assertions.assertNotNull(subscribeResult.getSubscriptionArn());
//Publishing 4 sample message to FIFO SNS Topic
for (int i = 0; i < 5; i++) {
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(topicArn)
.withMessage("Test Message" + i)
.withMessageGroupId(topicName)
.withMessageDeduplicationId(UUID.randomUUID().toString());
PublishResult publishResult = amazonSNS.publish(publishRequest);
Assertions.assertNotNull(publishResult.getMessageId());
}
//Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("All"));
String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessages");
//My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
}
}SNS访问策略(权限)
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__default_statement_ID",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish",
"SNS:Receive"
],
"Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo",
"Condition": {
"StringEquals": {
"AWS:SourceOwner": "<account>"
}
}
}
]
}访问策略(权限)
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}我遗漏了什么?为什么消息不存在于SQS队列中。对于SQS队列权限我应该做些什么,如下所示?
{
"Id": "Policy1611770719125",
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1611770707743",
"Action": [
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueueTags",
"sqs:ListQueues",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:SetQueueAttributes"
],
"Effect": "Allow",
"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
"Principal": {
"AWS": "*"
}
}
]
}发布于 2021-01-29 13:25:33
与后人分享我的答案,由于怀疑实际问题与Access Policy有关,当我们创建FIFO队列并使用AWS V1订阅SQS队列时,默认访问策略如下所示
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}即使我尝试使用AWS v2 链接创建SQS队列,上述访问策略也将是相同的。因此,当我按下面的方式手动更改访问策略时,问题已经解决,并且FIFO SNS主题退出已按规定发生:
{
"Statement": [
{
"Action": [
"sqs:*"
],
"Effect": "Allow",
"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
"Principal": {
"AWS": "*"
}
}
]
}代码块,为每个FIFO队列添加上述Access policy:
Policy policy = new Policy().withStatements(
new Statement(Statement.Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withResources(new Resource(queueArn))
.withActions(SQSActions.AllSQSActions));
Map<String, String> policyQueueAttributes = new HashMap<>();
policyQueueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, policyQueueAttributes));在创建SQS队列后添加了上述代码块,最终解决了这个问题。
发布于 2021-11-24 23:18:11
如果您正在寻找非编程解决方案,并且希望使用AWS控制台,请查看AWS正式文档中的“如何使用SNS FIFO Topics”部分(下面的链接),其中他们提到您需要向FIFO SQS队列的访问策略中添加一条语句。这个新添加的语句授予FIFO主题权限来向队列发送消息。
使用它们的示例,如果您有一个名为updates.fifo的FIFO主题,并且试图将消息扇出到两个队列,即customer.fifo和loyalty.fifo,则在订阅该主题的队列后,您将导航到控制台中的customer.fifo队列,并通过添加上述语句编辑访问策略:
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:us-east-2:123412341234:customer.fifo",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
}
}
}这也适用于loyalty.fifo队列:
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:us-east-2:123412341234:loyalty.fifo",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
}
}
}资源:介绍Amazon
https://stackoverflow.com/questions/65925207
复制相似问题