This commit is contained in:
Denis Zheleztsov 2017-01-12 19:12:46 +03:00
parent 1a57dca7b0
commit 785abf6b7e
10 changed files with 81 additions and 90 deletions

10
main.go
View File

@ -8,12 +8,12 @@ import (
)
var (
zk string
zk string
zkPath string
tick int
tick int
listen string
ws string
v bool
ws string
v bool
)
func init() {
@ -31,7 +31,7 @@ func main() {
if v {
rbmd.VersionShow()
}
config := rbmd.Zk{
strings.Split(zk, ","),
zkPath,

View File

@ -6,4 +6,3 @@ type Zk struct {
Path string
Tick int
}

View File

@ -16,4 +16,3 @@ func (conf Zk) InitConnection() (*zk.Conn, error) {
return conn, err
}

View File

@ -1,23 +1,23 @@
package rbmd
import (
"net/http"
"log"
"encoding/json"
"log"
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
)
//ServerConf configuration of http api server
type ServerConf struct {
Addr string
Ws string
Ws string
}
//MountHandler /mount location
func (wr wrr) MountHandler (w http.ResponseWriter, r *http.Request) {
func (wr wrr) MountHandler(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var m RBDDevice
err := decoder.Decode(&m)
@ -49,7 +49,7 @@ func (wr wrr) MountHandler (w http.ResponseWriter, r *http.Request) {
}
//UmountHandler /umount location
func (wr wrr) UmountHandler (w http.ResponseWriter, r *http.Request) {
func (wr wrr) UmountHandler(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var m RBDDevice
err := decoder.Decode(&m)
@ -80,7 +80,6 @@ func (wr wrr) UmountHandler (w http.ResponseWriter, r *http.Request) {
w.Write(state)
}
//Resolve resolve request
type Resolve struct {
Node string `json:"node"`
@ -108,7 +107,7 @@ func (wr wrr) ResoleHandler(w http.ResponseWriter, r *http.Request) {
//wrr API
type wrr struct {
Fqdn string
z ZooNode
z ZooNode
}
//ServeHTTP start http server
@ -138,7 +137,7 @@ func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) {
fqdn,
z,
}
// Mount volume. Accept JSON. Return JSON.
http.HandleFunc("/mount", wr.MountHandler)
@ -157,7 +156,7 @@ func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) {
//Writer ws
type Writer struct {
Upgrader websocket.Upgrader
z ZooNode
z ZooNode
}
//WriteStatusWs wrtite quorum status to websockets client
@ -174,7 +173,7 @@ func (wr Writer) WriteStatusWs(w http.ResponseWriter, r *http.Request) {
// break
return
}
go func() {
for {
err = c.WriteMessage(mt, wr.z.GetState())

View File

@ -1,9 +1,9 @@
package rbmd
import (
"strings"
"log"
"encoding/json"
"log"
"strings"
"time"
)
@ -65,7 +65,7 @@ func (z ZooNode) CheckAndSetHealth(childrens []string) {
z.SetQuorumHealth("deadly.")
z.SetDeadlyReason(childNode)
return
}
}
}
}
@ -144,7 +144,7 @@ func (z ZooNode) FindLeader(fqdn string) {
if err != nil {
log.Print("[ERROR] ", err)
}
var state bool
myState, _, _ := z.Conn.Get(strings.Join([]string{z.Path, "/cluster/", fqdn, "/state"}, ""))
@ -193,7 +193,7 @@ func (z ZooNode) CompareChilds(node Node) (bool, []string) {
z.UpdateQuorum(childrens)
continue
}
// Compare updated time
if node.Updated < childNode.Updated {
childrens, _, err = z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
@ -225,7 +225,7 @@ func (z ZooNode) DestroyNode(fqdn string) ([]string, string) {
z.RMR(childPath)
z.SetQuorumHealth(strings.Join([]string{"resizing. node ", fqdn}, ""))
}
childrens, _, err := z.Conn.Children(strings.Join([]string{z.Path, "/cluster"}, ""))
if err != nil {
log.Print("[zk ERROR] ", err)
@ -235,7 +235,6 @@ func (z ZooNode) DestroyNode(fqdn string) ([]string, string) {
return childrens, strings.Join(message, "")
}
//CheckMounts ...
// Check mounts on down node
func CheckMounts(nodeStat []byte) (bool, []string) {
@ -252,7 +251,7 @@ func CheckMounts(nodeStat []byte) (bool, []string) {
var message []string
if len(node.Mounts) > 0 {
message = append(message, "deadly. ", "Reason: ", " NODE: ", node.Node)
for _, mount := range node.Mounts {
for _, mount := range node.Mounts {
message = append(message, ", mountpoint: ", mount.Mountpoint, ", block: ", mount.Block, ", pool: ", mount.Pool)
}
return false, message

View File

@ -1,9 +1,9 @@
package rbmd
import (
"encoding/json"
"log"
"strings"
"encoding/json"
// "bytes"
"github.com/samuel/go-zookeeper/zk"
@ -55,7 +55,7 @@ func (z ZooNode) RequestWatch(fqdn string) {
break
}
z.Answer(fqdn, child, std, "OK")
// 1) Unmount FS 2) Unmap RBD
// 1) Unmount FS 2) Unmap RBD
} else if child == "umount" {
err := r.UnmountFS()
if err != nil {
@ -95,7 +95,7 @@ func (z ZooNode) Resolve(fqdn string) error {
if err != nil {
return err
}
var res Resolve
if err := json.Unmarshal(r, &res); err != nil {
return err
@ -126,7 +126,7 @@ func (z ZooNode) ResolveRequest(r Resolve) error {
return err
}
}
return nil
}
@ -201,7 +201,7 @@ func (z ZooNode) WatchAnswer(fqdn string, t string) MountState {
var ms MountState
var p string
for {
ans := <-ch
log.Print("[DEBUG] ch answer path ", ans.Path)

View File

@ -21,7 +21,7 @@ func Run(zoo Zk, s ServerConf) {
z := ZooNode{zoo.Path, connection, zoo}
z.CreateZkTree(fqdn)
go func () {
go func() {
for {
z.RequestWatch(fqdn)
}
@ -77,13 +77,12 @@ func (z ZooNode) UpdateState(zkPath string, fqdn string) {
}
}
//jsonState HTTP API quorum status
type jsonState struct {
Quorum map[string]Node `json:"quorum"`
Health string `json:"health"`
DeadlyReason Node `json:"deadlyreason"`
Leader string `json:"leader"`
Quorum map[string]Node `json:"quorum"`
Health string `json:"health"`
DeadlyReason Node `json:"deadlyreason"`
Leader string `json:"leader"`
}
//GetState return cluster status

View File

@ -1,16 +1,15 @@
package rbmd
import (
"syscall"
"io/ioutil"
"net"
"log"
"strings"
"regexp"
"time"
"os/exec"
"bytes"
"io/ioutil"
"log"
"net"
"os/exec"
"regexp"
"strings"
"syscall"
"time"
// "fmt"
)
@ -18,25 +17,25 @@ import (
type ClusterStatus struct {
Quorum []Node `json:"quorum"`
Health string `json:"health"`
Zk string `json:"zk"`
Zk string `json:"zk"`
}
//Node Node status struct
type Node struct {
Node string `json:"node"`
IP IPs `json:"ip"`
Node string `json:"node"`
IP IPs `json:"ip"`
Updated int64 `json:"updated"`
Mounts []Mount `json:"mounts"`
Mounts []Mount `json:"mounts"`
}
// Mount struct
type Mount struct {
Mountpoint string `json:"mountpoint"`
Mountopts string `json:"mountopts"`
Fstype string `json:"fstype"`
Pool string `json:"pool"`
Image string `json:"image"`
Block string `json:"block"`
Mountopts string `json:"mountopts"`
Fstype string `json:"fstype"`
Pool string `json:"pool"`
Image string `json:"image"`
Block string `json:"block"`
}
//IPs IP addresses
@ -61,14 +60,14 @@ func GetMounts() []Mount {
}
if match {
p := strings.Split(mount[0], "/")
pool, image := GetRBDPool(p[len(p) - 1])
pool, image := GetRBDPool(p[len(p)-1])
mounts = append(mounts, Mount{
mount[1],
mount[3],
mount[2],
pool,
image,
p[len(p) - 1],
p[len(p)-1],
})
}
}
@ -96,7 +95,7 @@ func GetRBDPool(device string) (string, string) {
return string(pool), string(image)
}
//GetMyIPs Exclude 127.0.0.1
//GetMyIPs Exclude 127.0.0.1
func GetMyIPs() IPs {
ifaces, err := net.Interfaces()
if err != nil {
@ -114,9 +113,9 @@ func GetMyIPs() IPs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
ip = v.IP
case *net.IPAddr:
ip = v.IP
ip = v.IP
}
if ip.String() != "127.0.0.1" && ip.String() != "::1" {
match, err := regexp.MatchString("^.*:.*$", ip.String())
@ -138,7 +137,6 @@ func GetMyIPs() IPs {
}
}
//GetNodeState Return Node struct
func GetNodeState(fqdn string) Node {
var n Node
@ -151,22 +149,21 @@ func GetNodeState(fqdn string) Node {
return n
}
//MountState status of mount/umount
type MountState struct {
State string `json:"state"`
State string `json:"state"`
Message string `json:"message"`
}
//RBDDevice rbd block device struct
type RBDDevice struct {
Node string `json:"node"`
Pool string `json:"pool"`
Image string `json:"image"`
Block string `json:"block"`
Node string `json:"node"`
Pool string `json:"pool"`
Image string `json:"image"`
Block string `json:"block"`
Mountpoint string `json:"mountpoint"`
Mountopts string `json:"mountopts"`
Fstype string `json:"fstype"`
Mountopts string `json:"mountopts"`
Fstype string `json:"fstype"`
}
//MapDevice map rbd block device
@ -176,7 +173,7 @@ func (r RBDDevice) MapDevice() ([]byte, error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("rbd", "map", image)
cmd.Stdout = &stdout
@ -184,15 +181,15 @@ func (r RBDDevice) MapDevice() ([]byte, error) {
err := cmd.Run()
if err != nil {
return []byte(stderr.String()) , err
return []byte(stderr.String()), err
}
o := stdout.String()
if strings.HasSuffix(o, "\n") {
o = o[ :len(o) - 1]
}
o = o[:len(o)-1]
}
return []byte(o), nil
}
@ -202,7 +199,7 @@ func (r RBDDevice) UnmapDevice() ([]byte, error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("rbd", "unmap", strings.Join([]string{"/dev/", r.Block}, ""))
cmd.Stdout = &stdout
@ -210,22 +207,21 @@ func (r RBDDevice) UnmapDevice() ([]byte, error) {
err := cmd.Run()
if err != nil {
return []byte(stderr.String()) , err
return []byte(stderr.String()), err
}
o := stdout.String()
if strings.HasSuffix(o, "\n") {
o = o[ :len(o) - 1]
}
o = o[:len(o)-1]
}
return []byte(o), nil
}
//MountFS mount file system
func (r RBDDevice) MountFS(device string) error {
err := syscall.Mount(device, r.Mountpoint, r.Fstype, ParseMountOpts(r.Mountopts), "")
log.Print("[DEBUG] RBDDevice: ", r)
if err != nil {
log.Print("[DEBUG] sys 207 ", err)
return err
@ -239,8 +235,8 @@ func ParseMountOpts(mountopts string) uintptr {
// Mount options map
opts := make(map[string]uintptr)
opts["ro"] = syscall.MS_RDONLY
opts["noatime"] = syscall.MS_NOATIME
opts["relatime"] = syscall.MS_RELATIME
opts["noatime"] = syscall.MS_NOATIME
opts["nosuid"] = syscall.MS_NOSUID
opts["noexec"] = syscall.MS_NOEXEC
opts["nodiratime"] = syscall.MS_NODIRATIME
@ -248,12 +244,12 @@ func ParseMountOpts(mountopts string) uintptr {
var msOpts uintptr
if mountopts != "" {
for _, o := range strings.Split(mountopts, ",") {
msOpts = uintptr(msOpts|opts[o])
msOpts = uintptr(msOpts | opts[o])
}
return msOpts
}
return 0
return 0
}
//UnmountFS unmount file system

View File

@ -1,9 +1,9 @@
package rbmd
import (
"fmt"
"os"
"runtime"
"fmt"
)
//VersionShow show version and exit

View File

@ -2,8 +2,8 @@ package rbmd
import (
"github.com/samuel/go-zookeeper/zk"
"strings"
"log"
"strings"
// "encoding/json"
)
@ -11,10 +11,10 @@ import (
type ZooNode struct {
Path string
Conn *zk.Conn
Zoo Zk
Zoo Zk
}
//EnsureZooPath create zookeeper path
//EnsureZooPath create zookeeper path
func (z ZooNode) EnsureZooPath(node string) (string, error) {
flag := int32(0)
acl := zk.WorldACL(zk.PermAll)
@ -24,16 +24,16 @@ func (z ZooNode) EnsureZooPath(node string) (string, error) {
var p []string
var fullnodepath string
for i := 1; i < len(s); i++ {
p = append(p, strings.Join([]string{"/", s[i]}, ""))
}
for i := 0; i < len(p); i++ {
fullnodepath = strings.Join([]string{fullnodepath, p[i]}, "")
z.Conn.Create(fullnodepath, []byte(""), flag, acl)
}
return fullnodepath, nil
}