leader election and simple cluster health check

This commit is contained in:
Denis Zheleztsov 2016-12-29 01:01:15 +03:00
parent 90d91a2c80
commit 2acb818296
6 changed files with 278 additions and 54 deletions

View File

@ -1,7 +1,9 @@
package rbmd
//Zk Zookeeper connection settings
type Zk struct {
Hosts []string
Path string
Tick int
Path string
Tick int
}

View File

@ -1,12 +1,13 @@
package rbmd
import (
"github.com/samuel/go-zookeeper/zk"
"time"
"log"
"time"
"github.com/samuel/go-zookeeper/zk"
)
//InitConnection Initialize Zookeeper connection
func (conf Zk) InitConnection() (*zk.Conn, error) {
conn, _, err := zk.Connect(conf.Hosts, time.Second)
if err != nil {
@ -15,3 +16,4 @@ func (conf Zk) InitConnection() (*zk.Conn, error) {
return conn, err
}

230
src/rbmd/leader.go Normal file
View File

@ -0,0 +1,230 @@
package rbmd
import (
"strings"
"log"
"encoding/json"
"time"
)
//Quorum quorum information
type Quorum struct {
Quorum []Node `json:"quorum"`
Leader string `json:"leader"`
Health string `json:"health"`
}
//GetQuorumHealth return health check of cluster state
func (z ZooNode) GetQuorumHealth() string {
helthPath := strings.Join([]string{z.Path, "/log/health"}, "")
health, _, err := z.Conn.Get(helthPath)
if err != nil {
log.Print("[ERROR] ", err)
}
return string(health)
}
//SetQuorumHealth set current cluster health
func (z ZooNode) SetQuorumHealth(health string) {
helthPath := strings.Join([]string{z.Path, "/log/health"}, "")
z.EnsureZooPath("log/health")
_, zoStat, _ := z.Conn.Get(helthPath)
z.Conn.Set(helthPath, []byte(health), zoStat.Version)
}
//CheckAndSetHealth ...
func (z ZooNode) CheckAndSetHealth(childrens []string) {
for _, child := range childrens {
var childNode Node
childStatePath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "")
childState, _, err := z.Conn.Get(childStatePath)
if err != nil {
log.Print("[ERROR] ", err)
}
json.Unmarshal(childState, &childNode)
state, message := CheckMounts(childState)
if !state {
if childNode.Updated < (time.Now().Unix() - 9) {
z.SetQuorumHealth(strings.Join(message, "\n"))
return
}
z.SetQuorumHealth("alive.")
return
}
}
currentHealth := strings.Split(z.GetQuorumHealth(), " ")
if currentHealth[0] == "resizing." {
for _, child := range childrens {
if child == currentHealth[2] {
z.SetQuorumHealth("alive.")
return
}
}
}
}
//UpdateQuorum set current cluster state
func (z ZooNode) UpdateQuorum(childrens []string) {
quorumStatePath := strings.Join([]string{z.Path, "/log/quorum"}, "")
z.EnsureZooPath("log/quorum")
// Get nodes statuses
var quorum Quorum
for _, child := range childrens {
var node Node
childPath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "")
data, _, _ := z.Conn.Get(childPath)
json.Unmarshal(data, &node)
quorum.Quorum = append(quorum.Quorum, node)
}
quorum.Health = z.GetQuorumHealth()
quorum.Leader = z.GetLeader()
_, zoStat, _ := z.Conn.Get(quorumStatePath)
q, err := json.Marshal(quorum)
if err != nil {
log.Print("[ERROR] ", err)
}
// Update
log.Print("[DEBUG] Updating quorum")
z.Conn.Set(quorumStatePath, q, zoStat.Version)
}
//GetLeader get current leader
func (z ZooNode) GetLeader() string {
leaderPath := strings.Join([]string{z.Path, "/log/leader"}, "")
leader, _, err := z.Conn.Get(leaderPath)
if err != nil {
log.Fatal("[ERROR] ", err)
}
return string(leader)
}
//SetLeader set current leader
func (z ZooNode) SetLeader(fqdn string) {
leaderPath := strings.Join([]string{z.Path, "/log/leader"}, "")
z.EnsureZooPath("log/leader")
_, zoStat, err := z.Conn.Get(leaderPath)
if err != nil {
log.Fatal("[ERROR] ", err)
}
if z.GetLeader() != fqdn {
log.Print("[DEBUG] I'm leader")
z.Conn.Set(leaderPath, []byte(fqdn), zoStat.Version)
}
}
//FindLeader return f.q.d.n of current leader
func (z ZooNode) FindLeader(fqdn string) {
childrens, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, ""))
if err != nil {
log.Print("[ERROR] ", err)
}
var state bool
myState, _, _ := z.Conn.Get(strings.Join([]string{z.Path, "/cluster/", fqdn, "/state"}, ""))
var node Node
json.Unmarshal(myState, &node)
state = z.CompareChilds(childrens, node)
if state {
z.SetLeader(fqdn)
}
z.CheckAndSetHealth(childrens)
z.UpdateQuorum(childrens)
}
//CompareChilds return bool
// Compare childrens
func (z ZooNode) CompareChilds(childrens []string, node Node) (bool) {
currentLeader := z.GetLeader()
for _, child := range childrens {
var childNode Node
childStatePath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "")
childState, _, err := z.Conn.Get(childStatePath)
if err != nil {
log.Print("[ERROR] ", err)
}
json.Unmarshal(childState, &childNode)
if childNode.Updated < (time.Now().Unix() - 9) {
leader := z.GetLeader()
if leader == child {
z.SetLeader(node.Node)
}
log.Print("[DEBUG] ", childNode)
childrens, _ := z.DestroyNode(child)
z.UpdateQuorum(childrens)
continue
}
// Compare updated time
if node.Updated < childNode.Updated {
return false
}
// if (time.Now().Unix() - 5) < childNode.Updated &&
if childNode.Node == currentLeader {
return false
}
}
return true
}
//DestroyNode ...
// Delete node from quorum
func (z ZooNode) DestroyNode(fqdn string) ([]string, string) {
log.Print("[WARNING] Deleting node ", fqdn, " from quorum!")
childStatePath := strings.Join([]string{z.Path, "/cluster/", fqdn, "/state"}, "")
childPath := strings.Join([]string{z.Path, "/cluster/", fqdn}, "")
nodeStat, stateVersion, _ := z.Conn.Get(childStatePath)
// Check node mounts
mountStat, message := CheckMounts(nodeStat)
if mountStat {
_, childVersion, _ := z.Conn.Get(childPath)
z.Conn.Delete(childStatePath, stateVersion.Version)
z.Conn.Delete(childPath, childVersion.Version)
z.SetQuorumHealth(strings.Join([]string{"resizing. node ", fqdn}, ""))
}
childrens, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, ""))
if err != nil {
log.Print("[ERROR] ", err)
}
log.Print("[DEBUG] After destroy childs ", childrens)
return childrens, strings.Join(message, "\n")
}
//CheckMounts ...
// Check mounts on down node
func CheckMounts(nodeStat []byte) (bool, []string) {
var node Node
err := json.Unmarshal(nodeStat, &node)
if err != nil {
log.Print("[ERROR] ", err)
}
var message []string
if len(node.Mounts) > 0 {
message = append(message, "deadly. ", "Reason: ", " NODE: ", node.Node)
for _, mount := range node.Mounts {
message = append(message, " mountpoint: ", mount.Mountpoint, " block: ", mount.Block, " pool: ", mount.Pool)
}
return false, message
}
return true, message
}

View File

@ -6,10 +6,11 @@ import (
"os"
"strings"
"time"
_ "syscall"
// "syscall"
"encoding/json"
)
//Run -- start main loop
func Run(zoo Zk) {
connection, err := zoo.InitConnection()
if err != nil {
@ -18,50 +19,37 @@ func Run(zoo Zk) {
fqdn, err := os.Hostname()
z := ZooNode{zoo.Path, connection}
node, err := z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, ""))
for {
node, err := z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, ""))
if err != nil {
log.Print("[ERROR] ", err)
}
go z.UpdateState(node, fqdn)
go z.UpdateLeader()
go z.FindLeader(fqdn)
time.Sleep(time.Duration(zoo.Tick) * time.Second)
}
}
func (z ZooNode) UpdateState(node string, fqdn string) {
//UpdateState -- update node status
func (z ZooNode) UpdateState(zkPath string, fqdn string) {
state := GetNodeState(fqdn)
state_json, err := json.Marshal(state)
stateJSON, err := json.Marshal(state)
if err != nil {
log.Print("[ERROR] Failed encoding json ", err)
}
_, zo_stat, err := z.Conn.Get(node)
_, zoStat, err := z.Conn.Get(zkPath)
if err != nil {
log.Print("[ERROR] ", err)
}
log.Print("[DEBUG] ", "Updating state")
zo_stat, err = z.Conn.Set(node, state_json, zo_stat.Version)
}
func (z ZooNode) UpdateLeader() {
z.EnsureZooPath("log/leader")
}
func (z ZooNode) FindLeader() {
children, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, ""))
zoStat, err = z.Conn.Set(zkPath, stateJSON, zoStat.Version)
if err != nil {
log.Print("[ERROR] ", err)
}
for {
for _, child := range children {
log.Print("[DEBUG] ", child)
}
}
}

View File

@ -11,37 +11,37 @@ import (
)
// Cluster status struct
//ClusterStatus Quorum status struct
type ClusterStatus struct {
Quorum []Node
Health string
Zk string
Quorum []Node `json:"quorum"`
Health string `json:"health"`
Zk string `json:"zk"`
}
// Node status struct
//Node Node status struct
type Node struct {
Node string
Ip IPs
Updated int
Mounts []Mount
Node string `json:"node"`
IP IPs `json:"ip"`
Updated int64 `json:"updated"`
Mounts []Mount `json:"mounts"`
}
// Mount struct
type Mount struct {
Mountpoint string
Mountopts string
Fstype string
Pool string
Block string
Mountpoint string `json:"mountpoint"`
Mountopts string `json:"mountopts"`
Fstype string `json:"fstype"`
Pool string `json:"pool"`
Block string `json:"block"`
}
// IP addresses
//IPs IP addresses
type IPs struct {
V4 []string
V6 []string
V4 []string `json:"v4"`
V6 []string `json:"v6"`
}
// Parse /proc/mounts and get RBD mounts
//GetMounts Parse /proc/mounts and get RBD mounts
func GetMounts() []Mount {
var mounts []Mount
m, err := ioutil.ReadFile("/proc/mounts")
@ -51,7 +51,7 @@ func GetMounts() []Mount {
for _, line := range strings.Split(string(m), "\n") {
mount := strings.Split(line, " ")
match, err := regexp.MatchString("^(/dev/rbd).*$", mount[0])
match, err := regexp.MatchString("^(/dev/sd).*$", mount[0])
if err != nil {
log.Print("[ERROR] ", err)
}
@ -71,7 +71,7 @@ func GetMounts() []Mount {
return mounts
}
// Exclude 127.0.0.1
//GetMyIPs Exclude 127.0.0.1
func GetMyIPs() IPs {
ifaces, err := net.Interfaces()
if err != nil {
@ -94,7 +94,7 @@ func GetMyIPs() IPs {
ip = v.IP
}
if ip.String() != "127.0.0.1" && ip.String() != "::1" {
match, err := regexp.MatchString("^.*::.*$", ip.String())
match, err := regexp.MatchString("^.*:.*$", ip.String())
if err != nil {
log.Print("[ERROR] ", err)
}
@ -114,13 +114,13 @@ func GetMyIPs() IPs {
}
// Return Node struct
//GetNodeState Return Node struct
func GetNodeState(fqdn string) Node {
var n Node
n.Node = fqdn
n.Ip = GetMyIPs()
n.Updated = int(time.Now().Unix())
n.IP = GetMyIPs()
n.Updated = time.Now().Unix()
n.Mounts = GetMounts()
return n

View File

@ -5,11 +5,13 @@ import (
"strings"
)
//ZooNode zookeeper node
type ZooNode struct {
Path string
Conn *zk.Conn
}
//EnsureZooPath create zookeeper path
func (z ZooNode) EnsureZooPath(node string) (string, error) {
flag := int32(0)
acl := zk.WorldACL(zk.PermAll)