From 18c239fa52e292362844cd0f43d92d7637a0d0a9 Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Mon, 5 Nov 2018 18:24:46 +0300 Subject: [PATCH] Point messages buggy working --- main.go | 20 ++++ node/api.go | 8 +- node/auth.go | 290 +++++++++++++++++++++++++++++++++++++++++++++++- node/elastic.go | 47 +++++++- node/helpers.go | 30 +++++ node/point.go | 80 ++++++++++++- node/structs.go | 25 ++++- 7 files changed, 486 insertions(+), 14 deletions(-) create mode 100644 node/helpers.go diff --git a/main.go b/main.go index a46d4ac..2613cb8 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,10 @@ package main import ( "flag" + "os" + "gitea.difrex.ru/Umbrella/lessmore/node" + log "github.com/Sirupsen/logrus" ) var ( @@ -10,6 +13,8 @@ var ( es string esMessagesIndex string esMessagesType string + add string + email string ) // init ... @@ -18,6 +23,8 @@ func init() { flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host") flag.StringVar(&esMessagesIndex, "esindex", "idec3", "ES index") flag.StringVar(&esMessagesType, "estype", "post", "ES index type") + flag.StringVar(&add, "add", "", "User to add") + flag.StringVar(&email, "email", "", "User email address") flag.Parse() } @@ -27,5 +34,18 @@ func main() { esconf.Host = es esconf.Index = esMessagesIndex esconf.Type = esMessagesType + if add != "" { + addUser(add, esconf) + } node.Serve(listen, esconf) } + +func addUser(name string, esconf node.ESConf) { + user, err := esconf.AddNewUser(add, email) + if err != nil { + log.Fatal(err) + os.Exit(2) + } + log.Infof("Created: %+v", user) + os.Exit(0) +} diff --git a/node/api.go b/node/api.go index b8a8dbf..e3024db 100644 --- a/node/api.go +++ b/node/api.go @@ -151,19 +151,23 @@ func (es ESConf) UPointHandler(w http.ResponseWriter, r *http.Request) { } // Authorization check - if !es.checkAuth(req) { + user, ok := es.checkAuth(req) + if !ok { w.WriteHeader(403) w.Write([]byte("Permission denied")) return } // Proccess point message - err = es.PointMessage(req) + err = es.PointMessage(req, user) if err != nil { log.Error(err.Error()) w.WriteHeader(500) return } + + w.WriteHeader(200) + w.Write([]byte("ok: added")) } // Serve ... diff --git a/node/auth.go b/node/auth.go index f0cc0ef..ee04fab 100644 --- a/node/auth.go +++ b/node/auth.go @@ -1,7 +1,289 @@ package node -// checkAuth token in point request -// TODO: implement logic -func (es ESConf) checkAuth(req PointRequest) bool { - return true +import ( + "fmt" + + "net/http" + + "strings" + + "io/ioutil" + + "encoding/json" + + "errors" + + "bytes" + + "time" + + log "github.com/Sirupsen/logrus" +) + +const ( + USERS_INDEX = ".lessmore_points" + USERS_DOC_TYPE = "points" + NODE_ADDRESS = "dynamic" + AUTH_STRING_LENGTH = 16 + LETTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + SALT_BYTES = 32 + HASH_BYTES = 64 +) + +// User document structure +type User struct { + // Will be added to the address: + // i.e. dynamic,1 + UserID int64 `json:"user_id"` + // Will be added to the bundled message + Name string `json:"name"` + // Email address needs for password restore + Email string `json:"email"` + AuthString string `json:"auth_string"` + Address string `json:"address"` + // Created time + Created int64 `json:"created"` +} + +// checkAuth token in point request +// do a search by the auth_string field +func (es ESConf) checkAuth(r PointRequest) (User, bool) { + reqURL := fmt.Sprintf("%s/%s/_search", es.Host, USERS_INDEX) + query := `{"query": {"term": { "auth_string": "%s" }}}` + query = fmt.Sprintf(query, r.Pauth) + + req, err := http.NewRequest("POST", reqURL, strings.NewReader(query)) + if err != nil { + log.Error(err) + return User{}, false + } + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Error(err) + return User{}, false + } + defer resp.Body.Close() + + var esr MaxIdAggregation + err = json.NewDecoder(resp.Body).Decode(&esr) + if err != nil { + log.Error(err) + return User{}, false + } + + if len(esr.Hits.Hits) == 1 && esr.Hits.Hits[0].Source.AuthString == r.Pauth { + return esr.Hits.Hits[0].Source, true + } + return User{}, false +} + +// DeleteUser from users index +func DeleteUser(name string) error { + return nil +} + +// AddNewUser to the .lessmore_users index +func (es ESConf) AddNewUser(name, email string) (User, error) { + var user User + if err := es.checkUser(name); err != nil { + return user, err + } + + max, err := es.getMaxUser() + if err != nil { + log.Fatal(err) + } + + user.Name = name + user.UserID = max + 1 + user.Address = NODE_ADDRESS + user.AuthString = string(genAuthString()) + user.Created = time.Now().Unix() + + err = es.IndexUser(user) + if err != nil { + return user, err + } + + return user, nil +} + +// IndexUser in `USERS_INDEX` index +func (es ESConf) IndexUser(user User) error { + reqURL := fmt.Sprintf("%s/%s/%s/%d", es.Host, USERS_INDEX, USERS_DOC_TYPE, user.UserID) + js, err := json.Marshal(user) + if err != nil { + return err + } + + client := &http.Client{} + req, err := http.NewRequest("PUT", reqURL, bytes.NewReader(js)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + return nil +} + +func (es ESConf) checkUser(name string) error { + reqURL := es.Host + "/" + USERS_INDEX + "/_search" + reqName := `{"query": {"term": { "name": "%s" }}}` + reqName = fmt.Sprintf(reqName, name) + req, err := http.NewRequest("POST", reqURL, strings.NewReader(reqName)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + var esr MaxIdAggregation + err = json.NewDecoder(resp.Body).Decode(&esr) + if err != nil { + return err + } + if len(esr.Hits.Hits) > 0 { + return errors.New(fmt.Sprintf("User %s alredy exists", name)) + } + + return nil +} + +func (es ESConf) getMaxUser() (int64, error) { + ok, err := es.checkIndex() + if err != nil { + return -1, err + } + if !ok { + if err := es.createIndex(); err != nil { + return -1, err + } + return 0, nil + } + + usersSearchURL := es.Host + "/" + USERS_INDEX + "/_search" + usersSearchReq := ` +{ + "aggs": { + "max_id": { "max": { "field": "user_id" } } + }, + "size": 0 +} +` + client := http.Client{} + req, err := http.NewRequest("POST", usersSearchURL, strings.NewReader(usersSearchReq)) + if err != nil { + return -1, err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + log.Error(err) + return -1, err + } + + defer resp.Body.Close() + + content, _ := ioutil.ReadAll(resp.Body) + var esr MaxIdAggregation + err = json.NewDecoder(strings.NewReader(string(content))).Decode(&esr) + if err != nil { + log.Error("Cant parse JSON") + return -1, err + } + + return int64(esr.MaxID["max_id"].Value), nil +} + +func (es ESConf) checkIndex() (bool, error) { + indexListURL := es.Host + "/_cat/indices" + // Initialize http client + client := http.Client{} + indicesReq, err := http.NewRequest("GET", indexListURL, strings.NewReader("")) + if err != nil { + log.Error(err) + return false, err + } + indicesResp, err := client.Do(indicesReq) + if err != nil { + log.Error(err) + return false, err + } + + defer indicesResp.Body.Close() + + list, err := ioutil.ReadAll(indicesResp.Body) + if err != nil { + return false, err + } + + if strings.Contains(string(list), USERS_INDEX) { + return true, nil + } + + return false, nil +} + +func (es ESConf) createIndex() error { + mapping := ` +{ + "mappings": { + "%s": { + "properties": { + "user_id": { "type": "integer" }, + "name": { "type": "keyword" }, + "email": { "type": "keyword" }, + "address": { "type": "keyword" }, + "auth_string": { "type": "keyword" }, + "created": { + "type": "date", + "format": "strict_date_optional_time||epoch_second" + } + } + } + } +} +` + mapping = fmt.Sprintf(mapping, USERS_DOC_TYPE) + + reqURL := fmt.Sprintf("%s/%s", es.Host, USERS_INDEX) + req, err := http.NewRequest("PUT", reqURL, strings.NewReader(mapping)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + log.Warn("Creating new users index") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + content, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + log.Warn("Created new index mapping") + fmt.Println(string(content)) + + return nil } diff --git a/node/elastic.go b/node/elastic.go index 6a23460..b24b845 100644 --- a/node/elastic.go +++ b/node/elastic.go @@ -53,10 +53,18 @@ func (es ESConf) GetPlainTextMessage(msgid string) []byte { `{"query": {"match": {"_id": "`, msgid, `"}}}`}, "")) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) + if err != nil { + log.Error(err.Error()) + return []byte("") + } + + req.Header.Add("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) + return []byte("") } defer resp.Body.Close() @@ -87,9 +95,15 @@ func (es ESConf) GetEchoMessageHashes(echo string) []string { searchQ := []byte(strings.Join([]string{ `{"sort": [ {"date":{ "order": "desc" }},{ "_score":{ "order": "desc" }}], - "query": {"query_string" : {"fields": ["msgid", "echo"], "query":"`, echo, `"}}, "size": 500}`}, "")) + "query": {"query_string" : {"fields": ["echo"], "query":"`, echo, `"}}, "size": 500}`}, "")) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) + if err != nil { + log.Error(err.Error()) + return hashes + } + req.Header.Add("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { @@ -99,6 +113,9 @@ func (es ESConf) GetEchoMessageHashes(echo string) []string { defer resp.Body.Close() + content, _ := ioutil.ReadAll(resp.Body) + log.Info(string(content)) + var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { @@ -142,6 +159,12 @@ func (es ESConf) GetLimitedEchoMessageHashes(echo string, offset int, limit int) "query": {"query_string" : {"fields": ["msgid", "echo"], "query":"`, echo, `"}}, "size":`, l, `}`}, "")) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) + if err != nil { + log.Error(err.Error()) + return hashes + } + req.Header.Add("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { @@ -188,6 +211,12 @@ func (es ESConf) GetUMMessages(msgs string) []string { ] }`) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query)) + if err != nil { + log.Error(err.Error()) + return encodedMessages + } + req.Header.Add("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { @@ -315,6 +344,12 @@ func (es ESConf) GetXC(echoes string) []string { } `) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query)) + if err != nil { + log.Error(err.Error()) + return counts + } + req.Header.Add("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { @@ -342,7 +377,7 @@ func (es ESConf) GetXC(echoes string) []string { func (es ESConf) GetListTXT() []byte { var searchURI string if es.Index != "" && es.Type != "" { - searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") + searchURI = strings.Join([]string{es.Host, es.Index, "_search"}, "/") } else { searchURI = strings.Join([]string{es.Host, "search"}, "/") } @@ -365,10 +400,17 @@ func (es ESConf) GetListTXT() []byte { log.Print("Search URI: ", searchURI) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) + if err != nil { + log.Error(err.Error()) + return []byte("") + } + req.Header.Add("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) + return []byte("") } defer resp.Body.Close() @@ -377,6 +419,7 @@ func (es ESConf) GetListTXT() []byte { err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) + return []byte("") } log.Infof("%+v", esr) diff --git a/node/helpers.go b/node/helpers.go new file mode 100644 index 0000000..2830d1b --- /dev/null +++ b/node/helpers.go @@ -0,0 +1,30 @@ +package node + +import ( + "log" + "time" + + mrand "math/rand" + + "golang.org/x/crypto/bcrypt" +) + +func hashAndSalt(authString []byte) string { + hash, err := bcrypt.GenerateFromPassword(authString, bcrypt.MinCost) + if err != nil { + log.Fatal(err.Error()) + } + return string(hash) +} + +// genAuthString random auth string +func genAuthString() []byte { + authString := make([]byte, AUTH_STRING_LENGTH) + mrand.Seed(time.Now().UnixNano()) + + for i := range authString { + authString[i] = LETTERS[mrand.Intn(len(LETTERS))] + } + + return authString +} diff --git a/node/point.go b/node/point.go index 4a0884a..ee5afb8 100644 --- a/node/point.go +++ b/node/point.go @@ -1,17 +1,89 @@ package node import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "bytes" + log "github.com/Sirupsen/logrus" - "github.com/idec-net/go-idec" + idec "github.com/idec-net/go-idec" ) +type ESDoc struct { + Echo string `json:"echo"` + Subg string `json:"subg"` + To string `json:"to"` + Author string `json:"author"` + Message string `json:"message"` + Date string `json:"date"` + MsgID string `json:"msgid"` + Tags string `json:"tags"` + Repto string `json:"repto"` + Address string `json:"address"` +} + // PointMessage add point message into DB -func (es ESConf) PointMessage(req PointRequest) error { - msg, err := idec.ParsePointMessage(req.Tmsg) +func (es ESConf) PointMessage(req PointRequest, user User) error { + pmsg, err := idec.ParsePointMessage(req.Tmsg) if err != nil { return err } - log.Infof("%+v", msg) + bmsg, err := idec.MakeBundledMessage(pmsg) + if err != nil { + return err + } + + // Make bundle ID + // Prevent collission via adding Timestamp + id := idec.MakeMsgID(fmt.Sprintf("%s\n%d", pmsg.String(), bmsg.Timestamp)) + bmsg.ID = id + bmsg.From = user.Name + bmsg.Address = fmt.Sprintf("%s,%d", user.Address, user.UserID) + + if err := es.IndexMessage(bmsg); err != nil { + return err + } + return nil +} + +func (es ESConf) IndexMessage(msg idec.Message) error { + tags, _ := msg.Tags.CollectTags() + doc := ESDoc{ + Tags: tags, + Echo: msg.Echo, + Subg: msg.Subg, + To: msg.To, + Author: msg.From, + Message: msg.Body, + Date: fmt.Sprintf("%d", msg.Timestamp), + Repto: msg.Repto, + Address: msg.Address, + MsgID: msg.ID, + } + reqURL := fmt.Sprintf("%s/%s/%s/%s", es.Host, es.Index, es.Type, msg.ID) + bdoc, err := json.Marshal(doc) + if err != nil { + return err + } + + req, err := http.NewRequest("PUT", reqURL, bytes.NewReader(bdoc)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + content, _ := ioutil.ReadAll(resp.Body) + log.Info("Message added, response: ", string(content)) + return nil } diff --git a/node/structs.go b/node/structs.go index 54768d7..27a79d6 100644 --- a/node/structs.go +++ b/node/structs.go @@ -33,18 +33,39 @@ type Hits struct { // {"_index":"idec5","_type":"post","_id":"aAjSbXS5XeNF6lVaPh5A","_score":1.0,"_source" type Hit struct { - Index string `json:"index"` + Index string `json:"_index"` Type string `json:"_type"` ID string `json:"_id"` Source i2es.ESDoc `json:"_source"` } +type UserHits struct { + Total int64 `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []UserHit `json:"hits"` +} + +// { "_index":".lessmore_users","_type":"user","_id":"1","_score":1.0, +// "_source":{"id": 1, "address": "dynamic", "name": "name", "authString": "thisIsAtest"}} +type UserHit struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + Score float64 `json:"_score"` + Source User `json:"_source"` +} + // "aggregations":{"echo":{"doc_count_error_upper_bound":2406,"sum_other_doc_count":76555,"buckets":[{"key":"bash.rss","doc_count":12779}]},"uniqueEcho":{"value":121}}} type EchoAggregations struct { EchoAgg map[string]EchoAgg `json:"aggregations"` UniqEcho map[string]Uniq `json:"uniqueEcho"` } +type MaxIdAggregation struct { + Hits UserHits `json:"hits"` + MaxID map[string]Uniq `json:"aggregations"` +} + type EchoAgg struct { DocCountErrorUpperBound int64 `json:"doc_count_error_upper_bound"` SumOtherDocCount int64 `json:"sum_other_doc_count"` @@ -57,7 +78,7 @@ type EchoBucket struct { } type Uniq struct { - Value int64 `json:"value"` + Value float64 `json:"value"` } type ESSearchShards struct {