首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >需要boto3和SWF示例

需要boto3和SWF示例
EN

Stack Overflow用户
提问于 2015-09-23 00:15:58
回答 2查看 3.7K关注 0票数 7

亚马逊正在为未来的开发推广boto3,但没有为新的boto3提供足够的文档。

有没有人愿意分享一下在boto3中使用SWF的示例代码?

EN

回答 2

Stack Overflow用户

发布于 2016-03-23 02:25:21

这是我到目前为止发现的唯一一个例子:

https://github.com/jhludwig/aws-swf-boto3

因此,流程概述看起来像这样(注意,这是直接从上面的链接中提取的,但添加了一些额外的注释和更多的流)。

应该注意的是,SWF对事物的名称进行操作。这取决于您的代码是否为这些名称赋予执行意义。例如,您的Decider将轮询并使用任务名称decide what next。

有些事情我不是很确定。我认为TASKLIST引用是一种名称空间。这并不是一个真正的东西列表,它更多的是通过名称来隔离东西。现在我可能完全错了,从我的基本理解来看,这就是我认为它在说的。

你可以在任何地方运行Decider和Worker。由于它们可以连接到AWS,因此如果您的防火墙允许0.0.0.0/0出口,您将可以访问。

AWS文档还提到您可以运行lambda,但我还没有找到如何触发它。

创建boto3 swf客户端:

代码语言:javascript
复制
import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')

创建域名

代码语言:javascript
复制
try:
  swf.register_domain(
    name=<DOMAIN>,
    description="Test SWF domain",
    workflowExecutionRetentionPeriodInDays="10" # keep history for this long
  )
except ClientError as e:
    print "Domain already exists: ", e.response.get("Error", {}).get("Code")

创建域后,我们现在注册工作流:

注册工作流

代码语言:javascript
复制
try:
  swf.register_workflow_type(
    domain=DOMAIN, # string
    name=WORKFLOW, # string
    version=VERSION, # string
    description="Test workflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Test workflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", {}).get("Code")

注册了工作流之后,我们现在可以开始分配任务了。

将任务分配给工作流。

您可以分配N任务。记住,这些主要是字符串,你的代码将赋予它们执行的意义。

代码语言:javascript
复制
try:
  swf.register_activity_type(
    domain=DOMAIN,
    name="DoSomething",
    version=VERSION, # string
    description="This is a worker that does something",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Worker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", {}).get("Code")

发送启动工作流

创建域、工作流和任务后,我们现在可以开始工作流了。

代码语言:javascript
复制
import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
  domain=DOMAIN # string,
  workflowId='test-1001',
  workflowType={
    "name": WORKFLOW,# string
    "version": VERSION # string
  },
  taskList={
      'name': TASKLIST
  },
  input=''
)

print "Workflow requested: ", response

请注意workflowId,这是一个自定义标识符,例如str(uuid.uuid4())。从文档中:

与工作流执行关联的用户定义标识符。您可以使用它将自定义标识符与工作流执行相关联。如果工作流执行在逻辑上是对上一次执行的重新启动,则可以指定相同的标识符。您不能同时使用相同的workflowId执行两个打开的工作流。

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

此时,不会发生任何事情,因为我们既没有运行Decider,也没有运行任何Workers。让我们看看这些是什么样子的。

决定者

我们的决策者将进行投票,以获得一个决定任务,以做出以下决定:

代码语言:javascript
复制
import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

请注意上面的超时设置。您可以参考此PR以了解其背后的基本原理:

https://github.com/boto/botocore/pull/634

从Boto3 SWF文档:

工作者应该将其客户端套接字超时设置为至少70秒(比服务可以容纳轮询请求的最大时间长10秒)。

正是这种公关使boto3能够做到这一点。

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

代码语言:javascript
复制
print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain=DOMAIN ,
    taskList={'name': TASKLIST }, # TASKLIST is a string
    identity='decider-1', # any identity you would like to provide, it's recorded in the history
    reverseOrder=False)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': TASKNAME, # string
                    'version': VERSION # string
                    },
                'activityId': 'activityid-' + str(uuid.uuid4()),
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': TASKLIST}, # TASKLIST is a string
            }
          }
        ]
      )
      print "Task Dispatched:", newTask['taskToken']

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )
      print "Task Completed!"

请注意,在这段代码的末尾,我们检查是否有ActivityTaskCompleted,并以决定CompleteWorkflowExecution作为响应,让SWF知道我们完成了操作。

这是我们的决定,工人是什么样子的?

工人

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

再次注意,我们设置了read_timeout

代码语言:javascript
复制
import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

现在我们开始我们的工人投票:

代码语言:javascript
复制
print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain=DOMAIN,# string
    taskList={'name': TASKLIST}, # TASKLIST is a string
    identity='worker-1') # identity is for our history

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"

我们再一次通知SWF我们已经完成了我们的工作。

票数 16
EN

Stack Overflow用户

发布于 2020-07-06 19:25:58

官方文档的链接在这里。

有很多代码示例,只需跟随链接或此链接。在available service部分下,它列出了boto3现在支持的所有服务以及详细的示例。

其中一些示例是: boto3和获取SWF的执行计数

代码语言:javascript
复制
import boto3
import datetime
import time
import dateutil.tz

def lambda_handler(event,context):
    swfClient = boto3.client('swf')
    currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
    latestDate = datetime.datetime.now(tz=currentTimeZone)
    oldestDate = latestDate - datetime.timedelta(1)

    fullTextPreloadResponse = swfClient.count_open_workflow_executions(
         domain=domainName,
         startTimeFilter={
             'oldestDate': oldestDate,
             'latestDate': latestDate
         },
         typeFilter={
             'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
             'version': 'VERSION_NUMBER'
         }
     )
     print("the count is " + str(fullTextResponse['count']))
     print(fullTextResponse)

这就是我在我的案例中用来获取正在运行的SWF Workflow类型的计数。我使用的格式在上面提到的文档中有很好的定义。

要简单地同时使用boto3和SWF,首先要在python lambda函数中导入boto3。然后添加python DateTime。然后,boto3.client设置客户端,我们可以在其中使用|与SWF交互。

其他示例包括:

代码语言:javascript
复制
history = swf.get_workflow_execution_history(
            domain= domainName,
            execution={
                'workflowId': workflowId,
                'runId': runId
            },
        )

希望这篇文章能帮到你!1:https://boto3.amazonaws.com/v1/documentation/api/latest/index.html 2:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32721847

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档