我的问题类似于"根据同一字段中的排序值合并两个排序文件“,但将其扩展到命名管道。
假设我有两个带有排序整数的文本文件,我想要合并它们。我可以使用sort -nm file1.txt file2.txt > merged.txt进行一次不阻塞的合并.
现在,假设这些文件实际上是我正在制作的管道(FIFO),然后在python中填充。只要我交替写一个管道,然后再写一个管道,我就可以做到这一点。此代码用于生成两个有序的整数列表,将它们写入由sort子进程读取的命名管道,该子进程将合并的结果输出到单个文件中:
import tempfile
import subprocess
import os
import sys
# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn" # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)
# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' + fifo_path1 + " " + fifo_path2 + " > " +
outfile, shell=True)
fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')
nlines1 = 0
nlines2 = 0
# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.
for i in range(1,100000):
print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
line_to_write = (str(i) + "\n")
if i % 2:
nlines1 +=1
fifo_writer2.write(line_to_write)
else:
nlines2 +=1
fifo_writer1.write(line_to_write)
# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()我得到了一个排序结果。但是--现在让我们通过将i % 2改为i % 3来实现列表的不平衡。在最初的版本中,这只会打印到fifo1,然后是fifo2,然后是fifo1,然后是fifo2等等。在修改后的版本中,它将两条管道中的一条打印两倍多的行。
使用i % 3运行这个程序,我得到以下输出:
...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398总是停在同一个地方。使用strace,我可以看到:
python进程在write调用spot 4:write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100时挂起。
但是sort进程在read调用spot 3:read(3,时挂起。
查看lsof -n -p进程的sort输出显示,它正在等待一个值到达fifo1,而write进程正在等待将一个值写入fifo2:
sort 23330 nsheff txt REG 259,2 110040 10769142 /usr/bin/sort
sort 23330 nsheff mem REG 259,2 2981280 10752335 /usr/lib/locale/locale-archive
sort 23330 nsheff mem REG 259,2 1868984 6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort 23330 nsheff mem REG 259,2 138696 6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort 23330 nsheff mem REG 259,2 162632 6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort 23330 nsheff 0u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 1w REG 259,2 0 4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort 23330 nsheff 2u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 3r FIFO 259,2 0t0 786463 /tmp/tmph1ilvegn/fifo1
sort 23330 nsheff 4r FIFO 259,2 0t0 786465 /tmp/tmph1ilvegn/fifo2因此,由于某种原因,sort进程*停止了侦听fifo2,导致进程挂起。
现在,如果我把一个单独的侦听器放在fifo2上,只需发出cat fifo2.该过程再次开始,并持续数千次迭代,直到.它现在停止在另一个随机点(迭代53733)。
我想一定有一些事情我不明白,在缓冲管道中,以及sort是如何从一个流到另一个流读取的。对我来说奇怪的是,它是确定性的,在完全相同的位置上失败,只有当列表不平衡时才会失败。
有什么办法能解决这个问题吗?
发布于 2019-06-06 17:03:56
显然,当您将不同数量的数据写入两个命名管道时,程序会造成死锁。您的程序在write上为一个fifo2块(缓冲区已满),而sort进程在read上为fifo1阻塞(缓冲区为空)。
您不知道sort是如何实现的。它可能希望读取较大块中的文件,然后在内存中处理数据以提高效率。如果sort使用来自stdio.h的函数读取数据,缓冲甚至可能会自动发生。
命名(和未命名)管道使用数据缓冲区。
如果缓冲区已满,则写入过程将被阻塞,直到读取进程读取某些数据或关闭其结束为止。
如果缓冲区为空,则读取进程将被阻塞,直到写入过程写入某些数据或关闭其结束为止。
如果您在每个周期中向fifo1写一行,将两行写到fifo2,则将填充fifo2's缓冲区,而fifo1's缓冲区只填充一半。
取决于您的程序写入fifo2的数据和要读取的sort的数量,这显然是在这样的情况下结束的:sort希望从只有一个空缓冲区的fifo1中读取一些东西,而您的程序希望用一个完整的缓冲区写入fifo2。
结果是确定性的,因为管道缓冲区的大小是固定的,而且您的程序和sort也可能使用固定的缓冲区大小来读取或写入数据。
您可以查看GNU sort的源代码
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c
在开始时,它尝试使用函数fillbuf来填充循环中所有输入文件的输入缓冲区。
稍后,在某些条件下,它再次调用fillbuf作为输入文件。
在函数fillbuf中有一个注释
/* Read as many bytes as possible, but do not read so many
bytes that there might not be enough room for the
corresponding line array. The worst case is when the
rest of the input file consists entirely of newlines,
except that the last byte is not a newline. */显然,sort选择一个输入文件并需要一定数量的数据。如果读取块,它不会切换输入文件。
该实现在普通文件中运行良好,因为read操作将在一段时间后返回某些数据或EOF,因此不会永久阻塞。
如果有不止一个可以阻塞两个进程/线程的东西,就很难避免死锁。在您的情况下,您应该只使用一个管道。使用非阻塞操作可能会有所帮助,如果您总是有数据写入fifo1,如果fifo2会阻塞,反之亦然。
如果您使用两个单独的线程/进程向管道写入,但只有当线程/进程彼此独立工作时,使用两个管道可能有效。如果线程A(应该写入pipe1 )以某种方式等待线程B(只在写入pipe2时阻塞),那就没有帮助了。
https://unix.stackexchange.com/questions/523347
复制相似问题