Assign topics util #1
@ -1,31 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
curl -XDELETE 'http://localhost:9200/iinetwork' && echo
|
|
||||||
curl -XPUT 'http://localhost:9200/iinetwork' -d '{
|
|
||||||
"settings": {
|
|
||||||
"analysis": {
|
|
||||||
"analyzer": {
|
|
||||||
"my_analyzer": {
|
|
||||||
"type": "custom",
|
|
||||||
"tokenizer": "standard",
|
|
||||||
"filter": ["lowercase", "russian_morphology", "english_morphology", "my_stopwords"]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"filter": {
|
|
||||||
"my_stopwords": {
|
|
||||||
"type": "stop",
|
|
||||||
"stopwords": "а,без,более,бы,был,была,были,было,быть,в,вам,вас,весь,во,вот,все,всего,всех,вы,где,да,даже,для,до,его,ее,если,есть,еще,же,за,здесь,и,из,или,им,их,к,как,ко,когда,кто,ли,либо,мне,может,мы,на,надо,наш,не,него,нее,нет,ни,них,но,ну,о,об,однако,он,она,они,оно,от,очень,по,под,при,с,со,так,также,такой,там,те,тем,то,того,тоже,той,только,том,ты,у,уже,хотя,чего,чей,чем,что,чтобы,чье,чья,эта,эти,это,я,a,an,and,are,as,at,be,but,by,for,if,in,into,is,it,no,not,of,on,or,such,that,the,their,then,there,these,they,this,to,was,will,with"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}' && echo
|
|
||||||
curl -XPUT 'http://localhost:9200/iinetwork/post/_mapping' -d '{
|
|
||||||
"post": {
|
|
||||||
"_all" : {"analyzer" : "russian_morphology"},
|
|
||||||
"properties" : {
|
|
||||||
"post" : { "type" : "string", "analyzer" : "russian_morphology" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}' && echo
|
|
||||||
|
|
||||||
curl -XPOST 'http://localhost:9200/iinetwork/_refresh' && echo
|
|
8
go.mod
Normal file
8
go.mod
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
module gitea.difrex.ru/Umbrella/fetcher
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/emirpasic/gods v1.12.0
|
||||||
|
github.com/google/uuid v1.0.0
|
||||||
|
github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343
|
||||||
|
github.com/sirupsen/logrus v1.2.0
|
||||||
|
)
|
17
go.sum
Normal file
17
go.sum
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
|
||||||
|
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
|
||||||
|
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
||||||
|
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343 h1:ABIlopLGU081SkX2KmXjho9vmm1MgPs38hxXCXC2BrM=
|
||||||
|
github.com/idec-net/go-idec v0.0.0-20181106151523-61a006246343/go.mod h1:XUvr43ZLN/4bTZT7TEhJA/rsfFLQxnggX6iU5TGXgIY=
|
||||||
|
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
|
||||||
|
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||||
|
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||||
|
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
|
||||||
|
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||||
|
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
|
||||||
|
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
@ -25,6 +25,7 @@ type ESConf struct {
|
|||||||
|
|
||||||
// ESRes ES response structure
|
// ESRes ES response structure
|
||||||
type ESRes struct {
|
type ESRes struct {
|
||||||
|
ScrollID string `json:"_scroll_id"`
|
||||||
Took int `json:"took"`
|
Took int `json:"took"`
|
||||||
TimedOut bool `json:"timed_out"`
|
TimedOut bool `json:"timed_out"`
|
||||||
Hits Hits `json:"hits"`
|
Hits Hits `json:"hits"`
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type topicid struct {
|
type topicid struct {
|
||||||
|
@ -5,8 +5,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// getorcreate topicID. Generate new unique topicID if message is start message
|
// getorcreate topicID. Generate new unique topicID if message is start message
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
idec "github.com/Difrex/go-idec"
|
idec "github.com/idec-net/go-idec"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ParseMessage ...
|
// ParseMessage ...
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IDEC Extensions. see: https://ii-net.tk/idec-doc/?p=extensions
|
// IDEC Extensions. see: https://ii-net.tk/idec-doc/?p=extensions
|
||||||
|
2
main.go
2
main.go
@ -6,7 +6,7 @@ import (
|
|||||||
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
||||||
"gitea.difrex.ru/Umbrella/fetcher/idec"
|
"gitea.difrex.ru/Umbrella/fetcher/idec"
|
||||||
|
84
mapping.json
Normal file
84
mapping.json
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
{
|
||||||
|
"mappings": {
|
||||||
|
"post": {
|
||||||
|
"properties": {
|
||||||
|
"address": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"author": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"date": {
|
||||||
|
"type": "date",
|
||||||
|
"format": "epoch_second"
|
||||||
|
},
|
||||||
|
"echo": {
|
||||||
|
"type": "keyword"
|
||||||
|
|
||||||
|
},
|
||||||
|
"message": {
|
||||||
|
"type": "text",
|
||||||
|
"fields": {
|
||||||
|
"russian": {
|
||||||
|
"type": "text",
|
||||||
|
"analyzer": "russian"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"analyzer": "standard"
|
||||||
|
},
|
||||||
|
"misplaced": {
|
||||||
|
"type": "text",
|
||||||
|
"fields": {
|
||||||
|
"keyword": {
|
||||||
|
"type": "keyword",
|
||||||
|
"ignore_above": 256
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
"msgid": {
|
||||||
|
"type": "text",
|
||||||
|
"fields": {
|
||||||
|
"keyword": {
|
||||||
|
"type": "keyword",
|
||||||
|
"ignore_above": 256
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
"repto": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"subg": {
|
||||||
|
"type": "text",
|
||||||
|
"fields": {
|
||||||
|
"russian": {
|
||||||
|
"type": "text",
|
||||||
|
"analyzer": "russian"
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
"analyzer": "standard"
|
||||||
|
|
||||||
|
},
|
||||||
|
"tags": {
|
||||||
|
"type": "keyword"
|
||||||
|
|
||||||
|
},
|
||||||
|
"to": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"topicid": {
|
||||||
|
"type": "keyword"
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -3,19 +3,22 @@ package main
|
|||||||
import "gitea.difrex.ru/Umbrella/fetcher/i2es"
|
import "gitea.difrex.ru/Umbrella/fetcher/i2es"
|
||||||
import "fmt"
|
import "fmt"
|
||||||
import "net/http"
|
import "net/http"
|
||||||
import log "github.com/Sirupsen/logrus"
|
import log "github.com/sirupsen/logrus"
|
||||||
import "encoding/json"
|
import "encoding/json"
|
||||||
|
import "strings"
|
||||||
|
import "io/ioutil"
|
||||||
|
|
||||||
type Conf struct {
|
type Conf struct {
|
||||||
ES i2es.ESConf
|
ES i2es.ESConf
|
||||||
Step int
|
Step, From, Limit int
|
||||||
|
ScrollID string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Stats struct {
|
type Stats struct {
|
||||||
Indices IndexStats `json:"indices"`
|
Indices IndexStats `json:"indices"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexStats map[string]interface{}
|
type IndexStats map[string]map[string]interface{}
|
||||||
|
|
||||||
// "indices": {
|
// "indices": {
|
||||||
// "idec": {
|
// "idec": {
|
||||||
@ -45,7 +48,91 @@ func (c *Conf) getDocsCount() int64 {
|
|||||||
|
|
||||||
var stats Stats
|
var stats Stats
|
||||||
err = json.NewDecoder(resp.Body).Decode(&stats)
|
err = json.NewDecoder(resp.Body).Decode(&stats)
|
||||||
log.Infof("%+v", stats)
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
return -1
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conf) doRequest(method, reqURL, data string) ([]byte, error) {
|
||||||
|
var content []byte
|
||||||
|
req, err := http.NewRequest(method, reqURL, strings.NewReader(data))
|
||||||
|
if err != nil {
|
||||||
|
return content, err
|
||||||
|
}
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
client := &http.Client{}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return content, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
content, err = ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return content, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return content, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) {
|
||||||
|
var res i2es.ESRes
|
||||||
|
req, err := http.NewRequest("POST", reqURL, strings.NewReader(query))
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
log.Info(query)
|
||||||
|
|
||||||
|
client := &http.Client{}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
err = json.NewDecoder(resp.Body).Decode(&res)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conf) GetDocs(from int) i2es.ESRes {
|
||||||
|
var reqURL, query string
|
||||||
|
if c.ScrollID == "" {
|
||||||
|
reqURL = fmt.Sprintf("%s/%s/_search?scroll=5m", c.ES.Host, c.ES.Index)
|
||||||
|
query = fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, from)
|
||||||
|
} else {
|
||||||
|
reqURL = fmt.Sprintf("%s/_search/scroll", c.ES.Host)
|
||||||
|
query = fmt.Sprintf(`{"scroll": "5m", "scroll_id": "%s"}`, c.ScrollID)
|
||||||
|
}
|
||||||
|
res, err := c.doSearchRequest(reqURL, query)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
if c.ScrollID == "" {
|
||||||
|
c.ScrollID = res.ScrollID
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conf) GetLatests() i2es.ESRes {
|
||||||
|
var reqURL, query string
|
||||||
|
reqURL = fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index)
|
||||||
|
query = fmt.Sprintf(`{"sort": {"date": {"order": "desc"}}, "size": %d}`, c.Limit)
|
||||||
|
res, err := c.doSearchRequest(reqURL, query)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
return res
|
||||||
}
|
}
|
||||||
|
@ -3,14 +3,13 @@ package main
|
|||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
es, index, esType string
|
es, index, esType string
|
||||||
step int
|
step, from, limit int
|
||||||
|
latest bool
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -18,6 +17,9 @@ func init() {
|
|||||||
flag.StringVar(&index, "index", "idec", "Elasticsearch index")
|
flag.StringVar(&index, "index", "idec", "Elasticsearch index")
|
||||||
flag.StringVar(&esType, "esType", "post", "Elasticsearch document type")
|
flag.StringVar(&esType, "esType", "post", "Elasticsearch document type")
|
||||||
flag.IntVar(&step, "step", 100, "Scroll step")
|
flag.IntVar(&step, "step", 100, "Scroll step")
|
||||||
|
flag.IntVar(&from, "from", 0, "Scroll from")
|
||||||
|
flag.IntVar(&limit, "limit", 2500, "Limit for latest posts")
|
||||||
|
flag.BoolVar(&latest, "latest", false, "Processing only latest posts")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,8 +31,14 @@ func main() {
|
|||||||
Type: esType,
|
Type: esType,
|
||||||
},
|
},
|
||||||
Step: step,
|
Step: step,
|
||||||
|
From: from,
|
||||||
|
Limit: limit,
|
||||||
}
|
}
|
||||||
|
con := newContainer()
|
||||||
count := conf.getDocsCount()
|
if latest {
|
||||||
fmt.Println(count)
|
conf.assignLatests(&con)
|
||||||
|
} else {
|
||||||
|
conf.reassignTopic(&con)
|
||||||
|
}
|
||||||
|
conf.UpdateDocs(&con)
|
||||||
}
|
}
|
||||||
|
243
utils/reassign_topics/processing.go
Normal file
243
utils/reassign_topics/processing.go
Normal file
@ -0,0 +1,243 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"text/template"
|
||||||
|
|
||||||
|
"bytes"
|
||||||
|
|
||||||
|
"gitea.difrex.ru/Umbrella/fetcher/i2es"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Container struct {
|
||||||
|
TopPosts *HashMap
|
||||||
|
Comments *HashMap
|
||||||
|
ToUpdate *HashMap
|
||||||
|
Thread *HashMap
|
||||||
|
}
|
||||||
|
|
||||||
|
type HashMap struct {
|
||||||
|
mux sync.Mutex
|
||||||
|
Map map[string]i2es.ESDoc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HashMap) Put(key string, val i2es.ESDoc) {
|
||||||
|
h.mux.Lock()
|
||||||
|
h.Map[key] = val
|
||||||
|
h.mux.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HashMap) Get(key string) (i2es.ESDoc, bool) {
|
||||||
|
h.mux.Lock()
|
||||||
|
val, ok := h.Map[key]
|
||||||
|
h.mux.Unlock()
|
||||||
|
return val, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HashMap) Remove(key string) {
|
||||||
|
h.mux.Lock()
|
||||||
|
delete(h.Map, key)
|
||||||
|
h.mux.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HashMap) Clear() {
|
||||||
|
h.mux.Lock()
|
||||||
|
for k, _ := range h.Map {
|
||||||
|
delete(h.Map, k)
|
||||||
|
}
|
||||||
|
h.mux.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HashMap) Values() []i2es.ESDoc {
|
||||||
|
h.mux.Lock()
|
||||||
|
var values []i2es.ESDoc
|
||||||
|
for _, v := range h.Map {
|
||||||
|
values = append(values, v)
|
||||||
|
}
|
||||||
|
h.mux.Unlock()
|
||||||
|
return values
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HashMap) Size() int {
|
||||||
|
return len(h.Map)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newContainer() Container {
|
||||||
|
top := &HashMap{
|
||||||
|
Map: make(map[string]i2es.ESDoc),
|
||||||
|
}
|
||||||
|
comments := &HashMap{
|
||||||
|
Map: make(map[string]i2es.ESDoc),
|
||||||
|
}
|
||||||
|
toUpdate := &HashMap{
|
||||||
|
Map: make(map[string]i2es.ESDoc),
|
||||||
|
}
|
||||||
|
thread := &HashMap{
|
||||||
|
Map: make(map[string]i2es.ESDoc),
|
||||||
|
}
|
||||||
|
return Container{
|
||||||
|
top,
|
||||||
|
comments,
|
||||||
|
toUpdate,
|
||||||
|
thread,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conf) reassignTopic(con *Container) {
|
||||||
|
totalDocs := c.getDocsCount()
|
||||||
|
log.Info("Total documents in the index ", totalDocs)
|
||||||
|
|
||||||
|
// Populate containers
|
||||||
|
for i := c.From; i < int(totalDocs); i += c.Step {
|
||||||
|
msgs := c.GetDocs(i)
|
||||||
|
log.Infof("Processing %d documents", len(msgs.Hits.Hits))
|
||||||
|
for _, msg := range msgs.Hits.Hits {
|
||||||
|
if msg.Source.Repto == "" && msg.Source.TopicID == "" {
|
||||||
|
m := msg.Source
|
||||||
|
m.TopicID = strings.Split(uuid.New().URN(), ":")[2]
|
||||||
|
con.Comments.Put(m.MsgID, m)
|
||||||
|
} else {
|
||||||
|
m := msg.Source
|
||||||
|
con.Comments.Put(m.MsgID, m)
|
||||||
|
con.ToUpdate.Put(m.MsgID, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
con.processComments()
|
||||||
|
log.Info("Top posts size ", con.TopPosts.Size())
|
||||||
|
log.Info("Comments size ", con.Comments.Size())
|
||||||
|
log.Info("To update size ", con.ToUpdate.Size())
|
||||||
|
log.Infof("\n%+v\n", con.ToUpdate.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conf) assignLatests(con *Container) {
|
||||||
|
// Populate containers
|
||||||
|
msgs := c.GetLatests()
|
||||||
|
log.Infof("Processing %d documents", len(msgs.Hits.Hits))
|
||||||
|
for _, msg := range msgs.Hits.Hits {
|
||||||
|
if msg.Source.Repto == "" && msg.Source.TopicID == "" {
|
||||||
|
m := msg.Source
|
||||||
|
m.TopicID = strings.Split(uuid.New().URN(), ":")[2]
|
||||||
|
con.Comments.Put(m.MsgID, m)
|
||||||
|
} else {
|
||||||
|
m := msg.Source
|
||||||
|
con.Comments.Put(m.MsgID, m)
|
||||||
|
con.ToUpdate.Put(m.MsgID, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
con.processComments()
|
||||||
|
log.Info("Top posts size ", con.TopPosts.Size())
|
||||||
|
log.Info("Comments size ", con.Comments.Size())
|
||||||
|
log.Info("To update size ", con.ToUpdate.Size())
|
||||||
|
log.Infof("\n%+v\n", con.ToUpdate.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (con *Container) processComments() {
|
||||||
|
for _, m := range con.Comments.Values() {
|
||||||
|
if m.TopicID != "" {
|
||||||
|
con.ToUpdate.Put(m.MsgID, m)
|
||||||
|
log.Infof("Message %s with topicid %s added to the updates list", m.MsgID, m.TopicID)
|
||||||
|
} else {
|
||||||
|
con.processSingle(1, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (con *Container) processSingle(depth int, m i2es.ESDoc) {
|
||||||
|
maxDepth := 100
|
||||||
|
if depth == maxDepth {
|
||||||
|
log.Warn("Max depth is reached!")
|
||||||
|
con.Thread.Clear()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.Repto != "" && m.TopicID == "" {
|
||||||
|
if comment, ok := con.Comments.Get(m.Repto); ok {
|
||||||
|
if comment.TopicID != "" {
|
||||||
|
log.Info("Found topic id ", comment.TopicID)
|
||||||
|
m.TopicID = comment.TopicID
|
||||||
|
con.ToUpdate.Put(m.MsgID, m)
|
||||||
|
con.Comments.Remove(m.MsgID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if comment.Repto != "" && comment.TopicID == "" {
|
||||||
|
if c, ok := con.TopPosts.Get(comment.Repto); ok {
|
||||||
|
con.Thread.Put(comment.MsgID, comment)
|
||||||
|
con.processSingle(depth+1, c)
|
||||||
|
}
|
||||||
|
} else if comment.TopicID != "" {
|
||||||
|
con.Thread.Put(comment.MsgID, comment)
|
||||||
|
con.processThread(comment.TopicID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Warnf("Message %s not found!", m.Repto)
|
||||||
|
con.Thread.Clear()
|
||||||
|
}
|
||||||
|
} else if m.TopicID != "" {
|
||||||
|
log.Info("Found topicid")
|
||||||
|
con.Thread.Put(m.MsgID, m)
|
||||||
|
con.processThread(m.TopicID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (con *Container) processThread(id string) {
|
||||||
|
log.Infof("Processing thread with topicid %s\n", id)
|
||||||
|
for _, v := range con.Thread.Values() {
|
||||||
|
v.TopicID = id
|
||||||
|
con.ToUpdate.Put(v.MsgID, v)
|
||||||
|
// con.Comments.Remove(v.MsgID)
|
||||||
|
con.Thread.Remove(v.MsgID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Plain struct {
|
||||||
|
ID string
|
||||||
|
Marshal string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conf) UpdateDocs(con *Container) {
|
||||||
|
var out []string
|
||||||
|
for k, v := range con.ToUpdate.Map {
|
||||||
|
var plain Plain
|
||||||
|
data, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
plain.ID = k
|
||||||
|
plain.Marshal = string(data)
|
||||||
|
t := `{ "index": { "_index": "%s", "_type": "%s", "_id": "{{ .ID }}" }}
|
||||||
|
{{ .Marshal }}
|
||||||
|
`
|
||||||
|
tmpl, err := template.New("bulk").Parse(fmt.Sprintf(t, c.ES.Index, c.ES.Type))
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bulk := []byte("")
|
||||||
|
s := bytes.NewBuffer(bulk)
|
||||||
|
err = tmpl.Execute(s, plain)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
out = append(out, s.String())
|
||||||
|
if len(out) >= 4999 {
|
||||||
|
_, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n"))
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
out = []string{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err := c.doRequest("POST", fmt.Sprintf("%s/_bulk", c.ES.Host), strings.Join(out, "\n"))
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user