首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >C++与Python之间的数据损坏管道

C++与Python之间的数据损坏管道
EN

Stack Overflow用户
提问于 2016-05-02 16:44:34
回答 2查看 1.2K关注 0票数 9

我正在编写一些代码,从Python获取二进制数据,将其导入C++,对数据进行一些处理(在本例中是计算互信息度量),然后将结果传送回python。在测试过程中,我发现如果我发送的数据是一组尺寸小于1500×1500的2个数组,那么一切都可以正常工作,但是如果我发送2个2KX2K数组,我就会得到许多损坏的胡说八道。

目前,我认为代码的算法部分很好,因为它在使用小数组(<=1500 X1500)进行测试时提供了预期的答案。这使我相信,这是一个问题,无论是stdin或stdout管道。也许我在某个地方通过了一些固有的限制。

Python和C++代码在下面。

Python代码:

代码语言:javascript
复制
import subprocess
import struct
import sys
import numpy as np

#set up the variables needed 
bytesPerDouble = 8
sizeX = 2000
sizeY = 2000
offset = sizeX*sizeY
totalBytesPerArray = sizeX*sizeY*bytesPerDouble
totalBytes = totalBytesPerArray*2                   #the 2 is because we pass 2 different versions of the 2D array

#setup the testing data array 
a = np.zeros(sizeX*sizeY*2, dtype='d')
for i in range(sizeX):
    for j in range(sizeY):
        a[j+i*sizeY] = i
        a[j+i*sizeY+offset] = i
        if i % 10 == 0:
            a[j+i*sizeY+offset] = j

data = a.tobytes('C')      

strTotalBytes = str(totalBytes)
strLineBytes  = str(sizeY*bytesPerDouble)

#communicate with c++ code
print("starting C++ code")     
command =   "C:\Python27\PythonPipes.exe"
proc = subprocess.Popen([command, strTotalBytes, strLineBytes, str(sizeY), str(sizeX)], stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE)

ByteBuffer = (data)
proc.stdin.write(ByteBuffer)

print("Reading results back from C++")
for i in range(sizeX):
    returnvalues = proc.stdout.read(sizeY*bytesPerDouble)
    a = buffer(returnvalues)
    b = struct.unpack_from(str(sizeY)+'d', a)
    print str(b) + " " + str(i)

print('done')

C++代码:主要功能:

代码语言:javascript
复制
int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;
    //*
    //write out data to pipe
    columnindex = 0;
    while (count<totalbytes/2) {

        fwrite(&(MI.getOutput()[columnindex]), 1, bytechunk, stdout);
        fflush(stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    //*/
    delete [] data;

    return 0;
}

如果您需要它,则需要实际的处理代码:

代码语言:javascript
复制
double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){
        //write out processing status
        //index = (double)ioutter;
        //fwrite(&index, 8, 1, stdout);
        //fflush(stdout);
        //*
        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;

            }

        }



    }

    return rvalue;
}

对于返回有意义的内容的数组,输出限制在0到1之间,如下所示:

(0.0,0.0,0.0,0.7160627908692593,0.6376472316395495,0.5728801401524277,…

对于2Kx2K或更高的数组,我得到了如下所示的非感知(即使代码将值夹在0到1之间):

(-2.2491400820412374e+228,-2.2491400820412374e+228,-2.2491400820412374e+228,-2.2491400820412374e+228,-2.2491400820412374e+228,

我想知道为什么这段代码会在0.0到1之间分配数据集之后破坏数据集,以及它是否是管道问题、stdin/stdout问题、某种类型的缓冲区问题,还是我根本没有看到的编码问题。

更新--我尝试使用Chris建议的代码以较小的块传递数据,但没有结果。另外值得注意的是,我在stdout上添加了一个ferror的捕获,并且它从未被绊倒,所以我非常肯定字节至少使它成为stdout。有没有可能别的东西在以某种方式写出来呢?也许在我的程序运行的时候,会有一个额外的字节进入stdout?我认为这是值得怀疑的,因为在第10条的第4页,错误总是出现在第4页。

按照克雷格的请求,这里是完整的C++代码(完整的Python已经发布):它位于3个文件中:

main.cpp

代码语言:javascript
复制
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <iostream>
#include "./MutualInformation.h"

double * data;
using namespace std;

void
xxwrite(unsigned char *buf, size_t wlen, FILE *fo)
{
    size_t xlen;

    for (; wlen > 0; wlen -= xlen, buf += xlen) {
        xlen = wlen;
        if (xlen > 1024)
            xlen = 1024;
        xlen = fwrite(buf, 1, xlen, fo);
        fflush(fo);
    }
}

int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;

    columnindex = 0;
    while (count<totalbytes/2) {

        xxwrite((unsigned char*)&(MI.getOutput()[columnindex]),  bytechunk, stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    delete [] data;

    return 0;
}

MutualInformation.h

代码语言:javascript
复制
#include <map>

using namespace std;

class MutualInformation
{
private:
    double * mData;
    double * mOutput;
    long mHeight;
    long mWidth;
    long mOffset;

public:
    MutualInformation();
    ~MutualInformation();
    bool Initialize(double * data, long Height, long Width, long Offset);
    const double * getOutput();

    double calcMI();

};

MutualInformation.cpp

代码语言:javascript
复制
#include "MutualInformation.h"


MutualInformation::MutualInformation()
{
    this->mData = nullptr;
    this->mOutput = nullptr;
    this->mHeight = 0;
    this->mWidth = 0;

}


MutualInformation::~MutualInformation()
{
    delete[] this->mOutput;
}

bool MutualInformation::Initialize(double * data, long Height, long Width, long Offset){
    bool rvalue = false;
    this->mData = data;
    this->mHeight = Height;
    this->mWidth = Width;
    this->mOffset = Offset;


    //allocate output data
    this->mOutput = new double[this->mHeight*this->mWidth];

    return rvalue;
}

const double * MutualInformation::getOutput(){
    return this->mOutput;
}


double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){

        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;
                //cout << "problem with output";
            }

        }



    }



    //*/
    return rvalue;
}

由6502求解

下面的答案解决了我的问题。我需要显式地告诉Windows对stdin / stdout使用二进制模式。为此,我必须在我的主cpp文件中包含2个新的头文件。

代码语言:javascript
复制
#include <fcntl.h>
#include <io.h>

在我的主函数开头添加以下代码行(由于Visual的抱怨,从6502的POSIX版本中修改)

代码语言:javascript
复制
_setmode(_fileno(stdout), O_BINARY);
_setmode(_fileno(stdin), O_BINARY);

然后将这些行添加到Python代码中:

代码语言:javascript
复制
import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-05-02 21:20:56

问题是windows中的stdin/stdout是在文本模式下打开的,而不是在二进制模式下打开的,因此在发送字符13 (\r)时会出现混乱。

例如,您可以在Python中使用

代码语言:javascript
复制
import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)

在C++中

代码语言:javascript
复制
_setmode(fileno(stdout), O_BINARY);
_setmode(fileno(stdin), O_BINARY);

请参阅https://msdn.microsoft.com/en-us/library/tw4k6df8.aspx

票数 6
EN

Stack Overflow用户

发布于 2016-05-02 17:29:52

您的C++ fwrite代码不考虑获得“短”传输。

这里有一个小小的改动:

代码语言:javascript
复制
//write out data to pipe
columnindex = 0;
while (count < totalbytes / 2) {
    wlen = fwrite(&(MI.getOutput()[columnindex]), 1, bytechunk, stdout);
    fflush(stdout);
    count += wlen;
    columnindex += wlen / sizeof(double);
}

注意:您仍然需要小心,因为如果wlen返回并且它不是sizeof(double)的倍数,这仍然会有问题。例如,如果bytechunk为16,而wlen为14,则在继续循环之前,需要一个长度为2的额外fwrite。对此的一个概括就是将整个数据矩阵作为一个巨大的字节缓冲区并对其进行循环。

实际上,许多小得多的传输被固定的(例如,“已知的安全数量”)限制为1024字节,您将获得同样的效率。这是因为输出是字节流。

以下是我经常使用的一个稍微通用的解决方案:

代码语言:javascript
复制
void
xxwrite(void *buf,size_t wlen,FILE *fo)
{
    size_t xlen;

    for (;  wlen > 0;  wlen -= xlen, buf += xlen) {
        xlen = wlen;
        if (xlen > 1024)
            xlen = 1024;
        xlen = fwrite(buf,1,xlen,fo);
        fflush(fo);
    }
}

//write out data to pipe
columnindex = 0;
while (count < totalbytes / 2) {
    xxwrite(&(MI.getOutput()[columnindex]), bytechunk, stdout);
    count += bytechunk;
    columnindex += bytechunk / sizeof(double);
}

更新:

我下载了你所有的代码并运行了它。我有好消息和坏消息:代码在这里运行良好,即使矩阵大小超过3000。我使用xxwrite和both运行它,结果是一样的。

使用我有限的python技能,我在python脚本中添加了一些漂亮的打印(例如,一些行包装),并让它检查每个值的范围,并注释任何坏的值。剧本里没有找到任何东西。另外,对这些值的视觉检查没有发现--这在漂亮的打印之前是正确的,所以它没有引入任何东西。只是大量的零,然后在0.9范围内阻止。

我唯一能看到的区别是,我在linux上使用的是gcc,当然还有python。但是,从您的脚本来看,您似乎使用了Windows (基于C:\...路径的C++可执行文件)。这对这个应用程序来说不重要,但我还是要提一提。

管道在这里工作。您可以尝试的一件事是将C++输出定向到文件。然后,让脚本从文件中读取回来(即没有管道),看看这是否有区别。我倾向于不这么认为,但是.

另外,我不知道您在Windows下使用的编译器和python实现。每当我需要这样做的时候,我通常都会安装Cygwin,因为它提供了类似linux/Unix环境的最接近的实现之一(也就是说,管道更有可能像广告中所说的那样工作)。

总之,这是修改过的脚本。还请注意,我添加了os.getenv以获取备用矩阵大小和C++可执行文件的备用位置,这样我们两人都能以最小的痛苦工作。

代码语言:javascript
复制
#!/usr/bin/python

import subprocess
import struct
import sys
import os
import numpy as np

val = os.getenv("MTX","2000")
sizeX = int(val)
sizeY = sizeX
print "sizeX=%d sizeY=%d" % (sizeX,sizeY)

#set up the variables needed
bytesPerDouble = 8
offset = sizeX*sizeY
totalBytesPerArray = sizeX*sizeY*bytesPerDouble
totalBytes = totalBytesPerArray*2                   #the 2 is because we pass 2 different versions of the 2D array

#setup the testing data array
a = np.zeros(sizeX*sizeY*2, dtype='d')
for i in range(sizeX):
    for j in range(sizeY):
        a[j+i*sizeY] = i
        a[j+i*sizeY+offset] = i
        if i % 10 == 0:
            a[j+i*sizeY+offset] = j

data = a.tobytes('C')

strTotalBytes = str(totalBytes)
strLineBytes  = str(sizeY*bytesPerDouble)

#communicate with c++ code
print("starting C++ code")

command = os.getenv("CPGM",None);
if command is None:
    command =   "C:\Python27\PythonPipes.exe"

proc = subprocess.Popen([command, strTotalBytes, strLineBytes, str(sizeY), str(sizeX)], stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE)

ByteBuffer = (data)
proc.stdin.write(ByteBuffer)

def prt(i,b):

    hangflg = 0
    per = 8

    for j in range(0,len(b)):
        if ((j % per) == 0):
            print("[%d,%d]" % (i,j)),

        q = b[j]
        print(q),
        hangflg = 1

        if (q < 0.0) or (q > 1.0):
            print("=WTF"),

        if ((j % per) == (per - 1)):
            print("")
            hangflg = 0

    if (hangflg):
        print("")

print("Reading results back from C++")
for i in range(sizeX):
    returnvalues = proc.stdout.read(sizeY*bytesPerDouble)
    a = buffer(returnvalues)
    b = struct.unpack_from(str(sizeY)+'d', a)
    prt(i,b)
    ###print str(b) + " " + str(i)
    ###print str(i) + ": " + str(b)

print('done')
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36987437

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档