218 lines
8.7 KiB
Python
218 lines
8.7 KiB
Python
#Public names
|
|
__all__=['Discovery','DiscoveryMesos','DiscoveryMarathon']
|
|
import dns.resolver
|
|
import dns.query
|
|
import os
|
|
import sys
|
|
import requests
|
|
from dns.exception import DNSException
|
|
from .config import *
|
|
from .logger import *
|
|
|
|
# Discovery object
|
|
_discovery_singleton=None
|
|
|
|
class DiscoveryTemplate:
|
|
def __init__(self):
|
|
self._config=Config()
|
|
self._logger=Logger()
|
|
|
|
def enabled(self):
|
|
return self._config[self._config_section].get('enabled',False)
|
|
|
|
def update_data(self):
|
|
pass
|
|
|
|
# Do DNS queries
|
|
# Return array:
|
|
# ["10.10.10.1", "10.10.10.2"]
|
|
def do_query_a(self,fqdn):
|
|
servers = []
|
|
try:
|
|
resolver = dns.resolver.Resolver()
|
|
for a_rdata in resolver.query(fqdn, 'A'):
|
|
servers.append(a_rdata.address)
|
|
except DNSException as e:
|
|
self._logger.error("Could not resolve ",fqdn)
|
|
return servers
|
|
|
|
# Do DNS queries
|
|
# Return array:
|
|
# [{"name": "f.q.d.n", "port": 8876, "ip": ["10.10.10.1", "10.10.10.2"]}]
|
|
def do_query_srv(self,fqdn):
|
|
servers = []
|
|
try:
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.lifetime = 1
|
|
resolver.timeout = 1
|
|
query = resolver.query(fqdn, 'SRV')
|
|
for rdata in query:
|
|
info = str(rdata).split()
|
|
servers.append({'name': info[3][:-1], 'port': info[2]})
|
|
except DNSException as e:
|
|
self._logger.warning("Could not resolve ",fqdn)
|
|
return servers
|
|
|
|
|
|
class Discovery:
|
|
_discoveries={}
|
|
def __new__(cls):
|
|
global _discovery_singleton
|
|
if _discovery_singleton is None:
|
|
_discovery_singleton=super(Discovery, cls).__new__(cls)
|
|
return _discovery_singleton
|
|
|
|
def __init__(self):
|
|
self._config=Config()
|
|
self._logger=Logger()
|
|
if not self._discoveries.get('mesos_dns'):
|
|
self._discoveries['mesos_dns']=DiscoveryMesos()
|
|
if not self._discoveries.get('marathon_api'):
|
|
self._discoveries['marathon_api']=DiscoveryMarathon()
|
|
|
|
def keys(self):
|
|
return self._discoveries.keys()
|
|
|
|
def resolve(self,app):
|
|
discovery=app.get('discovery',self._config.get('default_discovery'))
|
|
if discovery not in self.keys():
|
|
self._logger.warning('Discovery "',discovery,'" is not present')
|
|
return {}
|
|
if self._discoveries[discovery].enabled():
|
|
return self.compatible(self._discoveries[discovery].resolve(app))
|
|
else:
|
|
self._logger.error('Discovery "',discovery,'" is disabled')
|
|
return {}
|
|
|
|
def update_data(self):
|
|
self._config.update_apps()
|
|
for d in self.keys():
|
|
if self._discoveries[d].enabled():
|
|
self._discoveries[d].update_data()
|
|
|
|
def compatible(self,hosts):
|
|
compatible_hosts={}
|
|
if self._config.get('version') == '0.7':
|
|
for service in hosts.keys():
|
|
for host in hosts[service]:
|
|
ports=host.get('tcp',[])
|
|
if type(ports).__name__ == 'list':
|
|
compatible_hosts[service]=[]
|
|
for port in ports:
|
|
compatible_hosts[service].append({'name':host['name'],
|
|
'ip':host['ip'],
|
|
'port':str(port)})
|
|
else:
|
|
compatible_hosts[service]={}
|
|
for port in ports.keys():
|
|
compatible_host=compatible_hosts[service].setdefault(port,[])
|
|
compatible_host.append({'name':host['name'],
|
|
'ip':host['ip'],
|
|
'port':ports[port]})
|
|
|
|
return compatible_hosts
|
|
return hosts
|
|
|
|
|
|
class DiscoveryMesos(DiscoveryTemplate):
|
|
_config_section='mesos'
|
|
def resolve(self,app):
|
|
hosts = {}
|
|
services = app.get('services')
|
|
domain = self._config[self._config_section].get('domain')
|
|
for service in services:
|
|
group = service.get('group',app.get('group'))
|
|
if group is None:
|
|
self._logger.error('Group for service "{}" of config "{}" not found'.format(service['name'],app.get('conf_name')))
|
|
continue
|
|
ports = service.get('ports')
|
|
name = service['name']
|
|
hosts[name] = {}
|
|
serv = hosts[name]
|
|
self._logger.debug('group=',group,' ports=',ports,' name=',name,' serv=',serv)
|
|
for prot in ['tcp','udp']:
|
|
if ports is not None:
|
|
for port_name in ports:
|
|
for d in self.do_query_srv('_'+port_name+'._'+name+'.'+group+'._'+prot+'.'+domain):
|
|
hostname=d['name']
|
|
serv.setdefault(hostname,{'name':hostname,
|
|
'ip':self.do_query_a(hostname)})
|
|
serv[hostname].setdefault(prot,{})
|
|
serv[hostname][prot][port_name]=d['port']
|
|
else:
|
|
for d in self.do_query_srv('_'+name+'.'+group+'._'+prot+'.'+domain):
|
|
hostname=d['name']
|
|
if serv.get(hostname) is None:
|
|
serv[hostname]={'name':hostname,
|
|
'ip':self.do_query_a(hostname)}
|
|
if serv[hostname].get(prot) is None:
|
|
serv[hostname][prot]=[]
|
|
serv[hostname][prot].extend([d['port']])
|
|
hosts[name]=list(serv.values())
|
|
return hosts
|
|
|
|
|
|
class DiscoveryMarathon(DiscoveryTemplate):
|
|
_config_section='marathon'
|
|
_tasks = []
|
|
_ports = {}
|
|
|
|
def update_data(self):
|
|
hostname=self._config[self._config_section].get('host')
|
|
try:
|
|
ports = {}
|
|
for app in requests.get(hostname+'/v2/apps').json()['apps']:
|
|
ports[app['id']] = {}
|
|
if app.get('container') is not None and app['container'].get('type') == 'DOCKER':
|
|
ports[app['id']] = app['container']['docker'].get('portMappings',[])
|
|
self._ports=ports
|
|
except:
|
|
self._logger.warning('Apps (',hostname,'/v2/apps) request from Marathon API is failed')
|
|
pass
|
|
try:
|
|
self._tasks = requests.get(hostname + '/v2/tasks').json()['tasks']
|
|
except:
|
|
self._logger.warning('Tasks (',hostname,'/v2/tasks) request from Marathon API is failed')
|
|
pass
|
|
|
|
def _test_mask(self, mask, value):
|
|
return (mask.endswith('*') and value.startswith(mask[:-1])) or mask == value
|
|
|
|
def resolve(self, app):
|
|
hosts={}
|
|
services = app.get('services')
|
|
if not services:
|
|
services = [{'name':'*','ports':['*']}]
|
|
for service in services:
|
|
# Convert xxx.yyy.zzz to /zzz/yyy/xxx/ format
|
|
group=service.get('group',app.get('group'))
|
|
if group is None:
|
|
self._logger.error('Group for service "{}" of config "{}" not found'.format(service['name'],app.get('conf_name')))
|
|
continue
|
|
group = '/'+'/'.join(group.split('.')[::-1])+'/'
|
|
service_mask = group + service['name']
|
|
for task in self._tasks:
|
|
if self._test_mask(service_mask,task['appId']):
|
|
name='.'.join(task['appId'][len(group):].split('/')[::-1])
|
|
hosts[name]={}
|
|
serv = hosts[name]
|
|
hostname=task['host']
|
|
for task_port in self._ports[task['appId']]:
|
|
prot=task_port['protocol']
|
|
port_name=task_port['name']
|
|
port=task['ports'][task['servicePorts'].index(task_port['servicePort'])]
|
|
if 'ports' in service:
|
|
for port_mask in service['ports']:
|
|
if self._test_mask(port_mask,port_name):
|
|
serv.setdefault(hostname,{'name':hostname,
|
|
'ip':self.do_query_a(hostname)})
|
|
serv[hostname].setdefault(prot,{})
|
|
serv[hostname][prot][port_name]=port
|
|
else:
|
|
serv.setdefault(hostname,{'name':hostname,
|
|
'ip':self.do_query_a(hostname)})
|
|
serv[hostname].setdefault(prot,[])
|
|
serv[hostname][prot].extend([port])
|
|
hosts[name]=list(serv.values())
|
|
return hosts
|