package main import ( "encoding/json" "fmt" "strings" "sync" "text/template" "bytes" "gitea.difrex.ru/Umbrella/fetcher/i2es" "github.com/google/uuid" log "github.com/sirupsen/logrus" ) type Container struct { TopPosts *HashMap Comments *HashMap ToUpdate *HashMap Thread *HashMap } type HashMap struct { mux sync.Mutex Map map[string]i2es.ESDoc } func (h *HashMap) Put(key string, val i2es.ESDoc) { h.mux.Lock() h.Map[key] = val h.mux.Unlock() } func (h *HashMap) Get(key string) (i2es.ESDoc, bool) { h.mux.Lock() val, ok := h.Map[key] h.mux.Unlock() return val, ok } func (h *HashMap) Remove(key string) { h.mux.Lock() delete(h.Map, key) h.mux.Unlock() } func (h *HashMap) Clear() { h.mux.Lock() for k, _ := range h.Map { delete(h.Map, k) } h.mux.Unlock() } func (h *HashMap) Values() []i2es.ESDoc { h.mux.Lock() var values []i2es.ESDoc for _, v := range h.Map { values = append(values, v) } h.mux.Unlock() return values } func (h *HashMap) Size() int { return len(h.Map) } func newContainer() Container { top := &HashMap{ Map: make(map[string]i2es.ESDoc), } comments := &HashMap{ Map: make(map[string]i2es.ESDoc), } toUpdate := &HashMap{ Map: make(map[string]i2es.ESDoc), } thread := &HashMap{ Map: make(map[string]i2es.ESDoc), } return Container{ top, comments, toUpdate, thread, } } func (c *Conf) reassignTopic(con *Container) { totalDocs := c.getDocsCount() log.Info("Total documents in the index ", totalDocs) // Populate containers for i := c.From; i < int(totalDocs); i += c.Step { msgs := c.GetDocs(i) log.Infof("Processing %d documents", len(msgs.Hits.Hits)) for _, msg := range msgs.Hits.Hits { if msg.Source.Repto == "" && msg.Source.TopicID == "" { m := msg.Source m.TopicID = strings.Split(uuid.New().URN(), ":")[2] con.Comments.Put(m.MsgID, m) } else { m := msg.Source con.Comments.Put(m.MsgID, m) con.ToUpdate.Put(m.MsgID, m) } } } con.processComments() log.Info("Top posts size ", con.TopPosts.Size()) log.Info("Comments size ", con.Comments.Size()) log.Info("To update size ", con.ToUpdate.Size()) log.Infof("\n%+v\n", con.ToUpdate.Size()) } func (c *Conf) assignLatests(con *Container) { // Populate containers msgs := c.GetLatests() log.Infof("Processing %d documents", len(msgs.Hits.Hits)) for _, msg := range msgs.Hits.Hits { if msg.Source.Repto == "" && msg.Source.TopicID == "" { m := msg.Source m.TopicID = strings.Split(uuid.New().URN(), ":")[2] con.Comments.Put(m.MsgID, m) } else { m := msg.Source con.Comments.Put(m.MsgID, m) con.ToUpdate.Put(m.MsgID, m) } } con.processComments() log.Info("Top posts size ", con.TopPosts.Size()) log.Info("Comments size ", con.Comments.Size()) log.Info("To update size ", con.ToUpdate.Size()) log.Infof("\n%+v\n", con.ToUpdate.Size()) } func (con *Container) processComments() { for _, m := range con.Comments.Values() { if m.TopicID != "" { con.ToUpdate.Put(m.MsgID, m) log.Infof("Message %s with topicid %s added to the updates list", m.MsgID, m.TopicID) } else { con.processSingle(1, m) } } } func (con *Container) processSingle(depth int, m i2es.ESDoc) { maxDepth := 100 if depth == maxDepth { log.Warn("Max depth is reached!") con.Thread.Clear() return } if m.Repto != "" && m.TopicID == "" { if comment, ok := con.Comments.Get(m.Repto); ok { if comment.TopicID != "" { log.Info("Found topic id ", comment.TopicID) m.TopicID = comment.TopicID con.ToUpdate.Put(m.MsgID, m) con.Comments.Remove(m.MsgID) return } if comment.Repto != "" && comment.TopicID == "" { if c, ok := con.TopPosts.Get(comment.Repto); ok { con.Thread.Put(comment.MsgID, comment) con.processSingle(depth+1, c) } } else if comment.TopicID != "" { con.Thread.Put(comment.MsgID, comment) con.processThread(comment.TopicID) return } } else { log.Warnf("Message %s not found!", m.Repto) con.Thread.Clear() } } else if m.TopicID != "" { log.Info("Found topicid") con.Thread.Put(m.MsgID, m) con.processThread(m.TopicID) return } } func (con *Container) processThread(id string) { log.Infof("Processing thread with topicid %s\n", id) for _, v := range con.Thread.Values() { v.TopicID = id con.ToUpdate.Put(v.MsgID, v) // con.Comments.Remove(v.MsgID) con.Thread.Remove(v.MsgID) } } type Plain struct { ID string Marshal string } func (c *Conf) UpdateDocs(con *Container) { var out []string for k, v := range con.ToUpdate.Map { var plain Plain data, err := json.Marshal(v) if err != nil { log.Error(err) } plain.ID = k plain.Marshal = string(data) t := `{ "index": { "_index": "%s", "_type": "%s", "_id": "{{ .ID }}" }} {{ .Marshal }} ` tmpl, err := template.New("bulk").Parse(fmt.Sprintf(t, c.ES.Index, c.ES.Type)) if err != nil { log.Error(err) } bulk := []byte("") s := bytes.NewBuffer(bulk) err = tmpl.Execute(s, plain) if err != nil { log.Error(err) } out = append(out, s.String()) if len(out) >= 4999 { _, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n")) if err != nil { log.Error(err) } out = []string{} } } _, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n")) if err != nil { log.Error(err) } }