Point messages buggy working

This commit is contained in:
Denis Zheleztsov 2018-11-05 18:24:46 +03:00
parent 915fc24cd4
commit 18c239fa52
7 changed files with 486 additions and 14 deletions

20
main.go
View File

@ -2,7 +2,10 @@ package main
import (
"flag"
"os"
"gitea.difrex.ru/Umbrella/lessmore/node"
log "github.com/Sirupsen/logrus"
)
var (
@ -10,6 +13,8 @@ var (
es string
esMessagesIndex string
esMessagesType string
add string
email string
)
// init ...
@ -18,6 +23,8 @@ func init() {
flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host")
flag.StringVar(&esMessagesIndex, "esindex", "idec3", "ES index")
flag.StringVar(&esMessagesType, "estype", "post", "ES index type")
flag.StringVar(&add, "add", "", "User to add")
flag.StringVar(&email, "email", "", "User email address")
flag.Parse()
}
@ -27,5 +34,18 @@ func main() {
esconf.Host = es
esconf.Index = esMessagesIndex
esconf.Type = esMessagesType
if add != "" {
addUser(add, esconf)
}
node.Serve(listen, esconf)
}
func addUser(name string, esconf node.ESConf) {
user, err := esconf.AddNewUser(add, email)
if err != nil {
log.Fatal(err)
os.Exit(2)
}
log.Infof("Created: %+v", user)
os.Exit(0)
}

View File

@ -151,19 +151,23 @@ func (es ESConf) UPointHandler(w http.ResponseWriter, r *http.Request) {
}
// Authorization check
if !es.checkAuth(req) {
user, ok := es.checkAuth(req)
if !ok {
w.WriteHeader(403)
w.Write([]byte("Permission denied"))
return
}
// Proccess point message
err = es.PointMessage(req)
err = es.PointMessage(req, user)
if err != nil {
log.Error(err.Error())
w.WriteHeader(500)
return
}
w.WriteHeader(200)
w.Write([]byte("ok: added"))
}
// Serve ...

View File

@ -1,7 +1,289 @@
package node
// checkAuth token in point request
// TODO: implement logic
func (es ESConf) checkAuth(req PointRequest) bool {
return true
import (
"fmt"
"net/http"
"strings"
"io/ioutil"
"encoding/json"
"errors"
"bytes"
"time"
log "github.com/Sirupsen/logrus"
)
const (
USERS_INDEX = ".lessmore_points"
USERS_DOC_TYPE = "points"
NODE_ADDRESS = "dynamic"
AUTH_STRING_LENGTH = 16
LETTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
SALT_BYTES = 32
HASH_BYTES = 64
)
// User document structure
type User struct {
// Will be added to the address:
// i.e. dynamic,1
UserID int64 `json:"user_id"`
// Will be added to the bundled message
Name string `json:"name"`
// Email address needs for password restore
Email string `json:"email"`
AuthString string `json:"auth_string"`
Address string `json:"address"`
// Created time
Created int64 `json:"created"`
}
// checkAuth token in point request
// do a search by the auth_string field
func (es ESConf) checkAuth(r PointRequest) (User, bool) {
reqURL := fmt.Sprintf("%s/%s/_search", es.Host, USERS_INDEX)
query := `{"query": {"term": { "auth_string": "%s" }}}`
query = fmt.Sprintf(query, r.Pauth)
req, err := http.NewRequest("POST", reqURL, strings.NewReader(query))
if err != nil {
log.Error(err)
return User{}, false
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return User{}, false
}
defer resp.Body.Close()
var esr MaxIdAggregation
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err)
return User{}, false
}
if len(esr.Hits.Hits) == 1 && esr.Hits.Hits[0].Source.AuthString == r.Pauth {
return esr.Hits.Hits[0].Source, true
}
return User{}, false
}
// DeleteUser from users index
func DeleteUser(name string) error {
return nil
}
// AddNewUser to the .lessmore_users index
func (es ESConf) AddNewUser(name, email string) (User, error) {
var user User
if err := es.checkUser(name); err != nil {
return user, err
}
max, err := es.getMaxUser()
if err != nil {
log.Fatal(err)
}
user.Name = name
user.UserID = max + 1
user.Address = NODE_ADDRESS
user.AuthString = string(genAuthString())
user.Created = time.Now().Unix()
err = es.IndexUser(user)
if err != nil {
return user, err
}
return user, nil
}
// IndexUser in `USERS_INDEX` index
func (es ESConf) IndexUser(user User) error {
reqURL := fmt.Sprintf("%s/%s/%s/%d", es.Host, USERS_INDEX, USERS_DOC_TYPE, user.UserID)
js, err := json.Marshal(user)
if err != nil {
return err
}
client := &http.Client{}
req, err := http.NewRequest("PUT", reqURL, bytes.NewReader(js))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
func (es ESConf) checkUser(name string) error {
reqURL := es.Host + "/" + USERS_INDEX + "/_search"
reqName := `{"query": {"term": { "name": "%s" }}}`
reqName = fmt.Sprintf(reqName, name)
req, err := http.NewRequest("POST", reqURL, strings.NewReader(reqName))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
var esr MaxIdAggregation
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
return err
}
if len(esr.Hits.Hits) > 0 {
return errors.New(fmt.Sprintf("User %s alredy exists", name))
}
return nil
}
func (es ESConf) getMaxUser() (int64, error) {
ok, err := es.checkIndex()
if err != nil {
return -1, err
}
if !ok {
if err := es.createIndex(); err != nil {
return -1, err
}
return 0, nil
}
usersSearchURL := es.Host + "/" + USERS_INDEX + "/_search"
usersSearchReq := `
{
"aggs": {
"max_id": { "max": { "field": "user_id" } }
},
"size": 0
}
`
client := http.Client{}
req, err := http.NewRequest("POST", usersSearchURL, strings.NewReader(usersSearchReq))
if err != nil {
return -1, err
}
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return -1, err
}
defer resp.Body.Close()
content, _ := ioutil.ReadAll(resp.Body)
var esr MaxIdAggregation
err = json.NewDecoder(strings.NewReader(string(content))).Decode(&esr)
if err != nil {
log.Error("Cant parse JSON")
return -1, err
}
return int64(esr.MaxID["max_id"].Value), nil
}
func (es ESConf) checkIndex() (bool, error) {
indexListURL := es.Host + "/_cat/indices"
// Initialize http client
client := http.Client{}
indicesReq, err := http.NewRequest("GET", indexListURL, strings.NewReader(""))
if err != nil {
log.Error(err)
return false, err
}
indicesResp, err := client.Do(indicesReq)
if err != nil {
log.Error(err)
return false, err
}
defer indicesResp.Body.Close()
list, err := ioutil.ReadAll(indicesResp.Body)
if err != nil {
return false, err
}
if strings.Contains(string(list), USERS_INDEX) {
return true, nil
}
return false, nil
}
func (es ESConf) createIndex() error {
mapping := `
{
"mappings": {
"%s": {
"properties": {
"user_id": { "type": "integer" },
"name": { "type": "keyword" },
"email": { "type": "keyword" },
"address": { "type": "keyword" },
"auth_string": { "type": "keyword" },
"created": {
"type": "date",
"format": "strict_date_optional_time||epoch_second"
}
}
}
}
}
`
mapping = fmt.Sprintf(mapping, USERS_DOC_TYPE)
reqURL := fmt.Sprintf("%s/%s", es.Host, USERS_INDEX)
req, err := http.NewRequest("PUT", reqURL, strings.NewReader(mapping))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
log.Warn("Creating new users index")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
content, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
log.Warn("Created new index mapping")
fmt.Println(string(content))
return nil
}

View File

@ -53,10 +53,18 @@ func (es ESConf) GetPlainTextMessage(msgid string) []byte {
`{"query": {"match": {"_id": "`, msgid, `"}}}`}, ""))
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return []byte("")
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return []byte("")
}
defer resp.Body.Close()
@ -87,9 +95,15 @@ func (es ESConf) GetEchoMessageHashes(echo string) []string {
searchQ := []byte(strings.Join([]string{
`{"sort": [
{"date":{ "order": "desc" }},{ "_score":{ "order": "desc" }}],
"query": {"query_string" : {"fields": ["msgid", "echo"], "query":"`, echo, `"}}, "size": 500}`}, ""))
"query": {"query_string" : {"fields": ["echo"], "query":"`, echo, `"}}, "size": 500}`}, ""))
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return hashes
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
@ -99,6 +113,9 @@ func (es ESConf) GetEchoMessageHashes(echo string) []string {
defer resp.Body.Close()
content, _ := ioutil.ReadAll(resp.Body)
log.Info(string(content))
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
@ -142,6 +159,12 @@ func (es ESConf) GetLimitedEchoMessageHashes(echo string, offset int, limit int)
"query": {"query_string" : {"fields": ["msgid", "echo"], "query":"`, echo, `"}}, "size":`, l, `}`}, ""))
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return hashes
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
@ -188,6 +211,12 @@ func (es ESConf) GetUMMessages(msgs string) []string {
]
}`)
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query))
if err != nil {
log.Error(err.Error())
return encodedMessages
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
@ -315,6 +344,12 @@ func (es ESConf) GetXC(echoes string) []string {
}
`)
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query))
if err != nil {
log.Error(err.Error())
return counts
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
@ -342,7 +377,7 @@ func (es ESConf) GetXC(echoes string) []string {
func (es ESConf) GetListTXT() []byte {
var searchURI string
if es.Index != "" && es.Type != "" {
searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
searchURI = strings.Join([]string{es.Host, es.Index, "_search"}, "/")
} else {
searchURI = strings.Join([]string{es.Host, "search"}, "/")
}
@ -365,10 +400,17 @@ func (es ESConf) GetListTXT() []byte {
log.Print("Search URI: ", searchURI)
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return []byte("")
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return []byte("")
}
defer resp.Body.Close()
@ -377,6 +419,7 @@ func (es ESConf) GetListTXT() []byte {
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return []byte("")
}
log.Infof("%+v", esr)

30
node/helpers.go Normal file
View File

@ -0,0 +1,30 @@
package node
import (
"log"
"time"
mrand "math/rand"
"golang.org/x/crypto/bcrypt"
)
func hashAndSalt(authString []byte) string {
hash, err := bcrypt.GenerateFromPassword(authString, bcrypt.MinCost)
if err != nil {
log.Fatal(err.Error())
}
return string(hash)
}
// genAuthString random auth string
func genAuthString() []byte {
authString := make([]byte, AUTH_STRING_LENGTH)
mrand.Seed(time.Now().UnixNano())
for i := range authString {
authString[i] = LETTERS[mrand.Intn(len(LETTERS))]
}
return authString
}

View File

@ -1,17 +1,89 @@
package node
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"bytes"
log "github.com/Sirupsen/logrus"
"github.com/idec-net/go-idec"
idec "github.com/idec-net/go-idec"
)
type ESDoc struct {
Echo string `json:"echo"`
Subg string `json:"subg"`
To string `json:"to"`
Author string `json:"author"`
Message string `json:"message"`
Date string `json:"date"`
MsgID string `json:"msgid"`
Tags string `json:"tags"`
Repto string `json:"repto"`
Address string `json:"address"`
}
// PointMessage add point message into DB
func (es ESConf) PointMessage(req PointRequest) error {
msg, err := idec.ParsePointMessage(req.Tmsg)
func (es ESConf) PointMessage(req PointRequest, user User) error {
pmsg, err := idec.ParsePointMessage(req.Tmsg)
if err != nil {
return err
}
log.Infof("%+v", msg)
bmsg, err := idec.MakeBundledMessage(pmsg)
if err != nil {
return err
}
// Make bundle ID
// Prevent collission via adding Timestamp
id := idec.MakeMsgID(fmt.Sprintf("%s\n%d", pmsg.String(), bmsg.Timestamp))
bmsg.ID = id
bmsg.From = user.Name
bmsg.Address = fmt.Sprintf("%s,%d", user.Address, user.UserID)
if err := es.IndexMessage(bmsg); err != nil {
return err
}
return nil
}
func (es ESConf) IndexMessage(msg idec.Message) error {
tags, _ := msg.Tags.CollectTags()
doc := ESDoc{
Tags: tags,
Echo: msg.Echo,
Subg: msg.Subg,
To: msg.To,
Author: msg.From,
Message: msg.Body,
Date: fmt.Sprintf("%d", msg.Timestamp),
Repto: msg.Repto,
Address: msg.Address,
MsgID: msg.ID,
}
reqURL := fmt.Sprintf("%s/%s/%s/%s", es.Host, es.Index, es.Type, msg.ID)
bdoc, err := json.Marshal(doc)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", reqURL, bytes.NewReader(bdoc))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
content, _ := ioutil.ReadAll(resp.Body)
log.Info("Message added, response: ", string(content))
return nil
}

View File

@ -33,18 +33,39 @@ type Hits struct {
// {"_index":"idec5","_type":"post","_id":"aAjSbXS5XeNF6lVaPh5A","_score":1.0,"_source"
type Hit struct {
Index string `json:"index"`
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Source i2es.ESDoc `json:"_source"`
}
type UserHits struct {
Total int64 `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []UserHit `json:"hits"`
}
// { "_index":".lessmore_users","_type":"user","_id":"1","_score":1.0,
// "_source":{"id": 1, "address": "dynamic", "name": "name", "authString": "thisIsAtest"}}
type UserHit struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Score float64 `json:"_score"`
Source User `json:"_source"`
}
// "aggregations":{"echo":{"doc_count_error_upper_bound":2406,"sum_other_doc_count":76555,"buckets":[{"key":"bash.rss","doc_count":12779}]},"uniqueEcho":{"value":121}}}
type EchoAggregations struct {
EchoAgg map[string]EchoAgg `json:"aggregations"`
UniqEcho map[string]Uniq `json:"uniqueEcho"`
}
type MaxIdAggregation struct {
Hits UserHits `json:"hits"`
MaxID map[string]Uniq `json:"aggregations"`
}
type EchoAgg struct {
DocCountErrorUpperBound int64 `json:"doc_count_error_upper_bound"`
SumOtherDocCount int64 `json:"sum_other_doc_count"`
@ -57,7 +78,7 @@ type EchoBucket struct {
}
type Uniq struct {
Value int64 `json:"value"`
Value float64 `json:"value"`
}
type ESSearchShards struct {