Working topics assignement

This commit is contained in:
Denis Zheleztsov 2018-01-08 15:57:56 +03:00
parent e302295e6c
commit 0dc16c253e
3 changed files with 146 additions and 152 deletions

View File

@ -9,7 +9,6 @@ import (
"strings" "strings"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/google/uuid"
) )
type topicid struct { type topicid struct {
@ -40,128 +39,19 @@ func doRequest(uri, data, method string) ([]byte, error) {
return body, nil return body, nil
} }
// findTopicID for msgid
func (t topicid) findTopicID(msgid string, enter int) (string, error) {
searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/")
searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}, "size":' + show_rows + ', "from":' + from + '}`}, "")
body, err := doRequest(searchURI, searchQ, "POST")
if err != nil {
return "", err
}
var res ESRes
err = json.Unmarshal(body, &res)
if err != nil {
return "", err
}
if res.Hits.Total == 0 {
e := errors.New("MSGID " + msgid + " not found ")
return "", e
}
if res.Hits.Hits[0].Source.TopicID != "" {
topicid := res.Hits.Hits[0].Source.TopicID
return topicid, nil
}
if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" {
if enter > 25 {
topicid := uuid.New().String()
log.Error("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid)
return topicid, nil
} else {
t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1)
}
} else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" {
return res.Hits.Hits[0].Source.TopicID, nil
}
if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" {
// This is a old message without topicid => generate new topicid and update a top document
topicid := uuid.New().String()
log.Warn(topicid)
log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID")
log.Warn("Generated new topicID " + topicid)
log.Warn("Update top message")
updateDoc := res.Hits.Hits[0].Source
updateDoc.TopicID = topicid
updateDocPlain, err := json.Marshal(updateDoc)
if err != nil {
e := errors.New("Cant serialize document: " + err.Error())
return "", e
}
postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/")
_, err = doRequest(postURI, string(updateDocPlain), "PUT")
if err != nil {
e := errors.New("Cant update document: " + err.Error())
return "", e
}
log.Info(string(updateDocPlain))
return topicid, nil
}
return "", errors.New("Something went wrong")
}
// check if
func (t topicid) checkTopicID(msg *ESDoc) func() string {
if msg.TopicID != "" {
return func() string { return msg.TopicID }
}
if msg.Repto == "" {
var genTopicID = func() string {
topicid := uuid.New().String()
log.Warn("New start message for topic " + topicid + " found")
return topicid
}
return genTopicID
}
if msg.Repto != "" {
var findAndSetTopicID = func() string {
topicid, err := t.findTopicID(msg.Repto, 0)
if err != nil {
log.Error(err.Error())
return ""
}
log.Info("Topic ID " + topicid + " found for message " + msg.MsgID)
return topicid
}
return findAndSetTopicID
}
return func() string { return "" }
}
// getorcreate topicID. Generate new unique topicID if message is start message
// Find top message and get it topicid if ESDoc.Repto is not empty
func (t topicid) getOrCreate(msg *ESDoc) string {
f := t.checkTopicID(msg)
return f()
}
// PutToIndex ... // PutToIndex ...
func (es ESConf) PutToIndex(msg ESDoc) error { func (es ESConf) PutToIndex(msg ESDoc) (ESDoc, error) {
putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/")
log.Print(putURI) log.Print(putURI)
// Assign topicID // Assign topicID for top message
var t topicid var t topicid
t.es = es t.es = es
topic := t.getOrCreate(&msg) t.getOrCreate(&msg)
if msg.TopicID == "" && topic != "" {
msg.TopicID = topic
}
doc, err := json.Marshal(msg) doc, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return msg, err
} }
req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc))
@ -170,35 +60,12 @@ func (es ESConf) PutToIndex(msg ESDoc) error {
client := &http.Client{} client := &http.Client{}
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return err return msg, err
} }
defer resp.Body.Close() defer resp.Body.Close()
return err return msg, err
}
// ESRes ES response structure
type ESRes struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Hits Hits `json:"hits"`
}
// Hits Founded documents
type Hits struct {
Total int `json:"total"`
MaxScore float32 `json:"max_score"`
Hits []Hit `json:"hits"`
}
// ES Document
type Hit struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Score float32 `json:"_score"`
Source ESDoc `json:"_source"`
} }
// CheckID ... // CheckID ...

View File

@ -1,35 +1,56 @@
package i2es package i2es
import ( import (
"encoding/json"
"errors"
"strings"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/google/uuid" "github.com/google/uuid"
) )
// getorcreate topicID. Generate new unique topicID if message is start message
func (t topicid) getOrCreate(msg *ESDoc) {
if msg.TopicID == "" && msg.Repto == "" {
log.Warn("New topic assign for message " + msg.MsgID)
msg.TopicID = uuid.New().String()
}
}
// AssignTopics ... // AssignTopics ...
func (es ESConf) AssignTopics(messages []ESDoc) error { func (es ESConf) AssignTopics(messages []ESDoc) error {
log.Info("====\nTrying assign topics\n====")
for _, message := range messages { for _, message := range messages {
log.Info("Working for " + message.MsgID)
// generate new topicid // generate new topicid
if message.Repto == "" && message.TopicID == "" { if message.Repto == "" && message.TopicID == "" {
message.TopicID = uuid.New().String() topic := uuid.New().String()
err := es.PutToIndex(message) message.TopicID = topic
_, err := es.PutToIndex(message)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
continue continue
} }
log.Info("New topic " + topic + " for message " + message.MsgID + " assigned")
} }
// skip message with assigned topicid // skip message with assigned topicid
if message.TopicID != "" { if message.TopicID != "" {
log.Info("message " + message.MsgID + " has already topicid " + message.TopicID)
continue continue
} }
// find top message of the thread and get they topicid // find top message of the thread and get they topicid
if message.Repto != "" { if message.Repto != "" {
log.Info("Repto " + message.Repto + " found in message " + message.MsgID)
t := topicid{es} t := topicid{es}
topicid, err := t.findTopicID(message.MsgID, 0) topicid, err := t.findTopicID(message.Repto, 0)
if err != nil { if err != nil {
log.Error(err.Error())
message.Misplaced = "yes" message.Misplaced = "yes"
err := es.PutToIndex(message) log.Error("Message " + message.MsgID + " misplaced")
_, err := es.PutToIndex(message)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
continue continue
@ -38,12 +59,115 @@ func (es ESConf) AssignTopics(messages []ESDoc) error {
} }
message.TopicID = topicid message.TopicID = topicid
err = es.PutToIndex(message) _, err = es.PutToIndex(message)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
continue continue
} }
log.Info("Topic " + topicid + "found for message " + message.MsgID)
} }
} }
return nil return nil
} }
// findTopicID for msgid
func (t topicid) findTopicID(msgid string, enter int) (string, error) {
searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/")
searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}}`}, "")
log.Info(searchQ)
body, err := doRequest(searchURI, searchQ, "POST")
if err != nil {
return "", err
}
var res ESRes
err = json.Unmarshal(body, &res)
if err != nil {
return "", err
}
if res.Hits.Total == 0 {
e := errors.New("MSGID " + msgid + " not found ")
log.Error(string(body))
return "", e
}
if res.Hits.Hits[0].Source.TopicID != "" {
topicid := res.Hits.Hits[0].Source.TopicID
return topicid, nil
}
if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" {
if enter > 25 {
topicid := uuid.New().String()
e := errors.New("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid)
log.Error(e.Error())
return topicid, err
} else {
t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1)
}
} else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" {
return res.Hits.Hits[0].Source.TopicID, nil
}
if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" {
// This is a old message without topicid => generate new topicid and update a top document
topicid := uuid.New().String()
log.Warn(topicid)
log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID")
log.Warn("Generated new topicID " + topicid)
log.Warn("Update top message")
updateDoc := res.Hits.Hits[0].Source
updateDoc.TopicID = topicid
updateDocPlain, err := json.Marshal(updateDoc)
if err != nil {
e := errors.New("Cant serialize document: " + err.Error())
return "", e
}
postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/")
_, err = doRequest(postURI, string(updateDocPlain), "PUT")
if err != nil {
e := errors.New("Cant update document: " + err.Error())
return "", e
}
log.Info(string(updateDocPlain))
return topicid, nil
}
return "", errors.New("Something went wrong")
}
// check if
func (t topicid) checkTopicID(msg *ESDoc) func() string {
if msg.TopicID != "" {
return func() string { return msg.TopicID }
}
if msg.Repto == "" {
var genTopicID = func() string {
topicid := uuid.New().String()
log.Warn("New start message for topic " + topicid + " found")
return topicid
}
return genTopicID
}
if msg.Repto != "" {
var findAndSetTopicID = func() string {
topicid, err := t.findTopicID(msg.Repto, 0)
if err != nil {
log.Error(err.Error())
return ""
}
log.Info("Topic ID " + topicid + " found for message " + msg.MsgID)
return topicid
}
return findAndSetTopicID
}
return func() string { return "" }
}

19
main.go
View File

@ -76,6 +76,7 @@ func main() {
} }
} }
var newMessages []i2es.ESDoc
var messages []idec.MSG var messages []idec.MSG
var stash []idec.ID var stash []idec.ID
j := 0 j := 0
@ -114,25 +115,27 @@ func main() {
esd.MsgID = m.ID esd.MsgID = m.ID
esd.Repto = msg.Repto esd.Repto = msg.Repto
esd.Address = msg.Address esd.Address = msg.Address
esd.Misplaced = "no"
if !reindex {
log.Warn(msg.Repto)
continue
}
log.Debug("Check message ", m.ID) log.Debug("Check message ", m.ID)
_, err = elastic.CheckID(m.ID) _, err = elastic.CheckID(m.ID)
if err != nil && !reindex { if err != nil && !reindex {
if reindex { if reindex {
log.Warn("New thread message " + m.ID) log.Warn("Reindexing " + m.ID)
} }
continue continue
} }
err = elastic.PutToIndex(esd) esd, err = elastic.PutToIndex(esd)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
continue continue
} }
log.Warn("Message ", m.ID, " added to index") log.Warn("Message ", m.ID, " added to index and to assignates list")
newMessages = append(newMessages, esd)
}
err = elastic.AssignTopics(newMessages)
if err != nil {
log.Error(err.Error())
} }
} }