Merge pull request #15 from VEvgeniyV/marathon-discovery
Add Marathon discovery. Remade discovery and logger module
This commit is contained in:
commit
f8460d301a
19
README.ru.md
19
README.ru.md
@ -1,5 +1,5 @@
|
||||
# Surok
|
||||
[![Build Status](https://travis-ci.org/Difrex/surok.svg?branch=master)](https://travis-ci.org/Difrex/surok)
|
||||
[![Build Status](https://travis-ci.org/Surkoveds/surok.svg?branch=master)](https://travis-ci.org/Surkoveds/surok)
|
||||
|
||||
Обнаружение сервисов для Apache Mesos.
|
||||
|
||||
@ -17,10 +17,27 @@ cd build
|
||||
deb-пакет будет лежать в build/out
|
||||
|
||||
Сборка базового docker-образа surok
|
||||
Ubuntu Xenial
|
||||
```
|
||||
cd build
|
||||
./build.sh surok_image
|
||||
```
|
||||
Alpine image
|
||||
```
|
||||
cd build
|
||||
./build.sh alpine
|
||||
```
|
||||
CentOS image
|
||||
```
|
||||
cd build
|
||||
./build.sh centos
|
||||
```
|
||||
|
||||
ENTRYPOINT : ```cd /opt/surok && pytho3 surok.py -c /etc/surok/conf/surok.json```
|
||||
|
||||
## Документация
|
||||
|
||||
[Wiki](https://github.com/Surkoveds/surok/wiki)
|
||||
|
||||
## Известные проблемы
|
||||
|
||||
|
@ -3,7 +3,6 @@ import json
|
||||
import os
|
||||
import re
|
||||
|
||||
|
||||
class TestLoadConfig(unittest.TestCase):
|
||||
|
||||
def test_main_conf(self):
|
||||
@ -16,7 +15,6 @@ class TestLoadConfig(unittest.TestCase):
|
||||
|
||||
self.assertIn('confd', conf)
|
||||
self.assertTrue(os.path.isdir(conf['confd']))
|
||||
self.assertIn('domain', conf)
|
||||
self.assertIn('wait_time', conf)
|
||||
self.assertIn('lock_dir', conf)
|
||||
self.assertTrue(os.path.isdir(conf['lock_dir']))
|
||||
@ -24,47 +22,58 @@ class TestLoadConfig(unittest.TestCase):
|
||||
|
||||
class TestLogger(unittest.TestCase):
|
||||
|
||||
def test_debug(self):
|
||||
from surok.logger import Logger
|
||||
m = Logger()
|
||||
self.assertIn('DEBUG', m.testing('debug','log message'))
|
||||
|
||||
def test_info(self):
|
||||
from surok.logger import make_message
|
||||
m = make_message
|
||||
self.assertIn('INFO', m({'level': 'INFO', 'raw': 'log message'}))
|
||||
from surok.logger import Logger
|
||||
m = Logger()
|
||||
self.assertIn('INFO', m.testing('info','log message'))
|
||||
|
||||
def test_warning(self):
|
||||
from surok.logger import make_message
|
||||
m = make_message
|
||||
self.assertIn('WARNING', m({'level': 'WARNING', 'raw': 'log message'}))
|
||||
from surok.logger import Logger
|
||||
m = Logger()
|
||||
self.assertIn('WARNING', m.testing('warning','log message'))
|
||||
|
||||
def test_error(self):
|
||||
from surok.logger import make_message
|
||||
m = make_message
|
||||
self.assertIn('ERROR', m({'level': 'ERROR', 'raw': 'log message'}))
|
||||
|
||||
def test_info(self):
|
||||
from surok.logger import make_message
|
||||
m = make_message
|
||||
self.assertIn('DEBUG', m({'level': 'DEBUG', 'raw': 'log message'}))
|
||||
from surok.logger import Logger
|
||||
m = Logger()
|
||||
self.assertIn('ERROR', m.testing('error','log message'))
|
||||
|
||||
|
||||
class TestMemcachedDiscovery(unittest.TestCase):
|
||||
|
||||
def test_discovery_memcache(self):
|
||||
from surok.system import discovery_memcached
|
||||
|
||||
from surok.discovery import Discovery
|
||||
# Load base configurations
|
||||
surok_conf = '/etc/surok/conf/surok.json'
|
||||
# Read config file
|
||||
f = open(surok_conf, 'r')
|
||||
conf = json.loads(f.read())
|
||||
f.close()
|
||||
|
||||
d=Discovery(conf)
|
||||
self.assertEqual(discovery_memcached(conf), [])
|
||||
|
||||
|
||||
class TestGetGroup(unittest.TestCase):
|
||||
|
||||
def test_get_group(self):
|
||||
from surok.discovery import get_group
|
||||
self.assertFalse(get_group({}, {'env': os.environ}))
|
||||
def test_get_group_from_service(self):
|
||||
from surok.discovery import DiscoveryTemplate
|
||||
d=DiscoveryTemplate({})
|
||||
self.assertEqual('xxx.yyy.zzz',d.get_group({'group':'xxx.yyy.zzz'}, {}))
|
||||
|
||||
def test_get_group_from_env(self):
|
||||
from surok.discovery import DiscoveryTemplate
|
||||
d=DiscoveryTemplate({})
|
||||
self.assertEqual('xxx.yyy.zzz',d.get_group({}, {'env':{'SUROK_DISCOVERY_GROUP':'xxx.yyy.zzz'}}))
|
||||
|
||||
def test_get_group_from_marathon_id(self):
|
||||
from surok.discovery import DiscoveryTemplate
|
||||
d=DiscoveryTemplate({})
|
||||
self.assertEqual('xxx.yyy.zzz',d.get_group({}, {'env':{'MARATHON_APP_ID':'/zzz/yyy/xxx/www'}}))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -1,15 +1,20 @@
|
||||
{
|
||||
"marathon": {
|
||||
"enabled": false,
|
||||
"restart": false,
|
||||
"force": true,
|
||||
"host": "http://marathon.mesos:8080",
|
||||
"enabled": false
|
||||
"host": "http://marathon.mesos:8080"
|
||||
},
|
||||
"consul": {
|
||||
"enabled": false,
|
||||
"domain": "service.dc1.consul"
|
||||
},
|
||||
"mesos":{
|
||||
"enabled": true,
|
||||
"domain": "marathon.mesos"
|
||||
},
|
||||
"default_discovery": "mesos_dns",
|
||||
"confd": "/etc/surok/conf.d",
|
||||
"domain": "marathon.mesos",
|
||||
"wait_time": 20,
|
||||
"lock_dir": "/var/tmp",
|
||||
"loglevel": "info",
|
||||
|
@ -34,9 +34,9 @@
|
||||
}
|
||||
</code>
|
||||
* <strong>services</strong>. List of hashes with required services for app.
|
||||
1. <em>name</em> - string. App name in Marathon.
|
||||
1. <em>name</em> - string. App name in Marathon. If you use a Marathon discovery, you can use the "*" at the end of the string to indicate any character.
|
||||
2. <em>group</em> - string. App group in Marathon. Optional. Discovery policy: 1) config 2) SUROK<em>DISCOVERY</em>GROUP environment variable 3) Marathon API
|
||||
3. <em>ports</em> - list. Name of opened port. In marathon of course. Optional.
|
||||
3. <em>ports</em> - list. Name of opened port. In marathon of course. If you use a Marathon discovery, you can use the "*" at the end of the string to indicate any character. Optional.
|
||||
* <strong>conf<em>name</strong>. Unique app config name.
|
||||
* <strong>template</strong>. Jinja2 template location.
|
||||
* <strong>dest</strong>. Destination config path.
|
||||
|
@ -21,9 +21,9 @@ conf.d/myapp.json
|
||||
}
|
||||
```
|
||||
* **services**. List of hashes with required services for app.
|
||||
1. _name_ - string. App name in Marathon.
|
||||
1. _name_ - string. App name in Marathon. If you use a Marathon discovery, you can use the "*" at the end of the string to indicate any character.
|
||||
2. _group_ - string. App group in Marathon. Optional. Discovery policy: 1) config 2) SUROK_DISCOVERY_GROUP environment variable 3) Marathon API
|
||||
3. _ports_ - list. Name of opened port. In marathon of course. Optional.
|
||||
3. _ports_ - list. Name of opened port. In marathon of course. If you use a Marathon discovery, you can use the "*" at the end of the string to indicate any character. Optional.
|
||||
* **conf_name**. Unique app config name.
|
||||
* **template**. Jinja2 template location.
|
||||
* **dest**. Destination config path.
|
||||
|
@ -19,12 +19,21 @@
|
||||
<code>
|
||||
{
|
||||
"marathon": {
|
||||
"enabled": false,
|
||||
"restart": false,
|
||||
"force": true,
|
||||
"host": "http://marathon.mesos:8080",
|
||||
"enabled": false
|
||||
"host": "http://marathon.mesos:8080"
|
||||
},
|
||||
"consul": {
|
||||
"enabled": false,
|
||||
"domain": "service.dc1.consul"
|
||||
},
|
||||
"mesos":{
|
||||
"enabled": true,
|
||||
"domain": "marathon.mesos"
|
||||
},
|
||||
"default_discovery": "mesos_dns",
|
||||
"confd": "/etc/surok/conf.d",
|
||||
"domain": "marathon.mesos",
|
||||
"wait_time": 20,
|
||||
"lock_dir": "/var/tmp",
|
||||
"loglevel": "info",
|
||||
|
@ -6,12 +6,21 @@ conf/surok.json
|
||||
```
|
||||
{
|
||||
"marathon": {
|
||||
"enabled": false,
|
||||
"restart": false,
|
||||
"force": true,
|
||||
"host": "http://marathon.mesos:8080",
|
||||
"enabled": false
|
||||
"host": "http://marathon.mesos:8080"
|
||||
},
|
||||
"consul": {
|
||||
"enabled": false,
|
||||
"domain": "service.dc1.consul"
|
||||
},
|
||||
"mesos":{
|
||||
"enabled": true,
|
||||
"domain": "marathon.mesos"
|
||||
},
|
||||
"default_discovery": "mesos_dns",
|
||||
"confd": "/etc/surok/conf.d",
|
||||
"domain": "marathon.mesos",
|
||||
"wait_time": 20,
|
||||
"lock_dir": "/var/tmp",
|
||||
"loglevel": "info",
|
||||
|
25
surok.py
25
surok.py
@ -7,10 +7,11 @@ from os.path import isfile, join
|
||||
import json
|
||||
import argparse
|
||||
from surok.templates import gen
|
||||
from surok.discovery import resolve
|
||||
from surok.discovery import Discovery
|
||||
from surok.system import reload_conf
|
||||
from surok.logger import Logger
|
||||
|
||||
|
||||
logger=Logger()
|
||||
# Load base configurations
|
||||
surok_conf = '/etc/surok/conf/surok.json'
|
||||
|
||||
@ -47,28 +48,34 @@ def load_app_conf(app):
|
||||
|
||||
return c
|
||||
|
||||
logger.set_level(conf.get('loglevel','info'))
|
||||
|
||||
# Main loop
|
||||
###########
|
||||
|
||||
discovery=Discovery()
|
||||
|
||||
while 1:
|
||||
confs = get_configs()
|
||||
|
||||
# Update config from discovery object
|
||||
discovery.set_config(conf)
|
||||
|
||||
# Update discovery data
|
||||
discovery.update_data()
|
||||
|
||||
for app in confs:
|
||||
app_conf = load_app_conf(app)
|
||||
|
||||
# Will be removed later
|
||||
# For old configs
|
||||
loglevel = 'info'
|
||||
if 'loglevel' in conf:
|
||||
loglevel = conf['loglevel']
|
||||
|
||||
# Resolve services
|
||||
app_hosts = resolve(app_conf, conf)
|
||||
app_hosts = discovery.resolve(app_conf)
|
||||
|
||||
# Populate my dictionary
|
||||
my = {"services": app_hosts,
|
||||
"conf_name": app_conf['conf_name']}
|
||||
|
||||
logger.debug('my=',my)
|
||||
|
||||
# Generate config from template
|
||||
service_conf = gen(my, app_conf['template'])
|
||||
|
||||
|
@ -1,78 +1,270 @@
|
||||
import dns.resolver
|
||||
import dns.query
|
||||
from dns.exception import DNSException
|
||||
from .logger import info, warning, error, debug
|
||||
from .logger import Logger
|
||||
import sys
|
||||
import requests
|
||||
|
||||
# Default config for Discovery class
|
||||
_config={
|
||||
'default_discovery':'mesos_dns' # Default discovery system
|
||||
}
|
||||
|
||||
# Resolve service from mesos-dns SRV record
|
||||
# return dict {"servicename": [{"name": "service.f.q.d.n.", "port": 9999}]}
|
||||
def resolve(app, conf):
|
||||
hosts = {}
|
||||
services = app['services']
|
||||
domain = conf['domain']
|
||||
# Discoveries objects
|
||||
_discoveries={}
|
||||
|
||||
for service in services:
|
||||
hosts[service['name']] = []
|
||||
#Logger
|
||||
logger=Logger()
|
||||
|
||||
group = get_group(service, app)
|
||||
if group is False:
|
||||
error('Group is not defined in config, SUROK_DISCOVERY_GROUP and MARATHON_APP_ID')
|
||||
error('Not in Mesos launch?')
|
||||
class DiscoveryTemplate:
|
||||
# Default config values for discovery template
|
||||
_config={}
|
||||
_defconfig={'enabled':False}
|
||||
|
||||
def __init__(self,conf):
|
||||
for key in self._defconfig.keys():
|
||||
if key not in self._config.keys():
|
||||
self._config[key]=self._defconfig[key]
|
||||
self.set_config(conf)
|
||||
|
||||
def set_config(self,conf):
|
||||
pass
|
||||
|
||||
def enabled(self):
|
||||
return self._config['enabled']
|
||||
|
||||
def update_data(self):
|
||||
pass
|
||||
|
||||
def get_group(self,service, app):
|
||||
# Check group in app conf
|
||||
if 'group' in service:
|
||||
return service['group']
|
||||
|
||||
# Check environment variable
|
||||
elif app['env'].get('SUROK_DISCOVERY_GROUP'):
|
||||
return app['env']['SUROK_DISCOVERY_GROUP']
|
||||
|
||||
# Check marathon environment variable
|
||||
elif app['env'].get('MARATHON_APP_ID'):
|
||||
return ".".join(app['env']['MARATHON_APP_ID'].split('/')[-2:0:-1])
|
||||
|
||||
else:
|
||||
logger.error('Group is not defined in config, SUROK_DISCOVERY_GROUP and MARATHON_APP_ID')
|
||||
logger.error('Not in Mesos launch?')
|
||||
sys.exit(2)
|
||||
|
||||
# Port name from app config
|
||||
ports = None
|
||||
if 'ports' in service:
|
||||
ports = service['ports']
|
||||
|
||||
# "service-with-defined-ports":
|
||||
# [
|
||||
# {
|
||||
# "name": "example1.com",
|
||||
# "ip": ["10.10.10.1"],
|
||||
# "ports": {
|
||||
# "rpc": 12342,
|
||||
# "web": 12341
|
||||
# }
|
||||
# },
|
||||
# {
|
||||
# "name": "example2.com",
|
||||
# "ports": {
|
||||
# "rpc": 12344,
|
||||
# "web": 12343
|
||||
# }
|
||||
# }
|
||||
# ]
|
||||
fqdn = ''
|
||||
class Discovery:
|
||||
def __init__(self,*conf):
|
||||
for __conf in conf:
|
||||
self.set_config(__conf)
|
||||
|
||||
# Discovery over Consul DNS
|
||||
if 'consul' in conf and conf['consul']['enabled']:
|
||||
fqdn = '_' + service['name'] + '._tcp.' + conf['consul']['domain']
|
||||
hosts[service['name']].append(do_query(fqdn, conf['loglevel']))
|
||||
continue
|
||||
|
||||
if ports is not None:
|
||||
for port_name in ports:
|
||||
fqdn = '_' + port_name + '.' + '_' + service['name'] + '.' + group + '._tcp.' + domain # Need support for udp ports. See #16
|
||||
discovered = do_query(fqdn, conf['loglevel'])
|
||||
for d in discovered:
|
||||
to_append = {}
|
||||
to_append['name'] = d['name']
|
||||
to_append['ip'] = d['ip']
|
||||
to_append['ports'][port_name] = d['port']
|
||||
hosts[service['name']].append(to_append)
|
||||
def set_config(self,conf):
|
||||
global _discoveries
|
||||
#Get discoveries objects
|
||||
if not _discoveries.get('mesos_dns'):
|
||||
_discoveries['mesos_dns']=DiscoveryMesos(conf)
|
||||
else:
|
||||
fqdn = '_' + service['name'] + '.' + group + '._tcp.' + domain
|
||||
hosts[service['name']] = do_query(fqdn, conf['loglevel'])
|
||||
_discoveries['mesos_dns'].set_config(conf)
|
||||
|
||||
if not _discoveries.get('marathon_api'):
|
||||
_discoveries['marathon_api']=DiscoveryMarathon(conf)
|
||||
else:
|
||||
_discoveries['marathon_api'].set_config(conf)
|
||||
|
||||
if not _discoveries.get('consul_dns'):
|
||||
_discoveries['consul_dns']=DiscoveryConsul(conf)
|
||||
else:
|
||||
_discoveries['consul_dns'].set_config(conf)
|
||||
|
||||
global _config
|
||||
if conf.get('default_discovery'):
|
||||
discovery=conf.get('default_discovery')
|
||||
if discovery in list(_discoveries.keys()):
|
||||
_config['default_discovery']=discovery
|
||||
else:
|
||||
logger.error('Default discovery "'+discovery+'" is not present')
|
||||
logger.debug('Conf=',conf)
|
||||
|
||||
def resolve(self,app):
|
||||
__discovery=_config.get('default_discovery')
|
||||
if app.get('discovery'):
|
||||
discovery=app.get('discovery')
|
||||
if discovery in list(_discoveries.keys()):
|
||||
__discovery=discovery
|
||||
else:
|
||||
logger.warning('Discovery "'+discovery+'" is not present')
|
||||
logger.debug('App=',app)
|
||||
return {}
|
||||
if _discoveries[__discovery].enabled():
|
||||
return _discoveries[__discovery].resolve(app)
|
||||
else:
|
||||
logger.error('Discovery "'+__discovery+'" is disabled')
|
||||
return {}
|
||||
|
||||
def update_data(self):
|
||||
global _discoveries
|
||||
for d in list(_discoveries.keys()):
|
||||
if _discoveries[d].enabled():
|
||||
_discoveries[d].update_data()
|
||||
|
||||
|
||||
class DiscoveryMesos(DiscoveryTemplate):
|
||||
_config={
|
||||
'domain':'marathon.mesos' # Default domain
|
||||
}
|
||||
|
||||
def set_config(self,conf):
|
||||
# For old version config
|
||||
if conf.get('domain'):
|
||||
self._config['domain']=conf.get('domain')
|
||||
self._config['enabled']=True
|
||||
# For current version config
|
||||
if conf.get('mesos'):
|
||||
_conf=conf['mesos']
|
||||
for p in ['domain','enabled']:
|
||||
if _conf.get(p):
|
||||
self._config[p]=_conf.get(p)
|
||||
|
||||
def resolve(self,app):
|
||||
hosts = {}
|
||||
services = app['services']
|
||||
domain = self._config['domain']
|
||||
for service in services:
|
||||
group = self.get_group(service, app)
|
||||
ports = service.get('ports')
|
||||
name = service['name']
|
||||
hosts[name] = {}
|
||||
serv = hosts[name]
|
||||
if ports is not None:
|
||||
hosts[name] = {}
|
||||
serv = hosts[name]
|
||||
for prot in ['tcp','udp']:
|
||||
for port_name in ports:
|
||||
for d in do_query('_'+port_name+'._'+name+'.'+group+'._'+prot+'.'+domain):
|
||||
hostname=d['name']
|
||||
if serv.get(hostname) is None:
|
||||
serv[hostname]={"name":hostname,"ip":d['ip']}
|
||||
if serv[hostname].get(prot) is None:
|
||||
serv[hostname][prot]={}
|
||||
serv[hostname][prot][port_name]=d['port']
|
||||
hosts[name]=list(hosts[name].values())
|
||||
else:
|
||||
hosts[name]=do_query('_'+name+'.'+group+'._tcp.'+domain)
|
||||
|
||||
return hosts
|
||||
|
||||
|
||||
class DiscoveryMarathon(DiscoveryTemplate):
|
||||
_config={
|
||||
'host':'http://marathon.mesos:8080',
|
||||
'force':True
|
||||
}
|
||||
__tasks = []
|
||||
__ports = {}
|
||||
def set_config(self,conf):
|
||||
# For current version config
|
||||
if conf.get('marathon'):
|
||||
_conf=conf['marathon']
|
||||
for p in ['host','enabled','force']:
|
||||
if _conf.get(p):
|
||||
self._config[p]=_conf.get(p)
|
||||
|
||||
def update_data(self):
|
||||
try:
|
||||
apps = requests.get(self._config['host']+'/v2/apps').json()['apps']
|
||||
ports = {}
|
||||
for app in apps:
|
||||
ports[app['id']] = {}
|
||||
if app.get('container') is not None and app['container']['type'] == 'DOCKER':
|
||||
ports[app['id']] = app['container']['docker'].get('portMappings',[])
|
||||
self.__ports=ports
|
||||
except:
|
||||
logger.warning('Apps ('+self._config['host']+'/v2/apps) request from Marathon API is failed')
|
||||
pass
|
||||
try:
|
||||
self.__tasks = requests.get(self._config['host']+'/v2/tasks').json()['tasks']
|
||||
except:
|
||||
logger.warning('Tasks ('+self._config['host']+'/v2/tasks) request from Marathon API is failed')
|
||||
pass
|
||||
|
||||
def resolve(self, app):
|
||||
hosts={}
|
||||
serv_conf = app['services']
|
||||
if not serv_conf:
|
||||
serv_conf = [{'name':'*','ports':['*']}]
|
||||
for serv in serv_conf:
|
||||
# Convert xxx.yyy.zzz to /zzz/yyy/xxx/ format
|
||||
group = '/'.join(['']+self.get_group(serv, app).split('.')[::-1]+[''])
|
||||
mask = group+serv['name']
|
||||
for task in self.__tasks:
|
||||
if (mask.endswith('*') and task['appId'].startswith(mask[:-1])) or task['appId'] == mask:
|
||||
name='.'.join(task['appId'][len(group):].split('/')[::-1])
|
||||
if 'ports' in serv:
|
||||
hosts[name]={}
|
||||
for port in self.__ports[task['appId']]:
|
||||
for pp in serv['ports']:
|
||||
if (pp.endswith('*') and port['name'].startswith(pp[:-1])) or port['name'] == pp:
|
||||
if hosts[name].get(task['host']) is None:
|
||||
hosts[name][task['host']]={'name':task['host'],
|
||||
'ip':do_query_a(task['host'])}
|
||||
if hosts[name][task['host']].get(port['protocol']) is None:
|
||||
hosts[name][task['host']][port['protocol']]={}
|
||||
hosts[name][task['host']][port['protocol']][port['name']]=task['ports'][task['servicePorts'].index(port['servicePort'])]
|
||||
hosts[name]=list(hosts[name].values())
|
||||
else:
|
||||
hosts[name]=[]
|
||||
for port in self.__ports[task['appId']]:
|
||||
hosts[name].append({'name':task['host'],
|
||||
'port':task['ports'][task['servicePorts'].index(port['servicePort'])],
|
||||
'ip':do_query_a(task['host'])})
|
||||
|
||||
return hosts
|
||||
|
||||
|
||||
class DiscoveryConsul(DiscoveryTemplate):
|
||||
_config={
|
||||
'enabled':False,
|
||||
'domain':None
|
||||
}
|
||||
def set_config(self,conf):
|
||||
# For current version config
|
||||
if conf.get('consul'):
|
||||
_conf=conf['consul']
|
||||
for p in ['domain','enabled']:
|
||||
if _conf.get(p):
|
||||
self._config[p]=_conf.get(p)
|
||||
|
||||
def resolve(self,app):
|
||||
hosts = {}
|
||||
services = app['services']
|
||||
domain = self._config['domain']
|
||||
for service in services:
|
||||
name = service['name']
|
||||
hosts[name]=do_query('_'+name+'._tcp.'+domain)
|
||||
return hosts
|
||||
|
||||
|
||||
# Do DNS queries
|
||||
# Return array:
|
||||
# ["10.10.10.1", "10.10.10.2"]
|
||||
def do_query_a(fqdn):
|
||||
servers = []
|
||||
try:
|
||||
resolver = dns.resolver.Resolver()
|
||||
for a_rdata in resolver.query(fqdn, 'A'):
|
||||
servers.append(a_rdata.address)
|
||||
except DNSException as e:
|
||||
logger.error("Could not resolve "+fqdn)
|
||||
|
||||
return servers
|
||||
|
||||
|
||||
# Do DNS queries
|
||||
# Return array:
|
||||
# [{"name": "f.q.d.n", "port": 8876, "ip": ["10.10.10.1", "10.10.10.2"]}]
|
||||
def do_query(fqdn, loglevel):
|
||||
def do_query(fqdn):
|
||||
servers = []
|
||||
try:
|
||||
resolver = dns.resolver.Resolver()
|
||||
@ -83,40 +275,8 @@ def do_query(fqdn, loglevel):
|
||||
info = str(rdata).split()
|
||||
name = info[3][:-1]
|
||||
port = info[2]
|
||||
server = {'name': name, 'port': port, 'ip': []}
|
||||
a_query = resolver.query(name, 'A')
|
||||
for a_rdata in a_query:
|
||||
server['ip'].append(a_rdata.address)
|
||||
servers.append(server)
|
||||
servers.append({'name': name, 'port': port, 'ip': do_query_a(name)})
|
||||
except DNSException as e:
|
||||
if loglevel != 'info':
|
||||
error("Could not resolve " + fqdn)
|
||||
logger.error("Could not resolve " + fqdn)
|
||||
|
||||
return servers
|
||||
|
||||
|
||||
# Groups switch
|
||||
# Priority: config, environment, marathon environment
|
||||
def get_group(service, app):
|
||||
# Check group in app conf
|
||||
if 'group' in service:
|
||||
return service['group']
|
||||
# Check environment variable
|
||||
elif app['env'].get('SUROK_DISCOVERY_GROUP'):
|
||||
return app['env']['SUROK_DISCOVERY_GROUP']
|
||||
# Check marathon environment variable
|
||||
elif app['env'].get('MARATHON_APP_ID'):
|
||||
group = parse_marathon_app_id(app['env']['MARATHON_APP_ID'])
|
||||
return group
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
# Parse MARATHON_APP_ID
|
||||
# Return marathon.group
|
||||
def parse_marathon_app_id(marathon_app_id):
|
||||
marathon_app_id = marathon_app_id.split('/')
|
||||
del(marathon_app_id[-1])
|
||||
marathon_app_id.reverse()
|
||||
group = ".".join(marathon_app_id)[:-1]
|
||||
return(group)
|
||||
|
@ -1,36 +1,51 @@
|
||||
import sys
|
||||
import json
|
||||
from time import time
|
||||
_loglevel='info'
|
||||
msg_level={'debug':'DEBUG',
|
||||
'info':'INFO',
|
||||
'warning':'WARNING',
|
||||
'error':'ERROR'}
|
||||
|
||||
class Logger:
|
||||
def __init__(self,*args):
|
||||
if args:
|
||||
self.set_level(args[0])
|
||||
|
||||
def make_message(message):
|
||||
cur_time = str(time())
|
||||
m = '[' + cur_time + '] ' + message['level'] + ': ' + message['raw'] + "\n"
|
||||
return m
|
||||
def set_level(self,level):
|
||||
if level in ['debug','info','warning','error']:
|
||||
global _loglevel
|
||||
_loglevel=level
|
||||
|
||||
def get_level(self):
|
||||
return _loglevel
|
||||
|
||||
def info(message):
|
||||
req = {'level': 'INFO', 'raw': message}
|
||||
m = make_message(req)
|
||||
def __make_message(self,message):
|
||||
r=[]
|
||||
l=self.get_level()
|
||||
for m in message:
|
||||
if type(m).__name__=='str':
|
||||
r.append(m)
|
||||
else:
|
||||
r.append(json.dumps(m,sort_keys=True,indent=2))
|
||||
return '[' + str(time()) + '] ' + msg_level[l] + ': ' + ''.join(r) + "\n"
|
||||
|
||||
sys.stdout.write(m)
|
||||
def debug(self,*message):
|
||||
if self.get_level() in ['debug']:
|
||||
sys.stderr.write(self.__make_message(message))
|
||||
|
||||
def info(self,*message):
|
||||
if self.get_level() in ['debug','info']:
|
||||
sys.stdout.write(self.__make_message(message))
|
||||
|
||||
def warning(message):
|
||||
req = {'level': 'WARNING', 'raw': message}
|
||||
m = make_message(req)
|
||||
def warning(self,*message):
|
||||
if self.get_level() in ['debug','info','warning']:
|
||||
sys.stderr.write(self.__make_message(message))
|
||||
|
||||
sys.stderr.write(m)
|
||||
def error(self,*message):
|
||||
sys.stderr.write(self.__make_message(message))
|
||||
|
||||
def testing(self,level,message):
|
||||
self.set_level(level)
|
||||
return self.__make_message(message)
|
||||
|
||||
def error(message):
|
||||
req = {'level': 'ERROR', 'raw': message}
|
||||
m = make_message(req)
|
||||
|
||||
sys.stderr.write(m)
|
||||
|
||||
|
||||
def debug(message):
|
||||
req = {'level': 'DEBUG', 'raw': message}
|
||||
m = make_message(req)
|
||||
|
||||
sys.stderr.write(m)
|
||||
|
@ -1,20 +1,19 @@
|
||||
import os
|
||||
import sys
|
||||
import requests
|
||||
from .discovery import resolve
|
||||
from .logger import info, warning, error, debug
|
||||
|
||||
from .discovery import Discovery
|
||||
from .logger import Logger
|
||||
logger=Logger()
|
||||
|
||||
# Get old configuration
|
||||
def get_old(name, service_conf):
|
||||
|
||||
try:
|
||||
path = '/var/tmp/surok.' + name
|
||||
f = open(path, 'r')
|
||||
old = f.read()
|
||||
f.close()
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
logger.error(str(e))
|
||||
return 0
|
||||
|
||||
if old == service_conf:
|
||||
@ -53,7 +52,7 @@ def write_lock(name, service_conf):
|
||||
|
||||
|
||||
def do_reload(service_conf, app_conf):
|
||||
warning('Write new configuration of ' + app_conf['conf_name'])
|
||||
logger.warning('Write new configuration of ' + app_conf['conf_name'])
|
||||
|
||||
f = open(app_conf['dest'], 'w')
|
||||
f.write(service_conf)
|
||||
@ -68,6 +67,7 @@ def do_reload(service_conf, app_conf):
|
||||
|
||||
# Discovery memcached servers
|
||||
def discovery_memcached(conf):
|
||||
discovery=Discovery()
|
||||
memcache = conf['memcached']
|
||||
app_conf = {
|
||||
"services": [
|
||||
@ -78,7 +78,7 @@ def discovery_memcached(conf):
|
||||
]
|
||||
}
|
||||
|
||||
hosts = resolve(app_conf, conf)
|
||||
hosts = discovery.resolve(app_conf)
|
||||
mc_servers = []
|
||||
|
||||
for server in hosts[memcache['discovery']['service']]:
|
||||
@ -91,7 +91,7 @@ def discovery_memcached(conf):
|
||||
# !!! NEED REFACTORING !!!
|
||||
def reload_conf(service_conf, app_conf, conf, app_hosts):
|
||||
# Check marathon enabled in configuration
|
||||
if conf['marathon']['enabled'] is True:
|
||||
if conf['marathon'].get('restart',False):
|
||||
if get_old(app_conf['conf_name'], service_conf) != 1:
|
||||
restart_self_in_marathon(conf['marathon'])
|
||||
|
||||
@ -105,47 +105,38 @@ def reload_conf(service_conf, app_conf, conf, app_hosts):
|
||||
mc_hosts = None
|
||||
if conf['memcached']['discovery']['enabled'] is True:
|
||||
mc_hosts = discovery_memcached(conf)
|
||||
info('Discovered memcached hosts: ' + str(mc_hosts))
|
||||
logger.info('Discovered memcached hosts: ' + str(mc_hosts))
|
||||
else:
|
||||
mc_hosts = conf['memcached']['hosts']
|
||||
try:
|
||||
mc = memcache.Client(mc_hosts)
|
||||
if get_old_from_memcache(mc, app_conf['conf_name'], app_hosts) != 1:
|
||||
stdout = do_reload(service_conf, app_conf)
|
||||
info(stdout)
|
||||
logger.info(stdout)
|
||||
return True
|
||||
except Exception as e:
|
||||
error('Cannot connect to memcached: ' + str(e))
|
||||
logger.error('Cannot connect to memcached: ' + str(e))
|
||||
|
||||
else:
|
||||
warning('DEPRECATED main conf file. Please use new syntax!')
|
||||
logger.warning('DEPRECATED main conf file. Please use new syntax!')
|
||||
# End of memcache block
|
||||
#######################
|
||||
|
||||
if get_old(app_conf['conf_name'], service_conf) != 1:
|
||||
stdout = do_reload(service_conf, app_conf)
|
||||
info(stdout)
|
||||
logger.info(stdout)
|
||||
return True
|
||||
else:
|
||||
if conf['loglevel'] == 'debug':
|
||||
debug('Same config ' + app_conf['conf_name'] + ' Skip reload')
|
||||
logger.debug('Same config ' + app_conf['conf_name'] + ' Skip reload')
|
||||
return False
|
||||
|
||||
|
||||
# Do POST request to marathon API
|
||||
# /v2/apps//app/name/restart
|
||||
def restart_self_in_marathon(marathon):
|
||||
host = marathon['host']
|
||||
|
||||
# Check MARATHON_APP_ID environment varible
|
||||
if os.environ.get('MARATHON_APP_ID') is not True:
|
||||
error('Cannot find MARATHON_APP_ID. Not in Mesos?')
|
||||
if not os.environ.get('MARATHON_APP_ID',False):
|
||||
logger.error('Cannot find MARATHON_APP_ID. Not in Mesos?')
|
||||
sys.exit(2)
|
||||
app_id = os.environ['MARATHON_APP_ID']
|
||||
uri = 'http://' + host + '/v2/apps/' + app_id + '/restart'
|
||||
|
||||
# Ok. In this step we made restart request to Marathon
|
||||
if marathon['force'] is True:
|
||||
r = requests.post(uri, data = {'force': 'true'})
|
||||
else:
|
||||
r = requests.post(uri, data = {'force': 'false'})
|
||||
r = requests.post('http://'+marathon['host']+'/v2/apps/'+os.environ['MARATHON_APP_ID']+'/restart',
|
||||
data={'force': marathon.get('force',False)})
|
||||
|
Loading…
Reference in New Issue
Block a user