* Fix deadly. resolve

* Most logs moved to Sirupsen/logrus package

* Rewrite EnsureZooPath function
This commit is contained in:
Denis Zheleztsov 2017-09-01 18:04:06 +03:00
parent a2a84945d0
commit 5a29b42802
3 changed files with 53 additions and 36 deletions

View File

@ -2,11 +2,11 @@ package rbmd
import (
"encoding/json"
"log"
"net/http"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
)
@ -80,18 +80,14 @@ func (wr wrr) UmountHandler(w http.ResponseWriter, r *http.Request) {
w.Write(state)
}
//Resolve resolve request
type Resolve struct {
Node string `json:"node"`
}
//ResolveHandler resolve `deadly.` state. /resolve location
func (wr wrr) ResoleHandler(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
func (wr wrr) ResolveHandler(w http.ResponseWriter, r *http.Request) {
var res Resolve
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&res)
var msE []byte
if err != nil {
var msE []byte
msE, _ = json.Marshal(MountState{"FAIL", "JSON parse failure"})
w.WriteHeader(500)
w.Write(msE)
@ -99,7 +95,9 @@ func (wr wrr) ResoleHandler(w http.ResponseWriter, r *http.Request) {
}
if err := wr.z.ResolveRequest(res); err != nil {
log.Error(err.Error())
w.WriteHeader(500)
return
}
w.WriteHeader(200)
@ -158,7 +156,7 @@ func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) {
http.HandleFunc("/v1/umount", wr.UmountHandler)
// Umount volume. Accept JSON. Return JSON.
http.HandleFunc("/v1/resolve", wr.ResoleHandler)
http.HandleFunc("/v1/resolve", wr.ResolveHandler)
server := &http.Server{
Addr: s.Addr,
@ -178,26 +176,29 @@ func (wr Writer) WriteStatusWs(w http.ResponseWriter, r *http.Request) {
c, err := wr.Upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("[Ws ERROR] Upgrade: ", err)
defer c.Close()
return
}
mt, _, err := c.ReadMessage()
if err != nil {
log.Print("[Ws ERROR] Read error: ", err)
// break
defer c.Close()
return
}
go func() {
for {
err = c.WriteMessage(mt, wr.z.GetState())
if err != nil {
log.Print("[Ws ERROR] Write err: ", err)
defer c.Close()
break
}
time.Sleep(time.Duration(1) * time.Second)
// go func(c *websocket.Conn, mt int) {
for {
err = c.WriteMessage(mt, wr.z.GetState())
if err != nil {
log.Print("[Ws ERROR] Write err: ", err)
defer c.Close()
return
}
}()
time.Sleep(time.Duration(1) * time.Second)
}
// }(c, mt)
}
//ServeWebSockets start websockets server

View File

@ -14,30 +14,31 @@ func (z ZooNode) RequestWatch(fqdn string) {
requestsPath := strings.Join([]string{z.Path, "cluster", fqdn, "requests"}, "/")
_, _, ch, err := z.Conn.ChildrenW(requestsPath)
if err != nil {
log.Print("[zk ERROR] ", err)
log.Error("[zk ERROR] ", err.Error())
}
for {
req := <-ch
log.Print("[DEBUG] ch path ", req.Path)
log.Info("ch path ", req.Path)
childrens, _, err := z.Conn.Children(requestsPath)
if err != nil {
log.Error(err.Error())
break
}
for _, child := range childrens {
p := strings.Join([]string{req.Path, child}, "/")
request, _, err := z.Conn.Get(p)
if err != nil {
log.Print("[zk ERROR] ", err)
log.Error("[zk] ", err.Error())
}
var r RBDDevice
err = json.Unmarshal(request, &r)
if err != nil {
log.Error("[ERROR] ", err.Error())
log.Error("", err.Error())
}
if z.GetQuorumHealth() != "alive." {
if z.GetQuorumHealth() != "alive." && child != "resolve" {
z.RMR(p)
z.Answer(fqdn, child, []byte(""), "FAIL: cluster not alive")
break
@ -92,12 +93,13 @@ func (z ZooNode) RequestWatch(fqdn string) {
}
z.Answer(fqdn, child, std, "OK")
} else if child == "resolve" && z.GetLeader() == fqdn {
log.Warn("Got resolve request ", r.Node)
if err := z.Resolve(fqdn); err != nil {
log.Print("[ERROR] ", err)
log.Error(err.Error())
z.RMR(p)
}
} else {
log.Print("[ERROR] Unknown request: ", child)
log.Error("Unknown request: ", child)
z.RMR(p)
}
z.RMR(p)
@ -106,21 +108,29 @@ func (z ZooNode) RequestWatch(fqdn string) {
}
}
//Resolve resolve request
type Resolve struct {
Node string `json:"node"`
}
//Resolve delete node from quorum
func (z ZooNode) Resolve(fqdn string) error {
resolvePath := strings.Join([]string{z.Path, "cluster", fqdn, "requests", "resolve"}, "/")
r, _, err := z.Conn.Get(resolvePath)
if err != nil {
log.Error(err.Error())
return err
}
var res Resolve
if err := json.Unmarshal(r, &res); err != nil {
log.Error(err.Error())
return err
}
deadlyNodePath := strings.Join([]string{z.Path, "cluster", res.Node}, "/")
log.Warn("Trying resolve. Remove ", res.Node, " from quorum")
z.RMR(resolvePath)
z.RMR(deadlyNodePath)
@ -134,14 +144,16 @@ func (z ZooNode) ResolveRequest(r Resolve) error {
jsReq, err := json.Marshal(r)
if err != nil {
log.Error(err.Error())
return err
}
_, err = z.Conn.Create(resolvePath, jsReq, 0, zk.WorldACL(zk.PermAll))
z.EnsureZooPath(resolvePath)
_, err = z.Conn.Set(resolvePath, jsReq, -1)
if err != nil {
_, err := z.Conn.Set(resolvePath, jsReq, -1)
_, err := z.Conn.Create(resolvePath, jsReq, 0, zk.WorldACL(zk.PermAll))
if err != nil {
log.Print("[zk ERROR] ", err)
log.Error("Cant create resolve request node ", err.Error())
return err
}
}

View File

@ -1,9 +1,10 @@
package rbmd
import (
"github.com/samuel/go-zookeeper/zk"
"log"
"strings"
"github.com/samuel/go-zookeeper/zk"
// "encoding/json"
)
@ -15,13 +16,13 @@ type ZooNode struct {
}
//EnsureZooPath create zookeeper path
func (z ZooNode) EnsureZooPath(node string) (string, error) {
func (z ZooNode) EnsureZooPath(path string) (string, error) {
path = strings.Join([]string{z.Path, path}, "/")
flag := int32(0)
acl := zk.WorldACL(zk.PermAll)
zoopath := strings.Join([]string{z.Path, "/", node}, "")
s := strings.Split(zoopath, "/")
s := strings.Split(path, "/")
var p []string
var fullnodepath string
@ -31,7 +32,10 @@ func (z ZooNode) EnsureZooPath(node string) (string, error) {
for i := 0; i < len(p); i++ {
fullnodepath = strings.Join([]string{fullnodepath, p[i]}, "")
z.Conn.Create(fullnodepath, []byte(""), flag, acl)
exists, _, _ := z.Conn.Exists(fullnodepath)
if !exists {
z.Conn.Create(fullnodepath, []byte(""), flag, acl)
}
}
return fullnodepath, nil