fetcher/utils/reassign_topics/elastic.go

139 lines
3.0 KiB
Go
Raw Normal View History

2018-11-11 11:59:58 +03:00
package main
import "gitea.difrex.ru/Umbrella/fetcher/i2es"
import "fmt"
import "net/http"
2018-11-11 20:54:49 +03:00
import log "github.com/sirupsen/logrus"
2018-11-11 11:59:58 +03:00
import "encoding/json"
import "strings"
import "io/ioutil"
2018-11-11 11:59:58 +03:00
type Conf struct {
ES i2es.ESConf
Step, From, Limit int
ScrollID string
2018-11-11 11:59:58 +03:00
}
type Stats struct {
Indices IndexStats `json:"indices"`
}
type IndexStats map[string]map[string]interface{}
2018-11-11 11:59:58 +03:00
// "indices": {
// "idec": {
// "uuid": "_dO3GVkoSA665CdyV5LLlQ",
// "primaries": {
// "docs": {
// "count": 5600,
// "deleted": 435
// }
func (c *Conf) getDocsCount() int64 {
reqURL := fmt.Sprintf("%s/%s/_stats", c.ES.Host, c.ES.Index)
req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
log.Error(err)
return -1
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return -1
}
defer resp.Body.Close()
var stats Stats
err = json.NewDecoder(resp.Body).Decode(&stats)
if err != nil {
log.Error(err)
return -1
}
return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64))
}
func (c *Conf) doRequest(method, reqURL, data string) ([]byte, error) {
var content []byte
req, err := http.NewRequest(method, reqURL, strings.NewReader(data))
if err != nil {
return content, err
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return content, err
}
defer resp.Body.Close()
content, err = ioutil.ReadAll(resp.Body)
if err != nil {
return content, err
}
return content, nil
}
2018-11-11 20:54:49 +03:00
func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) {
var res i2es.ESRes
req, err := http.NewRequest("POST", reqURL, strings.NewReader(query))
if err != nil {
log.Error(err)
2018-11-11 20:54:49 +03:00
return res, err
}
req.Header.Add("Content-Type", "application/json")
log.Info(query)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
2018-11-11 20:54:49 +03:00
return res, err
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
log.Error(err)
2018-11-11 20:54:49 +03:00
return res, err
}
2018-11-11 11:59:58 +03:00
2018-11-11 20:54:49 +03:00
return res, nil
}
func (c *Conf) GetDocs(from int) i2es.ESRes {
var reqURL, query string
if c.ScrollID == "" {
reqURL = fmt.Sprintf("%s/%s/_search?scroll=5m", c.ES.Host, c.ES.Index)
query = fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, from)
} else {
reqURL = fmt.Sprintf("%s/_search/scroll", c.ES.Host)
query = fmt.Sprintf(`{"scroll": "5m", "scroll_id": "%s"}`, c.ScrollID)
}
2018-11-11 20:54:49 +03:00
res, err := c.doSearchRequest(reqURL, query)
if err != nil {
log.Fatal(err)
}
if c.ScrollID == "" {
c.ScrollID = res.ScrollID
}
return res
2018-11-11 11:59:58 +03:00
}
func (c *Conf) GetLatests() i2es.ESRes {
var reqURL, query string
reqURL = fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index)
query = fmt.Sprintf(`{"sort": {"date": {"order": "desc"}}, "size": %d}`, c.Limit)
res, err := c.doSearchRequest(reqURL, query)
if err != nil {
log.Fatal(err)
}
return res
}