TopicID realization

This commit is contained in:
Denis Zheleztsov 2018-01-05 17:05:45 +03:00
parent b041fcd80b
commit 0b6cc2e09d
2 changed files with 147 additions and 47 deletions

View File

@ -34,44 +34,135 @@ type ESConf struct {
Type string Type string
} }
type topicid struct{} type topicid struct {
es ESConf
}
// doPostRequest ...
func doRequest(uri, data, method string) ([]byte, error) {
req, err := http.NewRequest(method, uri, bytes.NewBuffer([]byte(data)))
req.Header.Add("Content-Type", "application/json")
if err != nil {
return []byte(""), err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return []byte(""), err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return []byte(""), err
}
return body, nil
}
// findTopicID for msgid // findTopicID for msgid
func (t topicid) findTopicID(msgid string) (string, error) { func (t topicid) findTopicID(msgid string, enter int) (string, error) {
var topicid string topicid := uuid.New().String()
searchURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, "_search"}, "/")
searchQ := strings.Join([]string{`{"sort": [{"date": {"order": "desc"}}, {"_score": { "order": "desc" }}], "query": { "query_string" : {"fields": ["msgid"], "query": "msgid.keyword:` + msgid + ` "}}, "size":' + show_rows + ', "from":' + from + '}`}, "")
body, err := doRequest(searchURI, searchQ, "POST")
if err != nil {
return "", err
}
var res ESRes
err = json.Unmarshal(body, &res)
if err != nil {
return "", err
}
if res.Hits.Total == 0 {
e := errors.New("MSGID " + msgid + " not found ")
return "", e
}
if res.Hits.Hits[0].Source.TopicID != "" {
topicid = res.Hits.Hits[0].Source.TopicID
}
if res.Hits.Hits[0].Source.Repto != "" && res.Hits.Hits[0].Source.TopicID == "" {
if enter > 25 {
log.Error("Maximum recurse depth. Top message not in index? Forse assign new topicID " + topicid)
return topicid, nil
} else {
t.findTopicID(res.Hits.Hits[0].Source.Repto, enter+1)
}
} else if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID != "" {
return res.Hits.Hits[0].Source.TopicID, nil
}
if res.Hits.Hits[0].Source.Repto == "" && res.Hits.Hits[0].Source.TopicID == "" {
// This is a old message without topicid => generate new topicid and update a top document
log.Warn(topicid)
log.Warn("Top message " + res.Hits.Hits[0].Source.MsgID + " found but it is doesn not have a topicID")
log.Warn("Generated new topicID " + topicid)
log.Warn("Update top message")
updateDoc := res.Hits.Hits[0].Source
updateDoc.TopicID = topicid
updateDocPlain, err := json.Marshal(updateDoc)
if err != nil {
e := errors.New("Cant serialize document: " + err.Error())
return "", e
}
postURI := strings.Join([]string{t.es.Host, t.es.Index, t.es.Type, updateDoc.MsgID}, "/")
_, err = doRequest(postURI, string(updateDocPlain), "PUT")
if err != nil {
e := errors.New("Cant update document: " + err.Error())
return "", e
}
log.Info(string(updateDocPlain))
}
return topicid, nil return topicid, nil
} }
// check if // check if
func (t topicid) checkID(msg *ESDoc) (bool, func()) { func (t topicid) checkTopicID(msg *ESDoc) func() string {
if msg.TopicID != "" { if msg.TopicID != "" {
return false, func() {} return func() string { return msg.TopicID }
} }
if msg.Repto == "" { if msg.Repto == "" {
var genTopicID = func() { var genTopicID = func() string {
u := uuid.New() topicid := uuid.New().String()
log.Debug("New UUID " + u.String() + " for message " + msg.MsgID) log.Warn("New start message for topic " + topicid + " found")
msg.TopicID = u.String() return topicid
} }
return true, genTopicID return genTopicID
} }
var findAndSetTopicID = func() { if msg.Repto != "" {
topicid, err := t.findTopicID(msg.MsgID) var findAndSetTopicID = func() string {
if err != nil { topicid, err := t.findTopicID(msg.MsgID, 0)
log.Error(err.Error()) if err != nil {
log.Error(err.Error())
}
log.Info("Topic ID " + topicid + " found for message " + msg.MsgID)
return topicid
} }
log.Debug("Topic ID " + topicid + " found for message " + msg.MsgID) return findAndSetTopicID
} }
return true, findAndSetTopicID
return func() string { return "" }
} }
// getorcreate topicID. Generate new unique topicID if message is start message // getorcreate topicID. Generate new unique topicID if message is start message
// Find top message and get it topicid if ESDoc.Repto is not empty // Find top message and get it topicid if ESDoc.Repto is not empty
func (t topicid) getOrCreate(msg *ESDoc) error { func (t topicid) getOrCreate(msg *ESDoc) string {
// TODO: write the code f := t.checkTopicID(msg)
return nil return f()
} }
// PutToIndex ... // PutToIndex ...
@ -79,11 +170,18 @@ func (es ESConf) PutToIndex(msg ESDoc) error {
putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/")
log.Print(putURI) log.Print(putURI)
// Assign topicID
var t topicid
t.es = es
topic := t.getOrCreate(&msg)
if msg.TopicID != topic {
msg.TopicID = topic
}
doc, err := json.Marshal(msg) doc, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
// log.Print(string(doc))
req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc))
req.Header.Add("Content-Type", "application/json") req.Header.Add("Content-Type", "application/json")
@ -99,18 +197,27 @@ func (es ESConf) PutToIndex(msg ESDoc) error {
return err return err
} }
// ESRes ES response minimal structure // ESRes ES response structure
type ESRes struct { type ESRes struct {
Took int `json:"took"` Took int `json:"took"`
TimedOut bool `json:"timed_out"` TimedOut bool `json:"timed_out"`
Hits Hits `json:"hits"` Hits Hits `json:"hits"`
} }
// Hits ... // Hits Founded documents
type Hits struct { type Hits struct {
Total int `json:"total"` Total int `json:"total"`
MaxScore float32 `json:"max_score"` MaxScore float32 `json:"max_score"`
Hits []interface{} `json:"hits"` Hits []Hit `json:"hits"`
}
// ES Document
type Hit struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Score float32 `json:"_score"`
Source ESDoc `json:"_source"`
} }
// CheckID ... // CheckID ...
@ -119,22 +226,7 @@ func (es ESConf) CheckID(id string) (bool, error) {
searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "")
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ))) body, err := doRequest(searchURI, searchQ, "POST")
req.Header.Add("Content-Type", "application/json")
if err != nil {
return false, err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -149,6 +241,7 @@ func (es ESConf) CheckID(id string) (bool, error) {
err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, "")) err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, ""))
return false, err return false, err
} }
if e.TimedOut { if e.TimedOut {
err = errors.New("Request time out") err = errors.New("Request time out")
return false, err return false, err

19
main.go
View File

@ -14,7 +14,7 @@ import (
var inode string var inode string
var offset int var offset int
var limit int var limit int
var all bool var all, reindex bool
var es string var es string
var esIndex string var esIndex string
var esType string var esType string
@ -24,6 +24,7 @@ func init() {
flag.IntVar(&offset, "offset", 0, "<int>") flag.IntVar(&offset, "offset", 0, "<int>")
flag.IntVar(&limit, "limit", -1, "<int>") flag.IntVar(&limit, "limit", -1, "<int>")
flag.BoolVar(&all, "all", false, "Get all messages") flag.BoolVar(&all, "all", false, "Get all messages")
flag.BoolVar(&reindex, "reindex", false, "Force update an a message")
flag.StringVar(&inode, "inode", "https://ii-net.tk/ii/ii-point.php?q=/", "IDEC node URI") flag.StringVar(&inode, "inode", "https://ii-net.tk/ii/ii-point.php?q=/", "IDEC node URI")
flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host") flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host")
flag.StringVar(&esIndex, "esindex", "idec", "ES index") flag.StringVar(&esIndex, "esindex", "idec", "ES index")
@ -81,7 +82,7 @@ func main() {
for _, id := range ids { for _, id := range ids {
stash = append(stash, id) stash = append(stash, id)
j++ j++
if j == 20 { if j == 50 {
m, err := fc.GetRawMessages(stash) m, err := fc.GetRawMessages(stash)
if err != nil { if err != nil {
log.Print(err) log.Print(err)
@ -98,7 +99,8 @@ func main() {
msg, err := idec.ParseMessage(m.Message) msg, err := idec.ParseMessage(m.Message)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return log.Error(m.ID)
continue
} }
var esd i2es.ESDoc var esd i2es.ESDoc
@ -113,17 +115,22 @@ func main() {
esd.Repto = msg.Repto esd.Repto = msg.Repto
esd.Address = msg.Address esd.Address = msg.Address
if reindex && msg.Repto != "" {
log.Warn(msg.Repto)
continue
}
log.Debug("Check message ", m.ID) log.Debug("Check message ", m.ID)
_, err = elastic.CheckID(m.ID) _, err = elastic.CheckID(m.ID)
if err != nil { if err != nil && !reindex {
log.Warn(err.Error()) log.Warn("New thread message " + m.ID)
continue continue
} }
err = elastic.PutToIndex(esd) err = elastic.PutToIndex(esd)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
continue
} }
log.Warn("Message ", m.ID, " added to index") log.Warn("Message ", m.ID, " added to index")
} }
} }