From f48d951722897dbb616ab2bb9521aa8c73d04dac Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Sun, 4 Nov 2018 10:21:49 +0300 Subject: [PATCH] Optimize list.txt aggregation --- node/elastic.go | 79 ++++++++++++++++++++++++++++--------------------- node/structs.go | 61 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 33 deletions(-) create mode 100644 node/structs.go diff --git a/node/elastic.go b/node/elastic.go index a878d5d..122e4a0 100644 --- a/node/elastic.go +++ b/node/elastic.go @@ -2,29 +2,22 @@ package node import ( "bytes" - // "encoding/json" - "gitea.difrex.ru/Umbrella/fetcher/i2es" - "github.com/Jeffail/gabs" + "encoding/json" "io/ioutil" - "log" "net/http" "strconv" "strings" + + "fmt" + + "github.com/Jeffail/gabs" + log "github.com/Sirupsen/logrus" ) const ( echoAgg = "echo_uniq" ) -// ESConf ... -type ESConf i2es.ESConf - -// Bucket ... -type Bucket struct { - Key string `json:"key"` - DocCount int `json:"doc_count"` -} - // MakePlainTextMessage ... func MakePlainTextMessage(hit interface{}) string { @@ -49,6 +42,9 @@ func (es ESConf) GetPlainTextMessage(msgid string) []byte { req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) client := &http.Client{} resp, err := client.Do(req) + if err != nil { + log.Error(err.Error()) + } defer resp.Body.Close() @@ -80,6 +76,9 @@ func (es ESConf) GetEchoMessageHashes(echo string) []string { req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) client := &http.Client{} resp, err := client.Do(req) + if err != nil { + log.Error(err.Error()) + } defer resp.Body.Close() @@ -128,6 +127,9 @@ func (es ESConf) GetLimitedEchoMessageHashes(echo string, offset int, limit int) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) client := &http.Client{} resp, err := client.Do(req) + if err != nil { + log.Error(err.Error()) + } defer resp.Body.Close() @@ -223,39 +225,50 @@ func (es ESConf) GetUEchoMessageHashes(echoes string) []string { // GetListTXT ... func (es ESConf) GetListTXT() []byte { - searchURI := strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") - searchQ := []byte(`{"aggs": {"echo_uniq": { "terms": { "field": "echo","size": 1000}}}}`) + var searchURI string + if es.Index != "" && es.Type != "" { + searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") + } else { + searchURI = strings.Join([]string{es.Host, "search"}, "/") + } + searchQ := []byte(`{ + "size": 0, + "aggs": { + "uniqueEcho": { + "cardinality": { + "field": "echo" + } + }, + "echo": { + "terms": { + "field": "echo", + "size": 1000 + } + } + } +}`) log.Print("Search URI: ", searchURI) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) client := &http.Client{} resp, err := client.Do(req) + if err != nil { + log.Error(err.Error()) + } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + var esr EchoAggregations + err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { - return []byte("") + log.Error(err.Error()) } - - esresp, err := gabs.ParseJSON(body) - if err != nil { - panic(err) - } - - var uniq map[string]interface{} - uniq, _ = esresp.Path(strings.Join([]string{"aggregations", echoAgg}, ".")).Data().(map[string]interface{}) + log.Infof("%+v", esr) var echoes []string - for _, bucket := range uniq["buckets"].([]interface{}) { - b := make(map[string]interface{}) - b = bucket.(map[string]interface{}) - count := int(b["doc_count"].(float64)) - c := strconv.Itoa(count) - echostr := strings.Join([]string{b["key"].(string), ":", c, ":"}, "") - echoes = append(echoes, echostr) + for _, bucket := range esr.EchoAgg["echo"].Buckets { + echoes = append(echoes, fmt.Sprintf("%s:%d:", bucket.Key, bucket.DocCount)) } - log.Print("Getting ", len(echoes), " echoes") return []byte(strings.Join(echoes, "\n")) diff --git a/node/structs.go b/node/structs.go new file mode 100644 index 0000000..e99043c --- /dev/null +++ b/node/structs.go @@ -0,0 +1,61 @@ +package node + +import "gitea.difrex.ru/Umbrella/fetcher/i2es" + +// ESConf ... +type ESConf i2es.ESConf + +// Bucket ... +type Bucket struct { + Key string `json:"key"` + DocCount int `json:"doc_count"` +} + +// {"took":467,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":89333,"max_score":0.0,"hits":[]}} +type ESSearchResp struct { + Took int64 `json:"took"` + TimedOut bool `json:"timed_out"` + ESSearchShards `json:"_shards"` +} + +type Hits struct { + Total int64 `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []Hit `json:"hits"` +} + +// {"_index":"idec5","_type":"post","_id":"aAjSbXS5XeNF6lVaPh5A","_score":1.0,"_source" +type Hit struct { + Index string `json:"index"` + Type string `json:"_type"` + ID string `json:"_id"` + Source i2es.ESDoc `json:"_source"` +} + +// "aggregations":{"echo":{"doc_count_error_upper_bound":2406,"sum_other_doc_count":76555,"buckets":[{"key":"bash.rss","doc_count":12779}]},"uniqueEcho":{"value":121}}} +type EchoAggregations struct { + EchoAgg map[string]EchoAgg `json:"aggregations"` + UniqEcho map[string]Uniq `json:"uniqueEcho"` +} + +type EchoAgg struct { + DocCountErrorUpperBound int64 `json:"doc_count_error_upper_bound"` + SumOtherDocCount int64 `json:"sum_other_doc_count"` + Buckets []Bucket `json:"buckets"` +} + +type EchoBucket struct { + Key string `json:"key"` + Count int64 `json:"doc_count"` +} + +type Uniq struct { + Value int64 `json:"value"` +} + +type ESSearchShards struct { + Total int64 `json:"total"` + Successful int64 `json:"successful"` + Skipped int64 `json:"skipped"` + Failed int64 `json:"failed"` +}