diff options
author | Narayan Desai <desai@mcs.anl.gov> | 2007-07-25 04:00:11 +0000 |
---|---|---|
committer | Narayan Desai <desai@mcs.anl.gov> | 2007-07-25 04:00:11 +0000 |
commit | 73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4 (patch) | |
tree | 5ebd17953e19e2a54634352aa2450bdd4942eeea /src | |
parent | 31f6577cabe54e6e2a5c733572bfe75a0e5eff39 (diff) | |
download | bcfg2-73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4.tar.gz bcfg2-73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4.tar.bz2 bcfg2-73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4.zip |
Implementing selective forking server, which runs read-only requests in child processes. Should dramatically improve scalability
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@3561 ce84e21b-d406-0410-9b95-82705330c041
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/Component.py | 49 | ||||
-rw-r--r-- | src/lib/Server/Plugins/SSHbase.py | 57 | ||||
-rwxr-xr-x | src/sbin/bcfg2-server | 1 |
3 files changed, 74 insertions, 33 deletions
diff --git a/src/lib/Component.py b/src/lib/Component.py index 7ebdf8f86..5bf61452c 100644 --- a/src/lib/Component.py +++ b/src/lib/Component.py @@ -1,7 +1,7 @@ '''Cobalt component base classes''' __revision__ = '$Revision$' -import atexit, logging, select, signal, socket, sys, time, urlparse, xmlrpclib, cPickle, ConfigParser +import atexit, logging, select, signal, socket, sys, time, urlparse, xmlrpclib, cPickle, ConfigParser, os from base64 import decodestring import BaseHTTPServer, SimpleXMLRPCServer import Bcfg2.tlslite.errors @@ -20,15 +20,17 @@ class ComponentKeyError(Exception): '''raised in case of key parse fails''' pass +class ForkedChild(Exception): + '''raised after child has been forked''' + pass + class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): '''CobaltXMLRPCRequestHandler takes care of ssl xmlrpc requests''' - def finish(self): - '''Finish HTTPS connections properly''' - self.request.close() def do_POST(self): '''Overload do_POST to pass through client address information''' try: + self.cleanup = True # get arguments data = self.rfile.read(int(self.headers["content-length"])) @@ -48,6 +50,9 @@ class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): authenticated = True response = self.server._cobalt_marshalled_dispatch(data, self.client_address, authenticated) + except ForkedChild: + self.cleanup = False + return except: # This should only happen if the module is buggy # internal error, report as HTTP server error log.error("Unexcepted handler failure in do_POST", exc_info=1) @@ -81,6 +86,7 @@ class TLSServer(Bcfg2.tlslite.api.TLSSocketServerMixIn, reqCert=False): self.sc = Bcfg2.tlslite.api.SessionCache() self.rc = reqCert + self.master = os.getpid() x509 = Bcfg2.tlslite.api.X509() s = open(keyfile).read() x509.parse(s) @@ -92,12 +98,15 @@ class TLSServer(Bcfg2.tlslite.api.TLSSocketServerMixIn, self.chain = Bcfg2.tlslite.api.X509CertChain([x509]) BaseHTTPServer.HTTPServer.__init__(self, address, handler) - def finish_request(self, sock, client_address): + def finish_request(self, sock, address): sock.settimeout(90) tlsConnection = TLSConnection(sock) if self.handshake(tlsConnection) == True: - self.RequestHandlerClass(tlsConnection, client_address, self) - tlsConnection.close() + req = self.RequestHandlerClass(tlsConnection, address, self) + if req.cleanup: + tlsConnection.close() + if os.getpid() != self.master: + os._exit(0) def handshake(self, tlsConnection): try: @@ -125,6 +134,8 @@ class Component(TLSServer, __implementation__ = 'Generic' __statefields__ = [] async_funcs = ['assert_location'] + fork_funcs = [] + child_limit = 32 def __init__(self, setup): # need to get addr @@ -134,6 +145,7 @@ class Component(TLSServer, signal.signal(signal.SIGTERM, self.start_shutdown) self.logger = logging.getLogger('Component') self.cfile = ConfigParser.ConfigParser() + self.children = [] if setup['configfile']: cfilename = setup['configfile'] else: @@ -211,6 +223,13 @@ class Component(TLSServer, params = rawparams[0:] # generate response try: + # need to add waitpid code here to enforce maxchild + if method in self.fork_funcs: + self.clean_up_children() + pid = os.fork() + if pid: + self.children.append(pid) + raise ForkedChild # all handlers must take address as the first argument response = self._dispatch(method, (address, ) + params) # wrap response in a singleton tuple @@ -222,6 +241,8 @@ class Component(TLSServer, self.logger.error("Client %s called function %s with wrong argument count" % (address[0], method), exc_info=1) response = xmlrpclib.dumps(xmlrpclib.Fault(4, terror.args[0])) + except ForkedChild: + raise except: self.logger.error("Unexpected handler failure", exc_info=1) # report exception back to server @@ -229,6 +250,20 @@ class Component(TLSServer, "%s:%s" % (sys.exc_type, sys.exc_value))) return response + def clean_up_children(self): + while True: + try: + pid = os.waitpid(0, os.WNOHANG)[0] + self.children.remove(pid) + self.logger.debug("process %d exited" % pid) + except OSError: + break + if len(self.children) >= self.child_limit: + self.logger.info("Reached child_limit; waiting for child exit") + pid = os.waitpid(0, 0)[0] + self.children.remove(pid) + self.logger.debug("process %d exited" % pid) + def _authenticate_connection(self, method, user, password, address): '''Authenticate new connection''' (user, address, method) diff --git a/src/lib/Server/Plugins/SSHbase.py b/src/lib/Server/Plugins/SSHbase.py index 0e473e29b..4d3f18957 100644 --- a/src/lib/Server/Plugins/SSHbase.py +++ b/src/lib/Server/Plugins/SSHbase.py @@ -10,7 +10,7 @@ def update_file(path, diff): print "writing file, %s" % path open(path, 'w').write(newdata) -class SSHbase(Bcfg2.Server.Plugin.Plugin): +class SSHbase(Bcfg2.Server.Plugin.Plugin, Bcfg2.Server.Plugin.DirectoryBacked): '''The sshbase generator manages ssh host keys (both v1 and v2) for hosts. It also manages the ssh_known_hosts file. It can integrate host keys from other management domains and similarly @@ -42,26 +42,31 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin): def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) try: - self.repository = Bcfg2.Server.Plugin.DirectoryBacked(self.data, self.core.fam) + Bcfg2.Server.Plugin.DirectoryBacked.__init__(self, self.data, self.core.fam) except OSError, ioerr: self.logger.error("Failed to load SSHbase repository from %s" % (self.data)) self.logger.error(ioerr) raise Bcfg2.Server.Plugin.PluginInitError - try: - prefix = open("%s/prefix" % (self.data)).read().strip() - except IOError: - prefix = '' self.Entries = {'ConfigFile': - {prefix + '/etc/ssh/ssh_known_hosts':self.build_skn, - prefix + '/etc/ssh/ssh_host_dsa_key':self.build_hk, - prefix + '/etc/ssh/ssh_host_rsa_key':self.build_hk, - prefix + '/etc/ssh/ssh_host_dsa_key.pub':self.build_hk, - prefix + '/etc/ssh/ssh_host_rsa_key.pub':self.build_hk, - prefix + '/etc/ssh/ssh_host_key':self.build_hk, - prefix + '/etc/ssh/ssh_host_key.pub':self.build_hk}} + {'/etc/ssh/ssh_known_hosts':self.build_skn, + '/etc/ssh/ssh_host_dsa_key':self.build_hk, + '/etc/ssh/ssh_host_rsa_key':self.build_hk, + '/etc/ssh/ssh_host_dsa_key.pub':self.build_hk, + '/etc/ssh/ssh_host_rsa_key.pub':self.build_hk, + '/etc/ssh/ssh_host_key':self.build_hk, + '/etc/ssh/ssh_host_key.pub':self.build_hk}} self.ipcache = {} self.__rmi__ = ['GetPubKeys'] + def HandleEvent(self, event=None): + '''Local event handler that does skn regen on pubkey change''' + Bcfg2.Server.Plugin.DirectoryBacked.HandleEvent(self, event) + if (len(self.entries.keys())) > (0.90 * len(os.listdir(self.data))) and \ + event and '_key.pub.H_' in event.filename: + self.cache_skn() + elif (len(self.entries.keys())) > (0.90 * len(os.listdir(self.data))) and \ + not hasattr(self, 'static_skn'): + self.cache_skn() def HandlesEntry(self, entry): '''Handle key entries dynamically''' @@ -81,7 +86,10 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin): def get_ipcache_entry(self, client): '''build a cache of dns results''' if self.ipcache.has_key(client): - return self.ipcache[client] + if self.ipcache[client]: + return self.ipcache[client] + else: + raise socket.gaierror else: # need to add entry try: @@ -93,13 +101,14 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin): if ipaddr: self.ipcache[client] = (ipaddr, client) return (ipaddr, client) + self.ipcache[client] = False self.logger.error("Failed to find IP address for %s" % client) raise socket.gaierror def cache_skn(self): '''build memory cache of the ssh known hosts file''' self.static_skn = '' - pubkeys = [pubk for pubk in self.repository.entries.keys() if pubk.find('.pub.H_') != -1] + pubkeys = [pubk for pubk in self.entries.keys() if pubk.find('.pub.H_') != -1] pubkeys.sort() for pubkey in pubkeys: hostname = pubkey.split('H_')[1] @@ -109,20 +118,18 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin): continue shortname = hostname.split('.')[0] self.static_skn += "%s,%s,%s %s" % (shortname, fqdn, ipaddr, - self.repository.entries[pubkey].data) + self.entries[pubkey].data) def build_skn(self, entry, metadata): '''This function builds builds a host specific known_hosts file''' client = metadata.hostname - if not hasattr(self, 'static_skn'): - self.cache_skn() entry.text = self.static_skn hostkeys = [keytmpl % client for keytmpl in self.pubkeys \ - if self.repository.entries.has_key(keytmpl % client)] + if self.entries.has_key(keytmpl % client)] hostkeys.sort() for hostkey in hostkeys: entry.text += "localhost,localhost.localdomain,127.0.0.1 %s" % ( - self.repository.entries[hostkey].data) + self.entries[hostkey].data) permdata = {'owner':'root', 'group':'0', 'perms':'0644'} [entry.attrib.__setitem__(key, permdata[key]) for key in permdata] @@ -130,14 +137,12 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin): '''This binds host key data into entries''' client = metadata.hostname filename = "%s.H_%s" % (entry.get('name').split('/')[-1], client) - if filename not in self.repository.entries.keys(): + if filename not in self.entries.keys(): self.GenerateHostKeys(client) - if hasattr(self, 'static_skn'): - del self.static_skn - if not self.repository.entries.has_key(filename): + if not self.entries.has_key(filename): self.logger.error("%s still not registered" % filename) raise Bcfg2.Server.Plugin.PluginExecutionError - keydata = self.repository.entries[filename].data + keydata = self.entries[filename].data permdata = {'owner':'root', 'group':'0'} permdata['perms'] = '0600' if entry.get('name')[-4:] == '.pub': @@ -160,7 +165,7 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin): else: keytype = 'rsa1' - if hostkey not in self.repository.entries.keys(): + if hostkey not in self.entries.keys(): fileloc = "%s/%s" % (self.data, hostkey) publoc = self.data + '/' + ".".join([hostkey.split('.')[0]]+['pub', "H_%s" % client]) temploc = "/tmp/%s" % hostkey diff --git a/src/sbin/bcfg2-server b/src/sbin/bcfg2-server index 46d937c39..431732896 100755 --- a/src/sbin/bcfg2-server +++ b/src/sbin/bcfg2-server @@ -67,6 +67,7 @@ class Bcfg2Serv(Bcfg2.Component.Component): """The Bcfg2 Server component providing XML-RPC access to Bcfg methods""" __name__ = 'bcfg2' __implementation__ = 'bcfg2' + fork_funcs = ['GetConfig', 'GetProbes'] request_queue_size = 15 |