我有3个巨大的CSV文件包含气候数据,每个大约5GB。每行中的第一个单元是气象站的数目(从0到大约100,000),每个站在每个文件中包含1至800行,这在所有文件中不一定相等。例如,State11在file1、file2和file3中分别有600、500和200行。我想读取每个站点的所有行,对它们执行一些操作,然后将结果写入另一个文件,然后写入下一个站点,等等。这些文件太大,无法同时加载到内存中,因此--我尝试了一些解决方案,以便以最小的内存负载来读取它们,比如这个职位和这个职位,其中包括这种方法:
with open(...) as f:
for line in f:
<do something with line> 此方法的问题在于它每次从一开始读取文件,而我想按如下方式读取文件:
for station in range (100798):
with open (file1) as f1, open (file2) as f2, open (file3) as f3:
for line in f1:
st = line.split(",")[0]
if st == station:
<store this line for some analysis>
else:
break # break the for loop and go to read the next file
for line in f2:
...
<similar code to f1>
...
for line in f3:
...
<similar code to f1>
...
<do the analysis to station, the go to next station>问题是,每次我重新开始接收下一个站点时,for循环将从开始开始,而我希望它从第n行的“断开”开始,即继续读取文件。
我该怎么做呢?
提前感谢
关于以下解决方案的说明:正如我在下面发布答案时提到的那样,我实现了@DerFaizio的答案,但我发现处理过程非常缓慢。
在我尝试了@PM_2Ring提交的基于生成器的答案之后,我发现它非常快。也许是因为它依赖于发电机。
两种溶液的不同之处在于每分钟处理站的数量,发电机溶液为2500 st/min,Pandas溶液为45 st/min。在中,基于生成器的解决方案比快55倍。
我将把这两个实现保存在下面,以供参考。感谢所有的贡献者,特别是@PM_2Ring。
发布于 2017-02-06 12:52:56
下面的代码逐行遍历文件,依次从每个文件中抓取每个站点的行,并将它们附加到列表中以供进一步处理。
这段代码的核心是一个生成器file_buff,它生成文件的行,但它允许我们将一行推回以供以后阅读。当我们为下一站读取一行时,我们可以将其发送回file_buff,以便在需要处理该站点的行时重新读取它。
为了测试这段代码,我使用create_data创建了一些简单的假站点数据。
from random import seed, randrange
seed(123)
station_hi = 5
def create_data():
''' Fill 3 files with fake station data '''
fbase = 'datafile_'
for fnum in range(1, 4):
with open(fbase + str(fnum), 'w') as f:
for snum in range(station_hi):
for i in range(randrange(1, 4)):
s = '{1} data{0}{1}{2}'.format(fnum, snum, i)
print(s)
f.write(s + '\n')
print()
create_data()
# A file buffer that you can push lines back to
def file_buff(fh):
prev = None
while True:
while prev:
yield prev
prev = yield prev
prev = yield next(fh)
# An infinite counter that yields numbers converted to strings
def str_count(start=0):
n = start
while True:
yield str(n)
n += 1
# Extract station data from all 3 files
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3:
fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)
for snum_str in str_count():
station_lines = []
for fb in (fb1, fb2, fb3):
for line in fb:
#Extract station number string & station data
sid, sdata = line.split()
if sid != snum_str:
# This line contains data for the next station,
# so push it back to the buffer
rc = fb.send(line)
# and go to the next file
break
# Otherwise, append this data
station_lines.append(sdata)
#Process all the data lines for this station
if not station_lines:
#There's no more data to process
break
print('Station', snum_str)
print(station_lines)输出
0 data100
1 data110
1 data111
2 data120
3 data130
3 data131
4 data140
4 data141
0 data200
1 data210
2 data220
2 data221
3 data230
3 data231
3 data232
4 data240
4 data241
4 data242
0 data300
0 data301
1 data310
1 data311
2 data320
3 data330
4 data340
Station 0
['data100', 'data200', 'data300', 'data301']
Station 1
['data110', 'data111', 'data210', 'data310', 'data311']
Station 2
['data120', 'data220', 'data221', 'data320']
Station 3
['data130', 'data131', 'data230', 'data231', 'data232', 'data330']
Station 4
['data140', 'data141', 'data240', 'data241', 'data242', 'data340']如果一个或两个文件中的某个站点数据丢失,则此代码可以处理,但如果所有三个文件中缺少该数据,则不能处理,因为当station_lines列表为空时,它会中断主处理循环,但这对您的数据不会造成问题。
有关生成器和generator.send方法的详细信息,请参阅文档中的6.2.9.产额表达式。
这段代码是使用Python3开发的,但它也将在Python3上运行(您只需要在脚本的顶部包含2.6+ )。
如果所有3个文件中都缺少站点ids,我们可以轻松地处理。只需使用一个简单的range循环,而不是无限str_count生成器。
from random import seed, randrange
seed(123)
station_hi = 7
def create_data():
''' Fill 3 files with fake station data '''
fbase = 'datafile_'
for fnum in range(1, 4):
with open(fbase + str(fnum), 'w') as f:
for snum in range(station_hi):
for i in range(randrange(0, 2)):
s = '{1} data{0}{1}{2}'.format(fnum, snum, i)
print(s)
f.write(s + '\n')
print()
create_data()
# A file buffer that you can push lines back to
def file_buff(fh):
prev = None
while True:
while prev:
yield prev
prev = yield prev
prev = yield next(fh)
station_start = 0
station_stop = station_hi
# Extract station data from all 3 files
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3:
fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)
for i in range(station_start, station_stop):
snum_str = str(i)
station_lines = []
for fb in (fb1, fb2, fb3):
for line in fb:
#Extract station number string & station data
sid, sdata = line.split()
if sid != snum_str:
# This line contains data for the next station,
# so push it back to the buffer
rc = fb.send(line)
# and go to the next file
break
# Otherwise, append this data
station_lines.append(sdata)
if not station_lines:
continue
print('Station', snum_str)
print(station_lines)输出
1 data110
3 data130
4 data140
0 data200
1 data210
2 data220
6 data260
0 data300
4 data340
6 data360
Station 0
['data200', 'data300']
Station 1
['data110', 'data210']
Station 2
['data220']
Station 3
['data130']
Station 4
['data140', 'data340']
Station 6
['data260', 'data360']发布于 2017-02-06 08:40:32
我建议使用pandas.read_csv。您可以使用跳过指定要跳过的行,也可以使用合理的行数来加载,这取决于您使用nrow的文件大小,这里是指向文档:csv.html的链接。
发布于 2017-02-06 12:42:12
在@PM-2 2Ring发布他的解决方案之前,我发布了下面的代码。我希望这两种解决办法都能发挥作用:
依赖于Pandas库的#1解决方案(由@DerFaizio编写)。:
这个解决方案在120分钟(约45站/分钟)内完成了5450个台站的
import pandas as pd
skips =[1, 1, 1] # to skip the header row forever
for station_number in range(100798):
storage = {}
tmax = pd.read_csv(full_paths[0], skiprows=skips[0], header=None, nrows=126000, usecols=[0, 1, 3])
tmin = pd.read_csv(full_paths[1], skiprows=skips[1], header=None, nrows=126000, usecols=[0, 1, 3])
tavg = pd.read_csv(full_paths[2], skiprows=skips[2], header=None, nrows=126000, usecols=[0, 1, 3])
# tmax is at position 0
for idx, station in enumerate(tmax[0]):
if station == station_number:
date_val = tmax[1][idx]
t_val = float(tmax[3][idx])/10.
storage[date_val] = [t_val, None, None]
skips[0] += 1
else:
break
# tmin is at position 1
for idx, station in enumerate(tmin[0]):
# station, date_val, _, val = lne.split(",")
if station == station_number:
date_val = tmin[1][idx]
t_val = float(tmin[3][idx]) / 10.
if date_val in storage:
storage[date_val][1] = t_val
else:
storage[date_val] = [None, t_val, None]
skips[1] += 1
else:
break
# tavg is at position 2
for idx, station in enumerate(tavg[0]):
...
# similar to Tmin
...
pass
station_info = []
for key in storage.keys():
# do some analysis
# Fill the list station_info
pass
data_out.writerows(station_info)下面的解决方案是基于生成器的解决方案(由@PM-2环)
这个解决方案在12分钟内完成了30000个台站(约2500个/分钟)
files = ['Tmax', 'Tmin', 'Tavg']
headers = ['Nesr_Id', 'r_Year', 'r_Month', 'r_Day', 'Tmax', 'Tmin', 'Tavg']
# A file buffer that you can push lines back to
def file_buff(fh):
prev = None
while True:
while prev:
yield prev
prev = yield prev
prev = yield next(fh)
# An infinite counter that yields numbers converted to strings
def str_count(start=0):
n = start
while True:
yield str(n)
n += 1
# NULL = -999.99
print "Time started: {}".format(time.strftime('%Y-%m-%d %H:%M:%S'))
with open('Results\\GHCN_Daily\\Important\\Temp_All_out_gen.csv', 'wb+') as out_file:
data_out = csv.writer(out_file, quoting=csv.QUOTE_NONE, quotechar='', delimiter=',', escapechar='\\',
lineterminator='\n')
data_out.writerow(headers)
full_paths = [os.path.join(source, '{}.csv'.format(file_name)) for file_name in files]
# Extract station data from all 3 files
with open(full_paths[0]) as f1, open(full_paths[1]) as f2, open(full_paths[0]) as f3:
fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)
for snum_str in str_count():
# station_lines = []
storage ={}
count = [0, 0, 0]
for file_id, fb in enumerate((fb1, fb2, fb3)):
for line in fb:
if not isinstance(get__proper_data_type(line.split(",")[0]), str):
# Extract station number string & station data
sid, date_val, _dummy, sdata = line.split(",")
if sid != snum_str:
# This line contains data for the next station,
# so push it back to the buffer
rc = fb.send(line)
# and go to the next file
break
# Otherwise, append this data
sdata = float(sdata) / 10.
count[file_id] += 1
if date_val in storage:
storage[date_val][file_id] = sdata
else:
storage[date_val]= [sdata, None, None]
# station_lines.append(sdata)
# # Process all the data lines for this station
# if not station_lines:
# # There's no more data to process
# break
print "St# {:6d}/100797. Time: {}. Tx({}), Tn({}), Ta({}) ".\
format(int(snum_str), time.strftime('%H:%M:%S'), count[0], count[1], count[2])
# print(station_lines)
station_info = []
for key in storage.keys():
# key_val = storage[key]
tx, tn, ta = storage[key]
if ta is None:
if tx != None and tn != None:
ta = round((tx + tn) / 2., 1)
if tx is None:
if tn != None and ta != None:
tx = round(2. * ta - tn, 1)
if tn is None:
if tx != None and ta != None:
tn = round(2. * ta - tx, 1)
# print key,
py_date = from_excel_ordinal(int(key))
# print py_date
station_info.append([snum_str, py_date.year, py_date.month, py_date.day, tx, tn, ta])
data_out.writerows(station_info)
del station_info谢谢大家。
https://stackoverflow.com/questions/42063281
复制相似问题