我试图发送数据从RDS到消防软管使用Lambda功能。我能够使用lambda函数从RDS检索数据。现在我想把这些数据从Lambda函数发送到动力消防软管。
我能够使用代码片段中给出的粗体代码从RDS检索数据,来自RDS的输入存储在变量'rows‘中。但是当我试图将RDS中的数据插入到Kinesis中时,我会得到这个错误。
"errorMessage":“需要一个类似字节的对象,而不是'tuple'",
"errorType":"TypeError“
connection = pymysql.connect(host = endpoint, user = username, passwd = password, db = database_name)
FIREHOSE_STREAM = 'DEMOLAMBDAFIREHOSE'
client = boto3.client('firehose')
def lambda_handler(event, context):
cursor = connection.cursor()
cursor.execute('SELECT * from inventory.report_product')
rows = cursor.fetchall()
for row in rows:
data = base64.b64encode(row)
response = client.put_record_batch(
DeliveryStreamName=FIREHOSE_STREAM,
Records=[
{
'Data': json.dumps(data)
},
]
)
print (response)发布于 2022-06-23 14:56:29
有两件事值得尝试:
data的json.dumps()调用。put_record_batch()方法期望Data字段有一个base64 64编码的二进制数据对象。json.dumps()返回一组为500的字符串.put_record_batch()方法支持多达500个记录的批处理。示例:
connection = pymysql.connect(host = endpoint, user = username, passwd = password, db = database_name)
FIREHOSE_STREAM = 'DEMOLAMBDAFIREHOSE'
client = boto3.client('firehose')
def lambda_handler(event, context):
cursor = connection.cursor()
cursor.execute('SELECT * from inventory.report_product')
rows = cursor.fetchall()
records = []
for row in rows:
if len(records) < 500:
records.append({
'Data': base64.b64encode(row)
})
else:
# call put_record_batch on previous 500 rows
response = client.put_record_batch(
DeliveryStreamName=FIREHOSE_STREAM,
Records=records
)
print (response)
# clear records and add current row
records = []
records.append({
'Data': base64.b64encode(row)
})
if len(records) > 0:
# send the final batch, call put_batch_record()警告:未测试此代码示例。
https://stackoverflow.com/questions/72714388
复制相似问题