首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有多个事件的Python kqueue

具有多个事件的Python kqueue
EN

Stack Overflow用户
提问于 2013-06-16 20:31:55
回答 1查看 2.5K关注 0票数 1

我试图编写一个小程序来发送和接收UDP通信,并通过HTTP接口接收命令。HTTP驻留在一个multiprocessing.Process中,UDP服务器驻留在另一个中。这两个进程通过python multiprocessing.Pipe进行通信。我已经把完整的代码附在下面。

我有两个相关的问题:

  • 如何处理python中带有kqueue的多个文件描述符/kevents(套接字文件描述符工作,管道文件描述符似乎不--不确定我使用的管道是否等同于文件)?
  • 如何区分这些kevents,以便在要读取管道和套接字时应用不同的函数?

我希望我的UDP服务器所做的工作的伪代码:

代码语言:javascript
复制
kq = new kqueue
udpEvent = kevent when socket read
pipeEvent = kevent when pipe read
while:
    for event in kq.conrol([udpEvent, pipeEvent]):
        if event == udpEvent:
             # do something
        elif event == pipeEvent:
             print "HTTP command via pipe:", pipe.recv()

现在,UDP服务器识别套接字事件并正确读取套接字。但是,当我将管道事件添加到kqueue中时,程序会不间断地发出管道事件。我将过滤器设置为编写了管道,但我假设这是错误的,更具体地说,python multiprocessing.Pipe就像一个普通的unix管道,需要进行不同的处理。

代码语言:javascript
复制
.....
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 ^C<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>

main.py

代码语言:javascript
复制
import sys
from multiprocessing import Process, Pipe
# from userinterface import OSXstatusbaritem # use like so: OSXstatusbaritem.start(pipe)
from server import Server
import handler # UI thingy

# For UI, use simple HTTP server with various endpoints
# open a connection: localhost:[PORT]/open/[TARGET_IP]

def startServer(pipe):
    UDP_IP = "127.0.0.1"
    UDP_PORT = 9000

    print "starting server"
    s = Server(pipe)
    s.listen(UDP_IP, UDP_PORT)
    print "finishing server"

import BaseHTTPServer
def startUI(pipe):
    HTTP_PORT = 4567
    server_class = BaseHTTPServer.HTTPServer
    myHandler = handler.handleRequestsUsing(pipe)
    httpd = server_class(('localhost', 4567), myHandler)
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
          pass
    httpd.server_close()

def main():
    # Named full duplex pipe for communicating between server process and UI
    pipeUI, pipeServer = Pipe()

    # Start subprocesses
    pServer = Process(target=startServer, args=(pipeServer,))
    pServer.start()
    startUI(pipeUI)
    pServer.join()

if __name__ == "__main__": sys.exit(main())

server.py (UDP)

代码语言:javascript
复制
import sys
import select # for kqueue
from socket import socket, AF_INET, SOCK_DGRAM
from multiprocessing import Process, Pipe

class Server:
    def __init__(self, pipe):
        self.pipe = pipe

    def listen (self, ipaddress, port):
        print "starting!"

        # Initialize listening UDP socket
        sock = socket(AF_INET, SOCK_DGRAM)
        sock.bind((ipaddress, port))

        # Configure kqueue
        kq = select.kqueue()
        # Event for UDP socket data available
        kevent0 = select.kevent( sock.fileno(),
                                 filter=select.KQ_FILTER_READ,
                                 flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR)
        # Event for message queue from other processes (ui)
        kevent1 = select.kevent( self.pipe.fileno(),
                                 filter=select.KQ_FILTER_WRITE,
                                 flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE)                        

        # TODO: Figure out how to handle multiple kevents on kqueue
        # TODO: Need an event for TUN data

        # Start kqueue      
        while True:
            revents = kq.control([kevent0, kevent1], 1, None)
            for event in revents:
                print event
        kq.close()
        # close file descriptors (os.close(fd))

handler.py (HTTP接口)

代码语言:javascript
复制
import BaseHTTPServer

# Simple HTTP endpoints for controlling prototype Phantom implementation.
# The following commands are supported:
# 1. Open a connection via /open/[IP]:[PORT]
# 2. ????

class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    pipe = None

    def __init__(self, pipe, *args):
        RequestHandler.pipe = pipe
        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args)

    def do_HEAD(s):
        s.send_response(200)
        s.send_header("Content-type", "application/json")
        s.end_headers()
    def do_GET(s):
        s.send_response(200)
        s.send_header("Content-type", "application/json")
        s.end_headers()

        # Open connection command
        if s.path.startswith('/open/'):
            addrStr = s.path[6:len(s.path)]
            (address, port) = tuple(filter(None, addrStr.split(':')))
            port = int(port)
            print "opening address: ", address, "port:", port
            RequestHandler.pipe.send(['open', address, port])

def handleRequestsUsing(logic):
    return lambda *args: RequestHandler(logic, *args)

更新:

我用select重写了服务器侦听方法。对于一个使用不超过3或4个fds的慢速巨蟒原型来说,速度并不重要。克奎尔将成为下一天的话题。

def侦听(self,ipaddress,port):打印“启动!”

代码语言:javascript
复制
# Initialize listening non-blocking UDP socket
sock = socket(AF_INET, SOCK_DGRAM)
sock.setblocking(0)
sock.bind((ipaddress, port))

inputs = [sock, self.pipe] # stuff we read
outputs = [] # stuff we expect to write
while inputs:
    readable, writable, exceptional = select.select(inputs, outputs, inputs)

    for event in readable:
        if event is sock:
            self.handleUDPData( sock.recvfrom(1024) )
        if event is self.pipe:
            print "pipe event", self.pipe.recv()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-03-20 18:08:59

我知道这是一个老问题,但我可以给出一个用于多线程HTTP服务器的kqueue套接字轮询的例子,我在阅读了C源代码和kqueue的手册页后才发现了这个问题。

代码语言:javascript
复制
#bsd socket polling
#I make all the relevant flags more C like to match the kqueue man pages
from select import kevent, kqueue
from select import KQ_EV_ADD as EV_ADD, KQ_EV_ONESHOT as EV_ONESHOT   
from select import KQ_EV_EOF as EV_EOF

from .common import Client_Thread #a parent class who's implementation is irrelevant to the question, lol

class BSD_Client(Client_Thread):
    def __init__(self, *args):
        Client_Thread.__init__(self, *args)
        #Make a kqueue object for the thread 
        kq = kqueue()
        #Make a one-shot kev for this kqueue for when the kill socket is
        #connected to. The connection is only made once, so why not tell
        #that to our kqueue? The default filter is EVFILT_READ, so we don't
        #need to specify that. The default flag is just EV_ADD.
        kill_kev = kevent(self.kill_fd, flags=EV_ADD|EV_ONESHOT)
        #using defaults for the client socket.
        client_kev = kevent(self.client_sock)
        #we only need to keep track of the kqueue's control func.
        #This also makes things prettier in the run func.
        self.control = kq.control
        #now, we add thel list of events we just made to our kqueue.
        #The first 0 means we want a list of at most 0 length in return.
        #the second 0 means we want no timeout (i.e. do this in a 
        #non-blocking way.) 
        self.control([client_kev, kill_kev], 0, 0)

    def run(self):
        while True:
            #Here we poll the kqueue object.
            #The empty list means we are adding no new events to the kqueue.
            #The one means we want a list of at most 1 element. Then None
            #Means we want block until an event is triggered.
            events = self.control([], 1, None)
            #If we have an event, and the event is for the kill socket 
            #(meaning somebody made a connection to it), then we break the 
            #loop and die.
            if events and events[0].ident == self.kill_fd:
                self.die()
                break
            #If all that is left is an EOF in our socket, then we break
            #the loop and die. Kqueues will keep returning a kevent
            #that has been read once, even when they are empty.
            if events and events[0].flags & EV_EOF:
                self.die()
                break
            #Finally, if we have an event that isn't for the kill socket and
            #does not have the EOF flag set, then there is work to do. If
            #the handle client function (defined in the parent class) returns
            #1, then we are done serving a page and we can die.
            if events and self.handle_client():
                self.die()
                break
        client.close()

self.die所做的就是将put的客户端ip:port放到用于消息传递的队列上。与队列中的字符串不同的线程get输出一条消息,而join则打印相关的线程对象。当然,我不会用管道来做这件事,只有插座。不过,我确实在kqueue的在线手册上找到了这个。

Fifos,管道 当有要读取的数据时返回;数据包含 字节可用。当最后一个写入器断开连接时,过滤器将在 旗子。这可以通过传入EV_CLEAR来清除,此时 过滤器将继续等待数据可用,然后再重新- 转弯

因此,也许在您的udp服务器中,您应该按照手册页所述的方式循环遍历revents列表?实际上,您甚至不需要遍历最长为1的列表。也许你的收听功能应该像这样.

代码语言:javascript
复制
def listen(self, ip, port):
    print "Starting!"
    sock = socket.socket(AF_INET, SOCK_DGRAM)
    sock.bind((ip, port))
    kq = select.kqueue()
    kev0 = select.kevent(sock)
    kev1 = select.kevent(self.pipe)
    kq.control([kev0, kev1], 0, 0)
    while True: #this loop never breaks! so this whole function blocks forever like this
        revents = kq.control([], 1, None)
        if revents:
            event = revents[0]
            if event.flags & select.KQ_EV_EOF:
                new_event = select.kevent(event.ident, flags=select.KQ_EV_CLEAR)
                kq.control([new_event], 0, 0)
            else:
                print event

我真的建议以我的方式导入标志和函数,这使得它更类似于您需要比较的基于C的手册,我认为它看起来更漂亮。我还想指出,我的类与您所拥有的有一点不同,因为每个新客户端都将获得一个实例,并且每个客户机都将在自己的线程中运行。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/17137437

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档