Progress of http api

This commit is contained in:
Denis Zheleztsov 2017-01-09 22:33:05 +03:00
parent d543c95632
commit bf96ba7891
4 changed files with 186 additions and 64 deletions

View File

@ -4,6 +4,7 @@ import (
"net/http"
"log"
"encoding/json"
"strings"
"time"
"github.com/gorilla/websocket"
@ -15,6 +16,44 @@ type ServerConf struct {
Ws string
}
//MountHandler /mount location
func (wr wrr) MountHandler (w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var m RBDDevice
err := decoder.Decode(&m)
log.Print("[DEBUG] ", m)
var msE []byte
if err != nil {
msE, _ = json.Marshal(MountState{"FAIL", "JSON parse failure"})
w.Write(msE)
return
}
// var wCh chan MountState
wCh := make(chan MountState, 1)
go func() { wCh <- wr.z.WatchAnswer(m.Node, "mount") }()
err = wr.z.MountRequest(m)
if err != nil {
w.Write(msE)
}
answerState := <-wCh
log.Print(answerState)
wr.z.RMR(strings.Join([]string{wr.z.Path, "cluster", wr.Fqdn, "answers", "mount"}, "/"))
state, err := json.Marshal(answerState)
if err != nil {
log.Print("[ERROR] ", err)
w.Write(msE)
}
w.Write(state)
}
//wrr API
type wrr struct {
Fqdn string
z ZooNode
}
//ServeHTTP start http server
func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) {
@ -38,14 +77,13 @@ func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) {
w.Write(state)
})
wr := wrr{
fqdn,
z,
}
// Mount volume. Accept JSON. Return JSON.
http.HandleFunc("/mount", func(w http.ResponseWriter, r *http.Request) {
state, err := json.Marshal(MountState{"FAIL", "Not implemented yet"})
if err != nil {
log.Fatal(err)
}
w.Write(state)
})
http.HandleFunc("/mount", wr.MountHandler)
// Umount volume. Accept JSON. Return JSON.
http.HandleFunc("/umount", func(w http.ResponseWriter, r *http.Request) {
@ -58,7 +96,6 @@ func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) {
log.Fatal(server.ListenAndServe())
}
//Writer ws
type Writer struct {
Upgrader websocket.Upgrader

View File

@ -9,9 +9,10 @@ import (
//Quorum quorum information
type Quorum struct {
Quorum []Node `json:"quorum"`
Leader string `json:"leader"`
Health string `json:"health"`
Quorum []Node `json:"quorum"`
Leader string `json:"leader"`
Health string `json:"health"`
DeadlyReason Node `json:"deadlyreason"`
}
//GetQuorumHealth return health check of cluster state
@ -34,6 +35,20 @@ func (z ZooNode) SetQuorumHealth(health string) {
z.Conn.Set(helthPath, []byte(health), zoStat.Version)
}
//SetDeadlyReason default null
func (z ZooNode) SetDeadlyReason(node Node) {
deadlyReasonPath := strings.Join([]string{z.Path, "log/deadlyreason"}, "/")
z.EnsureZooPath("log/deadlyreason")
deadlyNode, err := json.Marshal(node)
if err != nil {
log.Print("[ERROR] Marshal json failed: ", err)
}
_, zoStat, _ := z.Conn.Get(deadlyReasonPath)
z.Conn.Set(deadlyReasonPath, deadlyNode, zoStat.Version)
}
//CheckAndSetHealth ...
func (z ZooNode) CheckAndSetHealth(childrens []string) {
for _, child := range childrens {
@ -44,14 +59,13 @@ func (z ZooNode) CheckAndSetHealth(childrens []string) {
log.Print("[ERROR] ", err)
}
json.Unmarshal(childState, &childNode)
state, message := CheckMounts(childState)
state, _ := CheckMounts(childState)
if !state {
if childNode.Updated < (time.Now().Unix() - 9) {
z.SetQuorumHealth(strings.Join(message, "\n"))
z.SetQuorumHealth("deadly.")
z.SetDeadlyReason(childNode)
return
}
z.SetQuorumHealth("alive.")
return
}
}
@ -60,10 +74,13 @@ func (z ZooNode) CheckAndSetHealth(childrens []string) {
for _, child := range childrens {
if child == currentHealth[2] {
z.SetQuorumHealth("alive.")
z.SetDeadlyReason(Node{})
return
}
}
}
z.SetQuorumHealth("alive.")
z.SetDeadlyReason(Node{})
}
//UpdateQuorum set current cluster state
@ -90,7 +107,7 @@ func (z ZooNode) UpdateQuorum(childrens []string) {
}
// Update
log.Print("[DEBUG] Updating quorum")
// log.Print("[DEBUG] Updating quorum")
z.Conn.Set(quorumStatePath, q, zoStat.Version)
}
@ -122,7 +139,8 @@ func (z ZooNode) SetLeader(fqdn string) {
//FindLeader return f.q.d.n of current leader
func (z ZooNode) FindLeader(fqdn string) {
childrens, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, ""))
// childrens, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, ""))
childrens, _, err := z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
if err != nil {
log.Print("[ERROR] ", err)
}
@ -133,7 +151,7 @@ func (z ZooNode) FindLeader(fqdn string) {
var node Node
json.Unmarshal(myState, &node)
state = z.CompareChilds(childrens, node)
state, childrens = z.CompareChilds(node)
if state {
z.SetLeader(fqdn)
}
@ -143,40 +161,53 @@ func (z ZooNode) FindLeader(fqdn string) {
}
//CompareChilds return bool
// Compare childrens
func (z ZooNode) CompareChilds(childrens []string, node Node) (bool) {
// Needs rewrite
func (z ZooNode) CompareChilds(node Node) (bool, []string) {
childrens, _, err := z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
if err != nil {
log.Fatal("[zk ERROR] ", err)
}
if len(childrens) == 1 && childrens[0] == node.Node {
return true, childrens
}
currentLeader := z.GetLeader()
for _, child := range childrens {
var childNode Node
childStatePath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "")
childState, _, err := z.Conn.Get(childStatePath)
if err != nil {
log.Print("[ERROR] ", err)
}
json.Unmarshal(childState, &childNode)
if childNode.Updated < (time.Now().Unix() - 9) {
leader := z.GetLeader()
if leader == child {
z.SetLeader(node.Node)
if child != node.Node {
var childNode Node
childStatePath := strings.Join([]string{z.Path, "/cluster/", child, "/state"}, "")
childState, _, err := z.Conn.Get(childStatePath)
if err != nil {
log.Print("[zk ERROR] ", err)
}
log.Print("[DEBUG] ", childNode)
childrens, _ := z.DestroyNode(child)
z.UpdateQuorum(childrens)
continue
}
// Compare updated time
if node.Updated < childNode.Updated {
return false
}
json.Unmarshal(childState, &childNode)
// if (time.Now().Unix() - 5) < childNode.Updated &&
if childNode.Node == currentLeader {
return false
// log.Print("[DEBUG] child ", child, " updated ", childNode.Updated, " I'm updated ", node.Updated)
if childNode.Updated < (time.Now().Unix() - 9) {
log.Print("[DEBUG] child down ", child)
leader := z.GetLeader()
if leader == child {
z.SetLeader(node.Node)
}
childrens, _ := z.DestroyNode(child)
z.UpdateQuorum(childrens)
continue
}
// Compare updated time
if node.Updated < childNode.Updated {
childrens, _, err = z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
return false, childrens
}
if childNode.Node == currentLeader {
childrens, _, _ = z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
return false, childrens
}
}
}
return true
childrens, _, _ = z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
return true, childrens
}
//DestroyNode ...
@ -186,24 +217,22 @@ func (z ZooNode) DestroyNode(fqdn string) ([]string, string) {
childStatePath := strings.Join([]string{z.Path, "/cluster/", fqdn, "/state"}, "")
childPath := strings.Join([]string{z.Path, "/cluster/", fqdn}, "")
nodeStat, stateVersion, _ := z.Conn.Get(childStatePath)
nodeStat, _, _ := z.Conn.Get(childStatePath)
// Check node mounts
mountStat, message := CheckMounts(nodeStat)
if mountStat {
_, childVersion, _ := z.Conn.Get(childPath)
z.Conn.Delete(childStatePath, stateVersion.Version)
z.Conn.Delete(childPath, childVersion.Version)
z.RMR(childPath)
z.SetQuorumHealth(strings.Join([]string{"resizing. node ", fqdn}, ""))
}
childrens, _, _, err := z.Conn.ChildrenW(strings.Join([]string{z.Path, "/cluster"}, ""))
childrens, _, err := z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
if err != nil {
log.Print("[ERROR] ", err)
log.Print("[zk ERROR] ", err)
}
log.Print("[DEBUG] After destroy childs ", childrens)
return childrens, strings.Join(message, "\n")
return childrens, strings.Join(message, "")
}
@ -212,6 +241,9 @@ func (z ZooNode) DestroyNode(fqdn string) ([]string, string) {
func CheckMounts(nodeStat []byte) (bool, []string) {
var node Node
if string(nodeStat) == "" {
return true, []string{}
}
err := json.Unmarshal(nodeStat, &node)
if err != nil {
log.Print("[ERROR] ", err)
@ -221,10 +253,21 @@ func CheckMounts(nodeStat []byte) (bool, []string) {
if len(node.Mounts) > 0 {
message = append(message, "deadly. ", "Reason: ", " NODE: ", node.Node)
for _, mount := range node.Mounts {
message = append(message, " mountpoint: ", mount.Mountpoint, " block: ", mount.Block, " pool: ", mount.Pool)
message = append(message, ", mountpoint: ", mount.Mountpoint, ", block: ", mount.Block, ", pool: ", mount.Pool)
}
return false, message
}
return true, message
}
//Reconnect reconnect to Zk
func (z ZooNode) Reconnect() {
log.Print("[WARNING] Reconnect to Zk")
z.Conn.Close()
connection, err := z.Zoo.InitConnection()
if err != nil {
log.Panic(err)
}
z.Conn = connection
}

View File

@ -18,32 +18,46 @@ func Run(zoo Zk, s ServerConf) {
}
fqdn, err := os.Hostname()
z := ZooNode{zoo.Path, connection}
z := ZooNode{zoo.Path, connection, zoo}
// Create Zk nodes tree
z.EnsureZooPath("log/quorum")
z.EnsureZooPath("log/health")
z.EnsureZooPath("log/leader")
z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, ""))
z.CreateZkTree(fqdn)
go func () {
for {
z.RequestWatch(fqdn)
}
}()
// Serve HTTP API
go s.ServeHTTP(z, fqdn)
go s.ServeWebSockets(z)
for {
node, err := z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, ""))
node, err := z.EnsureZooPath(strings.Join([]string{"cluster", fqdn, "state"}, "/"))
if err != nil {
log.Panic("[ERROR] ", err)
}
z.UpdateState(node, fqdn)
go z.FindLeader(fqdn)
time.Sleep(time.Duration(zoo.Tick) * time.Second)
// z.Reconnect()
}
}
//CreateZkTree create Zk nodes tree
func (z ZooNode) CreateZkTree(fqdn string) {
z.EnsureZooPath("log/quorum")
z.EnsureZooPath("log/health")
z.EnsureZooPath("log/leader")
z.EnsureZooPath(strings.Join([]string{"cluster", fqdn, "state"}, "/"))
requestsPath := strings.Join([]string{"cluster", fqdn, "requests"}, "/")
answersPath := strings.Join([]string{"cluster", fqdn, "answers"}, "/")
z.EnsureZooPath(requestsPath)
z.EnsureZooPath(answersPath)
}
//UpdateState -- update node status
func (z ZooNode) UpdateState(zkPath string, fqdn string) {
z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, ""))
z.EnsureZooPath(strings.Join([]string{"cluster", fqdn, "state"}, "/"))
state := GetNodeState(fqdn)
stateJSON, err := json.Marshal(state)
@ -56,7 +70,7 @@ func (z ZooNode) UpdateState(zkPath string, fqdn string) {
log.Print("[ERROR] ", err)
}
log.Print("[DEBUG] ", "Updating state")
// log.Print("[DEBUG] ", "Updating state")
zoStat, err = z.Conn.Set(zkPath, stateJSON, zoStat.Version)
if err != nil {
log.Print("[ERROR] ", err)
@ -68,12 +82,14 @@ func (z ZooNode) UpdateState(zkPath string, fqdn string) {
type jsonState struct {
Quorum map[string]Node `json:"quorum"`
Health string `json:"health"`
DeadlyReason Node `json:"deadlyreason"`
Leader string `json:"leader"`
}
//GetState return cluster status
func (z ZooNode) GetState() []byte {
quorumStatePath := strings.Join([]string{z.Path, "/log/quorum"}, "")
deadlyReasonPath := strings.Join([]string{z.Path, "log/deadlyreason"}, "/")
stateJSON, _, err := z.Conn.Get(quorumStatePath)
if err != nil {
@ -89,7 +105,11 @@ func (z ZooNode) GetState() []byte {
node[n.Node] = n
}
retState := jsonState{node, state.Health, state.Leader}
var deadlyReason Node
deadlyJSON, _, err := z.Conn.Get(deadlyReasonPath)
err = json.Unmarshal(deadlyJSON, &deadlyReason)
retState := jsonState{node, state.Health, deadlyReason, state.Leader}
js, err := json.Marshal(retState)
if err != nil {

View File

@ -3,12 +3,15 @@ package rbmd
import (
"github.com/samuel/go-zookeeper/zk"
"strings"
"log"
// "encoding/json"
)
//ZooNode zookeeper node
type ZooNode struct {
Path string
Conn *zk.Conn
Zoo Zk
}
//EnsureZooPath create zookeeper path
@ -34,3 +37,22 @@ func (z ZooNode) EnsureZooPath(node string) (string, error) {
return fullnodepath, nil
}
//RMR remove Zk node recursive
func (z ZooNode) RMR(path string) {
c, _, err := z.Conn.Children(path)
if err != nil {
log.Print("[zk ERROR] ", err)
}
log.Print("[WARNING] Trying delete ", path)
if len(c) > 0 {
for _, child := range c {
childPath := strings.Join([]string{path, child}, "/")
z.RMR(childPath)
}
}
err = z.Conn.Delete(path, -1)
if err != nil {
log.Print("[zk ERROR] ", err)
}
log.Print("[WARNING] ", path, " deleted")
}