From 0dc16c253e6ed7785491e95877b140eb28c482f4 Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Mon, 8 Jan 2018 15:57:56 +0300 Subject: [PATCH] Working topics assignement --- i2es/elastic.go | 145 ++---------------------------------------------- i2es/topics.go | 134 ++++++++++++++++++++++++++++++++++++++++++-- main.go | 19 ++++--- 3 files changed, 146 insertions(+), 152 deletions(-) diff --git a/i2es/elastic.go b/i2es/elastic.go index 7a7a683..8a94241 100644 --- a/i2es/elastic.go +++ b/i2es/elastic.go @@ -9,7 +9,6 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/google/uuid" ) type topicid struct { @@ -40,128 +39,19 @@ func doRequest(uri, data, method string) ([]byte, error) { return body, nil } -// findTopicID for msgid -func (t topicid) findTopicID(msgid string, enter int) (string, error) { - searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/") - searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}, "size":' + show_rows + ', "from":' + from + '}`}, "") - - body, err := doRequest(searchURI, searchQ, "POST") - if err != nil { - return "", err - } - - var res ESRes - err = json.Unmarshal(body, &res) - if err != nil { - return "", err - } - - if res.Hits.Total == 0 { - e := errors.New("MSGID " + msgid + " not found ") - return "", e - } - - if res.Hits.Hits[0].Source.TopicID != "" { - topicid := res.Hits.Hits[0].Source.TopicID - return topicid, nil - } - - if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" { - if enter > 25 { - topicid := uuid.New().String() - log.Error("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid) - return topicid, nil - } else { - t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1) - } - } else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" { - return res.Hits.Hits[0].Source.TopicID, nil - } - - if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" { - // This is a old message without topicid => generate new topicid and update a top document - topicid := uuid.New().String() - log.Warn(topicid) - log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID") - log.Warn("Generated new topicID " + topicid) - log.Warn("Update top message") - - updateDoc := res.Hits.Hits[0].Source - updateDoc.TopicID = topicid - - updateDocPlain, err := json.Marshal(updateDoc) - if err != nil { - e := errors.New("Cant serialize document: " + err.Error()) - return "", e - } - - postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/") - _, err = doRequest(postURI, string(updateDocPlain), "PUT") - if err != nil { - e := errors.New("Cant update document: " + err.Error()) - return "", e - } - - log.Info(string(updateDocPlain)) - return topicid, nil - } - return "", errors.New("Something went wrong") -} - -// check if -func (t topicid) checkTopicID(msg *ESDoc) func() string { - if msg.TopicID != "" { - return func() string { return msg.TopicID } - } - - if msg.Repto == "" { - var genTopicID = func() string { - topicid := uuid.New().String() - log.Warn("New start message for topic " + topicid + " found") - return topicid - } - return genTopicID - } - - if msg.Repto != "" { - var findAndSetTopicID = func() string { - topicid, err := t.findTopicID(msg.Repto, 0) - if err != nil { - log.Error(err.Error()) - return "" - } - log.Info("Topic ID " + topicid + " found for message " + msg.MsgID) - return topicid - } - return findAndSetTopicID - } - - return func() string { return "" } -} - -// getorcreate topicID. Generate new unique topicID if message is start message -// Find top message and get it topicid if ESDoc.Repto is not empty -func (t topicid) getOrCreate(msg *ESDoc) string { - f := t.checkTopicID(msg) - return f() -} - // PutToIndex ... -func (es ESConf) PutToIndex(msg ESDoc) error { +func (es ESConf) PutToIndex(msg ESDoc) (ESDoc, error) { putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") log.Print(putURI) - // Assign topicID + // Assign topicID for top message var t topicid t.es = es - topic := t.getOrCreate(&msg) - if msg.TopicID == "" && topic != "" { - msg.TopicID = topic - } + t.getOrCreate(&msg) doc, err := json.Marshal(msg) if err != nil { - return err + return msg, err } req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) @@ -170,35 +60,12 @@ func (es ESConf) PutToIndex(msg ESDoc) error { client := &http.Client{} resp, err := client.Do(req) if err != nil { - return err + return msg, err } defer resp.Body.Close() - return err -} - -// ESRes ES response structure -type ESRes struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Hits Hits `json:"hits"` -} - -// Hits Founded documents -type Hits struct { - Total int `json:"total"` - MaxScore float32 `json:"max_score"` - Hits []Hit `json:"hits"` -} - -// ES Document -type Hit struct { - Index string `json:"_index"` - Type string `json:"_type"` - ID string `json:"_id"` - Score float32 `json:"_score"` - Source ESDoc `json:"_source"` + return msg, err } // CheckID ... diff --git a/i2es/topics.go b/i2es/topics.go index 153781c..82dc99f 100644 --- a/i2es/topics.go +++ b/i2es/topics.go @@ -1,35 +1,56 @@ package i2es import ( + "encoding/json" + "errors" + "strings" + log "github.com/Sirupsen/logrus" "github.com/google/uuid" ) +// getorcreate topicID. Generate new unique topicID if message is start message +func (t topicid) getOrCreate(msg *ESDoc) { + if msg.TopicID == "" && msg.Repto == "" { + log.Warn("New topic assign for message " + msg.MsgID) + msg.TopicID = uuid.New().String() + } +} + // AssignTopics ... func (es ESConf) AssignTopics(messages []ESDoc) error { + log.Info("====\nTrying assign topics\n====") + for _, message := range messages { + log.Info("Working for " + message.MsgID) // generate new topicid if message.Repto == "" && message.TopicID == "" { - message.TopicID = uuid.New().String() - err := es.PutToIndex(message) + topic := uuid.New().String() + message.TopicID = topic + _, err := es.PutToIndex(message) if err != nil { log.Error(err.Error()) continue } + log.Info("New topic " + topic + " for message " + message.MsgID + " assigned") } // skip message with assigned topicid if message.TopicID != "" { + log.Info("message " + message.MsgID + " has already topicid " + message.TopicID) continue } // find top message of the thread and get they topicid if message.Repto != "" { + log.Info("Repto " + message.Repto + " found in message " + message.MsgID) t := topicid{es} - topicid, err := t.findTopicID(message.MsgID, 0) + topicid, err := t.findTopicID(message.Repto, 0) if err != nil { + log.Error(err.Error()) message.Misplaced = "yes" - err := es.PutToIndex(message) + log.Error("Message " + message.MsgID + " misplaced") + _, err := es.PutToIndex(message) if err != nil { log.Error(err.Error()) continue @@ -38,12 +59,115 @@ func (es ESConf) AssignTopics(messages []ESDoc) error { } message.TopicID = topicid - err = es.PutToIndex(message) + _, err = es.PutToIndex(message) if err != nil { log.Error(err.Error()) continue } + log.Info("Topic " + topicid + "found for message " + message.MsgID) } } return nil } + +// findTopicID for msgid +func (t topicid) findTopicID(msgid string, enter int) (string, error) { + searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/") + searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}}`}, "") + log.Info(searchQ) + + body, err := doRequest(searchURI, searchQ, "POST") + if err != nil { + return "", err + } + + var res ESRes + err = json.Unmarshal(body, &res) + if err != nil { + return "", err + } + + if res.Hits.Total == 0 { + e := errors.New("MSGID " + msgid + " not found ") + log.Error(string(body)) + return "", e + } + + if res.Hits.Hits[0].Source.TopicID != "" { + topicid := res.Hits.Hits[0].Source.TopicID + return topicid, nil + } + + if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" { + if enter > 25 { + topicid := uuid.New().String() + e := errors.New("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid) + log.Error(e.Error()) + return topicid, err + } else { + t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1) + } + } else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" { + return res.Hits.Hits[0].Source.TopicID, nil + } + + if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" { + // This is a old message without topicid => generate new topicid and update a top document + topicid := uuid.New().String() + log.Warn(topicid) + log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID") + log.Warn("Generated new topicID " + topicid) + log.Warn("Update top message") + + updateDoc := res.Hits.Hits[0].Source + updateDoc.TopicID = topicid + + updateDocPlain, err := json.Marshal(updateDoc) + if err != nil { + e := errors.New("Cant serialize document: " + err.Error()) + return "", e + } + + postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/") + _, err = doRequest(postURI, string(updateDocPlain), "PUT") + if err != nil { + e := errors.New("Cant update document: " + err.Error()) + return "", e + } + + log.Info(string(updateDocPlain)) + return topicid, nil + } + return "", errors.New("Something went wrong") +} + +// check if +func (t topicid) checkTopicID(msg *ESDoc) func() string { + if msg.TopicID != "" { + return func() string { return msg.TopicID } + } + + if msg.Repto == "" { + var genTopicID = func() string { + topicid := uuid.New().String() + log.Warn("New start message for topic " + topicid + " found") + return topicid + } + return genTopicID + } + + if msg.Repto != "" { + var findAndSetTopicID = func() string { + topicid, err := t.findTopicID(msg.Repto, 0) + if err != nil { + log.Error(err.Error()) + return "" + } + log.Info("Topic ID " + topicid + " found for message " + msg.MsgID) + return topicid + } + return findAndSetTopicID + } + + return func() string { return "" } +} diff --git a/main.go b/main.go index 40dd44c..c0c22f2 100644 --- a/main.go +++ b/main.go @@ -76,6 +76,7 @@ func main() { } } + var newMessages []i2es.ESDoc var messages []idec.MSG var stash []idec.ID j := 0 @@ -114,25 +115,27 @@ func main() { esd.MsgID = m.ID esd.Repto = msg.Repto esd.Address = msg.Address - - if !reindex { - log.Warn(msg.Repto) - continue - } + esd.Misplaced = "no" log.Debug("Check message ", m.ID) _, err = elastic.CheckID(m.ID) if err != nil && !reindex { if reindex { - log.Warn("New thread message " + m.ID) + log.Warn("Reindexing " + m.ID) } continue } - err = elastic.PutToIndex(esd) + esd, err = elastic.PutToIndex(esd) if err != nil { log.Error(err.Error()) continue } - log.Warn("Message ", m.ID, " added to index") + log.Warn("Message ", m.ID, " added to index and to assignates list") + newMessages = append(newMessages, esd) + } + + err = elastic.AssignTopics(newMessages) + if err != nil { + log.Error(err.Error()) } }