2018-11-11 20:54:49 +03:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
2018-11-12 14:07:34 +03:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"text/template"
|
|
|
|
|
|
|
|
"bytes"
|
|
|
|
|
2018-11-11 20:54:49 +03:00
|
|
|
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Container struct {
|
2018-11-12 14:07:34 +03:00
|
|
|
TopPosts *HashMap
|
|
|
|
Comments *HashMap
|
|
|
|
ToUpdate *HashMap
|
|
|
|
Thread *HashMap
|
|
|
|
}
|
|
|
|
|
|
|
|
type HashMap struct {
|
|
|
|
mux sync.Mutex
|
|
|
|
Map map[string]i2es.ESDoc
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *HashMap) Put(key string, val i2es.ESDoc) {
|
|
|
|
h.mux.Lock()
|
|
|
|
h.Map[key] = val
|
|
|
|
h.mux.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *HashMap) Get(key string) (i2es.ESDoc, bool) {
|
|
|
|
h.mux.Lock()
|
|
|
|
val, ok := h.Map[key]
|
|
|
|
h.mux.Unlock()
|
|
|
|
return val, ok
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *HashMap) Remove(key string) {
|
|
|
|
h.mux.Lock()
|
|
|
|
delete(h.Map, key)
|
|
|
|
h.mux.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *HashMap) Clear() {
|
|
|
|
h.mux.Lock()
|
|
|
|
for k, _ := range h.Map {
|
|
|
|
delete(h.Map, k)
|
|
|
|
}
|
|
|
|
h.mux.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *HashMap) Values() []i2es.ESDoc {
|
|
|
|
h.mux.Lock()
|
|
|
|
var values []i2es.ESDoc
|
|
|
|
for _, v := range h.Map {
|
|
|
|
values = append(values, v)
|
|
|
|
}
|
|
|
|
h.mux.Unlock()
|
|
|
|
return values
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *HashMap) Size() int {
|
|
|
|
return len(h.Map)
|
2018-11-11 20:54:49 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func newContainer() Container {
|
2018-11-12 14:07:34 +03:00
|
|
|
top := &HashMap{
|
|
|
|
Map: make(map[string]i2es.ESDoc),
|
|
|
|
}
|
|
|
|
comments := &HashMap{
|
|
|
|
Map: make(map[string]i2es.ESDoc),
|
|
|
|
}
|
|
|
|
toUpdate := &HashMap{
|
|
|
|
Map: make(map[string]i2es.ESDoc),
|
|
|
|
}
|
|
|
|
thread := &HashMap{
|
|
|
|
Map: make(map[string]i2es.ESDoc),
|
|
|
|
}
|
2018-11-11 20:54:49 +03:00
|
|
|
return Container{
|
2018-11-12 14:07:34 +03:00
|
|
|
top,
|
|
|
|
comments,
|
|
|
|
toUpdate,
|
|
|
|
thread,
|
2018-11-11 20:54:49 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Conf) reassignTopic(con *Container) {
|
|
|
|
totalDocs := c.getDocsCount()
|
2018-11-12 14:07:34 +03:00
|
|
|
log.Info("Total documents in the index ", totalDocs)
|
2018-11-11 20:54:49 +03:00
|
|
|
|
|
|
|
// Populate containers
|
2018-11-12 14:07:34 +03:00
|
|
|
for i := c.From; i < int(totalDocs); i += c.Step {
|
2018-11-11 20:54:49 +03:00
|
|
|
msgs := c.GetDocs(i)
|
2018-11-12 14:07:34 +03:00
|
|
|
log.Infof("Processing %d documents", len(msgs.Hits.Hits))
|
2018-11-11 20:54:49 +03:00
|
|
|
for _, msg := range msgs.Hits.Hits {
|
2018-11-12 14:07:34 +03:00
|
|
|
if msg.Source.Repto == "" && msg.Source.TopicID == "" {
|
2018-11-11 20:54:49 +03:00
|
|
|
m := msg.Source
|
2018-11-12 14:07:34 +03:00
|
|
|
m.TopicID = strings.Split(uuid.New().URN(), ":")[2]
|
|
|
|
con.Comments.Put(m.MsgID, m)
|
2018-11-11 20:54:49 +03:00
|
|
|
} else {
|
|
|
|
m := msg.Source
|
|
|
|
con.Comments.Put(m.MsgID, m)
|
2018-11-12 14:07:34 +03:00
|
|
|
con.ToUpdate.Put(m.MsgID, m)
|
2018-11-11 20:54:49 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-11-12 14:07:34 +03:00
|
|
|
con.processComments()
|
|
|
|
log.Info("Top posts size ", con.TopPosts.Size())
|
|
|
|
log.Info("Comments size ", con.Comments.Size())
|
|
|
|
log.Info("To update size ", con.ToUpdate.Size())
|
2018-11-11 20:54:49 +03:00
|
|
|
log.Infof("\n%+v\n", con.ToUpdate.Size())
|
|
|
|
}
|
|
|
|
|
2018-11-12 15:39:24 +03:00
|
|
|
func (c *Conf) assignLatests(con *Container) {
|
|
|
|
// Populate containers
|
|
|
|
msgs := c.GetLatests()
|
|
|
|
log.Infof("Processing %d documents", len(msgs.Hits.Hits))
|
|
|
|
for _, msg := range msgs.Hits.Hits {
|
|
|
|
if msg.Source.Repto == "" && msg.Source.TopicID == "" {
|
|
|
|
m := msg.Source
|
|
|
|
m.TopicID = strings.Split(uuid.New().URN(), ":")[2]
|
|
|
|
con.Comments.Put(m.MsgID, m)
|
|
|
|
} else {
|
|
|
|
m := msg.Source
|
|
|
|
con.Comments.Put(m.MsgID, m)
|
|
|
|
con.ToUpdate.Put(m.MsgID, m)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
con.processComments()
|
|
|
|
log.Info("Top posts size ", con.TopPosts.Size())
|
|
|
|
log.Info("Comments size ", con.Comments.Size())
|
|
|
|
log.Info("To update size ", con.ToUpdate.Size())
|
|
|
|
log.Infof("\n%+v\n", con.ToUpdate.Size())
|
|
|
|
}
|
|
|
|
|
2018-11-12 14:07:34 +03:00
|
|
|
func (con *Container) processComments() {
|
|
|
|
for _, m := range con.Comments.Values() {
|
|
|
|
if m.TopicID != "" {
|
2018-11-11 20:54:49 +03:00
|
|
|
con.ToUpdate.Put(m.MsgID, m)
|
2018-11-12 14:07:34 +03:00
|
|
|
log.Infof("Message %s with topicid %s added to the updates list", m.MsgID, m.TopicID)
|
|
|
|
} else {
|
|
|
|
con.processSingle(1, m)
|
2018-11-11 20:54:49 +03:00
|
|
|
}
|
2018-11-12 14:07:34 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (con *Container) processSingle(depth int, m i2es.ESDoc) {
|
|
|
|
maxDepth := 100
|
|
|
|
if depth == maxDepth {
|
|
|
|
log.Warn("Max depth is reached!")
|
|
|
|
con.Thread.Clear()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if m.Repto != "" && m.TopicID == "" {
|
|
|
|
if comment, ok := con.Comments.Get(m.Repto); ok {
|
|
|
|
if comment.TopicID != "" {
|
|
|
|
log.Info("Found topic id ", comment.TopicID)
|
|
|
|
m.TopicID = comment.TopicID
|
2018-11-11 20:54:49 +03:00
|
|
|
con.ToUpdate.Put(m.MsgID, m)
|
|
|
|
con.Comments.Remove(m.MsgID)
|
2018-11-12 14:07:34 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
if comment.Repto != "" && comment.TopicID == "" {
|
|
|
|
if c, ok := con.TopPosts.Get(comment.Repto); ok {
|
|
|
|
con.Thread.Put(comment.MsgID, comment)
|
|
|
|
con.processSingle(depth+1, c)
|
|
|
|
}
|
|
|
|
} else if comment.TopicID != "" {
|
|
|
|
con.Thread.Put(comment.MsgID, comment)
|
|
|
|
con.processThread(comment.TopicID)
|
|
|
|
return
|
2018-11-11 20:54:49 +03:00
|
|
|
}
|
2018-11-12 14:07:34 +03:00
|
|
|
} else {
|
|
|
|
log.Warnf("Message %s not found!", m.Repto)
|
|
|
|
con.Thread.Clear()
|
2018-11-11 20:54:49 +03:00
|
|
|
}
|
2018-11-12 14:07:34 +03:00
|
|
|
} else if m.TopicID != "" {
|
|
|
|
log.Info("Found topicid")
|
|
|
|
con.Thread.Put(m.MsgID, m)
|
|
|
|
con.processThread(m.TopicID)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (con *Container) processThread(id string) {
|
|
|
|
log.Infof("Processing thread with topicid %s\n", id)
|
|
|
|
for _, v := range con.Thread.Values() {
|
|
|
|
v.TopicID = id
|
|
|
|
con.ToUpdate.Put(v.MsgID, v)
|
|
|
|
// con.Comments.Remove(v.MsgID)
|
|
|
|
con.Thread.Remove(v.MsgID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type Plain struct {
|
|
|
|
ID string
|
|
|
|
Marshal string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Conf) UpdateDocs(con *Container) {
|
|
|
|
var out []string
|
|
|
|
for k, v := range con.ToUpdate.Map {
|
|
|
|
var plain Plain
|
|
|
|
data, err := json.Marshal(v)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
plain.ID = k
|
|
|
|
plain.Marshal = string(data)
|
|
|
|
t := `{ "index": { "_index": "%s", "_type": "%s", "_id": "{{ .ID }}" }}
|
|
|
|
{{ .Marshal }}
|
|
|
|
`
|
|
|
|
tmpl, err := template.New("bulk").Parse(fmt.Sprintf(t, c.ES.Index, c.ES.Type))
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
bulk := []byte("")
|
|
|
|
s := bytes.NewBuffer(bulk)
|
|
|
|
err = tmpl.Execute(s, plain)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
out = append(out, s.String())
|
|
|
|
if len(out) >= 4999 {
|
|
|
|
_, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n"))
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
out = []string{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n"))
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
2018-11-11 20:54:49 +03:00
|
|
|
}
|
|
|
|
}
|