[utils/reassign topics]: Working reassign
This commit is contained in:
parent
4b032d4951
commit
e728433edd
1
go.mod
1
go.mod
@ -3,5 +3,6 @@ module gitea.difrex.ru/Umbrella/fetcher
|
||||
require (
|
||||
github.com/emirpasic/gods v1.12.0
|
||||
github.com/google/uuid v1.0.0
|
||||
github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343
|
||||
github.com/sirupsen/logrus v1.2.0
|
||||
)
|
||||
|
2
go.sum
2
go.sum
@ -3,6 +3,8 @@ github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg
|
||||
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
|
||||
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343 h1:ABIlopLGU081SkX2KmXjho9vmm1MgPs38hxXCXC2BrM=
|
||||
github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343/go.mod h1:XUvr43ZLN/4bTZT7TEhJA/rsfFLQxnggX6iU5TGXgIY=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
|
||||
|
@ -25,6 +25,7 @@ type ESConf struct {
|
||||
|
||||
// ESRes ES response structure
|
||||
type ESRes struct {
|
||||
ScrollID string `json:"_scroll_id"`
|
||||
Took int `json:"took"`
|
||||
TimedOut bool `json:"timed_out"`
|
||||
Hits Hits `json:"hits"`
|
||||
|
10
main.go
10
main.go
@ -6,7 +6,7 @@ import (
|
||||
|
||||
"strconv"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
||||
"gitea.difrex.ru/Umbrella/fetcher/idec"
|
||||
@ -137,8 +137,8 @@ func main() {
|
||||
newMessages = append(newMessages, esd)
|
||||
}
|
||||
|
||||
err = elastic.AssignTopics(newMessages)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
// err = elastic.AssignTopics(newMessages)
|
||||
// if err != nil {
|
||||
// log.Error(err.Error())
|
||||
// }
|
||||
}
|
||||
|
@ -6,10 +6,12 @@ import "net/http"
|
||||
import log "github.com/sirupsen/logrus"
|
||||
import "encoding/json"
|
||||
import "strings"
|
||||
import "io/ioutil"
|
||||
|
||||
type Conf struct {
|
||||
ES i2es.ESConf
|
||||
Step int
|
||||
Step, From int
|
||||
ScrollID string
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
@ -54,6 +56,29 @@ func (c *Conf) getDocsCount() int64 {
|
||||
return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64))
|
||||
}
|
||||
|
||||
func (c *Conf) doRequest(method, reqURL, data string) ([]byte, error) {
|
||||
var content []byte
|
||||
req, err := http.NewRequest(method, reqURL, strings.NewReader(data))
|
||||
if err != nil {
|
||||
return content, err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return content, err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
content, err = ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return content, err
|
||||
}
|
||||
|
||||
return content, nil
|
||||
}
|
||||
|
||||
func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) {
|
||||
var res i2es.ESRes
|
||||
req, err := http.NewRequest("POST", reqURL, strings.NewReader(query))
|
||||
@ -62,6 +87,7 @@ func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) {
|
||||
return res, err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
log.Info(query)
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
@ -82,12 +108,20 @@ func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) {
|
||||
}
|
||||
|
||||
func (c *Conf) GetDocs(from int) i2es.ESRes {
|
||||
reqURL := fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index)
|
||||
query := fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, 0)
|
||||
log.Infof("Do query `%s`\n", query)
|
||||
var reqURL, query string
|
||||
if c.ScrollID == "" {
|
||||
reqURL = fmt.Sprintf("%s/%s/_search?scroll=5m", c.ES.Host, c.ES.Index)
|
||||
query = fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, from)
|
||||
} else {
|
||||
reqURL = fmt.Sprintf("%s/_search/scroll", c.ES.Host)
|
||||
query = fmt.Sprintf(`{"scroll": "5m", "scroll_id": "%s"}`, c.ScrollID)
|
||||
}
|
||||
res, err := c.doSearchRequest(reqURL, query)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if c.ScrollID == "" {
|
||||
c.ScrollID = res.ScrollID
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
@ -4,12 +4,11 @@ import (
|
||||
"flag"
|
||||
|
||||
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
es, index, esType string
|
||||
step int
|
||||
step, from int
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -17,6 +16,7 @@ func init() {
|
||||
flag.StringVar(&index, "index", "idec", "Elasticsearch index")
|
||||
flag.StringVar(&esType, "esType", "post", "Elasticsearch document type")
|
||||
flag.IntVar(&step, "step", 100, "Scroll step")
|
||||
flag.IntVar(&from, "from", 0, "Scroll from")
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
@ -28,9 +28,9 @@ func main() {
|
||||
Type: esType,
|
||||
},
|
||||
Step: step,
|
||||
From: from,
|
||||
}
|
||||
con := newContainer()
|
||||
conf.reassignTopic(&con)
|
||||
|
||||
log.Infof("\n%+v\n", con.ToUpdate)
|
||||
conf.UpdateDocs(&con)
|
||||
}
|
||||
|
@ -1,67 +1,221 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"sync"
|
||||
|
||||
"text/template"
|
||||
|
||||
"bytes"
|
||||
|
||||
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
||||
"github.com/emirpasic/gods/maps/hashmap"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Container struct {
|
||||
TopPosts *hashmap.Map
|
||||
Comments *hashmap.Map
|
||||
ToUpdate *hashmap.Map
|
||||
TopPosts *HashMap
|
||||
Comments *HashMap
|
||||
ToUpdate *HashMap
|
||||
Thread *HashMap
|
||||
}
|
||||
|
||||
type HashMap struct {
|
||||
mux sync.Mutex
|
||||
Map map[string]i2es.ESDoc
|
||||
}
|
||||
|
||||
func (h *HashMap) Put(key string, val i2es.ESDoc) {
|
||||
h.mux.Lock()
|
||||
h.Map[key] = val
|
||||
h.mux.Unlock()
|
||||
}
|
||||
|
||||
func (h *HashMap) Get(key string) (i2es.ESDoc, bool) {
|
||||
h.mux.Lock()
|
||||
val, ok := h.Map[key]
|
||||
h.mux.Unlock()
|
||||
return val, ok
|
||||
}
|
||||
|
||||
func (h *HashMap) Remove(key string) {
|
||||
h.mux.Lock()
|
||||
delete(h.Map, key)
|
||||
h.mux.Unlock()
|
||||
}
|
||||
|
||||
func (h *HashMap) Clear() {
|
||||
h.mux.Lock()
|
||||
for k, _ := range h.Map {
|
||||
delete(h.Map, k)
|
||||
}
|
||||
h.mux.Unlock()
|
||||
}
|
||||
|
||||
func (h *HashMap) Values() []i2es.ESDoc {
|
||||
h.mux.Lock()
|
||||
var values []i2es.ESDoc
|
||||
for _, v := range h.Map {
|
||||
values = append(values, v)
|
||||
}
|
||||
h.mux.Unlock()
|
||||
return values
|
||||
}
|
||||
|
||||
func (h *HashMap) Size() int {
|
||||
return len(h.Map)
|
||||
}
|
||||
|
||||
func newContainer() Container {
|
||||
top := &HashMap{
|
||||
Map: make(map[string]i2es.ESDoc),
|
||||
}
|
||||
comments := &HashMap{
|
||||
Map: make(map[string]i2es.ESDoc),
|
||||
}
|
||||
toUpdate := &HashMap{
|
||||
Map: make(map[string]i2es.ESDoc),
|
||||
}
|
||||
thread := &HashMap{
|
||||
Map: make(map[string]i2es.ESDoc),
|
||||
}
|
||||
return Container{
|
||||
hashmap.New(),
|
||||
hashmap.New(),
|
||||
hashmap.New(),
|
||||
top,
|
||||
comments,
|
||||
toUpdate,
|
||||
thread,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conf) reassignTopic(con *Container) {
|
||||
totalDocs := c.getDocsCount()
|
||||
log.Info("Total documents in the index ", totalDocs)
|
||||
|
||||
// Populate containers
|
||||
for i := 0; i < int(totalDocs); i += c.Step {
|
||||
for i := c.From; i < int(totalDocs); i += c.Step {
|
||||
msgs := c.GetDocs(i)
|
||||
log.Infof("Processing %d documents", len(msgs.Hits.Hits))
|
||||
for _, msg := range msgs.Hits.Hits {
|
||||
if msg.Source.TopicID == "" {
|
||||
if msg.Source.Repto == "" && msg.Source.TopicID == "" {
|
||||
m := msg.Source
|
||||
m.TopicID = uuid.New().URN()
|
||||
log.Infof("Assign new topic id `%s` for message `%s`\n", m.TopicID, m.MsgID)
|
||||
con.TopPosts.Put(m.MsgID, m)
|
||||
m.TopicID = strings.Split(uuid.New().URN(), ":")[2]
|
||||
con.Comments.Put(m.MsgID, m)
|
||||
} else {
|
||||
m := msg.Source
|
||||
con.Comments.Put(m.MsgID, m)
|
||||
con.ToUpdate.Put(m.MsgID, m)
|
||||
}
|
||||
}
|
||||
c.processComments(con)
|
||||
}
|
||||
con.processComments()
|
||||
log.Info("Top posts size ", con.TopPosts.Size())
|
||||
log.Info("Comments size ", con.Comments.Size())
|
||||
}
|
||||
log.Info("To update size ", con.ToUpdate.Size())
|
||||
log.Infof("\n%+v\n", con.ToUpdate.Size())
|
||||
}
|
||||
|
||||
func (c *Conf) processComments(con *Container) {
|
||||
for _, msg := range con.Comments.Values() {
|
||||
m := msg.(i2es.ESDoc)
|
||||
if top, ok := con.TopPosts.Get(m.Repto); ok {
|
||||
log.Info("Found topic id ", top.(i2es.ESDoc).TopicID)
|
||||
m.TopicID = top.(i2es.ESDoc).TopicID
|
||||
func (con *Container) processComments() {
|
||||
for _, m := range con.Comments.Values() {
|
||||
if m.TopicID != "" {
|
||||
con.ToUpdate.Put(m.MsgID, m)
|
||||
con.Comments.Remove(m.MsgID)
|
||||
continue
|
||||
}
|
||||
if comment, ok := con.ToUpdate.Get(m.Repto); ok {
|
||||
if comment.(i2es.ESDoc).TopicID != "" {
|
||||
log.Info("Found topic id ", comment.(i2es.ESDoc).TopicID)
|
||||
m.TopicID = comment.(i2es.ESDoc).TopicID
|
||||
con.ToUpdate.Put(m.MsgID, m)
|
||||
con.Comments.Remove(m.MsgID)
|
||||
}
|
||||
log.Infof("Message %s with topicid %s added to the updates list", m.MsgID, m.TopicID)
|
||||
} else {
|
||||
con.processSingle(1, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (con *Container) processSingle(depth int, m i2es.ESDoc) {
|
||||
maxDepth := 100
|
||||
if depth == maxDepth {
|
||||
log.Warn("Max depth is reached!")
|
||||
con.Thread.Clear()
|
||||
return
|
||||
}
|
||||
if m.Repto != "" && m.TopicID == "" {
|
||||
if comment, ok := con.Comments.Get(m.Repto); ok {
|
||||
if comment.TopicID != "" {
|
||||
log.Info("Found topic id ", comment.TopicID)
|
||||
m.TopicID = comment.TopicID
|
||||
con.ToUpdate.Put(m.MsgID, m)
|
||||
con.Comments.Remove(m.MsgID)
|
||||
return
|
||||
}
|
||||
if comment.Repto != "" && comment.TopicID == "" {
|
||||
if c, ok := con.TopPosts.Get(comment.Repto); ok {
|
||||
con.Thread.Put(comment.MsgID, comment)
|
||||
con.processSingle(depth+1, c)
|
||||
}
|
||||
} else if comment.TopicID != "" {
|
||||
con.Thread.Put(comment.MsgID, comment)
|
||||
con.processThread(comment.TopicID)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
log.Warnf("Message %s not found!", m.Repto)
|
||||
con.Thread.Clear()
|
||||
}
|
||||
} else if m.TopicID != "" {
|
||||
log.Info("Found topicid")
|
||||
con.Thread.Put(m.MsgID, m)
|
||||
con.processThread(m.TopicID)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (con *Container) processThread(id string) {
|
||||
log.Infof("Processing thread with topicid %s\n", id)
|
||||
for _, v := range con.Thread.Values() {
|
||||
v.TopicID = id
|
||||
con.ToUpdate.Put(v.MsgID, v)
|
||||
// con.Comments.Remove(v.MsgID)
|
||||
con.Thread.Remove(v.MsgID)
|
||||
}
|
||||
}
|
||||
|
||||
type Plain struct {
|
||||
ID string
|
||||
Marshal string
|
||||
}
|
||||
|
||||
func (c *Conf) UpdateDocs(con *Container) {
|
||||
var out []string
|
||||
for k, v := range con.ToUpdate.Map {
|
||||
var plain Plain
|
||||
data, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
plain.ID = k
|
||||
plain.Marshal = string(data)
|
||||
t := `{ "index": { "_index": "%s", "_type": "%s", "_id": "{{ .ID }}" }}
|
||||
{{ .Marshal }}
|
||||
`
|
||||
tmpl, err := template.New("bulk").Parse(fmt.Sprintf(t, c.ES.Index, c.ES.Type))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
bulk := []byte("")
|
||||
s := bytes.NewBuffer(bulk)
|
||||
err = tmpl.Execute(s, plain)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
out = append(out, s.String())
|
||||
if len(out) >= 4999 {
|
||||
_, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n"))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
out = []string{}
|
||||
}
|
||||
}
|
||||
_, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n"))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
Binary file not shown.
Loading…
Reference in New Issue
Block a user