首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >只有在我使用异步数据库操作时才会将数据发回。

只有在我使用异步数据库操作时才会将数据发回。
EN

Stack Overflow用户
提问于 2015-09-22 06:08:01
回答 1查看 88关注 0票数 0

在与inlineCallbacks和扭曲/txredisapi的产量作斗争之后,我可以将数据保存到redis中。多亏了txredisapi的作者。现在我遇到了一个新问题,socket服务器在保存到DB之前/之后不会发送回客户端。

扭曲提供了以下简单的套接字服务器:

代码语言:javascript
复制
from twisted.internet import protocol, reactor

class Echo(protocol.Protocol):
    def dataReceived(self, data): 
        self.transport.write(data) ### write back 

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

reactor.listenTCP(8000, EchoFactory)
recctor.run()

我的代码是相似的,只有附加的DB操作。

代码语言:javascript
复制
#!/usr/bin/env python

import time
import binascii
import txredisapi

from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory
from twisted.enterprise import adbapi
from twisted.python import log

from dmpack import Dmpack
from dmdb import Dmdb
from dmconfig import DmConf

dm = Dmpack()
conf = DmConf().loadConf()
rcs = txredisapi.lazyConnection(password=conf['RedisPassword'])
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
    passwd=conf['DbPassword'],host=conf['DbHost'],\
    use_unicode=True,charset=conf['DbCharset'])

def getDataParsed(data):
    realtime = None
    period = None
    self.snrCode = dm.snrToAscii(data[2:7])    
    realtime = data[7:167] # save it into redis
    period = data[167:-2] # save it into SQL
    return (snrCode, realtime, period)

class PlainTCP(protocol.Protocol):
    def __init__(self, factory):
        self.factory = factory
        self.factory.numConnections = 0
        self.snrCode = None 
        self.rData = None
        self.pData = None
        self.err = None

    def connectionMade(self):
        self.factory.numConnections += 1
        print "Nr. of connections: %d\n" %(self.factory.numConnections)
        self.transport.write("Hello remote\r\n") # it only prints very 5 connections.

    def connectionLost(self, reason):
        self.factory.numConnections -= 1
        print "Nr. of connections: %d\n" %(self.factory.numConnections)

    @defer.inlineCallbacks
    def dataReceived(self, data):
        global dbpool, rcs
        (self.snrCode,rDat,pDat) = getDataParsed(data)

        if self.snrCode == None or rDat == None or pDat == None:
            err = "Bad format"
        else:
            err = "OK"
        print "err:%s"%(err) # debug print to show flow control
        self.err = err 

        self.transport.write(self.snrCode)
        self.transport.write(self.err)
        self.transport.write(rDat)
        self.transport.write(pDat) 
        self.transport.loseConnection()

        if self.snrCode != None and rDat != None and pDat != None:    
            res = yield self.saveRealTimeData(rcs, rDat)        
            res = yield self.savePeriodData(dbpool, pDat, conf)

        print "err2:%s"%(err)  # debug print to show flow control


    @defer.inlineCallbacks
    def saveRealTimeData(self, rc, dat):
        key = "somekey"
        val = "somedata"
        yield rc.set(key,val)
        yield rc.expire(key,30)

    @defer.inlineCallbacks
    def savePeriodData(self,rc,dat,conf):
        query = "some SQL statement"
        yield rc.runQuery(query)

class PlainTCPFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return PlainTCP(self)

def main():
    dmdb = Dmdb()
    if not dmdb.detectDb():
        print "Please run MySQL RDBS first."
        sys.exit()

    log.startLogging(sys.stdout)

    reactor.listenTCP(8080, PlainTCPFactory())
    reactor.run()

if __name__ == "__main__":
    main()

和我的客户的剪辑,这是一个简单的客户端:

代码语言:javascript
复制
def connectSend(host="127.0.0.1",port=8080):
    global packet
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.connect((host, port))
        s.sendall(''.join(packet))
        data = s.recv(1024)
        s.close()
        print 'Received', repr(data)
    except socket.error, err:
        print "Remote socket is not available: %s"%str(err)
        sys.exit(1)

目前的状况是:

  • 如果禁用@defer.inlineCallbacks并产生dataReceived()的操作,connectionMode()和dataReceived()内部的self.transport.write()都可以将数据输出到客户端。
  • 如果我们启用了@defer.inlineCallbacks和两个SQL/Redis的DB,那么connectionMode()内部的self.transport.write()将每5个连接打印一次,dataReceived()不会将任何数据输出到客户端。
  • 无论@defer.inlineCallbacks如何,调试打印语句都将在日志上打印。

有人告诉我,dataReceived()不应该是@defer.inlineCallbacks。但如果我去掉那个装饰也不会有任何改变。

我在想,gevent是否能帮助我摆脱这种无法预料的行为。我被扭曲成一个无尽的龙卷风,旋风.

任何有类似经验的人,请帮助我。谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-09-22 14:16:17

通过如下更改函数,代码可以工作。

代码语言:javascript
复制
#COMMENT OUT decorator of @defer.inlineCallbacks

def dataReceived(self, data):
    global dbpool, rcs
    (self.snrCode,rDat,pDat) = getDataParsed(data)

    if self.snrCode == None or rDat == None or pDat == None:
        err = "Bad format"
    else:
        err = "OK"
    print "err:%s"%(err) # debug print to show flow control
    self.err = err 

    self.transport.write(self.snrCode)
    self.transport.write(self.err)
    self.transport.write(rDat)
    self.transport.write(pDat) 
    self.transport.loseConnection()

    if self.snrCode != None and rDat != None and pDat != None:    
        self.saveRealTimeData(rcs, rDat)        
        self.savePeriodData(dbpool, pDat, conf)
        # Removing yield before DB ops

    print "err2:%s"%(err)  # debug print to show flow control


@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
    print "saveRedis"
    key = "somekey"
    val = "somedata"
    yield rc.set(key,val)
    yield rc.expire(key,30)

@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
    print "save SQL"
    query = "some SQL statement"
    yield rc.runQuery(query)

如果我们保持@defer.inlineCallbacks,并在dataReceived中屈服。连接在第二个DB op之前关闭。因此,没有数据输出到连接。可能是由inlineCallbacks装饰师引起的。

通过删除它,流控制是简单而直接的。

然而,如果有两个延迟的操作,我仍然可以理解为什么不能添加inlineCallbacks。这一次他们不需要推迟?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32709539

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档