This commit is contained in:
Васильев Евгений Владимирович 2017-02-07 01:58:13 +03:00
parent 1f09875aa3
commit 62cb5c24ba
6 changed files with 174 additions and 144 deletions

View File

@ -1,15 +1,20 @@
{
"services": [
{"name":"zzy0",
"ports":["tname_aa","tname_ab","tname_ba","tname_bb","tname_d"]
{
"name": "zzy0",
"ports": ["tname_aa", "tname_ab", "tname_ba", "tname_bb", "tname_d"]
},
{"name":"zzy1",
"ports":["tname_aa","tname_ab","tname_ba","tname_bb"]
{
"name": "zzy1",
"ports": ["tname_aa", "tname_ab", "tname_ba", "tname_bb"]
},
{"name":"zzz0",
"ports":["tname_aa","tname_bb"]
{
"name": "zzz0",
"ports": ["tname_aa", "tname_bb"]
},
{"name":"zzz1"}
{
"name": "zzz1"
}
],
"template": "templates/selfcheck.jj2",
"dest": "selfcheck",

View File

@ -1,12 +1,12 @@
{
"version":"0.8",
"version": "0.8",
"marathon": {
"enabled": false,
"restart": false,
"force": true,
"host": "http://marathon.mesos:8080"
},
"mesos":{
"mesos": {
"enabled": false,
"domain": "marathon.mesos"
},

View File

@ -12,7 +12,7 @@ from surok.system import reload_conf
from surok.logger import Logger
from surok.config import Config
logger=Logger()
logger = Logger()
# Command line arguments
parser = argparse.ArgumentParser()
@ -20,12 +20,12 @@ parser.add_argument('-c', '--config', help='surok.json path')
args = parser.parse_args()
# Load base configurations
config=Config(args.config if args.config else '/etc/surok/conf/surok.json')
config = Config(args.config if args.config else '/etc/surok/conf/surok.json')
# Main loop
###########
#
discovery=Discovery()
discovery = Discovery()
while 1:
# Update config from discovery object
@ -38,7 +38,7 @@ while 1:
my = {"services": app_hosts,
"conf_name": app['conf_name']}
logger.debug('my=',my)
logger.debug('my=', my)
# Generate config from template
service_conf = gen(my, app['template'])

View File

@ -23,7 +23,6 @@ type_par - additional parameters for test
'''
'''
Public Config object
==================================================
@ -36,16 +35,18 @@ Public Config object
.apps - Apps object. List of AppConfig oblects
'''
class _ConfigTemplate(dict):
_conf={}
def _init_conf(self,params):
conf={}
_conf = {}
def _init_conf(self, params):
conf = {}
for k in params.keys():
if params[k].get('params'):
conf[k]=self._init_conf(params[k].get('params'))
conf[k] = self._init_conf(params[k].get('params'))
else:
if params[k].get('value') is not None:
conf[k]=params[k].get('value')
conf[k] = params[k].get('value')
return conf
def __init__(self, *conf_data):
@ -55,49 +56,56 @@ class _ConfigTemplate(dict):
for c in conf_data:
self.set_config(c)
def _set_conf_params(self,oldconf,testconf,params):
def _set_conf_params(self, oldconf, testconf, params):
conf = oldconf if oldconf else {}
for key in testconf.keys():
resvalue=None
param=params.get(key)
oldvalue=conf.get(key)
testvalue=testconf.get(key)
resvalue = None
param = params.get(key)
oldvalue = conf.get(key)
testvalue = testconf.get(key)
if param is None:
self._logger.error('Parameter "', key, '" value "', testvalue, '" type is "', type(testvalue).__name__, '" not found')
self._logger.error('Parameter "', key, '" value "', testvalue,
'" type is "', type(testvalue).__name__, '" not found')
else:
type_param=param.get('type')
resvalue=[]
type_param = param.get('type')
resvalue = []
if type(testvalue).__name__ != 'list':
testvalue=[testvalue]
testvalue = [testvalue]
for testitem in testvalue:
if self._test_value(key, testitem, param):
if 'dict' in type_param:
if param.get('params'):
res=self._set_conf_params(oldvalue,testitem,param.get('params'))
res = self._set_conf_params(
oldvalue, testitem, param.get('params'))
if res is not None:
resvalue.append(res)
else:
resvalue.append(testitem)
if 'list' not in type_param:
resvalue=list([None]+resvalue).pop()
resvalue = list([None] + resvalue).pop()
if resvalue is not None and 'do' in type_param:
if not self._do_type_set(key, resvalue, param):
self._logger.warning('Parameter "', key, '" current "', resvalue, '" type is "', type(resvalue).__name__, '" testing failed')
self._logger.warning(
'Parameter "', key, '" current "', resvalue, '" type is "', type(resvalue).__name__, '" testing failed')
resvalue = None
if resvalue is not None:
conf[key]=resvalue
conf[key] = resvalue
return conf
def _test_value(self, key, value, param):
type_param=param.get('type')
type_value=[x for x in type_param if x in ['str', 'int', 'bool', 'dict']]
type_param = param.get('type')
type_value = [
x for x in type_param if x in ['str', 'int', 'bool', 'dict']]
if type_value:
if type(value).__name__ not in type_value:
self._logger.error('Parameter "', key, '" must be ', type_value,' types, current "', value, '" (',type(value).__name__,')')
self._logger.error(
'Parameter "', key, '" must be ', type_value,
' types, current "', value, '" (', type(value).__name__, ')')
return False
if 'value' in type_param:
if value not in param.get('values',[]):
self._logger.error('Value "', value, '" of key "', key, '" unknown')
if value not in param.get('values', []):
self._logger.error(
'Value "', value, '" of key "', key, '" unknown')
return False
if 'dir' in type_param:
if not os.path.isdir(value):
@ -109,7 +117,8 @@ class _ConfigTemplate(dict):
return False
return True
else:
self._logger.error('Type for testing "{}" unknown'.format(type_value))
self._logger.error(
'Type for testing "{}" unknown'.format(type_value))
return False
def set_config(self, conf_data):
@ -118,7 +127,7 @@ class _ConfigTemplate(dict):
try:
self._logger.debug('Open file ', conf_data)
f = open(conf_data, 'r')
json_data=f.read()
json_data = f.read()
f.close()
conf = json.loads(json_data)
except OSError as err:
@ -134,7 +143,7 @@ class _ConfigTemplate(dict):
conf = conf_data
else:
return False
self._conf=self._set_conf_params(self._conf,conf,self._params)
self._conf = self._set_conf_params(self._conf, conf, self._params)
self._logger.debug('Conf=', self._conf)
def keys(self):
@ -151,7 +160,7 @@ class _ConfigTemplate(dict):
return hashlib.sha1(json.dumps(self._conf, sort_keys=True).encode()).hexdigest()
def set(self, key, value):
self._conf[key]=value
self._conf[key] = value
def __setitem__(self, key, value):
self.set(key, value)
@ -196,7 +205,7 @@ class Config(_ConfigTemplate):
'type': ['bool']
}
},
'type':['dict']
'type': ['dict']
},
'mesos': {
'params': {
@ -209,7 +218,7 @@ class Config(_ConfigTemplate):
'type': ['bool']
}
},
'type':['dict']
'type': ['dict']
},
'memcached': {
'params': {
@ -232,23 +241,23 @@ class Config(_ConfigTemplate):
'type': ['str']
}
},
'type':['dict']
'type': ['dict']
},
'hosts': {
'value': ['localhost:11211'],
'type': ['list','str']
'type': ['list', 'str']
}
},
'type':['dict']
'type': ['dict']
},
'version': {
'value': '0.7',
'type': ['str','value'],
'type': ['str', 'value'],
'values': ['0.7', '0.8']
},
'confd': {
'value': '/etc/surok/conf.d',
'type': ['str','dir']
'type': ['str', 'dir']
},
'wait_time': {
'value': 20,
@ -256,7 +265,7 @@ class Config(_ConfigTemplate):
},
'lock_dir': {
'value': '/var/tmp',
'type': ['str','dir']
'type': ['str', 'dir']
},
'default_discovery': {
'value': 'mesos_dns',
@ -265,8 +274,8 @@ class Config(_ConfigTemplate):
},
'loglevel': {
'value': 'info',
'type': ['str','do'],
'do':'set_loglevel'
'type': ['str', 'do'],
'do': 'set_loglevel'
}
}
@ -277,7 +286,7 @@ class Config(_ConfigTemplate):
return _config_singleton
def __init__(self, *conf_data):
super().__init__( *conf_data)
super().__init__(*conf_data)
self.apps = _Apps()
def set_config(self, conf_data):
@ -317,7 +326,7 @@ class _Apps:
self._apps[app.get('conf_name')] = app
def reset(self):
keys=[]+list(self.keys())
keys = [] + list(self.keys())
for k in keys:
del self._apps[k]
@ -361,7 +370,7 @@ class AppConfig(_ConfigTemplate):
'type': ['str']
},
'ports': {
'type': ['list','str']
'type': ['list', 'str']
},
'discovery': {
'type': ['str']
@ -370,10 +379,10 @@ class AppConfig(_ConfigTemplate):
'type': ['str']
}
},
'type':['list','dict']
'type': ['list', 'dict']
},
'template': {
'type': ['str','file']
'type': ['str', 'file']
},
'dest': {
'type': ['str']
@ -395,13 +404,14 @@ class AppConfig(_ConfigTemplate):
def set_config(self, conf_data):
super().set_config(conf_data)
self._conf.setdefault('discovery', self._config.get('default_discovery'))
self._conf.setdefault(
'discovery', self._config.get('default_discovery'))
self._conf.setdefault('group', self._get_default_group())
if type(conf_data).__name__ == 'str':
self._conf.setdefault('conf_name', os.path.basename(conf_data))
def _get_default_group(self):
env=self._config.get('env',dict(os.environ))
env = self._config.get('env', dict(os.environ))
# Check environment variable
if env.get('SUROK_DISCOVERY_GROUP'):
return env['SUROK_DISCOVERY_GROUP']

View File

@ -1,5 +1,5 @@
#Public names
__all__=['Discovery','DiscoveryMesos','DiscoveryMarathon']
# Public names
__all__ = ['Discovery', 'DiscoveryMesos', 'DiscoveryMarathon']
import dns.resolver
import dns.query
import os
@ -10,15 +10,17 @@ from .config import *
from .logger import *
# Discovery object
_discovery_singleton=None
_discovery_singleton = None
class DiscoveryTemplate:
def __init__(self):
self._config=Config()
self._logger=Logger()
self._config = Config()
self._logger = Logger()
def enabled(self):
return self._config[self._config_section].get('enabled',False)
return self._config[self._config_section].get('enabled', False)
def update_data(self):
pass
@ -26,20 +28,20 @@ class DiscoveryTemplate:
# Do DNS queries
# Return array:
# ["10.10.10.1", "10.10.10.2"]
def do_query_a(self,fqdn):
def do_query_a(self, fqdn):
servers = []
try:
resolver = dns.resolver.Resolver()
for a_rdata in resolver.query(fqdn, 'A'):
servers.append(a_rdata.address)
except DNSException as e:
self._logger.error("Could not resolve ",fqdn)
self._logger.error("Could not resolve ", fqdn)
return servers
# Do DNS queries
# Return array:
# [{"name": "f.q.d.n", "port": 8876, "ip": ["10.10.10.1", "10.10.10.2"]}]
def do_query_srv(self,fqdn):
def do_query_srv(self, fqdn):
servers = []
try:
resolver = dns.resolver.Resolver()
@ -50,38 +52,39 @@ class DiscoveryTemplate:
info = str(rdata).split()
servers.append({'name': info[3][:-1], 'port': info[2]})
except DNSException as e:
self._logger.warning("Could not resolve ",fqdn)
self._logger.warning("Could not resolve ", fqdn)
return servers
class Discovery:
_discoveries={}
_discoveries = {}
def __new__(cls):
global _discovery_singleton
if _discovery_singleton is None:
_discovery_singleton=super(Discovery, cls).__new__(cls)
_discovery_singleton = super(Discovery, cls).__new__(cls)
return _discovery_singleton
def __init__(self):
self._config=Config()
self._logger=Logger()
self._config = Config()
self._logger = Logger()
if not self._discoveries.get('mesos_dns'):
self._discoveries['mesos_dns']=DiscoveryMesos()
self._discoveries['mesos_dns'] = DiscoveryMesos()
if not self._discoveries.get('marathon_api'):
self._discoveries['marathon_api']=DiscoveryMarathon()
self._discoveries['marathon_api'] = DiscoveryMarathon()
def keys(self):
return self._discoveries.keys()
def resolve(self,app):
discovery=app.get('discovery',self._config.get('default_discovery'))
def resolve(self, app):
discovery = app.get('discovery', self._config.get('default_discovery'))
if discovery not in self.keys():
self._logger.warning('Discovery "',discovery,'" is not present')
self._logger.warning('Discovery "', discovery, '" is not present')
return {}
if self._discoveries[discovery].enabled():
return self.compatible(self._discoveries[discovery].resolve(app))
else:
self._logger.error('Discovery "',discovery,'" is disabled')
self._logger.error('Discovery "', discovery, '" is disabled')
return {}
def update_data(self):
@ -90,128 +93,140 @@ class Discovery:
if self._discoveries[d].enabled():
self._discoveries[d].update_data()
def compatible(self,hosts):
compatible_hosts={}
def compatible(self, hosts):
compatible_hosts = {}
if self._config.get('version') == '0.7':
for service in hosts.keys():
for host in hosts[service]:
ports=host.get('tcp',[])
ports = host.get('tcp', [])
if type(ports).__name__ == 'list':
compatible_hosts[service]=[]
compatible_hosts[service] = []
for port in ports:
compatible_hosts[service].append({'name':host['name'],
'ip':host['ip'],
'port':str(port)})
compatible_hosts[service].append(
{'name': host['name'],
'ip': host['ip'],
'port': str(port)})
else:
compatible_hosts[service]={}
compatible_hosts[service] = {}
for port in ports.keys():
compatible_host=compatible_hosts[service].setdefault(port,[])
compatible_host.append({'name':host['name'],
'ip':host['ip'],
'port':ports[port]})
compatible_host = compatible_hosts[
service].setdefault(port, [])
compatible_host.append({'name': host['name'],
'ip': host['ip'],
'port': ports[port]})
return compatible_hosts
return hosts
class DiscoveryMesos(DiscoveryTemplate):
_config_section='mesos'
def resolve(self,app):
_config_section = 'mesos'
def resolve(self, app):
hosts = {}
services = app.get('services')
domain = self._config[self._config_section].get('domain')
for service in services:
group = service.get('group',app.get('group'))
group = service.get('group', app.get('group'))
if group is None:
self._logger.error('Group for service "{}" of config "{}" not found'.format(service['name'],app.get('conf_name')))
self._logger.error('Group for service "{}" of config "{}" not found'.format(
service['name'], app.get('conf_name')))
continue
ports = service.get('ports')
name = service['name']
hosts[name] = {}
serv = hosts[name]
self._logger.debug('group=',group,' ports=',ports,' name=',name,' serv=',serv)
for prot in ['tcp','udp']:
self._logger.debug(
'group=', group, ' ports=', ports, ' name=', name, ' serv=', serv)
for prot in ['tcp', 'udp']:
if ports is not None:
for port_name in ports:
for d in self.do_query_srv('_'+port_name+'._'+name+'.'+group+'._'+prot+'.'+domain):
hostname=d['name']
serv.setdefault(hostname,{'name':hostname,
'ip':self.do_query_a(hostname)})
serv[hostname].setdefault(prot,{})
serv[hostname][prot][port_name]=d['port']
for d in self.do_query_srv('_' + port_name + '._' + name + '.' + group + '._' + prot + '.' + domain):
hostname = d['name']
serv.setdefault(hostname, {'name': hostname,
'ip': self.do_query_a(hostname)})
serv[hostname].setdefault(prot, {})
serv[hostname][prot][port_name] = d['port']
else:
for d in self.do_query_srv('_'+name+'.'+group+'._'+prot+'.'+domain):
hostname=d['name']
for d in self.do_query_srv('_' + name + '.' + group + '._' + prot + '.' + domain):
hostname = d['name']
if serv.get(hostname) is None:
serv[hostname]={'name':hostname,
'ip':self.do_query_a(hostname)}
serv[hostname] = {'name': hostname,
'ip': self.do_query_a(hostname)}
if serv[hostname].get(prot) is None:
serv[hostname][prot]=[]
serv[hostname][prot] = []
serv[hostname][prot].extend([d['port']])
hosts[name]=list(serv.values())
hosts[name] = list(serv.values())
return hosts
class DiscoveryMarathon(DiscoveryTemplate):
_config_section='marathon'
_config_section = 'marathon'
_tasks = []
_ports = {}
def update_data(self):
hostname=self._config[self._config_section].get('host')
hostname = self._config[self._config_section].get('host')
try:
ports = {}
for app in requests.get(hostname+'/v2/apps').json()['apps']:
for app in requests.get(hostname + '/v2/apps').json()['apps']:
ports[app['id']] = {}
if app.get('container') is not None and app['container'].get('type') == 'DOCKER':
ports[app['id']] = app['container']['docker'].get('portMappings',[])
self._ports=ports
ports[app['id']] = app['container'][
'docker'].get('portMappings', [])
self._ports = ports
except:
self._logger.warning('Apps (',hostname,'/v2/apps) request from Marathon API is failed')
self._logger.warning(
'Apps (', hostname, '/v2/apps) request from Marathon API is failed')
pass
try:
self._tasks = requests.get(hostname + '/v2/tasks').json()['tasks']
except:
self._logger.warning('Tasks (',hostname,'/v2/tasks) request from Marathon API is failed')
self._logger.warning(
'Tasks (', hostname, '/v2/tasks) request from Marathon API is failed')
pass
def _test_mask(self, mask, value):
return (mask.endswith('*') and value.startswith(mask[:-1])) or mask == value
def resolve(self, app):
hosts={}
hosts = {}
services = app.get('services')
if not services:
services = [{'name':'*','ports':['*']}]
services = [{'name': '*', 'ports': ['*']}]
for service in services:
# Convert xxx.yyy.zzz to /zzz/yyy/xxx/ format
group=service.get('group',app.get('group'))
group = service.get('group', app.get('group'))
if group is None:
self._logger.error('Group for service "{}" of config "{}" not found'.format(service['name'],app.get('conf_name')))
self._logger.error('Group for service "{}" of config "{}" not found'.format(
service['name'], app.get('conf_name')))
continue
group = '/'+'/'.join(group.split('.')[::-1])+'/'
group = '/' + '/'.join(group.split('.')[::-1]) + '/'
service_mask = group + service['name']
for task in self._tasks:
if self._test_mask(service_mask,task['appId']):
name='.'.join(task['appId'][len(group):].split('/')[::-1])
hosts[name]={}
if self._test_mask(service_mask, task['appId']):
name = '.'.join(
task['appId'][len(group):].split('/')[::-1])
hosts[name] = {}
serv = hosts[name]
hostname=task['host']
hostname = task['host']
for task_port in self._ports[task['appId']]:
prot=task_port['protocol']
port_name=task_port['name']
port=task['ports'][task['servicePorts'].index(task_port['servicePort'])]
prot = task_port['protocol']
port_name = task_port['name']
port = task['ports'][
task['servicePorts'].index(task_port['servicePort'])]
if 'ports' in service:
for port_mask in service['ports']:
if self._test_mask(port_mask,port_name):
serv.setdefault(hostname,{'name':hostname,
'ip':self.do_query_a(hostname)})
serv[hostname].setdefault(prot,{})
serv[hostname][prot][port_name]=port
if self._test_mask(port_mask, port_name):
serv.setdefault(
hostname, {'name': hostname,
'ip': self.do_query_a(hostname)})
serv[hostname].setdefault(prot, {})
serv[hostname][prot][port_name] = port
else:
serv.setdefault(hostname,{'name':hostname,
'ip':self.do_query_a(hostname)})
serv[hostname].setdefault(prot,[])
serv.setdefault(hostname, {'name': hostname,
'ip': self.do_query_a(hostname)})
serv[hostname].setdefault(prot, [])
serv[hostname][prot].extend([port])
hosts[name]=list(serv.values())
hosts[name] = list(serv.values())
return hosts

View File

@ -28,7 +28,7 @@ Public Logger oblect
class Logger:
_loglevel = 'info'
_msg_level = {
'debug': 'DEBUG',
'debug': 'DEBUG',
'info': 'INFO',
'warning': 'WARNING',
'error': 'ERROR'
@ -66,21 +66,21 @@ class Logger:
def debug(self, *message):
if self.get_level() in ['debug']:
self._log2err(self._make_message('debug',message))
self._log2err(self._make_message('debug', message))
def info(self, *message):
if self.get_level() in ['debug', 'info']:
self._log2out(self._make_message('info',message))
self._log2out(self._make_message('info', message))
def warning(self, *message):
if self.get_level() in ['debug', 'info', 'warning']:
self._log2out(self._make_message('warning',message))
self._log2out(self._make_message('warning', message))
def error(self, *message):
self._log2err(self._make_message('error',message))
self._log2err(self._make_message('error', message))
def _log2err(self,out):
def _log2err(self, out):
sys.stderr.write(out)
def _log2out(self,out):
def _log2out(self, out):
sys.stdout.write(out)