Websockets

This commit is contained in:
Denis Zheleztsov 2017-01-03 10:32:04 +03:00
parent 2b6a2af5fd
commit eb2ef376d3
3 changed files with 91 additions and 3 deletions

View File

@ -12,12 +12,14 @@ var (
zkPath string
tick int
listen string
ws string
)
func init() {
flag.StringVar(&zk, "zk", "127.0.0.1:2181", "Zookeeper servers comma separated")
flag.StringVar(&zkPath, "zkPath", "/rbmd", "Zookeeper path")
flag.StringVar(&listen, "listen", "0.0.0.0:9076", "HTTP API listen address")
flag.StringVar(&ws, "ws", "0.0.0.0:7690", "Websockets listen address")
flag.IntVar(&tick, "tick", 5, "Tick time loop")
flag.Parse()
}
@ -28,6 +30,6 @@ func main() {
zkPath,
tick,
}
s := rbmd.ServerConf{listen}
s := rbmd.ServerConf{listen, ws}
rbmd.Run(config, s)
}

View File

@ -4,11 +4,15 @@ import (
"net/http"
"log"
"encoding/json"
"time"
"github.com/gorilla/websocket"
)
//ServerConf configuration of http api server
type ServerConf struct {
Addr string
Ws string
}
//ServeHTTP start http server
@ -49,3 +53,59 @@ func (s ServerConf) ServeHTTP(z ZooNode, fqdn string) {
}
log.Fatal(server.ListenAndServe())
}
//Writer ws
type Writer struct {
Message []byte
Upgrader websocket.Upgrader
z ZooNode
}
//WriteStatusWs wrtite quorum status to websockets client
func (wr Writer) WriteStatusWs(w http.ResponseWriter, r *http.Request) {
c, err := wr.Upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("[Ws ERROR] Upgrade: ", err)
}
defer c.Close()
mt, _, err := c.ReadMessage()
if err != nil {
log.Print("[Ws ERROR] Read error: ", err)
// break
return
}
for {
err = c.WriteMessage(mt, wr.z.GetState())
if err != nil {
log.Print("[Ws ERROR] Write err: ", err)
break
}
time.Sleep(time.Duration(1) * time.Second)
}
}
//ServeWebSockets start websockets server
func (s ServerConf) ServeWebSockets(z ZooNode) {
var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
go http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Writer{z.GetState(), wsUpgrader, z}.WriteStatusWs(w, r)
})
server := &http.Server{
Addr: s.Ws,
}
log.Fatal(server.ListenAndServe())
}

View File

@ -24,16 +24,18 @@ func Run(zoo Zk, s ServerConf) {
z.EnsureZooPath("log/quorum")
z.EnsureZooPath("log/health")
z.EnsureZooPath("log/leader")
z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, ""))
// Serve HTTP API
go s.ServeHTTP(z, fqdn)
go s.ServeWebSockets(z)
for {
node, err := z.EnsureZooPath(strings.Join([]string{"cluster/", fqdn, "/state"}, ""))
if err != nil {
log.Panic("[ERROR] ", err)
}
go z.UpdateState(node, fqdn)
z.UpdateState(node, fqdn)
go z.FindLeader(fqdn)
time.Sleep(time.Duration(zoo.Tick) * time.Second)
}
@ -61,6 +63,14 @@ func (z ZooNode) UpdateState(zkPath string, fqdn string) {
}
}
//jsonState HTTP API quorum status
type jsonState struct {
Quorum map[string]Node `json:"quorum"`
Health string `json:"health"`
Leader string `json:"leader"`
}
//GetState return cluster status
func (z ZooNode) GetState() []byte {
quorumStatePath := strings.Join([]string{z.Path, "/log/quorum"}, "")
@ -70,5 +80,21 @@ func (z ZooNode) GetState() []byte {
log.Fatal(err)
}
return stateJSON
var state Quorum
json.Unmarshal(stateJSON, &state)
node := make(map[string]Node)
for _, n := range state.Quorum {
node[n.Node] = n
}
retState := jsonState{node, state.Health, state.Leader}
js, err := json.Marshal(retState)
if err != nil {
log.Fatal(err)
}
return js
}