diff --git a/src/rbmd/config.go b/src/rbmd/config.go index 5db495c..860989d 100644 --- a/src/rbmd/config.go +++ b/src/rbmd/config.go @@ -1,7 +1,9 @@ package rbmd +//Zk Zookeeper connection settings type Zk struct { Hosts []string - Path string - Tick int + Path string + Tick int } + diff --git a/src/rbmd/connection.go b/src/rbmd/connection.go index 68cf6ff..2bf65a3 100644 --- a/src/rbmd/connection.go +++ b/src/rbmd/connection.go @@ -1,12 +1,13 @@ package rbmd import ( - "github.com/samuel/go-zookeeper/zk" - "time" "log" + "time" + + "github.com/samuel/go-zookeeper/zk" ) - +//InitConnection Initialize Zookeeper connection func (conf Zk) InitConnection() (*zk.Conn, error) { conn, _, err := zk.Connect(conf.Hosts, time.Second) if err != nil { @@ -15,3 +16,4 @@ func (conf Zk) InitConnection() (*zk.Conn, error) { return conn, err } + diff --git a/src/rbmd/leader.go b/src/rbmd/leader.go new file mode 100644 index 0000000..78e6a34 --- /dev/null +++ b/src/rbmd/leader.go @@ -0,0 +1,230 @@ +package rbmd + +import ( + "strings" + "log" + "encoding/json" + "time" +) + +//Quorum quorum information +type Quorum struct { + Quorum []Node `json:"quorum"` + Leader string `json:"leader"` + Health string `json:"health"` +} + +//GetQuorumHealth return health check of cluster state +func (z ZooNode) GetQuorumHealth() string { + helthPath := strings.Join([]string{z.Path, "/log/health"}, "") + health, _, err := z.Conn.Get(helthPath) + if err != nil { + log.Print("[ERROR] ", err) + } + + return string(health) +} + +//SetQuorumHealth set current cluster health +func (z ZooNode) SetQuorumHealth(health string) { + helthPath := strings.Join([]string{z.Path, "/log/health"}, "") + z.EnsureZooPath("log/health") + _, zoStat, _ := z.Conn.Get(helthPath) + + z.Conn.Set(helthPath, []byte(health), zoStat.Version) +} + +//CheckAndSetHealth ... +func (z ZooNode) CheckAndSetHealth(childrens []string) { + for _, child := range childrens { + var childNode Node + childStatePath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "") + childState, _, err := z.Conn.Get(childStatePath) + if err != nil { + log.Print("[ERROR] ", err) + } + json.Unmarshal(childState, &childNode) + state, message := CheckMounts(childState) + if !state { + if childNode.Updated < (time.Now().Unix() - 9) { + z.SetQuorumHealth(strings.Join(message, "\n")) + return + } + z.SetQuorumHealth("alive.") + return + } + } + + currentHealth := strings.Split(z.GetQuorumHealth(), " ") + if currentHealth[0] == "resizing." { + for _, child := range childrens { + if child == currentHealth[2] { + z.SetQuorumHealth("alive.") + return + } + } + } +} + +//UpdateQuorum set current cluster state +func (z ZooNode) UpdateQuorum(childrens []string) { + quorumStatePath := strings.Join([]string{z.Path, "/log/quorum"}, "") + z.EnsureZooPath("log/quorum") + + // Get nodes statuses + var quorum Quorum + for _, child := range childrens { + var node Node + childPath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "") + data, _, _ := z.Conn.Get(childPath) + json.Unmarshal(data, &node) + quorum.Quorum = append(quorum.Quorum, node) + } + + quorum.Health = z.GetQuorumHealth() + quorum.Leader = z.GetLeader() + _, zoStat, _ := z.Conn.Get(quorumStatePath) + q, err := json.Marshal(quorum) + if err != nil { + log.Print("[ERROR] ", err) + } + + // Update + log.Print("[DEBUG] Updating quorum") + z.Conn.Set(quorumStatePath, q, zoStat.Version) +} + +//GetLeader get current leader +func (z ZooNode) GetLeader() string { + leaderPath := strings.Join([]string{z.Path, "/log/leader"}, "") + leader, _, err := z.Conn.Get(leaderPath) + if err != nil { + log.Fatal("[ERROR] ", err) + } + + return string(leader) +} + +//SetLeader set current leader +func (z ZooNode) SetLeader(fqdn string) { + leaderPath := strings.Join([]string{z.Path, "/log/leader"}, "") + z.EnsureZooPath("log/leader") + _, zoStat, err := z.Conn.Get(leaderPath) + if err != nil { + log.Fatal("[ERROR] ", err) + } + + if z.GetLeader() != fqdn { + log.Print("[DEBUG] I'm leader") + z.Conn.Set(leaderPath, []byte(fqdn), zoStat.Version) + } +} + +//FindLeader return f.q.d.n of current leader +func (z ZooNode) FindLeader(fqdn string) { + childrens, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, "")) + if err != nil { + log.Print("[ERROR] ", err) + } + + var state bool + + myState, _, _ := z.Conn.Get(strings.Join([]string{z.Path, "/cluster/", fqdn, "/state"}, "")) + var node Node + json.Unmarshal(myState, &node) + + state = z.CompareChilds(childrens, node) + if state { + z.SetLeader(fqdn) + } + + z.CheckAndSetHealth(childrens) + z.UpdateQuorum(childrens) +} + +//CompareChilds return bool +// Compare childrens +func (z ZooNode) CompareChilds(childrens []string, node Node) (bool) { + currentLeader := z.GetLeader() + for _, child := range childrens { + var childNode Node + childStatePath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "") + childState, _, err := z.Conn.Get(childStatePath) + if err != nil { + log.Print("[ERROR] ", err) + } + json.Unmarshal(childState, &childNode) + + if childNode.Updated < (time.Now().Unix() - 9) { + leader := z.GetLeader() + if leader == child { + z.SetLeader(node.Node) + } + log.Print("[DEBUG] ", childNode) + childrens, _ := z.DestroyNode(child) + z.UpdateQuorum(childrens) + continue + } + + // Compare updated time + if node.Updated < childNode.Updated { + return false + } + + // if (time.Now().Unix() - 5) < childNode.Updated && + if childNode.Node == currentLeader { + return false + } + } + return true +} + +//DestroyNode ... +// Delete node from quorum +func (z ZooNode) DestroyNode(fqdn string) ([]string, string) { + log.Print("[WARNING] Deleting node ", fqdn, " from quorum!") + + childStatePath := strings.Join([]string{z.Path, "/cluster/", fqdn, "/state"}, "") + childPath := strings.Join([]string{z.Path, "/cluster/", fqdn}, "") + nodeStat, stateVersion, _ := z.Conn.Get(childStatePath) + + // Check node mounts + mountStat, message := CheckMounts(nodeStat) + if mountStat { + _, childVersion, _ := z.Conn.Get(childPath) + z.Conn.Delete(childStatePath, stateVersion.Version) + z.Conn.Delete(childPath, childVersion.Version) + z.SetQuorumHealth(strings.Join([]string{"resizing. node ", fqdn}, "")) + } + + childrens, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, "")) + if err != nil { + log.Print("[ERROR] ", err) + } + log.Print("[DEBUG] After destroy childs ", childrens) + + return childrens, strings.Join(message, "\n") +} + + +//CheckMounts ... +// Check mounts on down node +func CheckMounts(nodeStat []byte) (bool, []string) { + var node Node + + err := json.Unmarshal(nodeStat, &node) + if err != nil { + log.Print("[ERROR] ", err) + } + + var message []string + if len(node.Mounts) > 0 { + message = append(message, "deadly. ", "Reason: ", " NODE: ", node.Node) + for _, mount := range node.Mounts { + message = append(message, " mountpoint: ", mount.Mountpoint, " block: ", mount.Block, " pool: ", mount.Pool) + } + return false, message + } + + return true, message +} diff --git a/src/rbmd/state.go b/src/rbmd/state.go index 62e0b99..e94e15b 100644 --- a/src/rbmd/state.go +++ b/src/rbmd/state.go @@ -6,10 +6,11 @@ import ( "os" "strings" "time" - _ "syscall" + // "syscall" "encoding/json" ) +//Run -- start main loop func Run(zoo Zk) { connection, err := zoo.InitConnection() if err != nil { @@ -18,50 +19,37 @@ func Run(zoo Zk) { fqdn, err := os.Hostname() z := ZooNode{zoo.Path, connection} - node, err := z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, "")) for { + node, err := z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, "")) + if err != nil { + log.Print("[ERROR] ", err) + } go z.UpdateState(node, fqdn) - go z.UpdateLeader() + go z.FindLeader(fqdn) time.Sleep(time.Duration(zoo.Tick) * time.Second) } } - -func (z ZooNode) UpdateState(node string, fqdn string) { +//UpdateState -- update node status +func (z ZooNode) UpdateState(zkPath string, fqdn string) { state := GetNodeState(fqdn) - state_json, err := json.Marshal(state) + stateJSON, err := json.Marshal(state) if err != nil { log.Print("[ERROR] Failed encoding json ", err) } - _, zo_stat, err := z.Conn.Get(node) + _, zoStat, err := z.Conn.Get(zkPath) if err != nil { log.Print("[ERROR] ", err) } log.Print("[DEBUG] ", "Updating state") - zo_stat, err = z.Conn.Set(node, state_json, zo_stat.Version) -} - - -func (z ZooNode) UpdateLeader() { - z.EnsureZooPath("log/leader") -} - - -func (z ZooNode) FindLeader() { - children, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, "")) + zoStat, err = z.Conn.Set(zkPath, stateJSON, zoStat.Version) if err != nil { log.Print("[ERROR] ", err) } - - for { - for _, child := range children { - log.Print("[DEBUG] ", child) - } - } - } + diff --git a/src/rbmd/sys.go b/src/rbmd/sys.go index b245317..cbccdd6 100644 --- a/src/rbmd/sys.go +++ b/src/rbmd/sys.go @@ -11,37 +11,37 @@ import ( ) -// Cluster status struct +//ClusterStatus Quorum status struct type ClusterStatus struct { - Quorum []Node - Health string - Zk string + Quorum []Node `json:"quorum"` + Health string `json:"health"` + Zk string `json:"zk"` } -// Node status struct +//Node Node status struct type Node struct { - Node string - Ip IPs - Updated int - Mounts []Mount + Node string `json:"node"` + IP IPs `json:"ip"` + Updated int64 `json:"updated"` + Mounts []Mount `json:"mounts"` } // Mount struct type Mount struct { - Mountpoint string - Mountopts string - Fstype string - Pool string - Block string + Mountpoint string `json:"mountpoint"` + Mountopts string `json:"mountopts"` + Fstype string `json:"fstype"` + Pool string `json:"pool"` + Block string `json:"block"` } -// IP addresses +//IPs IP addresses type IPs struct { - V4 []string - V6 []string + V4 []string `json:"v4"` + V6 []string `json:"v6"` } -// Parse /proc/mounts and get RBD mounts +//GetMounts Parse /proc/mounts and get RBD mounts func GetMounts() []Mount { var mounts []Mount m, err := ioutil.ReadFile("/proc/mounts") @@ -51,7 +51,7 @@ func GetMounts() []Mount { for _, line := range strings.Split(string(m), "\n") { mount := strings.Split(line, " ") - match, err := regexp.MatchString("^(/dev/rbd).*$", mount[0]) + match, err := regexp.MatchString("^(/dev/sd).*$", mount[0]) if err != nil { log.Print("[ERROR] ", err) } @@ -71,7 +71,7 @@ func GetMounts() []Mount { return mounts } -// Exclude 127.0.0.1 +//GetMyIPs Exclude 127.0.0.1 func GetMyIPs() IPs { ifaces, err := net.Interfaces() if err != nil { @@ -94,7 +94,7 @@ func GetMyIPs() IPs { ip = v.IP } if ip.String() != "127.0.0.1" && ip.String() != "::1" { - match, err := regexp.MatchString("^.*::.*$", ip.String()) + match, err := regexp.MatchString("^.*:.*$", ip.String()) if err != nil { log.Print("[ERROR] ", err) } @@ -114,13 +114,13 @@ func GetMyIPs() IPs { } -// Return Node struct +//GetNodeState Return Node struct func GetNodeState(fqdn string) Node { var n Node n.Node = fqdn - n.Ip = GetMyIPs() - n.Updated = int(time.Now().Unix()) + n.IP = GetMyIPs() + n.Updated = time.Now().Unix() n.Mounts = GetMounts() return n diff --git a/src/rbmd/zoo.go b/src/rbmd/zoo.go index 3f3e849..0f04f4e 100644 --- a/src/rbmd/zoo.go +++ b/src/rbmd/zoo.go @@ -5,11 +5,13 @@ import ( "strings" ) +//ZooNode zookeeper node type ZooNode struct { Path string Conn *zk.Conn } +//EnsureZooPath create zookeeper path func (z ZooNode) EnsureZooPath(node string) (string, error) { flag := int32(0) acl := zk.WorldACL(zk.PermAll)