我正在创建一个脑-计算机接口,其中一项任务是将脑电图仪(EEG)数据流到Python脚本中进行实时分析。该问题包括两个Python脚本:(1)生成和流假脑电数据(32个随机浮点数用于测试)和(2)接收脑电数据并将其转换为形状的numpy数组(n_channels,n_samples)。第二个脚本提取最后X秒数据的滑动窗口。
# file is named "stream_eeg_realtime.py"
import random, threading, time
nominal_srate = 500. # sampling rate in Hz. Must be float.
sleep_time = 1/nominal_srate
# Define all of the channel names.
channels = ['Fp1','Fp2','AF3','AF4','F3','F4','F7','F8','FC5','FC6','T7','T8',
'FC1','FC2','C3','C4','CP5','CP6','P7','P8','CP1','CP2','P3','P4','01','02',
'PO3','PO4','Oz','Pz','Cz','Fz']
one_sample = [round(random.uniform(-4.,4.), 5) for _ in channels]
data = []
datalock = threading.RLock()
def synthesize_data():
while True:
with datalock:
data.append(one_sample)
time.sleep(sleep_time)
t1 = threading.Thread(target=synthesize_data, name="EEG")
t1.daemon = True
t1.start()
print("Now streaming synthetic data...")In [1]:
import numpy as np
import stream_eeg_realtime as ser
DATA_DURATION = 20
sfreq = float(ser.nominal_srate)
last_rows_index = int(DATA_DURATION*sfreq)
thread_lock = ser.datalock
def get_data(raw_data, index, thread_lock, full_data=False):
# Data comes in as array-like with shape (n_samples, n_channels).
with thread_lock:
if full_data==False:
d = [row[:] for row in raw_data[-index:]]
# If we want all of the data, instead of the X-second window.
elif full_data==True:
d = [row[:] for row in raw_data]
data = np.array(d, dtype="float64").T
return data
In [2]:
raw_array = get_data(ser.data, last_rows_index, thread_lock, full_data=False)
print raw_array.shape这段代码必须尽可能快。作为一个高级初学者,我尽可能多地优化了代码,但是我能让它更快吗?在这种情况下使用线程可以吗,还是multiprocessing更好呢?(我不知道如何使用multiprocessing)
在我发布代码之前,这里有一些我尝试过的东西。首先,我尝试将“列”添加到嵌套列表中,并进行列表理解。这是快速的,但事实证明,更快的附加“行”(不需要理解列表),然后转接。我还试图在第二个脚本中deepcopy列表,然后使用嵌套列表理解来复制。在列表理解中使用切片语法是复制列表的最快方法。
发布于 2016-10-08 09:08:35
datalock似乎是不需要的,因为data.append是线程安全和迭代的,而附加则是安全的。
multiprocessing可能更快,但可能更慢。这取决于你的瓶颈在哪里。特别是,多个进程的数据传输速度较慢。
这里的慢点主要是对numpy.ndarray对象的重复转换。这可以通过只执行一次翻译来避免,这在生产者线程中是最简单的:
data.append(np.array(one_sample, dtype="float64"))然后,get_data变得更简单,因为
def get_data(raw_data, number_values=None):
if number_values == 0:
return []
return np.array(raw_data[-number_values or 0:]).T如果您想要的块大小有固定的限制,那么最好将其写入一个形状正确的圆形numpy缓冲区中。这样,您就可以只进行一次级联,而不是对大量数组进行连接。
https://codereview.stackexchange.com/questions/143451
复制相似问题