我有一个大约10 GB的文本文件,我需要对文件中的文本数据进行一些处理。读取、访问和处理如此庞大的文件的最佳方式是什么?
我正在考虑将文件分成块,然后通过处理较小的文件来处理它(或者可以在buffer中-更好),然后合并结果。更像是map-reduce范式,但不会使用大数据技术。
发布于 2016-09-29 09:59:15
处理数据的方式选项:
RAM将所有内容加载到中并一次性处理它--如果它一次只适合
根据我的经验,我有一些建议:
vector这样的“线性”数据结构在这方面比链表更好。使用排序数组和二进制搜索,而不是字典/映射和键查找-更小的内存占用,更小的内存碎片,更大的CPU缓存命中机会。。
现在,如何做到这一点:
你可以在“本地主机模式”中使用Hadoop或类似的工具--不需要使用YARN、Zookeeper和其他工具部署完整的堆栈。只需安装Hadoop (或类似的东西),用数据处理逻辑编写一些.java文件,编译成.jar,在hadoop中执行,就完成了。不需要使用HDFS (如果您不想使用),只需使用普通文件。
或者从头开始写一些东西。在这里,我推荐Python,因为它可以很好地处理所有可以想象到的东西(文件格式、数据库、数学库),并且它的multiprocessing模块提供了很棒的工具(如进程、进程池、队列、锁、并行映射、类似redis的数据服务器)来使您的程序具有一定的分布性。如果您发现Python很慢,只需将该较慢部分重写为C/C++并在Python中使用它(使用cffi或Cython)。
大多数Python多处理功能仅限于单个主机/计算机。我认为这是最好的,因为今天的硬件通常有许多CPU核心。如果没有,只需以每小时几美分的价格启动一些具有任意多个内核的亚马逊网络服务EC2实例即可。
让我们来做一些例子--字数统计,“大数据问候世界”。使用Python。我将使用618MB压缩和2.35G未压缩的cswiki.xml.bz2维基百科转储。这是一个XML文件,但为了简单起见,我们将其作为文本文件使用:)
首先,处理一个文件很繁琐。最好将其拆分成更多更小的文件,这样可以更容易地将输入数据分发给多个工作进程:
$ bzcat cswiki-20160920-pages-articles-multistream.xml.bz2 | \
split \
--filter='xz -1 > $FILE' \
--additional-suffix=.xz \
--lines=5000000 \
- cswiki-splitted.结果:
$ ls -1hs cswiki*
618M cswiki-20160920-pages-articles-multistream.xml.bz2
94M cswiki-splitted.aa.xz
77M cswiki-splitted.ab.xz
74M cswiki-splitted.ac.xz
64M cswiki-splitted.ad.xz
62M cswiki-splitted.ae.xz
56M cswiki-splitted.af.xz
54M cswiki-splitted.ag.xz
58M cswiki-splitted.ah.xz
59M cswiki-splitted.ai.xz
15M cswiki-splitted.aj.xz下面是一个使用multiprocessing.Pool的简单的单词计数实现:
#!/usr/bin/env python3
import lzma
import multiprocessing
from os import getpid
from pathlib import Path
import re
def main():
input_dir = Path('.')
input_files = [p for p in input_dir.iterdir() if p.name.startswith('cswiki-splitted.')]
pool = multiprocessing.Pool()
partial_results = pool.map(process_file, input_files)
aggregated_results = {}
for pr in partial_results:
for word, count in pr.items():
aggregated_results[word] = aggregated_results.get(word, 0) + count
words_and_counts = aggregated_results.items()
counts_and_words = [(c, w) for w, c in words_and_counts]
counts_and_words.sort(reverse=True)
print('Top 100:', counts_and_words[:100])
def process_file(path):
print('Process {} reading file {}'.format(getpid(), path))
f = lzma.open(str(path), 'rt')
counts = {}
for line in f:
words = re.split(r'\W+', line)
for word in words:
if word != '':
word = word.lower()
counts[word] = counts.get(word, 0) + 1
return counts
if __name__ == '__main__':
main()输出:
$ ./wordcount.py
Process 2480 reading file cswiki-splitted.ab.xz
Process 2481 reading file cswiki-splitted.ah.xz
Process 2482 reading file cswiki-splitted.aj.xz
Process 2483 reading file cswiki-splitted.aa.xz
Process 2484 reading file cswiki-splitted.af.xz
Process 2485 reading file cswiki-splitted.ac.xz
Process 2486 reading file cswiki-splitted.ai.xz
Process 2487 reading file cswiki-splitted.ae.xz
Process 2482 reading file cswiki-splitted.ad.xz
Process 2481 reading file cswiki-splitted.ag.xz
Top 100: [(4890109, 'quot'), (4774018, 'gt'), (4765677, 'lt'), (4468312, 'id'), (4433742, 'v'), (4377363, 'a'), (2767007, 'na'), (2459957, 'text'), (2278791, 'amp'), (2114275, 'se'), (1971423, 'ref'), (1968093, 'kategorie'), (1799812, 'align'), (1795733, 'nbsp'), (1779981, 'title'), (1662895, '0'), (1592622, '1'), (1489233, 'page'), (1485505, 'je'), (1483416, 'model'), (1476711, 'format'), (1473507, '2'), (1470963, 'ns'), (1468018, 'revision'), (1467530, 'contributor'), (1467479, 'timestamp'), (1467453, 'sha1'), (1429859, 'comment'), (1414549, 'username'), (1261194, 's'), (1177526, '3'), (1159601, 'z'), (1115378, 'http'), (1040230, 'parentid'), (1012821, 'flagicon'), (949947, 'do'), (920863, 'right'), (887196, 'br'), (828466, 'x'), (797722, 've'), (795342, '4'), (783019, 'www'), (778643, '6'), (762929, 'name'), (762220, 'wiki'), (757659, 'i'), (752524, 'space'), (742525, 'xml'), (740244, 'center'), (733809, 'preserve'), (733752, 'wikitext'), (730781, 'o'), (725646, 'cz'), (679842, '5'), (672394, 'datum'), (599607, 'u'), (580936, 'byl'), (563301, 'k'), (550669, 'roce'), (546944, '10'), (536135, 'pro'), (531257, 'jako'), (527321, 'rd1'), (519607, '7'), (515398, 'roku'), (512456, 'od'), (509483, 'style'), (488923, 'za'), (485546, 'titul'), (467147, 'jméno'), (451536, '14'), (448649, '2016'), (447374, 'po'), (444325, 'citace'), (442389, 'jpg'), (424982, '12'), (423842, 'že'), (416419, 'název'), (408796, 'redirect'), (405058, 'minor'), (402733, 'to'), (400355, 'soubor'), (398188, '8'), (395652, 'the'), (393122, '11'), (389370, 'místo'), (368283, '15'), (359019, 'url'), (355302, 'monografie'), (354336, 'odkazy'), (352414, 'jsou'), (348138, 'of'), (344892, 'narození'), (340021, 'vydavatel'), (339462, '2014'), (339219, '20'), (339063, 'jeho'), (336257, '9'), (332598, 'praha'), (328268, 'byla')]我们可以看到,XML标记和属性中有很多杂音。这就是在XML文件上运行wordcount得到的结果:)
所有的文件读取和字数统计都是并行完成的。仅在主进程中执行最终聚合。
发布于 2016-09-28 22:09:41
如果您将所有10 GB都加载到内存中,那么一切都很简单。
如果你负担不起,那么你一次只能将大文件的一个范围加载到你的缓冲区中。
当你完成了一个部分,你滑动窗口(改变范围),加载新的数据范围到你的缓冲区中,这样缓冲区中以前的数据范围就会被丢弃(覆盖)。
您可能会查找所需的位置,并且可能需要来回查找以加载数据。它可能相对较慢,但这是你为使用更少的内存而付出的代价(时空权衡)。
--
你可能想要阅读可以处理大文件的程序的源代码。例如文件存档器。
发布于 2016-09-28 22:25:18
我会使用线程将文件的每一块加载到缓冲区中,然后处理缓冲区并执行所需的操作。然后,为了加载更多的缓冲区,从内存中删除之前的缓冲区,然后继续。看看音频是如何加载的,就像你想要的那样加载到缓冲区中。
https://stackoverflow.com/questions/39748920
复制相似问题