diff options
author | Narayan Desai <desai@mcs.anl.gov> | 2008-01-09 21:21:18 +0000 |
---|---|---|
committer | Narayan Desai <desai@mcs.anl.gov> | 2008-01-09 21:21:18 +0000 |
commit | 2848627d82951554359ca9d7e23fa0326c21f2e1 (patch) | |
tree | f5ec15464a97d2b3c4b96a6843a5c0e7bc3c3272 /src/lib | |
parent | 3dca8f84e8f9bd024f64ce8ed3caf61dfeccb915 (diff) | |
download | bcfg2-2848627d82951554359ca9d7e23fa0326c21f2e1.tar.gz bcfg2-2848627d82951554359ca9d7e23fa0326c21f2e1.tar.bz2 bcfg2-2848627d82951554359ca9d7e23fa0326c21f2e1.zip |
Replace bcfg2 proxy code with simplified code from Cobalt
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@4226 ce84e21b-d406-0410-9b95-82705330c041
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/Client/Proxy.py | 231 | ||||
-rw-r--r-- | src/lib/Client/__init__.py | 2 | ||||
-rw-r--r-- | src/lib/Proxy.py | 188 | ||||
-rw-r--r-- | src/lib/__init__.py | 2 | ||||
-rwxr-xr-x | src/lib/tlslite/integration/HTTPTLSConnection.py | 2 |
5 files changed, 191 insertions, 234 deletions
diff --git a/src/lib/Client/Proxy.py b/src/lib/Client/Proxy.py deleted file mode 100644 index d148748e2..000000000 --- a/src/lib/Client/Proxy.py +++ /dev/null @@ -1,231 +0,0 @@ -'''Cobalt proxy provides client access to cobalt components''' -__revision__ = '$Revision$' - -import logging, socket, time, xmlrpclib, ConfigParser, httplib -from Bcfg2.tlslite.integration.XMLRPCTransport import XMLRPCTransport -from Bcfg2.tlslite.integration.HTTPTLSConnection import HTTPTLSConnection -from Bcfg2.tlslite.TLSConnection import TLSConnection -import Bcfg2.tlslite.errors - -#FIXME need to reimplement _binadaddress support for XMLRPCTransport - -class MyHTTPTLSConnection(HTTPTLSConnection): - def connect(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if hasattr(sock, 'settimeout'): - sock.settimeout(90) - sock.connect((self.host, self.port)) - - #Use a TLSConnection to emulate a socket - self.sock = TLSConnection(sock) - - #When httplib closes this, close the socket - self.sock.closeSocket = True - self._handshake(self.sock) - -class MyXMLRPCTransport(XMLRPCTransport): - def make_connection(self, host): - # create a HTTPS connection object from a host descriptor - host, extra_headers, x509 = self.get_host_info(host) - http = MyHTTPTLSConnection(host, None, - self.username, self.password, - self.sharedKey, - self.certChain, self.privateKey, - self.checker.cryptoID, - self.checker.protocol, - self.checker.x509Fingerprint, - self.checker.x509TrustList, - self.checker.x509CommonName, - self.settings) - http2 = httplib.HTTP(host) - http2._setup(http) - return http2 - -class CobaltComponentError(Exception): - pass - -class SafeProxy: - '''Wrapper for proxy''' - - _retries = 4 - _authinfo = () - _components = {} - def __init__(self, component, args={}): - self.component = component - self.log = logging.getLogger(component) - - if args.has_key('server'): - # processing from command line args - self._components[component] = args['server'] - else: - if args.has_key('setup'): - # processing from specified config file - _cfpath = args['setup'] - else: - _cfpath = '/etc/bcfg2.conf' - self._cfile = ConfigParser.ConfigParser() - self._cfile.read([_cfpath]) - try: - self._components = self._cfile._sections['components'] - except: - self.log.error("%s doesn't contain a valid components section" % (_cfpath)) - raise SystemExit, 1 - if args.has_key('password'): - # get passwd from cmdline - password = args['password'] - else: - try: - password = self._cfile.get('communication', 'password') - except: - self.log.error("%s doesn't contain a valid password" % (_cfpath)) - raise SystemExit, 1 - if args.has_key('user'): - user = args['user'] - else: - try: - user = self._cfile.get('communication', 'user') - except: - user = 'root' - - self._authinfo = (user, password) - - if args.has_key('fingerprint'): - self.fingerprint = args['fingerprint'] - else: - self.fingerprint = False - - _bindaddress = "" - try: - _bindaddress = self._cfile.get('communication', 'bindaddress') - except: - pass - - if args.has_key('server'): - address = args['server'] - else: - address = self.__get_location(component) - - try: - # if _bindaddress != "": - # self.log.info("Binding client to address %s" % _bindaddress) - # self.proxy = xmlrpclib.ServerProxy(address, transport=Bcfg2SafeTransport()) - # else: - if self.fingerprint: - transport = MyXMLRPCTransport(x509Fingerprint=self.fingerprint) - else: - transport = MyXMLRPCTransport() - self.proxy = xmlrpclib.ServerProxy(address, transport=transport) - - except IOError, io_error: - self.log.error("Invalid server URL %s: %s" % (address, io_error)) - raise CobaltComponentError - except: - self.log.error("Failed to initialize xml-rpc", exc_info=1) - - def run_method(self, methodName, methodArgs): - ''' Perform an XMLRPC invocation against the server''' - method = getattr(self.proxy, methodName) - for irs in range(self._retries): - try: - ret = apply(method, self._authinfo + methodArgs) - if irs > 0: - self.log.warning("Required %d attempts to contact %s for operation %s" % - (irs, self.component, methodName)) - self.log.debug("%s completed successfully" % (methodName)) - return ret - except xmlrpclib.ProtocolError: - self.log.error("Server failure: Protocol Error") - raise xmlrpclib.Fault(20, "Server Failure") - except xmlrpclib.Fault: - self.log.debug("Operation %s completed with fault" % (methodName)) - raise - except socket.error, serr: - self.log.debug("Attempting %s (%d of %d) failed because %s" % (methodName, (irs+1), - self._retries, serr)) - except Bcfg2.tlslite.errors.TLSFingerprintError, err: - self.log.error("Server fingerprint did not match") - errmsg = err.message.split() - self.log.error("Got %s expected %s" % (errmsg[3], errmsg[4])) - raise SystemExit, 1 - except: - self.log.error("Unknown failure", exc_info=1) - break - time.sleep(0.5) - self.log.error("%s failed:\nCould not connect to %s" % (methodName, self.component)) - raise xmlrpclib.Fault(20, "Server Failure") - - def __get_location(self, name): - '''Perform component location lookups if needed''' - if self._components.has_key(name): - return self._components[name] - slp = SafeProxy('service-location', url=self._cfile.get('components', 'service-location')) - try: - sdata = slp.run_method('LookupService', - ([{'tag':'location', 'name':name, 'url':'*'}],)) - except xmlrpclib.Fault: - raise CobaltComponentError, "No Such Component" - if sdata: - curl = sdata[0]['url'] - self._components[name] = curl - return curl - - def dummy(self): - '''dummy method for pylint''' - return True - -class ComponentProxy(SafeProxy): - '''Component Proxy instantiates a SafeProxy to a component and registers local functions - based on its definition''' - name = 'dummy' - methods = [] - - def __init__(self, url={}): - SafeProxy.__init__(self, self.name, url) - for method in self.methods: - setattr(self, method, eval('lambda *x:self.run_method(method, x)', - {'self':self, 'method':method})) - -class service_location(ComponentProxy): - '''service-location component-specific proxy''' - name = 'service-location' - methods = ['AssertService', 'LookupService', 'DeassertService'] - -class allocation_manager(ComponentProxy): - '''allocation manager specific component proxy''' - name = 'allocation-manager' - methods = ['GetProject'] - -class file_stager(ComponentProxy): - '''File staging component''' - name = 'file-stager' - methods = ['StageInit', 'FinalizeStage'] - -class process_manager(ComponentProxy): - '''process manager specific component proxy''' - name = 'process-manager' - methods = ['CreateProcessGroup', 'GetProcessGroup', 'KillProcessGroup', 'WaitProcessGroup'] - -class queue_manager(ComponentProxy): - '''queue manager proxy''' - name = 'queue-manager' - methods = ['AddJob', 'GetJobs', 'DelJobs', 'RunJobs', 'SetJobs', 'SetJobID'] - -class scheduler(ComponentProxy): - '''scheduler proxy''' - name = 'scheduler' - methods = ['AddReservation', 'DelReservation', 'GetPartition', 'AddPartition', 'DelPartition', 'Set'] - -class bcfg2(ComponentProxy): - '''bcfg2 client code''' - name = 'bcfg2' - methods = ['AssertProfile', 'GetConfig', 'GetProbes', 'RecvProbeData', 'RecvStats'] - -class CommDict(dict): - '''CommDict is a dictionary that automatically instantiates a component proxy upon access''' - commnames = {'pm':process_manager, 'fs':file_stager, 'am':allocation_manager, - 'sched':scheduler, 'qm':queue_manager} - - def __getitem__(self, name): - if not self.has_key(name): - self.__setitem__(name, self.commnames[name]()) - return dict.__getitem__(self, name) diff --git a/src/lib/Client/__init__.py b/src/lib/Client/__init__.py index cc8a25964..7bcef1361 100644 --- a/src/lib/Client/__init__.py +++ b/src/lib/Client/__init__.py @@ -1,4 +1,4 @@ '''This contains all Bcfg2 Client modules''' __revision__ = '$Revision$' -__all__ = ["Frame", "Proxy", "Tools", "XML"] +__all__ = ["Frame", "Tools", "XML"] diff --git a/src/lib/Proxy.py b/src/lib/Proxy.py new file mode 100644 index 000000000..5bcc78ad1 --- /dev/null +++ b/src/lib/Proxy.py @@ -0,0 +1,188 @@ +"""RPC client access to cobalt components. + +Classes: +ComponentProxy -- an RPC client proxy to Cobalt components + +Functions: +load_config -- read configuration files +""" + +__revision__ = '$Revision: $' + +from ConfigParser import SafeConfigParser, NoSectionError +import logging, socket, urlparse, time, Bcfg2.tlslite.errors +from Bcfg2.tlslite.integration.XMLRPCTransport import XMLRPCTransport +import xmlrpclib +from xmlrpclib import _Method + +__all__ = [ + "ComponentProxy", "ComponentLookupError", "RetryMethod", + "register_component", "find_configured_servers", +] + +local_components = dict() +known_servers = dict() + +def register_component (component): + local_components[component.name] = component + + +class ComponentError (Exception): + + """Component error baseclass""" + + +class ComponentLookupError (ComponentError): + + """Unable to locate an address for the given component.""" + + +class ComponentOperationError (ComponentError): + + """Component Failure during operation""" + + +class RetryMethod(_Method): + """Method with error handling and retries built in""" + log = logging.getLogger('xmlrpc') + def __call__(self, *args): + max_retries = 4 + for retry in range(max_retries): + try: + return _Method.__call__(self, *args) + except xmlrpclib.ProtocolError: + self.log.error("Server failure: Protocol Error") + raise xmlrpclib.Fault(20, "Server Failure") + except socket.error: + pass + except Bcfg2.tlslite.errors.TLSFingerprintError, err: + self.log.error("Server fingerprint did not match") + errmsg = err.message.split() + self.log.error("Got %s expected %s" % (errmsg[3], errmsg[4])) + raise SystemExit, 1 + except: + self.log.error("Unknown failure", exc_info=1) + break + time.sleep(0.5) + raise xmlrpclib.Fault(20, "Server Failure") + +# sorry jon +xmlrpclib._Method = RetryMethod + +def ComponentProxy (component_name, defer=False, user=None, password=None, + fingerprint=None): + + """Constructs proxies to components. + + Arguments: + component_name -- name of the component to connect to + + Additional arguments are passed to the ServerProxy constructor. + """ + + if defer: + return DeferredProxy(component_name) + + if component_name in local_components: + return LocalProxy(local_components[component_name]) + elif component_name in known_servers: + url = known_servers[component_name] + elif component_name != "service-location": + try: + slp = ComponentProxy("service-location") + except ComponentLookupError: + raise ComponentLookupError(component_name) + try: + url = slp.locate(component_name) + except: + raise ComponentLookupError(component_name) + if not url: + raise ComponentLookupError(component_name) + else: + raise ComponentLookupError(component_name) + # process url + if user and password: + method, path = urlparse.urlparse(url)[:2] + newurl = "%s://%s:%s@%s" % (method, user, password, path) + else: + newurl = url + return xmlrpclib.ServerProxy(newurl, allow_none=True, + transport=XMLRPCTransport(x509Fingerprint=fingerprint)) + +class LocalProxy (object): + + """Proxy-like filter for inter-component communication. + + Used to access other components stored in local memory, + without having to transport across tcp/http. + + Dispatches method calls through the component's _dispatch + method to keep the interface between this and ServerProxy + consistent. + """ + + def __init__ (self, component): + self._component = component + + def __getattr__ (self, attribute): + return LocalProxyMethod(self, attribute) + + +class LocalProxyMethod (object): + + def __init__ (self, proxy, func_name): + self._proxy = proxy + self._func_name = func_name + + def __call__ (self, *args): + return self._proxy._component._dispatch(self._func_name, args) + + +class DeferredProxy (object): + + """Defering proxy object. + + Gets a new proxy when it can't connect to a component. + """ + + def __init__ (self, component_name): + self._component_name = component_name + + def __getattr__ (self, attribute): + return DeferredProxyMethod(self, attribute) + + +class DeferredProxyMethod (object): + + def __init__ (self, proxy, func_name): + self._proxy = proxy + self._func_name = func_name + + def __call__ (self, *args): + proxy = ComponentProxy(self._proxy._component_name, defer=False) + func = getattr(proxy, self._func_name) + return func(*args) + + +def find_configured_servers (config_files=None): + """Read associated config files into the module. + + Arguments: + config_files -- a list of paths to config files. + """ + if not config_files: + config_files = ['/etc/bcfg2.conf'] + config = SafeConfigParser() + config.read(config_files) + try: + components = config.options("components") + except NoSectionError: + return [] + known_servers.clear() + known_servers.update(dict([ + (component, config.get("components", component)) + for component in components + ])) + return known_servers.copy() + +find_configured_servers() diff --git a/src/lib/__init__.py b/src/lib/__init__.py index 7e9e8a92a..7f771da6f 100644 --- a/src/lib/__init__.py +++ b/src/lib/__init__.py @@ -1,4 +1,4 @@ '''base modules definition''' __revision__ = '$Revision$' -__all__ = ['Server', 'Client', 'Component', 'Logging', 'Options'] +__all__ = ['Server', 'Client', 'Component', 'Logging', 'Options', 'Proxy'] diff --git a/src/lib/tlslite/integration/HTTPTLSConnection.py b/src/lib/tlslite/integration/HTTPTLSConnection.py index a20400893..a50c053c7 100755 --- a/src/lib/tlslite/integration/HTTPTLSConnection.py +++ b/src/lib/tlslite/integration/HTTPTLSConnection.py @@ -22,7 +22,7 @@ class HTTPBaseTLSConnection(httplib.HTTPConnection): def connect(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if hasattr(sock, 'settimeout'): - sock.settimeout(10) + sock.settimeout(90) sock.connect((self.host, self.port)) #Use a TLSConnection to emulate a socket |