diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..122b1d8 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module gitea.difrex.ru/Umbrella/fetcher + +require ( + github.com/emirpasic/gods v1.12.0 + github.com/google/uuid v1.0.0 + github.com/sirupsen/logrus v1.2.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ec6cdb0 --- /dev/null +++ b/go.sum @@ -0,0 +1,15 @@ +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= +github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/i2es/elastic.go b/i2es/elastic.go index 8a94241..6605821 100644 --- a/i2es/elastic.go +++ b/i2es/elastic.go @@ -8,7 +8,7 @@ import ( "net/http" "strings" - log "github.com/Sirupsen/logrus" + log "github.com/sirupsen/logrus" ) type topicid struct { diff --git a/i2es/topics.go b/i2es/topics.go index 82dc99f..37b2474 100644 --- a/i2es/topics.go +++ b/i2es/topics.go @@ -5,8 +5,8 @@ import ( "errors" "strings" - log "github.com/Sirupsen/logrus" "github.com/google/uuid" + log "github.com/sirupsen/logrus" ) // getorcreate topicID. Generate new unique topicID if message is start message diff --git a/idec/parser.go b/idec/parser.go index ba95eb5..d22094c 100644 --- a/idec/parser.go +++ b/idec/parser.go @@ -4,7 +4,7 @@ import ( "strconv" "strings" - idec "github.com/Difrex/go-idec" + idec "github.com/idec-net/go-idec" ) // ParseMessage ... diff --git a/idec/proto.go b/idec/proto.go index bcdc1ab..4b59907 100644 --- a/idec/proto.go +++ b/idec/proto.go @@ -9,7 +9,7 @@ import ( "strconv" "strings" - log "github.com/Sirupsen/logrus" + log "github.com/sirupsen/logrus" ) // IDEC Extensions. see: https://ii-net.tk/idec-doc/?p=extensions diff --git a/utils/reassign_topics/elastic.go b/utils/reassign_topics/elastic.go index 2111a6a..d1e4718 100644 --- a/utils/reassign_topics/elastic.go +++ b/utils/reassign_topics/elastic.go @@ -3,7 +3,7 @@ package main import "gitea.difrex.ru/Umbrella/fetcher/i2es" import "fmt" import "net/http" -import log "github.com/Sirupsen/logrus" +import log "github.com/sirupsen/logrus" import "encoding/json" import "strings" @@ -54,14 +54,12 @@ func (c *Conf) getDocsCount() int64 { return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64)) } -func (c *Conf) getDocs(from int) i2es.ESRes { +func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) { var res i2es.ESRes - reqURL := fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index) - query := fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, 0) req, err := http.NewRequest("POST", reqURL, strings.NewReader(query)) if err != nil { log.Error(err) - return res + return res, err } req.Header.Add("Content-Type", "application/json") @@ -69,7 +67,7 @@ func (c *Conf) getDocs(from int) i2es.ESRes { resp, err := client.Do(req) if err != nil { log.Error(err) - return res + return res, err } defer resp.Body.Close() @@ -77,8 +75,19 @@ func (c *Conf) getDocs(from int) i2es.ESRes { err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { log.Error(err) - return res + return res, err } + return res, nil +} + +func (c *Conf) GetDocs(from int) i2es.ESRes { + reqURL := fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index) + query := fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, 0) + log.Infof("Do query `%s`\n", query) + res, err := c.doSearchRequest(reqURL, query) + if err != nil { + log.Fatal(err) + } return res } diff --git a/utils/reassign_topics/main.go b/utils/reassign_topics/main.go index 559ce76..493adf4 100644 --- a/utils/reassign_topics/main.go +++ b/utils/reassign_topics/main.go @@ -3,9 +3,8 @@ package main import ( "flag" - "fmt" - "gitea.difrex.ru/Umbrella/fetcher/i2es" + log "github.com/sirupsen/logrus" ) var ( @@ -30,12 +29,8 @@ func main() { }, Step: step, } + con := newContainer() + conf.reassignTopic(&con) - count := conf.getDocsCount() - fmt.Println(count) - - for i := 0; int64(i) < count; i += step { - docs := conf.getDocs(i) - fmt.Printf("%d\n", len(docs.Hits.Hits)) - } + log.Infof("\n%+v\n", con.ToUpdate) } diff --git a/utils/reassign_topics/processing.go b/utils/reassign_topics/processing.go new file mode 100644 index 0000000..d9b2c5e --- /dev/null +++ b/utils/reassign_topics/processing.go @@ -0,0 +1,67 @@ +package main + +import ( + "gitea.difrex.ru/Umbrella/fetcher/i2es" + "github.com/emirpasic/gods/maps/hashmap" + "github.com/google/uuid" + log "github.com/sirupsen/logrus" +) + +type Container struct { + TopPosts *hashmap.Map + Comments *hashmap.Map + ToUpdate *hashmap.Map +} + +func newContainer() Container { + return Container{ + hashmap.New(), + hashmap.New(), + hashmap.New(), + } +} + +func (c *Conf) reassignTopic(con *Container) { + totalDocs := c.getDocsCount() + + // Populate containers + for i := 0; i < int(totalDocs); i += c.Step { + msgs := c.GetDocs(i) + for _, msg := range msgs.Hits.Hits { + if msg.Source.TopicID == "" { + m := msg.Source + m.TopicID = uuid.New().URN() + log.Infof("Assign new topic id `%s` for message `%s`\n", m.TopicID, m.MsgID) + con.TopPosts.Put(m.MsgID, m) + } else { + m := msg.Source + con.Comments.Put(m.MsgID, m) + } + } + c.processComments(con) + log.Info("Top posts size ", con.TopPosts.Size()) + log.Info("Comments size ", con.Comments.Size()) + } + log.Infof("\n%+v\n", con.ToUpdate.Size()) +} + +func (c *Conf) processComments(con *Container) { + for _, msg := range con.Comments.Values() { + m := msg.(i2es.ESDoc) + if top, ok := con.TopPosts.Get(m.Repto); ok { + log.Info("Found topic id ", top.(i2es.ESDoc).TopicID) + m.TopicID = top.(i2es.ESDoc).TopicID + con.ToUpdate.Put(m.MsgID, m) + con.Comments.Remove(m.MsgID) + continue + } + if comment, ok := con.ToUpdate.Get(m.Repto); ok { + if comment.(i2es.ESDoc).TopicID != "" { + log.Info("Found topic id ", comment.(i2es.ESDoc).TopicID) + m.TopicID = comment.(i2es.ESDoc).TopicID + con.ToUpdate.Put(m.MsgID, m) + con.Comments.Remove(m.MsgID) + } + } + } +} diff --git a/utils/reassign_topics/reassign_topics b/utils/reassign_topics/reassign_topics new file mode 100755 index 0000000..970447e Binary files /dev/null and b/utils/reassign_topics/reassign_topics differ