summaryrefslogtreecommitdiffstats
path: root/services/test-server.py
blob: 176373cf159e7f6b909abcc1fd5f07c37b416335 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#! /usr/bin/env python2

from os import environ as env

authorized_keys_file = env.get('authorized_keys_file', '/dev/null')
debug_log = env.get('debug_log', 'false')
services_file = env.get('services_file', '/dev/null')
host_key_file = env.get('host_key_file', '/dev/null')
host_key_pub_file = host_key_file + '.pub'


from checkers import PublicKeyChecker
from twisted.conch.avatar import ConchUser
from twisted.conch.ssh.connection import SSHConnection
from twisted.conch.ssh.factory import SSHFactory
from twisted.conch.ssh.keys import Key
from twisted.conch.ssh.session import SSHSession, ISession, wrapProtocol
from twisted.conch.ssh.userauth import SSHUserAuthServer
from twisted.cred.error import UnauthorizedLogin
from twisted.cred.portal import IRealm, Portal
from twisted.internet.protocol import Protocol
from twisted.internet.reactor import listenTCP, run
from twisted.internet.error import ProcessTerminated
from twisted.python.components import registerAdapter
from twisted.python.failure import Failure
from zope.interface import implements

if debug_log == 'true':
    from twisted.python.log import startLogging
    from sys import stderr
    startLogging(stderr)


class MyRealm:
    implements(IRealm)

    def requestAvatar(self, avatarId, mind, *interfaces):
        return interfaces[0], MyUser(), lambda: None


class MyUser(ConchUser):
    def __init__(self):
        ConchUser.__init__(self)
        self.channelLookup.update({ 'session': SSHSession })


class MySession:
    
    def __init__(self, avatar):
        pass

    def getPty(self, term, windowSize, attrs):
        pass
    
    def execCommand(self, proto, cmd):
        raise Exception("no executing commands")

    def openShell(self, trans):
        ep = MyProtocol()
        ep.makeConnection(trans)
        trans.makeConnection(wrapProtocol(ep))

    def eofReceived(self):
        pass

    def closed(self):
        pass


registerAdapter(MySession, MyUser, ISession)


def slurpTextfile(filename):
    file = open(filename, 'r')
    try:
        return file.read()
    finally:
        file.close()

class MyProtocol(Protocol):
    def connectionMade(self):
        data = slurpTextfile(services_file).replace('\n', '\r\n')
        self.transport.write(data)
        self.transport.processEnded(Failure(ProcessTerminated(0, None, None)))
        self.transport.loseConnection()

    #def dataReceived(self, data):
    #    if data == '\r':
    #        data = '\r\n'
    #    elif data == '\x03': #^C
    #        self.transport.loseConnection()
    #        return
    #    self.transport.write(data)


class MyFactory(SSHFactory):
    privateKeys = {
    'ssh-rsa': Key.fromFile(filename=host_key_file)
    }
    publicKeys = {
    'ssh-rsa': Key.fromFile(filename=host_key_pub_file)
    }
    services = {
        'ssh-userauth': SSHUserAuthServer,
        'ssh-connection': SSHConnection
    }

if __name__ == '__main__':
    portal = Portal(MyRealm())
    portal.registerChecker(PublicKeyChecker(authorized_keys_file))
    MyFactory.portal = portal
    listenTCP(1337, MyFactory())
    run()