在与inlineCallbacks和扭曲/txredisapi的产量作斗争之后,我可以将数据保存到redis中。多亏了txredisapi的作者。现在我遇到了一个新问题,socket服务器在保存到DB之前/之后不会发送回客户端。
扭曲提供了以下简单的套接字服务器:
from twisted.internet import protocol, reactor
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data) ### write back
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()
reactor.listenTCP(8000, EchoFactory)
recctor.run()我的代码是相似的,只有附加的DB操作。
#!/usr/bin/env python
import time
import binascii
import txredisapi
from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory
from twisted.enterprise import adbapi
from twisted.python import log
from dmpack import Dmpack
from dmdb import Dmdb
from dmconfig import DmConf
dm = Dmpack()
conf = DmConf().loadConf()
rcs = txredisapi.lazyConnection(password=conf['RedisPassword'])
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
passwd=conf['DbPassword'],host=conf['DbHost'],\
use_unicode=True,charset=conf['DbCharset'])
def getDataParsed(data):
realtime = None
period = None
self.snrCode = dm.snrToAscii(data[2:7])
realtime = data[7:167] # save it into redis
period = data[167:-2] # save it into SQL
return (snrCode, realtime, period)
class PlainTCP(protocol.Protocol):
def __init__(self, factory):
self.factory = factory
self.factory.numConnections = 0
self.snrCode = None
self.rData = None
self.pData = None
self.err = None
def connectionMade(self):
self.factory.numConnections += 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)
self.transport.write("Hello remote\r\n") # it only prints very 5 connections.
def connectionLost(self, reason):
self.factory.numConnections -= 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)
@defer.inlineCallbacks
def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)
if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err
self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()
if self.snrCode != None and rDat != None and pDat != None:
res = yield self.saveRealTimeData(rcs, rDat)
res = yield self.savePeriodData(dbpool, pDat, conf)
print "err2:%s"%(err) # debug print to show flow control
@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)
@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
query = "some SQL statement"
yield rc.runQuery(query)
class PlainTCPFactory(protocol.Factory):
def buildProtocol(self, addr):
return PlainTCP(self)
def main():
dmdb = Dmdb()
if not dmdb.detectDb():
print "Please run MySQL RDBS first."
sys.exit()
log.startLogging(sys.stdout)
reactor.listenTCP(8080, PlainTCPFactory())
reactor.run()
if __name__ == "__main__":
main()和我的客户的剪辑,这是一个简单的客户端:
def connectSend(host="127.0.0.1",port=8080):
global packet
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
s.sendall(''.join(packet))
data = s.recv(1024)
s.close()
print 'Received', repr(data)
except socket.error, err:
print "Remote socket is not available: %s"%str(err)
sys.exit(1)目前的状况是:
有人告诉我,dataReceived()不应该是@defer.inlineCallbacks。但如果我去掉那个装饰也不会有任何改变。
我在想,gevent是否能帮助我摆脱这种无法预料的行为。我被扭曲成一个无尽的龙卷风,旋风.
任何有类似经验的人,请帮助我。谢谢。
发布于 2015-09-22 14:16:17
通过如下更改函数,代码可以工作。
#COMMENT OUT decorator of @defer.inlineCallbacks
def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)
if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err
self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()
if self.snrCode != None and rDat != None and pDat != None:
self.saveRealTimeData(rcs, rDat)
self.savePeriodData(dbpool, pDat, conf)
# Removing yield before DB ops
print "err2:%s"%(err) # debug print to show flow control
@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
print "saveRedis"
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)
@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
print "save SQL"
query = "some SQL statement"
yield rc.runQuery(query)如果我们保持@defer.inlineCallbacks,并在dataReceived中屈服。连接在第二个DB op之前关闭。因此,没有数据输出到连接。可能是由inlineCallbacks装饰师引起的。
通过删除它,流控制是简单而直接的。
然而,如果有两个延迟的操作,我仍然可以理解为什么不能添加inlineCallbacks。这一次他们不需要推迟?
https://stackoverflow.com/questions/32709539
复制相似问题