diff --git a/create_index.sh b/create_index.sh deleted file mode 100755 index 08eacb8..0000000 --- a/create_index.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/sh -curl -XDELETE 'http://localhost:9200/iinetwork' && echo -curl -XPUT 'http://localhost:9200/iinetwork' -d '{ - "settings": { - "analysis": { - "analyzer": { - "my_analyzer": { - "type": "custom", - "tokenizer": "standard", - "filter": ["lowercase", "russian_morphology", "english_morphology", "my_stopwords"] - } - }, - "filter": { - "my_stopwords": { - "type": "stop", - "stopwords": "а,без,более,бы,был,была,были,было,быть,в,вам,вас,весь,во,вот,все,всего,всех,вы,где,да,даже,для,до,его,ее,если,есть,еще,же,за,здесь,и,из,или,им,их,к,как,ко,когда,кто,ли,либо,мне,может,мы,на,надо,наш,не,него,нее,нет,ни,них,но,ну,о,об,однако,он,она,они,оно,от,очень,по,под,при,с,со,так,также,такой,там,те,тем,то,того,тоже,той,только,том,ты,у,уже,хотя,чего,чей,чем,что,чтобы,чье,чья,эта,эти,это,я,a,an,and,are,as,at,be,but,by,for,if,in,into,is,it,no,not,of,on,or,such,that,the,their,then,there,these,they,this,to,was,will,with" - } - } - } - } -}' && echo -curl -XPUT 'http://localhost:9200/iinetwork/post/_mapping' -d '{ - "post": { - "_all" : {"analyzer" : "russian_morphology"}, - "properties" : { - "post" : { "type" : "string", "analyzer" : "russian_morphology" } - } - } -}' && echo - -curl -XPOST 'http://localhost:9200/iinetwork/_refresh' && echo diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cb03c70 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module gitea.difrex.ru/Umbrella/fetcher + +require ( + github.com/emirpasic/gods v1.12.0 + github.com/google/uuid v1.0.0 + github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343 + github.com/sirupsen/logrus v1.2.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8d1e233 --- /dev/null +++ b/go.sum @@ -0,0 +1,17 @@ +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= +github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343 h1:ABIlopLGU081SkX2KmXjho9vmm1MgPs38hxXCXC2BrM= +github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343/go.mod h1:XUvr43ZLN/4bTZT7TEhJA/rsfFLQxnggX6iU5TGXgIY= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/i2es/documents.go b/i2es/documents.go index 107f094..eda16bd 100644 --- a/i2es/documents.go +++ b/i2es/documents.go @@ -25,9 +25,10 @@ type ESConf struct { // ESRes ES response structure type ESRes struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Hits Hits `json:"hits"` + ScrollID string `json:"_scroll_id"` + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Hits Hits `json:"hits"` } // Hits Founded documents diff --git a/i2es/elastic.go b/i2es/elastic.go index 8a94241..6605821 100644 --- a/i2es/elastic.go +++ b/i2es/elastic.go @@ -8,7 +8,7 @@ import ( "net/http" "strings" - log "github.com/Sirupsen/logrus" + log "github.com/sirupsen/logrus" ) type topicid struct { diff --git a/i2es/topics.go b/i2es/topics.go index 82dc99f..37b2474 100644 --- a/i2es/topics.go +++ b/i2es/topics.go @@ -5,8 +5,8 @@ import ( "errors" "strings" - log "github.com/Sirupsen/logrus" "github.com/google/uuid" + log "github.com/sirupsen/logrus" ) // getorcreate topicID. Generate new unique topicID if message is start message diff --git a/idec/parser.go b/idec/parser.go index ba95eb5..d22094c 100644 --- a/idec/parser.go +++ b/idec/parser.go @@ -4,7 +4,7 @@ import ( "strconv" "strings" - idec "github.com/Difrex/go-idec" + idec "github.com/idec-net/go-idec" ) // ParseMessage ... diff --git a/idec/proto.go b/idec/proto.go index bcdc1ab..4b59907 100644 --- a/idec/proto.go +++ b/idec/proto.go @@ -9,7 +9,7 @@ import ( "strconv" "strings" - log "github.com/Sirupsen/logrus" + log "github.com/sirupsen/logrus" ) // IDEC Extensions. see: https://ii-net.tk/idec-doc/?p=extensions diff --git a/main.go b/main.go index 54c2dfc..9b758cf 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "strconv" - log "github.com/Sirupsen/logrus" + log "github.com/sirupsen/logrus" "gitea.difrex.ru/Umbrella/fetcher/i2es" "gitea.difrex.ru/Umbrella/fetcher/idec" diff --git a/mapping.json b/mapping.json new file mode 100644 index 0000000..53d788c --- /dev/null +++ b/mapping.json @@ -0,0 +1,84 @@ +{ + "mappings": { + "post": { + "properties": { + "address": { + "type": "keyword" + }, + "author": { + "type": "keyword" + }, + "date": { + "type": "date", + "format": "epoch_second" + }, + "echo": { + "type": "keyword" + + }, + "message": { + "type": "text", + "fields": { + "russian": { + "type": "text", + "analyzer": "russian" + } + }, + "analyzer": "standard" + }, + "misplaced": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + + } + + } + + }, + "msgid": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + + } + + } + + }, + "repto": { + "type": "keyword" + }, + "subg": { + "type": "text", + "fields": { + "russian": { + "type": "text", + "analyzer": "russian" + + } + + }, + "analyzer": "standard" + + }, + "tags": { + "type": "keyword" + + }, + "to": { + "type": "keyword" + }, + "topicid": { + "type": "keyword" + } + + } + + } + } +} diff --git a/utils/reassign_topics/elastic.go b/utils/reassign_topics/elastic.go index 21bfa59..0d3df21 100644 --- a/utils/reassign_topics/elastic.go +++ b/utils/reassign_topics/elastic.go @@ -3,19 +3,22 @@ package main import "gitea.difrex.ru/Umbrella/fetcher/i2es" import "fmt" import "net/http" -import log "github.com/Sirupsen/logrus" +import log "github.com/sirupsen/logrus" import "encoding/json" +import "strings" +import "io/ioutil" type Conf struct { - ES i2es.ESConf - Step int + ES i2es.ESConf + Step, From, Limit int + ScrollID string } type Stats struct { Indices IndexStats `json:"indices"` } -type IndexStats map[string]interface{} +type IndexStats map[string]map[string]interface{} // "indices": { // "idec": { @@ -45,7 +48,91 @@ func (c *Conf) getDocsCount() int64 { var stats Stats err = json.NewDecoder(resp.Body).Decode(&stats) - log.Infof("%+v", stats) + if err != nil { + log.Error(err) + return -1 + } - return -1 + return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64)) +} + +func (c *Conf) doRequest(method, reqURL, data string) ([]byte, error) { + var content []byte + req, err := http.NewRequest(method, reqURL, strings.NewReader(data)) + if err != nil { + return content, err + } + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return content, err + } + + defer resp.Body.Close() + content, err = ioutil.ReadAll(resp.Body) + if err != nil { + return content, err + } + + return content, nil +} + +func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) { + var res i2es.ESRes + req, err := http.NewRequest("POST", reqURL, strings.NewReader(query)) + if err != nil { + log.Error(err) + return res, err + } + req.Header.Add("Content-Type", "application/json") + log.Info(query) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Error(err) + return res, err + } + + defer resp.Body.Close() + + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + log.Error(err) + return res, err + } + + return res, nil +} + +func (c *Conf) GetDocs(from int) i2es.ESRes { + var reqURL, query string + if c.ScrollID == "" { + reqURL = fmt.Sprintf("%s/%s/_search?scroll=5m", c.ES.Host, c.ES.Index) + query = fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, from) + } else { + reqURL = fmt.Sprintf("%s/_search/scroll", c.ES.Host) + query = fmt.Sprintf(`{"scroll": "5m", "scroll_id": "%s"}`, c.ScrollID) + } + res, err := c.doSearchRequest(reqURL, query) + if err != nil { + log.Fatal(err) + } + if c.ScrollID == "" { + c.ScrollID = res.ScrollID + } + return res +} + +func (c *Conf) GetLatests() i2es.ESRes { + var reqURL, query string + reqURL = fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index) + query = fmt.Sprintf(`{"sort": {"date": {"order": "desc"}}, "size": %d}`, c.Limit) + res, err := c.doSearchRequest(reqURL, query) + if err != nil { + log.Fatal(err) + } + return res } diff --git a/utils/reassign_topics/main.go b/utils/reassign_topics/main.go index 875a74b..9466a90 100644 --- a/utils/reassign_topics/main.go +++ b/utils/reassign_topics/main.go @@ -3,14 +3,13 @@ package main import ( "flag" - "fmt" - "gitea.difrex.ru/Umbrella/fetcher/i2es" ) var ( es, index, esType string - step int + step, from, limit int + latest bool ) func init() { @@ -18,6 +17,9 @@ func init() { flag.StringVar(&index, "index", "idec", "Elasticsearch index") flag.StringVar(&esType, "esType", "post", "Elasticsearch document type") flag.IntVar(&step, "step", 100, "Scroll step") + flag.IntVar(&from, "from", 0, "Scroll from") + flag.IntVar(&limit, "limit", 2500, "Limit for latest posts") + flag.BoolVar(&latest, "latest", false, "Processing only latest posts") flag.Parse() } @@ -28,9 +30,15 @@ func main() { Index: index, Type: esType, }, - Step: step, + Step: step, + From: from, + Limit: limit, } - - count := conf.getDocsCount() - fmt.Println(count) + con := newContainer() + if latest { + conf.assignLatests(&con) + } else { + conf.reassignTopic(&con) + } + conf.UpdateDocs(&con) } diff --git a/utils/reassign_topics/processing.go b/utils/reassign_topics/processing.go new file mode 100644 index 0000000..ae82f34 --- /dev/null +++ b/utils/reassign_topics/processing.go @@ -0,0 +1,243 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + + "sync" + + "text/template" + + "bytes" + + "gitea.difrex.ru/Umbrella/fetcher/i2es" + "github.com/google/uuid" + log "github.com/sirupsen/logrus" +) + +type Container struct { + TopPosts *HashMap + Comments *HashMap + ToUpdate *HashMap + Thread *HashMap +} + +type HashMap struct { + mux sync.Mutex + Map map[string]i2es.ESDoc +} + +func (h *HashMap) Put(key string, val i2es.ESDoc) { + h.mux.Lock() + h.Map[key] = val + h.mux.Unlock() +} + +func (h *HashMap) Get(key string) (i2es.ESDoc, bool) { + h.mux.Lock() + val, ok := h.Map[key] + h.mux.Unlock() + return val, ok +} + +func (h *HashMap) Remove(key string) { + h.mux.Lock() + delete(h.Map, key) + h.mux.Unlock() +} + +func (h *HashMap) Clear() { + h.mux.Lock() + for k, _ := range h.Map { + delete(h.Map, k) + } + h.mux.Unlock() +} + +func (h *HashMap) Values() []i2es.ESDoc { + h.mux.Lock() + var values []i2es.ESDoc + for _, v := range h.Map { + values = append(values, v) + } + h.mux.Unlock() + return values +} + +func (h *HashMap) Size() int { + return len(h.Map) +} + +func newContainer() Container { + top := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } + comments := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } + toUpdate := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } + thread := &HashMap{ + Map: make(map[string]i2es.ESDoc), + } + return Container{ + top, + comments, + toUpdate, + thread, + } +} + +func (c *Conf) reassignTopic(con *Container) { + totalDocs := c.getDocsCount() + log.Info("Total documents in the index ", totalDocs) + + // Populate containers + for i := c.From; i < int(totalDocs); i += c.Step { + msgs := c.GetDocs(i) + log.Infof("Processing %d documents", len(msgs.Hits.Hits)) + for _, msg := range msgs.Hits.Hits { + if msg.Source.Repto == "" && msg.Source.TopicID == "" { + m := msg.Source + m.TopicID = strings.Split(uuid.New().URN(), ":")[2] + con.Comments.Put(m.MsgID, m) + } else { + m := msg.Source + con.Comments.Put(m.MsgID, m) + con.ToUpdate.Put(m.MsgID, m) + } + } + } + con.processComments() + log.Info("Top posts size ", con.TopPosts.Size()) + log.Info("Comments size ", con.Comments.Size()) + log.Info("To update size ", con.ToUpdate.Size()) + log.Infof("\n%+v\n", con.ToUpdate.Size()) +} + +func (c *Conf) assignLatests(con *Container) { + // Populate containers + msgs := c.GetLatests() + log.Infof("Processing %d documents", len(msgs.Hits.Hits)) + for _, msg := range msgs.Hits.Hits { + if msg.Source.Repto == "" && msg.Source.TopicID == "" { + m := msg.Source + m.TopicID = strings.Split(uuid.New().URN(), ":")[2] + con.Comments.Put(m.MsgID, m) + } else { + m := msg.Source + con.Comments.Put(m.MsgID, m) + con.ToUpdate.Put(m.MsgID, m) + } + } + con.processComments() + log.Info("Top posts size ", con.TopPosts.Size()) + log.Info("Comments size ", con.Comments.Size()) + log.Info("To update size ", con.ToUpdate.Size()) + log.Infof("\n%+v\n", con.ToUpdate.Size()) +} + +func (con *Container) processComments() { + for _, m := range con.Comments.Values() { + if m.TopicID != "" { + con.ToUpdate.Put(m.MsgID, m) + log.Infof("Message %s with topicid %s added to the updates list", m.MsgID, m.TopicID) + } else { + con.processSingle(1, m) + } + } +} + +func (con *Container) processSingle(depth int, m i2es.ESDoc) { + maxDepth := 100 + if depth == maxDepth { + log.Warn("Max depth is reached!") + con.Thread.Clear() + return + } + if m.Repto != "" && m.TopicID == "" { + if comment, ok := con.Comments.Get(m.Repto); ok { + if comment.TopicID != "" { + log.Info("Found topic id ", comment.TopicID) + m.TopicID = comment.TopicID + con.ToUpdate.Put(m.MsgID, m) + con.Comments.Remove(m.MsgID) + return + } + if comment.Repto != "" && comment.TopicID == "" { + if c, ok := con.TopPosts.Get(comment.Repto); ok { + con.Thread.Put(comment.MsgID, comment) + con.processSingle(depth+1, c) + } + } else if comment.TopicID != "" { + con.Thread.Put(comment.MsgID, comment) + con.processThread(comment.TopicID) + return + } + } else { + log.Warnf("Message %s not found!", m.Repto) + con.Thread.Clear() + } + } else if m.TopicID != "" { + log.Info("Found topicid") + con.Thread.Put(m.MsgID, m) + con.processThread(m.TopicID) + return + } +} + +func (con *Container) processThread(id string) { + log.Infof("Processing thread with topicid %s\n", id) + for _, v := range con.Thread.Values() { + v.TopicID = id + con.ToUpdate.Put(v.MsgID, v) + // con.Comments.Remove(v.MsgID) + con.Thread.Remove(v.MsgID) + } +} + +type Plain struct { + ID string + Marshal string +} + +func (c *Conf) UpdateDocs(con *Container) { + var out []string + for k, v := range con.ToUpdate.Map { + var plain Plain + data, err := json.Marshal(v) + if err != nil { + log.Error(err) + } + plain.ID = k + plain.Marshal = string(data) + t := `{ "index": { "_index": "%s", "_type": "%s", "_id": "{{ .ID }}" }} +{{ .Marshal }} +` + tmpl, err := template.New("bulk").Parse(fmt.Sprintf(t, c.ES.Index, c.ES.Type)) + if err != nil { + log.Error(err) + } + + bulk := []byte("") + s := bytes.NewBuffer(bulk) + err = tmpl.Execute(s, plain) + if err != nil { + log.Error(err) + } + out = append(out, s.String()) + if len(out) >= 4999 { + _, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n")) + if err != nil { + log.Error(err) + } + out = []string{} + } + } + _, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n")) + if err != nil { + log.Error(err) + } +}