290 lines
6.0 KiB
Go
290 lines
6.0 KiB
Go
package node
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"net/http"
|
|
|
|
"strings"
|
|
|
|
"io/ioutil"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"bytes"
|
|
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
USERS_INDEX = ".lessmore_points"
|
|
USERS_DOC_TYPE = "points"
|
|
NODE_ADDRESS = "dynamic"
|
|
AUTH_STRING_LENGTH = 16
|
|
LETTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
|
SALT_BYTES = 32
|
|
HASH_BYTES = 64
|
|
)
|
|
|
|
// User document structure
|
|
type User struct {
|
|
// Will be added to the address:
|
|
// i.e. dynamic,1
|
|
UserID int64 `json:"user_id"`
|
|
// Will be added to the bundled message
|
|
Name string `json:"name"`
|
|
// Email address needs for password restore
|
|
Email string `json:"email"`
|
|
AuthString string `json:"auth_string"`
|
|
Address string `json:"address"`
|
|
// Created time
|
|
Created int64 `json:"created"`
|
|
}
|
|
|
|
// checkAuth token in point request
|
|
// do a search by the auth_string field
|
|
func (es ESConf) checkAuth(r PointRequest) (User, bool) {
|
|
reqURL := fmt.Sprintf("%s/%s/_search", es.Host, USERS_INDEX)
|
|
query := `{"query": {"term": { "auth_string": "%s" }}}`
|
|
query = fmt.Sprintf(query, r.Pauth)
|
|
|
|
req, err := http.NewRequest("POST", reqURL, strings.NewReader(query))
|
|
if err != nil {
|
|
log.Error("Can't prepare a request: ", err)
|
|
return User{}, false
|
|
}
|
|
req.Header.Add("Content-Type", "application/json")
|
|
|
|
client := &http.Client{}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
log.Error("Can't make a request: ", err)
|
|
return User{}, false
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
var esr MaxIdAggregation
|
|
err = json.NewDecoder(resp.Body).Decode(&esr)
|
|
if err != nil {
|
|
log.Error("Can't decode a response from ES: ", err)
|
|
return User{}, false
|
|
}
|
|
|
|
if len(esr.Hits.Hits) == 1 && esr.Hits.Hits[0].Source.AuthString == r.Pauth {
|
|
return esr.Hits.Hits[0].Source, true
|
|
}
|
|
return User{}, false
|
|
}
|
|
|
|
// DeleteUser from users index
|
|
func DeleteUser(name string) error {
|
|
return nil
|
|
}
|
|
|
|
// AddNewUser to the .lessmore_users index
|
|
func (es ESConf) AddNewUser(name, email string) (User, error) {
|
|
var user User
|
|
if err := es.checkUser(name); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
max, err := es.getMaxUser()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
user.Name = name
|
|
user.UserID = max + 1
|
|
user.Address = NODE_ADDRESS
|
|
user.AuthString = string(genAuthString())
|
|
user.Created = time.Now().Unix()
|
|
|
|
err = es.IndexUser(user)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
// IndexUser in `USERS_INDEX` index
|
|
func (es ESConf) IndexUser(user User) error {
|
|
reqURL := fmt.Sprintf("%s/%s/%s/%d", es.Host, USERS_INDEX, USERS_DOC_TYPE, user.UserID)
|
|
js, err := json.Marshal(user)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
client := &http.Client{}
|
|
req, err := http.NewRequest("PUT", reqURL, bytes.NewReader(js))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Add("Content-Type", "application/json")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es ESConf) checkUser(name string) error {
|
|
reqURL := es.Host + "/" + USERS_INDEX + "/_search"
|
|
reqName := `{"query": {"term": { "name": "%s" }}}`
|
|
reqName = fmt.Sprintf(reqName, name)
|
|
req, err := http.NewRequest("POST", reqURL, strings.NewReader(reqName))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Add("Content-Type", "application/json")
|
|
|
|
client := &http.Client{}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
var esr MaxIdAggregation
|
|
err = json.NewDecoder(resp.Body).Decode(&esr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(esr.Hits.Hits) > 0 {
|
|
return errors.New(fmt.Sprintf("User %s alredy exists", name))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es ESConf) getMaxUser() (int64, error) {
|
|
ok, err := es.checkIndex()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
if !ok {
|
|
if err := es.createIndex(); err != nil {
|
|
return -1, err
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
usersSearchURL := es.Host + "/" + USERS_INDEX + "/_search"
|
|
usersSearchReq := `
|
|
{
|
|
"aggs": {
|
|
"max_id": { "max": { "field": "user_id" } }
|
|
},
|
|
"size": 0
|
|
}
|
|
`
|
|
client := http.Client{}
|
|
req, err := http.NewRequest("POST", usersSearchURL, strings.NewReader(usersSearchReq))
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
req.Header.Add("Content-Type", "application/json")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
log.Error(err)
|
|
return -1, err
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
content, _ := ioutil.ReadAll(resp.Body)
|
|
var esr MaxIdAggregation
|
|
err = json.NewDecoder(strings.NewReader(string(content))).Decode(&esr)
|
|
if err != nil {
|
|
log.Error("Cant parse JSON")
|
|
return -1, err
|
|
}
|
|
|
|
return int64(esr.MaxID["max_id"].Value), nil
|
|
}
|
|
|
|
func (es ESConf) checkIndex() (bool, error) {
|
|
indexListURL := es.Host + "/_cat/indices"
|
|
// Initialize http client
|
|
client := http.Client{}
|
|
indicesReq, err := http.NewRequest("GET", indexListURL, strings.NewReader(""))
|
|
if err != nil {
|
|
log.Error(err)
|
|
return false, err
|
|
}
|
|
indicesResp, err := client.Do(indicesReq)
|
|
if err != nil {
|
|
log.Error(err)
|
|
return false, err
|
|
}
|
|
|
|
defer indicesResp.Body.Close()
|
|
|
|
list, err := ioutil.ReadAll(indicesResp.Body)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if strings.Contains(string(list), USERS_INDEX) {
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (es ESConf) createIndex() error {
|
|
mapping := `
|
|
{
|
|
"mappings": {
|
|
"%s": {
|
|
"properties": {
|
|
"user_id": { "type": "integer" },
|
|
"name": { "type": "keyword" },
|
|
"email": { "type": "keyword" },
|
|
"address": { "type": "keyword" },
|
|
"auth_string": { "type": "keyword" },
|
|
"created": {
|
|
"type": "date",
|
|
"format": "strict_date_optional_time||epoch_second"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`
|
|
mapping = fmt.Sprintf(mapping, USERS_DOC_TYPE)
|
|
|
|
reqURL := fmt.Sprintf("%s/%s", es.Host, USERS_INDEX)
|
|
req, err := http.NewRequest("PUT", reqURL, strings.NewReader(mapping))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Add("Content-Type", "application/json")
|
|
|
|
log.Warn("Creating new users index")
|
|
|
|
client := &http.Client{}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
content, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Warn("Created new index mapping")
|
|
fmt.Println(string(content))
|
|
|
|
return nil
|
|
}
|