1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
#! /usr/bin/env python2
from os import environ as env
authorized_keys_file = env.get('authorized_keys_file', '/dev/null')
debug_log = env.get('debug_log', 'false')
services_file = env.get('services_file', '/dev/null')
host_key_file = env.get('host_key_file', '/dev/null')
host_key_pub_file = host_key_file + '.pub'
from checkers import PublicKeyChecker
from twisted.conch.avatar import ConchUser
from twisted.conch.ssh.connection import SSHConnection
from twisted.conch.ssh.factory import SSHFactory
from twisted.conch.ssh.keys import Key
from twisted.conch.ssh.session import SSHSession, ISession, wrapProtocol
from twisted.conch.ssh.userauth import SSHUserAuthServer
from twisted.cred.error import UnauthorizedLogin
from twisted.cred.portal import IRealm, Portal
from twisted.internet.protocol import Protocol
from twisted.internet.reactor import listenTCP, run
from twisted.internet.error import ProcessTerminated
from twisted.python.components import registerAdapter
from twisted.python.failure import Failure
from zope.interface import implements
if debug_log == 'true':
from twisted.python.log import startLogging
from sys import stderr
startLogging(stderr)
class MyRealm:
implements(IRealm)
def requestAvatar(self, avatarId, mind, *interfaces):
return interfaces[0], MyUser(), lambda: None
class MyUser(ConchUser):
def __init__(self):
ConchUser.__init__(self)
self.channelLookup.update({ 'session': SSHSession })
class MySession:
def __init__(self, avatar):
pass
def getPty(self, term, windowSize, attrs):
pass
def execCommand(self, proto, cmd):
raise Exception("no executing commands")
def openShell(self, trans):
ep = MyProtocol()
ep.makeConnection(trans)
trans.makeConnection(wrapProtocol(ep))
def eofReceived(self):
pass
def closed(self):
pass
registerAdapter(MySession, MyUser, ISession)
def slurpTextfile(filename):
file = open(filename, 'r')
try:
return file.read()
finally:
file.close()
class MyProtocol(Protocol):
def connectionMade(self):
data = slurpTextfile(services_file).replace('\n', '\r\n')
self.transport.write(data)
self.transport.processEnded(Failure(ProcessTerminated(0, None, None)))
self.transport.loseConnection()
#def dataReceived(self, data):
# if data == '\r':
# data = '\r\n'
# elif data == '\x03': #^C
# self.transport.loseConnection()
# return
# self.transport.write(data)
class MyFactory(SSHFactory):
privateKeys = {
'ssh-rsa': Key.fromFile(filename=host_key_file)
}
publicKeys = {
'ssh-rsa': Key.fromFile(filename=host_key_pub_file)
}
services = {
'ssh-userauth': SSHUserAuthServer,
'ssh-connection': SSHConnection
}
if __name__ == '__main__':
portal = Portal(MyRealm())
portal.registerChecker(PublicKeyChecker(authorized_keys_file))
MyFactory.portal = portal
listenTCP(1337, MyFactory())
run()
|