fetcher/utils/reassign_topics/elastic.go

85 lines
1.7 KiB
Go
Raw Normal View History

2018-11-11 11:59:58 +03:00
package main
import "gitea.difrex.ru/Umbrella/fetcher/i2es"
import "fmt"
import "net/http"
import log "github.com/Sirupsen/logrus"
import "encoding/json"
import "strings"
2018-11-11 11:59:58 +03:00
type Conf struct {
ES i2es.ESConf
Step int
}
type Stats struct {
Indices IndexStats `json:"indices"`
}
type IndexStats map[string]map[string]interface{}
2018-11-11 11:59:58 +03:00
// "indices": {
// "idec": {
// "uuid": "_dO3GVkoSA665CdyV5LLlQ",
// "primaries": {
// "docs": {
// "count": 5600,
// "deleted": 435
// }
func (c *Conf) getDocsCount() int64 {
reqURL := fmt.Sprintf("%s/%s/_stats", c.ES.Host, c.ES.Index)
req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
log.Error(err)
return -1
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return -1
}
defer resp.Body.Close()
var stats Stats
err = json.NewDecoder(resp.Body).Decode(&stats)
if err != nil {
log.Error(err)
return -1
}
return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64))
}
func (c *Conf) getDocs(from int) i2es.ESRes {
var res i2es.ESRes
reqURL := fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index)
query := fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, 0)
req, err := http.NewRequest("POST", reqURL, strings.NewReader(query))
if err != nil {
log.Error(err)
return res
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return res
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
log.Error(err)
return res
}
2018-11-11 11:59:58 +03:00
return res
2018-11-11 11:59:58 +03:00
}