diff --git a/go.mod b/go.mod index 122b1d8..cb03c70 100644 --- a/go.mod +++ b/go.mod @@ -3,5 +3,6 @@ module gitea.difrex.ru/Umbrella/fetcher require ( github.com/emirpasic/gods v1.12.0 github.com/google/uuid v1.0.0 + github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343 github.com/sirupsen/logrus v1.2.0 ) diff --git a/go.sum b/go.sum index ec6cdb0..8d1e233 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343 h1:ABIlopLGU081SkX2KmXjho9vmm1MgPs38hxXCXC2BrM= +github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343/go.mod h1:XUvr43ZLN/4bTZT7TEhJA/rsfFLQxnggX6iU5TGXgIY= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= diff --git a/i2es/documents.go b/i2es/documents.go index 107f094..eda16bd 100644 --- a/i2es/documents.go +++ b/i2es/documents.go @@ -25,9 +25,10 @@ type ESConf struct { // ESRes ES response structure type ESRes struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Hits Hits `json:"hits"` + ScrollID string `json:"_scroll_id"` + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Hits Hits `json:"hits"` } // Hits Founded documents diff --git a/main.go b/main.go index 54c2dfc..f62209e 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "strconv" - log "github.com/Sirupsen/logrus" + log "github.com/sirupsen/logrus" "gitea.difrex.ru/Umbrella/fetcher/i2es" "gitea.difrex.ru/Umbrella/fetcher/idec" @@ -137,8 +137,8 @@ func main() { newMessages = append(newMessages, esd) } - err = elastic.AssignTopics(newMessages) - if err != nil { - log.Error(err.Error()) - } + // err = elastic.AssignTopics(newMessages) + // if err != nil { + // log.Error(err.Error()) + // } } diff --git a/utils/reassign_topics/elastic.go b/utils/reassign_topics/elastic.go index d1e4718..bf7dd35 100644 --- a/utils/reassign_topics/elastic.go +++ b/utils/reassign_topics/elastic.go @@ -6,10 +6,12 @@ import "net/http" import log "github.com/sirupsen/logrus" import "encoding/json" import "strings" +import "io/ioutil" type Conf struct { - ES i2es.ESConf - Step int + ES i2es.ESConf + Step, From int + ScrollID string } type Stats struct { @@ -54,6 +56,29 @@ func (c *Conf) getDocsCount() int64 { return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64)) } +func (c *Conf) doRequest(method, reqURL, data string) ([]byte, error) { + var content []byte + req, err := http.NewRequest(method, reqURL, strings.NewReader(data)) + if err != nil { + return content, err + } + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return content, err + } + + defer resp.Body.Close() + content, err = ioutil.ReadAll(resp.Body) + if err != nil { + return content, err + } + + return content, nil +} + func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) { var res i2es.ESRes req, err := http.NewRequest("POST", reqURL, strings.NewReader(query)) @@ -62,6 +87,7 @@ func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) { return res, err } req.Header.Add("Content-Type", "application/json") + log.Info(query) client := &http.Client{} resp, err := client.Do(req) @@ -82,12 +108,20 @@ func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) { } func (c *Conf) GetDocs(from int) i2es.ESRes { - reqURL := fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index) - query := fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, 0) - log.Infof("Do query `%s`\n", query) + var reqURL, query string + if c.ScrollID == "" { + reqURL = fmt.Sprintf("%s/%s/_search?scroll=5m", c.ES.Host, c.ES.Index) + query = fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, from) + } else { + reqURL = fmt.Sprintf("%s/_search/scroll", c.ES.Host) + query = fmt.Sprintf(`{"scroll": "5m", "scroll_id": "%s"}`, c.ScrollID) + } res, err := c.doSearchRequest(reqURL, query) if err != nil { log.Fatal(err) } + if c.ScrollID == "" { + c.ScrollID = res.ScrollID + } return res } diff --git a/utils/reassign_topics/main.go b/utils/reassign_topics/main.go index 493adf4..4a5049f 100644 --- a/utils/reassign_topics/main.go +++ b/utils/reassign_topics/main.go @@ -4,12 +4,11 @@ import ( "flag" "gitea.difrex.ru/Umbrella/fetcher/i2es" - log "github.com/sirupsen/logrus" ) var ( es, index, esType string - step int + step, from int ) func init() { @@ -17,6 +16,7 @@ func init() { flag.StringVar(&index, "index", "idec", "Elasticsearch index") flag.StringVar(&esType, "esType", "post", "Elasticsearch document type") flag.IntVar(&step, "step", 100, "Scroll step") + flag.IntVar(&from, "from", 0, "Scroll from") flag.Parse() } @@ -28,9 +28,9 @@ func main() { Type: esType, }, Step: step, + From: from, } con := newContainer() conf.reassignTopic(&con) - - log.Infof("\n%+v\n", con.ToUpdate) + conf.UpdateDocs(&con) } diff --git a/utils/reassign_topics/processing.go b/utils/reassign_topics/processing.go index d9b2c5e..4d53b67 100644 --- a/utils/reassign_topics/processing.go +++ b/utils/reassign_topics/processing.go @@ -1,67 +1,221 @@ package main import ( + "encoding/json" + "fmt" + "strings" + + "sync" + + "text/template" + + "bytes" + "gitea.difrex.ru/Umbrella/fetcher/i2es" - "github.com/emirpasic/gods/maps/hashmap" "github.com/google/uuid" log "github.com/sirupsen/logrus" ) type Container struct { - TopPosts *hashmap.Map - Comments *hashmap.Map - ToUpdate *hashmap.Map + TopPosts *HashMap + Comments *HashMap + ToUpdate *HashMap + Thread *HashMap +} + +type HashMap struct { + mux sync.Mutex + Map map[string]i2es.ESDoc +} + +func (h *HashMap) Put(key string, val i2es.ESDoc) { + h.mux.Lock() + h.Map[key] = val + h.mux.Unlock() +} + +func (h *HashMap) Get(key string) (i2es.ESDoc, bool) { + h.mux.Lock() + val, ok := h.Map[key] + h.mux.Unlock() + return val, ok +} + +func (h *HashMap) Remove(key string) { + h.mux.Lock() + delete(h.Map, key) + h.mux.Unlock() +} + +func (h *HashMap) Clear() { + h.mux.Lock() + for k, _ := range h.Map { + delete(h.Map, k) + } + h.mux.Unlock() +} + +func (h *HashMap) Values() []i2es.ESDoc { + h.mux.Lock() + var values []i2es.ESDoc + for _, v := range h.Map { + values = append(values, v) + } + h.mux.Unlock() + return values +} + +func (h *HashMap) Size() int { + return len(h.Map) } func newContainer() Container { + top := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } + comments := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } + toUpdate := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } + thread := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } return Container{ - hashmap.New(), - hashmap.New(), - hashmap.New(), + top, + comments, + toUpdate, + thread, } } func (c *Conf) reassignTopic(con *Container) { totalDocs := c.getDocsCount() + log.Info("Total documents in the index ", totalDocs) // Populate containers - for i := 0; i < int(totalDocs); i += c.Step { + for i := c.From; i < int(totalDocs); i += c.Step { msgs := c.GetDocs(i) + log.Infof("Processing %d documents", len(msgs.Hits.Hits)) for _, msg := range msgs.Hits.Hits { - if msg.Source.TopicID == "" { + if msg.Source.Repto == "" && msg.Source.TopicID == "" { m := msg.Source - m.TopicID = uuid.New().URN() - log.Infof("Assign new topic id `%s` for message `%s`\n", m.TopicID, m.MsgID) - con.TopPosts.Put(m.MsgID, m) + m.TopicID = strings.Split(uuid.New().URN(), ":")[2] + con.Comments.Put(m.MsgID, m) } else { m := msg.Source con.Comments.Put(m.MsgID, m) + con.ToUpdate.Put(m.MsgID, m) } } - c.processComments(con) - log.Info("Top posts size ", con.TopPosts.Size()) - log.Info("Comments size ", con.Comments.Size()) } + con.processComments() + log.Info("Top posts size ", con.TopPosts.Size()) + log.Info("Comments size ", con.Comments.Size()) + log.Info("To update size ", con.ToUpdate.Size()) log.Infof("\n%+v\n", con.ToUpdate.Size()) } -func (c *Conf) processComments(con *Container) { - for _, msg := range con.Comments.Values() { - m := msg.(i2es.ESDoc) - if top, ok := con.TopPosts.Get(m.Repto); ok { - log.Info("Found topic id ", top.(i2es.ESDoc).TopicID) - m.TopicID = top.(i2es.ESDoc).TopicID +func (con *Container) processComments() { + for _, m := range con.Comments.Values() { + if m.TopicID != "" { con.ToUpdate.Put(m.MsgID, m) - con.Comments.Remove(m.MsgID) - continue - } - if comment, ok := con.ToUpdate.Get(m.Repto); ok { - if comment.(i2es.ESDoc).TopicID != "" { - log.Info("Found topic id ", comment.(i2es.ESDoc).TopicID) - m.TopicID = comment.(i2es.ESDoc).TopicID - con.ToUpdate.Put(m.MsgID, m) - con.Comments.Remove(m.MsgID) - } + log.Infof("Message %s with topicid %s added to the updates list", m.MsgID, m.TopicID) + } else { + con.processSingle(1, m) } } } + +func (con *Container) processSingle(depth int, m i2es.ESDoc) { + maxDepth := 100 + if depth == maxDepth { + log.Warn("Max depth is reached!") + con.Thread.Clear() + return + } + if m.Repto != "" && m.TopicID == "" { + if comment, ok := con.Comments.Get(m.Repto); ok { + if comment.TopicID != "" { + log.Info("Found topic id ", comment.TopicID) + m.TopicID = comment.TopicID + con.ToUpdate.Put(m.MsgID, m) + con.Comments.Remove(m.MsgID) + return + } + if comment.Repto != "" && comment.TopicID == "" { + if c, ok := con.TopPosts.Get(comment.Repto); ok { + con.Thread.Put(comment.MsgID, comment) + con.processSingle(depth+1, c) + } + } else if comment.TopicID != "" { + con.Thread.Put(comment.MsgID, comment) + con.processThread(comment.TopicID) + return + } + } else { + log.Warnf("Message %s not found!", m.Repto) + con.Thread.Clear() + } + } else if m.TopicID != "" { + log.Info("Found topicid") + con.Thread.Put(m.MsgID, m) + con.processThread(m.TopicID) + return + } +} + +func (con *Container) processThread(id string) { + log.Infof("Processing thread with topicid %s\n", id) + for _, v := range con.Thread.Values() { + v.TopicID = id + con.ToUpdate.Put(v.MsgID, v) + // con.Comments.Remove(v.MsgID) + con.Thread.Remove(v.MsgID) + } +} + +type Plain struct { + ID string + Marshal string +} + +func (c *Conf) UpdateDocs(con *Container) { + var out []string + for k, v := range con.ToUpdate.Map { + var plain Plain + data, err := json.Marshal(v) + if err != nil { + log.Error(err) + } + plain.ID = k + plain.Marshal = string(data) + t := `{ "index": { "_index": "%s", "_type": "%s", "_id": "{{ .ID }}" }} +{{ .Marshal }} +` + tmpl, err := template.New("bulk").Parse(fmt.Sprintf(t, c.ES.Index, c.ES.Type)) + if err != nil { + log.Error(err) + } + + bulk := []byte("") + s := bytes.NewBuffer(bulk) + err = tmpl.Execute(s, plain) + if err != nil { + log.Error(err) + } + out = append(out, s.String()) + if len(out) >= 4999 { + _, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n")) + if err != nil { + log.Error(err) + } + out = []string{} + } + } + _, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n")) + if err != nil { + log.Error(err) + } +} diff --git a/utils/reassign_topics/reassign_topics b/utils/reassign_topics/reassign_topics index 970447e..3ba1d44 100755 Binary files a/utils/reassign_topics/reassign_topics and b/utils/reassign_topics/reassign_topics differ