package i2es import ( "bytes" "encoding/json" "errors" "io/ioutil" "net/http" "strings" log "github.com/Sirupsen/logrus" "github.com/google/uuid" ) type topicid struct { es ESConf } // doPostRequest ... func doRequest(uri, data, method string) ([]byte, error) { req, err := http.NewRequest(method, uri, bytes.NewBuffer([]byte(data))) req.Header.Add("Content-Type", "application/json") if err != nil { return []byte(""), err } client := &http.Client{} resp, err := client.Do(req) if err != nil { return []byte(""), err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return []byte(""), err } return body, nil } // findTopicID for msgid func (t topicid) findTopicID(msgid string, enter int) (string, error) { searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/") searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}, "size":' + show_rows + ', "from":' + from + '}`}, "") body, err := doRequest(searchURI, searchQ, "POST") if err != nil { return "", err } var res ESRes err = json.Unmarshal(body, &res) if err != nil { return "", err } if res.Hits.Total == 0 { e := errors.New("MSGID " + msgid + " not found ") return "", e } if res.Hits.Hits[0].Source.TopicID != "" { topicid := res.Hits.Hits[0].Source.TopicID return topicid, nil } if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" { if enter > 25 { topicid := uuid.New().String() log.Error("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid) return topicid, nil } else { t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1) } } else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" { return res.Hits.Hits[0].Source.TopicID, nil } if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" { // This is a old message without topicid => generate new topicid and update a top document topicid := uuid.New().String() log.Warn(topicid) log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID") log.Warn("Generated new topicID " + topicid) log.Warn("Update top message") updateDoc := res.Hits.Hits[0].Source updateDoc.TopicID = topicid updateDocPlain, err := json.Marshal(updateDoc) if err != nil { e := errors.New("Cant serialize document: " + err.Error()) return "", e } postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/") _, err = doRequest(postURI, string(updateDocPlain), "PUT") if err != nil { e := errors.New("Cant update document: " + err.Error()) return "", e } log.Info(string(updateDocPlain)) return topicid, nil } return "", errors.New("Something went wrong") } // check if func (t topicid) checkTopicID(msg *ESDoc) func() string { if msg.TopicID != "" { return func() string { return msg.TopicID } } if msg.Repto == "" { var genTopicID = func() string { topicid := uuid.New().String() log.Warn("New start message for topic " + topicid + " found") return topicid } return genTopicID } if msg.Repto != "" { var findAndSetTopicID = func() string { topicid, err := t.findTopicID(msg.Repto, 0) if err != nil { log.Error(err.Error()) return "" } log.Info("Topic ID " + topicid + " found for message " + msg.MsgID) return topicid } return findAndSetTopicID } return func() string { return "" } } // getorcreate topicID. Generate new unique topicID if message is start message // Find top message and get it topicid if ESDoc.Repto is not empty func (t topicid) getOrCreate(msg *ESDoc) string { f := t.checkTopicID(msg) return f() } // PutToIndex ... func (es ESConf) PutToIndex(msg ESDoc) error { putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") log.Print(putURI) // Assign topicID var t topicid t.es = es topic := t.getOrCreate(&msg) if msg.TopicID == "" && topic != "" { msg.TopicID = topic } doc, err := json.Marshal(msg) if err != nil { return err } req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() return err } // ESRes ES response structure type ESRes struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` Hits Hits `json:"hits"` } // Hits Founded documents type Hits struct { Total int `json:"total"` MaxScore float32 `json:"max_score"` Hits []Hit `json:"hits"` } // ES Document type Hit struct { Index string `json:"_index"` Type string `json:"_type"` ID string `json:"_id"` Score float32 `json:"_score"` Source ESDoc `json:"_source"` } // CheckID ... func (es ESConf) CheckID(id string) (bool, error) { searchURI := strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") body, err := doRequest(searchURI, searchQ, "POST") if err != nil { return false, err } var e ESRes err = json.Unmarshal(body, &e) if err != nil { return false, err } if e.Hits.Total > 0 { err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, "")) return false, err } if e.TimedOut { err = errors.New("Request time out") return false, err } return false, nil }