首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python服务器(twisted.conch)命令筛选和端口转发

Python服务器(twisted.conch)命令筛选和端口转发
EN

Stack Overflow用户
提问于 2016-01-23 22:21:13
回答 2查看 588关注 0票数 2

我需要创建一个SSH服务器(已为该作业选择了twisted.conch),它将执行以下操作:

  1. 执行端口转发(附加的代码不会这样做,我也不知道该修改什么)
  2. 在执行命令之前(或至少在执行之前或之后对其进行记录)筛选它们。

下面所附的代码创建了一个完美的SSH和SFTP服务器,但它缺少一个主要组件--端口转发(以及命令筛选,但这并不像端口转发那么重要)。

我找了尽可能多的地方,却找不到这两个人。请帮帮我,-这是拼图的最后一片平静

代码语言:javascript
复制
#!/usr/bin/env python
from twisted.conch.unix import UnixSSHRealm
from twisted.cred.portal import Portal
from twisted.cred.credentials import IUsernamePassword
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.error import UnauthorizedLogin
from twisted.conch.ssh.factory import SSHFactory
from twisted.internet import reactor, defer
from twisted.conch.ssh.transport import SSHServerTransport
from twisted.conch.ssh.userauth import SSHUserAuthServer
from twisted.conch.ssh.connection import SSHConnection
from twisted.conch.ssh.keys import Key
from zope.interface import implements
from subprocess import Popen,PIPE
from crypt import crypt

publicKey = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJSkbh/C+BR3utDS555mV'
privateKey = """-----BEGIN RSA PRIVATE KEY-----
MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW
4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw
vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb
Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1
xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8
PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2
gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu
DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML
pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP
EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg==
-----END RSA PRIVATE KEY-----"""

# check if username/password is valid
def checkPassword(username,password):
    try:
        ret=False
        if username and password:
            output=Popen(["grep",username,"/etc/shadow"],stdout=PIPE,stderr=PIPE).communicate()[0]
            hash=""
            if output:
                tmp=output.split(":")
                if tmp>=2:
                    hash=tmp[1]
                del tmp
            ret=crypt(password,hash)==hash
            del output,hash
    except Exception,e:
        ret=False
    return ret

# authorization methods
class XSSHAuth(object):
    credentialInterfaces=IUsernamePassword,implements(ICredentialsChecker)
    def requestAvatarId(self, credentials):
        #print "Credentials:",credentials.username,credentials.password
        if credentials.username=="root" and credentials.password and checkPassword(credentials.username,credentials.password):
            # successful authorization
            return defer.succeed(credentials.username)
        # failed authorization
        return defer.fail(UnauthorizedLogin("invalid password"))
class XSSHUserAuthServer(SSHUserAuthServer):
    def _ebPassword(self, reason):
        addr = self.transport.getPeer().address
        if addr.host!="3.22.116.85" and addr.host!="127.0.0.1":
            p1 = Popen(["iptables","-I","INPUT","-s",addr.host,"-j","DROP"], stdout=PIPE, stderr=PIPE)
            p1.communicate()
        print(addr.host, addr.port, self.user, self.method)
        self.transport.loseConnection()
        return defer.fail(UnauthorizedLogin("invalid password"))

# the transport class - we use it to log MOST OF THE ACTIONS executed thru the server
class XSSHTransport(SSHServerTransport):
    ourVersionString="SSH-2.0-X"
    logCommand=""
    def connectionMade(self):
        print "Connection made",self.getPeer()
        SSHServerTransport.connectionMade(self)
        #self.transport.loseConnection()
    def connectionLost(self,reason):
        print "Connection closed",self.getPeer()
        SSHServerTransport.connectionLost(self,reason)
    def dataReceived(self, data):
        SSHServerTransport.dataReceived(self,data)
    def dispatchMessage(self, messageNum, payload):
        SSHServerTransport.dispatchMessage(self,messageNum,payload)

# start the server
class XSSHFactory(SSHFactory):
    protocol=XSSHTransport
factory = XSSHFactory()
factory.publicKeys = {'ssh-rsa': Key.fromString(data=publicKey)}
factory.privateKeys = {'ssh-rsa': Key.fromString(data=privateKey)}
factory.services = {
    'ssh-userauth': XSSHUserAuthServer,
    'ssh-connection': SSHConnection
}
portal=Portal(UnixSSHRealm())
portal.registerChecker(XSSHAuth())
factory.portal=portal
reactor.listenTCP(22, factory)
reactor.run()
EN

回答 2

Stack Overflow用户

发布于 2016-01-27 09:55:30

由于您使用的是实现UnixConchUserglobal_tcpip_forward,所以它实际上是起作用的。当我运行您的示例并使用ssh -L4321:remote.host:1234 root@localhost -p 2222telnet localhost 4321连接到它时,我会被隧道化到remote.host 1234。你必须更详细地陈述你的问题。

票数 0
EN

Stack Overflow用户

发布于 2016-04-20 09:47:52

命令日志可以在dataReceived(self, data)中完成

代码语言:javascript
复制
def dataReceived(self, data):  
        SSHServerTransport.dataReceived(self,data)   
        self.buf += data  
        if data == '\r':  
           cmd = self.buf  
           self.buf = ''

但是它不能很好地处理删除键、选项卡、箭头向上、箭头向下和其他特殊字符。我想知道你是怎么得到最后命令的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34969623

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档