fetcher/i2es/elastic.go

233 lines
5.5 KiB
Go
Raw Permalink Normal View History

2017-02-11 19:53:20 +03:00
package i2es
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"strings"
2018-01-04 22:59:09 +03:00
log "github.com/Sirupsen/logrus"
"github.com/google/uuid"
2017-02-11 19:53:20 +03:00
)
2018-01-05 17:05:45 +03:00
type topicid struct {
es ESConf
}
// doPostRequest ...
func doRequest(uri, data, method string) ([]byte, error) {
req, err := http.NewRequest(method, uri, bytes.NewBuffer([]byte(data)))
req.Header.Add("Content-Type", "application/json")
if err != nil {
return []byte(""), err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return []byte(""), err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return []byte(""), err
}
return body, nil
}
2018-01-04 22:59:09 +03:00
// findTopicID for msgid
2018-01-05 17:05:45 +03:00
func (t topicid) findTopicID(msgid string, enter int) (string, error) {
searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/")
searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}, "size":' + show_rows + ', "from":' + from + '}`}, "")
body, err := doRequest(searchURI, searchQ, "POST")
if err != nil {
return "", err
}
var res ESRes
err = json.Unmarshal(body, &res)
if err != nil {
return "", err
}
if res.Hits.Total == 0 {
e := errors.New("MSGID " + msgid + " not found ")
return "", e
}
if res.Hits.Hits[0].Source.TopicID != "" {
2018-01-05 17:57:38 +03:00
topicid := res.Hits.Hits[0].Source.TopicID
2018-01-05 17:39:37 +03:00
return topicid, nil
2018-01-05 17:05:45 +03:00
}
if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" {
if enter > 25 {
2018-01-05 17:57:38 +03:00
topicid := uuid.New().String()
2018-01-05 17:05:45 +03:00
log.Error("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid)
return topicid, nil
} else {
t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1)
}
} else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" {
return res.Hits.Hits[0].Source.TopicID, nil
}
if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" {
// This is a old message without topicid => generate new topicid and update a top document
2018-01-05 17:57:38 +03:00
topicid := uuid.New().String()
2018-01-05 17:05:45 +03:00
log.Warn(topicid)
log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID")
log.Warn("Generated new topicID " + topicid)
log.Warn("Update top message")
updateDoc := res.Hits.Hits[0].Source
updateDoc.TopicID = topicid
updateDocPlain, err := json.Marshal(updateDoc)
if err != nil {
e := errors.New("Cant serialize document: " + err.Error())
return "", e
}
postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/")
_, err = doRequest(postURI, string(updateDocPlain), "PUT")
if err != nil {
e := errors.New("Cant update document: " + err.Error())
return "", e
}
log.Info(string(updateDocPlain))
2018-01-05 17:57:38 +03:00
return topicid, nil
2018-01-05 17:05:45 +03:00
}
2018-01-05 17:57:38 +03:00
return "", errors.New("Something went wrong")
2018-01-04 22:59:09 +03:00
}
// check if
2018-01-05 17:05:45 +03:00
func (t topicid) checkTopicID(msg *ESDoc) func() string {
2018-01-04 22:59:09 +03:00
if msg.TopicID != "" {
2018-01-05 17:05:45 +03:00
return func() string { return msg.TopicID }
2018-01-04 22:59:09 +03:00
}
if msg.Repto == "" {
2018-01-05 17:05:45 +03:00
var genTopicID = func() string {
topicid := uuid.New().String()
log.Warn("New start message for topic " + topicid + " found")
return topicid
2018-01-04 22:59:09 +03:00
}
2018-01-05 17:05:45 +03:00
return genTopicID
2018-01-04 22:59:09 +03:00
}
2018-01-05 17:05:45 +03:00
if msg.Repto != "" {
var findAndSetTopicID = func() string {
2018-01-05 17:57:38 +03:00
topicid, err := t.findTopicID(msg.Repto, 0)
2018-01-05 17:05:45 +03:00
if err != nil {
log.Error(err.Error())
2018-01-05 17:57:38 +03:00
return ""
2018-01-05 17:05:45 +03:00
}
log.Info("Topic ID " + topicid + " found for message " + msg.MsgID)
return topicid
2018-01-04 22:59:09 +03:00
}
2018-01-05 17:05:45 +03:00
return findAndSetTopicID
2018-01-04 22:59:09 +03:00
}
2018-01-05 17:05:45 +03:00
return func() string { return "" }
2018-01-04 22:59:09 +03:00
}
// getorcreate topicID. Generate new unique topicID if message is start message
// Find top message and get it topicid if ESDoc.Repto is not empty
2018-01-05 17:05:45 +03:00
func (t topicid) getOrCreate(msg *ESDoc) string {
f := t.checkTopicID(msg)
return f()
2018-01-04 22:59:09 +03:00
}
2017-02-11 19:53:20 +03:00
// PutToIndex ...
func (es ESConf) PutToIndex(msg ESDoc) error {
putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/")
log.Print(putURI)
2017-02-11 19:53:20 +03:00
2018-01-05 17:05:45 +03:00
// Assign topicID
var t topicid
t.es = es
topic := t.getOrCreate(&msg)
2018-01-05 17:57:38 +03:00
if msg.TopicID == "" && topic != "" {
2018-01-05 17:05:45 +03:00
msg.TopicID = topic
}
2017-02-11 19:53:20 +03:00
doc, err := json.Marshal(msg)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc))
req.Header.Add("Content-Type", "application/json")
2017-02-11 19:53:20 +03:00
client := &http.Client{}
resp, err := client.Do(req)
2017-09-27 14:49:50 +03:00
if err != nil {
return err
}
2017-02-11 19:53:20 +03:00
defer resp.Body.Close()
return err
}
2018-01-05 17:05:45 +03:00
// ESRes ES response structure
2017-02-11 19:53:20 +03:00
type ESRes struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Hits Hits `json:"hits"`
}
2018-01-05 17:05:45 +03:00
// Hits Founded documents
2017-02-11 19:53:20 +03:00
type Hits struct {
2018-01-05 17:05:45 +03:00
Total int `json:"total"`
MaxScore float32 `json:"max_score"`
Hits []Hit `json:"hits"`
}
// ES Document
type Hit struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Score float32 `json:"_score"`
Source ESDoc `json:"_source"`
2017-02-11 19:53:20 +03:00
}
// CheckID ...
func (es ESConf) CheckID(id string) (bool, error) {
searchURI := strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "")
2018-01-05 17:05:45 +03:00
body, err := doRequest(searchURI, searchQ, "POST")
2017-02-11 19:53:20 +03:00
if err != nil {
return false, err
}
var e ESRes
err = json.Unmarshal(body, &e)
if err != nil {
return false, err
}
if e.Hits.Total > 0 {
err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, ""))
return false, err
}
2018-01-05 17:05:45 +03:00
2017-02-11 19:53:20 +03:00
if e.TimedOut {
err = errors.New("Request time out")
return false, err
}
return false, nil
}