From eb2ef376d3b16bbe9dfd06ee1296e99b8731366f Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Tue, 3 Jan 2017 10:32:04 +0300 Subject: [PATCH] Websockets --- main.go | 4 +++- src/rbmd/http.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++ src/rbmd/state.go | 30 ++++++++++++++++++++++-- 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 226b93e..5c9406b 100644 --- a/main.go +++ b/main.go @@ -12,12 +12,14 @@ var ( zkPath string tick int listen string + ws string ) func init() { flag.StringVar(&zk, "zk", "127.0.0.1:2181", "Zookeeper servers comma separated") flag.StringVar(&zkPath, "zkPath", "/rbmd", "Zookeeper path") flag.StringVar(&listen, "listen", "0.0.0.0:9076", "HTTP API listen address") + flag.StringVar(&ws, "ws", "0.0.0.0:7690", "Websockets listen address") flag.IntVar(&tick, "tick", 5, "Tick time loop") flag.Parse() } @@ -28,6 +30,6 @@ func main() { zkPath, tick, } - s := rbmd.ServerConf{listen} + s := rbmd.ServerConf{listen, ws} rbmd.Run(config, s) } diff --git a/src/rbmd/http.go b/src/rbmd/http.go index cda9b12..08f6f6c 100644 --- a/src/rbmd/http.go +++ b/src/rbmd/http.go @@ -4,11 +4,15 @@ import ( "net/http" "log" "encoding/json" + "time" + + "github.com/gorilla/websocket" ) //ServerConf configuration of http api server type ServerConf struct { Addr string + Ws string } //ServeHTTP start http server @@ -49,3 +53,59 @@ func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) { } log.Fatal(server.ListenAndServe()) } + + +//Writer ws +type Writer struct { + Message []byte + Upgrader websocket.Upgrader + z ZooNode +} + +//WriteStatusWs wrtite quorum status to websockets client +func (wr Writer) WriteStatusWs(w http.ResponseWriter, r *http.Request) { + + c, err := wr.Upgrader.Upgrade(w, r, nil) + if err != nil { + log.Print("[Ws ERROR] Upgrade: ", err) + } + + defer c.Close() + + mt, _, err := c.ReadMessage() + if err != nil { + log.Print("[Ws ERROR] Read error: ", err) + // break + return + } + + for { + err = c.WriteMessage(mt, wr.z.GetState()) + if err != nil { + log.Print("[Ws ERROR] Write err: ", err) + break + } + time.Sleep(time.Duration(1) * time.Second) + } +} + +//ServeWebSockets start websockets server +func (s ServerConf) ServeWebSockets(z ZooNode) { + + var wsUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + go http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + Writer{z.GetState(), wsUpgrader, z}.WriteStatusWs(w, r) + }) + + server := &http.Server{ + Addr: s.Ws, + } + log.Fatal(server.ListenAndServe()) +} diff --git a/src/rbmd/state.go b/src/rbmd/state.go index 643fef7..b28e3a5 100644 --- a/src/rbmd/state.go +++ b/src/rbmd/state.go @@ -24,16 +24,18 @@ func Run(zoo Zk, s ServerConf) { z.EnsureZooPath("log/quorum") z.EnsureZooPath("log/health") z.EnsureZooPath("log/leader") + z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, "")) // Serve HTTP API go s.ServeHTTP(z, fqdn) + go s.ServeWebSockets(z) for { node, err := z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, "")) if err != nil { log.Panic("[ERROR] ", err) } - go z.UpdateState(node, fqdn) + z.UpdateState(node, fqdn) go z.FindLeader(fqdn) time.Sleep(time.Duration(zoo.Tick) * time.Second) } @@ -61,6 +63,14 @@ func (z ZooNode) UpdateState(zkPath string, fqdn string) { } } + +//jsonState HTTP API quorum status +type jsonState struct { + Quorum map[string]Node `json:"quorum"` + Health string `json:"health"` + Leader string `json:"leader"` +} + //GetState return cluster status func (z ZooNode) GetState() []byte { quorumStatePath := strings.Join([]string{z.Path, "/log/quorum"}, "") @@ -70,5 +80,21 @@ func (z ZooNode) GetState() []byte { log.Fatal(err) } - return stateJSON + var state Quorum + json.Unmarshal(stateJSON, &state) + + node := make(map[string]Node) + + for _, n := range state.Quorum { + node[n.Node] = n + } + + retState := jsonState{node, state.Health, state.Leader} + + js, err := json.Marshal(retState) + if err != nil { + log.Fatal(err) + } + + return js }