首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Python将数据从AWS lambda推送到Kin产消防软管

使用Python将数据从AWS lambda推送到Kin产消防软管
EN

Stack Overflow用户
提问于 2022-06-22 11:05:45
回答 1查看 523关注 0票数 0

我试图发送数据从RDS到消防软管使用Lambda功能。我能够使用lambda函数从RDS检索数据。现在我想把这些数据从Lambda函数发送到动力消防软管。

我能够使用代码片段中给出的粗体代码从RDS检索数据,来自RDS的输入存储在变量'rows‘中。但是当我试图将RDS中的数据插入到Kinesis中时,我会得到这个错误。

"errorMessage":“需要一个类似字节的对象,而不是'tuple'",

"errorType":"TypeError“

代码语言:javascript
复制
connection = pymysql.connect(host = endpoint, user = username, passwd = password, db = database_name)

FIREHOSE_STREAM = 'DEMOLAMBDAFIREHOSE'
client = boto3.client('firehose')

def lambda_handler(event, context):
        cursor = connection.cursor()
        cursor.execute('SELECT * from inventory.report_product')
        rows = cursor.fetchall()
        
        for row in rows:
          data = base64.b64encode(row)
          response = client.put_record_batch(
                DeliveryStreamName=FIREHOSE_STREAM,
                Records=[
                   {
                   'Data': json.dumps(data)
                    },
                     ]
                  )
        print (response)
EN

回答 1

Stack Overflow用户

发布于 2022-06-23 14:56:29

有两件事值得尝试:

  1. 删除对datajson.dumps()调用。put_record_batch()方法期望Data字段有一个base64 64编码的二进制数据对象。json.dumps()返回一组为500的字符串.
  2. 批处理行。put_record_batch()方法支持多达500个记录的批处理。

示例:

代码语言:javascript
复制
connection = pymysql.connect(host = endpoint, user = username, passwd = password, db = database_name)

FIREHOSE_STREAM = 'DEMOLAMBDAFIREHOSE'
client = boto3.client('firehose')

def lambda_handler(event, context):
    cursor = connection.cursor()
    cursor.execute('SELECT * from inventory.report_product')
    rows = cursor.fetchall()

    records = []
    for row in rows:
        if len(records) < 500:
            records.append({
                'Data': base64.b64encode(row)
            })
        else:
            # call put_record_batch on previous 500 rows
            response = client.put_record_batch(
                DeliveryStreamName=FIREHOSE_STREAM,
                Records=records
            )
            print (response)

            # clear records and add current row
            records = []
            records.append({
                'Data': base64.b64encode(row)
            })

    if len(records) > 0:
        # send the final batch, call put_batch_record()

警告:未测试此代码示例。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72714388

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档