首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Read from Kin产是在使用以前的序列号或时间戳运行时给出空记录。

Read from Kin产是在使用以前的序列号或时间戳运行时给出空记录。
EN

Stack Overflow用户
提问于 2017-10-03 12:34:06
回答 1查看 1K关注 0票数 1

我正试图在以下帮助下读取推送到Kinesis流的消息

get_records()和get_shard_iterator() API。

我的制作人在处理结束时一直在推动记录,消费者每隔30分钟也会继续以cron的身份运行。因此,我尝试将当前消息的序列号存储在我的数据库中,并使用AFTER_SEQUENCE_NUMBER切分迭代器和最后读取的序列号。但是,在新消息被推送后第二次(第一次成功地读取流中的所有消息)同样不能工作。

我还尝试使用AT_TIMESTAMP和消息时间戳,生产者将其作为消息的一部分推进到流中,并存储该消息以供进一步使用。同样,第一次运行将处理所有消息,而在第二次运行中,我将得到空记录。

我真的不知道我哪里出了问题。如果有人能帮我做这件事我会很感激的。

使用时间戳提供下面的代码,但序列号方法也是如此。

代码语言:javascript
复制
def listen_to_kinesis_stream():
kinesis_client = boto3.client('kinesis', region_name=SETTINGS['region_name'])
stream_response = kinesis_client.describe_stream(StreamName=SETTINGS['kinesis_stream'])

for shard_info in stream_response['StreamDescription']['Shards']:
    kinesis_stream_status = mongo_coll.find_one({'_id': "DOC_ID"})
    last_read_ts = kinesis_stream_status.get('state', {}).get(
        shard_info['ShardId'], datetime.datetime.strftime(datetime.date(1970, 01, 01), "%Y-%m-%dT%H:%M:%S.%f"))

    shard_iterator = kinesis_client.get_shard_iterator(
        StreamName=SETTINGS['kinesis_stream'],
        ShardId=shard_info['ShardId'],
        ShardIteratorType='AT_TIMESTAMP',
        Timestamp=last_read_ts)

    get_response = kinesis_client.get_records(ShardIterator=shard_iterator['ShardIterator'], Limit=1)
    if len(get_response['Records']) == 0:
        continue

    message = json.loads(get_response['Records'][0]['Data'])
    process_resp = process_message(message)
    if process_resp['success'] is False:
        print process_resp
    generic_config_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
    print "Processed {0}".format(message)

    while 'NextShardIterator' in get_response:
        get_response = kinesis_client.get_records(ShardIterator=get_response['NextShardIterator'], Limit=1)
        if len(get_response['Records']) == 0:
            break

        message = json.loads(get_response['Records'][0]['Data'])
        process_resp = process_message(message)
        if process_resp['success'] is False:
            print process_resp
        mongo_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
        print "Processed {0}".format(message)

logger.debug("Processed all messages from Kinesis stream")
print "Processed all messages from Kinesis stream"
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-10-05 07:48:37

根据我与AWS技术支持人员的讨论,可能会有一些带有空记录的消息,因此,当len(get_response‘==’records‘)为len(get_response'Records')时中断不是一个好主意。

我们建议的更好的方法是-我们可以有一个计数器,指示在运行和退出循环中读取的消息的最大数量。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46544716

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档