有人能建议如何使用函数elasticsearch.helpers.streaming_bulk代替elasticsearch.helpers.bulk来索引数据到elasticsearch中吗?
如果我只是简单地更改streaming_bulk而不是bulk,那么任何内容都不会被索引,所以我猜它需要以不同的形式使用。
下面的代码创建索引,类型和索引数据从CSV文件在块500个元素到elasticsearch。它工作正常,但我在想,是否有可能提高性能。这就是我想尝试streaming_bulk函数的原因。
目前,我需要10分钟为200MB的CSV文档建立100万行的索引。我使用两台机器,CentOS6.6,8cpu-s,x86_64,CPU MHz: 2499.902,内存: 15.574G。不确定它还能走得更快。
es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file
es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)
with open(file_to_index, 'rb') as csvfile:
reader = csv.reader(csvfile) #read documents for indexing from CSV file, more than million rows
content = {"_index": index_name, "_type": type_name}
batch_chunks = []
iterator = 0
for row in reader:
var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
id_increment = id_increment + 1
#var = transform_row_for_indexing(row,fields, index_name, type_name)
batch_chunks.append(var)
if iterator % 500 == 0:
helpers.bulk(es,batch_chunks)
del batch_chunks[:]
print "ispucalo batch"
iterator = iterator + 1
# indexing of last batch_chunk
if len(batch_chunks) != 0:
helpers.bulk(es,batch_chunks)发布于 2016-03-28 09:20:54
因此,批量流式传输会返回一个插入器。这意味着在你开始迭代它之前什么都不会发生。“bulk”函数的代码如下所示:
success, failed = 0, 0
# list of errors to be collected is not stats_only
errors = []
for ok, item in streaming_bulk(client, actions, **kwargs):
# go through request-reponse pairs and detect failures
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
return success, failed if stats_only else errors因此,基本上只调用streaming_bulk(客户端,操作,**kwargs)实际上不会做任何事情。直到你像这个for循环一样遍历它,索引才真正开始发生。
所以在你的代码中。欢迎您将' bulk‘更改为'streaming_bulk’,但是您需要遍历批量流的结果,以便对任何内容进行索引。
发布于 2018-03-31 12:20:04
streaming_bulk使用actions的迭代器,并为每个操作生成一个响应。因此,您首先需要在您的文档上编写一个简单的迭代器,如下所示:
def document_stream(file_to_index):
with open(file_to_index, "rb") as csvfile:
for row in csv.reader(csvfile):
yield {"_index": index_name,
"_type": type_name,
"_source": transform_row(row)
}然后执行流式大容量插入
stream = document_stream(file_to_index)
for ok, response in streaming_bulk(es, actions = stream):
if not ok:
# failure inserting
print responsehttps://stackoverflow.com/questions/34659198
复制相似问题