package main import "gitea.difrex.ru/Umbrella/fetcher/i2es" import "fmt" import "net/http" import log "github.com/sirupsen/logrus" import "encoding/json" import "strings" import "io/ioutil" type Conf struct { ES i2es.ESConf Step, From, Limit int ScrollID string } type Stats struct { Indices IndexStats `json:"indices"` } type IndexStats map[string]map[string]interface{} // "indices": { // "idec": { // "uuid": "_dO3GVkoSA665CdyV5LLlQ", // "primaries": { // "docs": { // "count": 5600, // "deleted": 435 // } func (c *Conf) getDocsCount() int64 { reqURL := fmt.Sprintf("%s/%s/_stats", c.ES.Host, c.ES.Index) req, err := http.NewRequest("GET", reqURL, nil) if err != nil { log.Error(err) return -1 } client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err) return -1 } defer resp.Body.Close() var stats Stats err = json.NewDecoder(resp.Body).Decode(&stats) if err != nil { log.Error(err) return -1 } return int64(stats.Indices[c.ES.Index]["primaries"].(map[string]interface{})["docs"].(map[string]interface{})["count"].(float64)) } func (c *Conf) doRequest(method, reqURL, data string) ([]byte, error) { var content []byte req, err := http.NewRequest(method, reqURL, strings.NewReader(data)) if err != nil { return content, err } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { return content, err } defer resp.Body.Close() content, err = ioutil.ReadAll(resp.Body) if err != nil { return content, err } return content, nil } func (c *Conf) doSearchRequest(reqURL, query string) (i2es.ESRes, error) { var res i2es.ESRes req, err := http.NewRequest("POST", reqURL, strings.NewReader(query)) if err != nil { log.Error(err) return res, err } req.Header.Add("Content-Type", "application/json") log.Info(query) client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err) return res, err } defer resp.Body.Close() err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { log.Error(err) return res, err } return res, nil } func (c *Conf) GetDocs(from int) i2es.ESRes { var reqURL, query string if c.ScrollID == "" { reqURL = fmt.Sprintf("%s/%s/_search?scroll=5m", c.ES.Host, c.ES.Index) query = fmt.Sprintf(`{"sort": {"date": {"order": "asc"}}, "size": %d, "from": %d}`, c.Step, from) } else { reqURL = fmt.Sprintf("%s/_search/scroll", c.ES.Host) query = fmt.Sprintf(`{"scroll": "5m", "scroll_id": "%s"}`, c.ScrollID) } res, err := c.doSearchRequest(reqURL, query) if err != nil { log.Fatal(err) } if c.ScrollID == "" { c.ScrollID = res.ScrollID } return res } func (c *Conf) GetLatests() i2es.ESRes { var reqURL, query string reqURL = fmt.Sprintf("%s/%s/_search", c.ES.Host, c.ES.Index) query = fmt.Sprintf(`{"sort": {"date": {"order": "desc"}}, "size": %d}`, c.Limit) res, err := c.doSearchRequest(reqURL, query) if err != nil { log.Fatal(err) } return res }