[utils/reassign topics]: Assign latest messages

This commit is contained in:
Denis Zheleztsov 2018-11-12 15:39:24 +03:00
parent e728433edd
commit 703c4ed1f7
Signed by: Difrex
GPG Key ID: B047A0E62A285621
5 changed files with 52 additions and 11 deletions

View File

@ -137,8 +137,8 @@ func main() {
newMessages = append(newMessages, esd) newMessages = append(newMessages, esd)
} }
// err = elastic.AssignTopics(newMessages) err = elastic.AssignTopics(newMessages)
// if err != nil { if err != nil {
// log.Error(err.Error()) log.Error(err.Error())
// } }
} }

View File

@ -9,9 +9,9 @@ import "strings"
import "io/ioutil" import "io/ioutil"
type Conf struct { type Conf struct {
ES i2es.ESConf ES i2es.ESConf
Step, From int Step, From, Limit int
ScrollID string ScrollID string
} }
type Stats struct { type Stats struct {
@ -125,3 +125,14 @@ func (c *Conf) GetDocs(from int) i2es.ESRes {
} }
return res return res
} }
func (c *Conf) GetLatests() i2es.ESRes {
var reqURL, query string
reqURL = fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index)
query = fmt.Sprintf(`{"sort": {"date": {"order": "desc"}}, "size": %d}`, c.Limit)
res, err := c.doSearchRequest(reqURL, query)
if err != nil {
log.Fatal(err)
}
return res
}

View File

@ -8,7 +8,8 @@ import (
var ( var (
es, index, esType string es, index, esType string
step, from int step, from, limit int
latest bool
) )
func init() { func init() {
@ -17,6 +18,8 @@ func init() {
flag.StringVar(&esType, "esType", "post", "Elasticsearch document type") flag.StringVar(&esType, "esType", "post", "Elasticsearch document type")
flag.IntVar(&step, "step", 100, "Scroll step") flag.IntVar(&step, "step", 100, "Scroll step")
flag.IntVar(&from, "from", 0, "Scroll from") flag.IntVar(&from, "from", 0, "Scroll from")
flag.IntVar(&limit, "limit", 2500, "Limit for latest posts")
flag.BoolVar(&latest, "latest", false, "Processing only latest posts")
flag.Parse() flag.Parse()
} }
@ -27,10 +30,15 @@ func main() {
Index: index, Index: index,
Type: esType, Type: esType,
}, },
Step: step, Step: step,
From: from, From: from,
Limit: limit,
} }
con := newContainer() con := newContainer()
conf.reassignTopic(&con) if latest {
conf.assignLatests(&con)
} else {
conf.reassignTopic(&con)
}
conf.UpdateDocs(&con) conf.UpdateDocs(&con)
} }

View File

@ -117,6 +117,28 @@ func (c *Conf) reassignTopic(con *Container) {
log.Infof("\n%+v\n", con.ToUpdate.Size()) log.Infof("\n%+v\n", con.ToUpdate.Size())
} }
func (c *Conf) assignLatests(con *Container) {
// Populate containers
msgs := c.GetLatests()
log.Infof("Processing %d documents", len(msgs.Hits.Hits))
for _, msg := range msgs.Hits.Hits {
if msg.Source.Repto == "" && msg.Source.TopicID == "" {
m := msg.Source
m.TopicID = strings.Split(uuid.New().URN(), ":")[2]
con.Comments.Put(m.MsgID, m)
} else {
m := msg.Source
con.Comments.Put(m.MsgID, m)
con.ToUpdate.Put(m.MsgID, m)
}
}
con.processComments()
log.Info("Top posts size ", con.TopPosts.Size())
log.Info("Comments size ", con.Comments.Size())
log.Info("To update size ", con.ToUpdate.Size())
log.Infof("\n%+v\n", con.ToUpdate.Size())
}
func (con *Container) processComments() { func (con *Container) processComments() {
for _, m := range con.Comments.Values() { for _, m := range con.Comments.Values() {
if m.TopicID != "" { if m.TopicID != "" {