From 0b6cc2e09d030fe6485766dad00d22006ba7258c Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Fri, 5 Jan 2018 17:05:45 +0300 Subject: [PATCH] TopicID realization --- i2es/elastic.go | 175 ++++++++++++++++++++++++++++++++++++------------ main.go | 19 ++++-- 2 files changed, 147 insertions(+), 47 deletions(-) diff --git a/i2es/elastic.go b/i2es/elastic.go index 30daa7e..1d29cc4 100644 --- a/i2es/elastic.go +++ b/i2es/elastic.go @@ -34,44 +34,135 @@ type ESConf struct { Type string } -type topicid struct{} +type topicid struct { + es ESConf +} + +// doPostRequest ... +func doRequest(uri, data, method string) ([]byte, error) { + req, err := http.NewRequest(method, uri, bytes.NewBuffer([]byte(data))) + req.Header.Add("Content-Type", "application/json") + if err != nil { + return []byte(""), err + } + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return []byte(""), err + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return []byte(""), err + } + + return body, nil +} // findTopicID for msgid -func (t topicid) findTopicID(msgid string) (string, error) { - var topicid string +func (t topicid) findTopicID(msgid string, enter int) (string, error) { + topicid := uuid.New().String() + searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/") + searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}, "size":' + show_rows + ', "from":' + from + '}`}, "") + + body, err := doRequest(searchURI, searchQ, "POST") + if err != nil { + return "", err + } + + var res ESRes + err = json.Unmarshal(body, &res) + if err != nil { + return "", err + } + + if res.Hits.Total == 0 { + e := errors.New("MSGID " + msgid + " not found ") + return "", e + } + + if res.Hits.Hits[0].Source.TopicID != "" { + topicid = res.Hits.Hits[0].Source.TopicID + } + + if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" { + if enter > 25 { + log.Error("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid) + return topicid, nil + } else { + t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1) + } + } else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" { + return res.Hits.Hits[0].Source.TopicID, nil + } + + if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" { + // This is a old message without topicid => generate new topicid and update a top document + log.Warn(topicid) + log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID") + log.Warn("Generated new topicID " + topicid) + log.Warn("Update top message") + + updateDoc := res.Hits.Hits[0].Source + updateDoc.TopicID = topicid + + updateDocPlain, err := json.Marshal(updateDoc) + if err != nil { + e := errors.New("Cant serialize document: " + err.Error()) + return "", e + } + + postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/") + _, err = doRequest(postURI, string(updateDocPlain), "PUT") + if err != nil { + e := errors.New("Cant update document: " + err.Error()) + return "", e + } + + log.Info(string(updateDocPlain)) + } + return topicid, nil } // check if -func (t topicid) checkID(msg *ESDoc) (bool, func()) { +func (t topicid) checkTopicID(msg *ESDoc) func() string { if msg.TopicID != "" { - return false, func() {} + return func() string { return msg.TopicID } } if msg.Repto == "" { - var genTopicID = func() { - u := uuid.New() - log.Debug("New UUID " + u.String() + " for message " + msg.MsgID) - msg.TopicID = u.String() + var genTopicID = func() string { + topicid := uuid.New().String() + log.Warn("New start message for topic " + topicid + " found") + return topicid } - return true, genTopicID + return genTopicID } - var findAndSetTopicID = func() { - topicid, err := t.findTopicID(msg.MsgID) - if err != nil { - log.Error(err.Error()) + if msg.Repto != "" { + var findAndSetTopicID = func() string { + topicid, err := t.findTopicID(msg.MsgID, 0) + if err != nil { + log.Error(err.Error()) + } + log.Info("Topic ID " + topicid + " found for message " + msg.MsgID) + return topicid } - log.Debug("Topic ID " + topicid + " found for message " + msg.MsgID) + return findAndSetTopicID } - return true, findAndSetTopicID + + return func() string { return "" } } // getorcreate topicID. Generate new unique topicID if message is start message // Find top message and get it topicid if ESDoc.Repto is not empty -func (t topicid) getOrCreate(msg *ESDoc) error { - // TODO: write the code - return nil +func (t topicid) getOrCreate(msg *ESDoc) string { + f := t.checkTopicID(msg) + return f() } // PutToIndex ... @@ -79,11 +170,18 @@ func (es ESConf) PutToIndex(msg ESDoc) error { putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") log.Print(putURI) + // Assign topicID + var t topicid + t.es = es + topic := t.getOrCreate(&msg) + if msg.TopicID != topic { + msg.TopicID = topic + } + doc, err := json.Marshal(msg) if err != nil { return err } - // log.Print(string(doc)) req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) req.Header.Add("Content-Type", "application/json") @@ -99,18 +197,27 @@ func (es ESConf) PutToIndex(msg ESDoc) error { return err } -// ESRes ES response minimal structure +// ESRes ES response structure type ESRes struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` Hits Hits `json:"hits"` } -// Hits ... +// Hits Founded documents type Hits struct { - Total int `json:"total"` - MaxScore float32 `json:"max_score"` - Hits []interface{} `json:"hits"` + Total int `json:"total"` + MaxScore float32 `json:"max_score"` + Hits []Hit `json:"hits"` +} + +// ES Document +type Hit struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + Score float32 `json:"_score"` + Source ESDoc `json:"_source"` } // CheckID ... @@ -119,22 +226,7 @@ func (es ESConf) CheckID(id string) (bool, error) { searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") - req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ))) - req.Header.Add("Content-Type", "application/json") - - if err != nil { - return false, err - } - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return false, err - } - - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) + body, err := doRequest(searchURI, searchQ, "POST") if err != nil { return false, err } @@ -149,6 +241,7 @@ func (es ESConf) CheckID(id string) (bool, error) { err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, "")) return false, err } + if e.TimedOut { err = errors.New("Request time out") return false, err diff --git a/main.go b/main.go index b8f60ed..a2a7bf3 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( var inode string var offset int var limit int -var all bool +var all, reindex bool var es string var esIndex string var esType string @@ -24,6 +24,7 @@ func init() { flag.IntVar(&offset, "offset", 0, "") flag.IntVar(&limit, "limit", -1, "") flag.BoolVar(&all, "all", false, "Get all messages") + flag.BoolVar(&reindex, "reindex", false, "Force update an a message") flag.StringVar(&inode, "inode", "https://ii-net.tk/ii/ii-point.php?q=/", "IDEC node URI") flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host") flag.StringVar(&esIndex, "esindex", "idec", "ES index") @@ -81,7 +82,7 @@ func main() { for _, id := range ids { stash = append(stash, id) j++ - if j == 20 { + if j == 50 { m, err := fc.GetRawMessages(stash) if err != nil { log.Print(err) @@ -98,7 +99,8 @@ func main() { msg, err := idec.ParseMessage(m.Message) if err != nil { log.Error(err.Error()) - return + log.Error(m.ID) + continue } var esd i2es.ESDoc @@ -113,17 +115,22 @@ func main() { esd.Repto = msg.Repto esd.Address = msg.Address + if reindex && msg.Repto != "" { + log.Warn(msg.Repto) + continue + } + log.Debug("Check message ", m.ID) _, err = elastic.CheckID(m.ID) - if err != nil { - log.Warn(err.Error()) + if err != nil && !reindex { + log.Warn("New thread message " + m.ID) continue } err = elastic.PutToIndex(esd) if err != nil { log.Error(err.Error()) + continue } log.Warn("Message ", m.ID, " added to index") - } }