fetcher/i2es/elastic.go

100 lines
1.8 KiB
Go
Raw Normal View History

2017-02-11 19:53:20 +03:00
package i2es
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"strings"
2018-01-04 22:59:09 +03:00
2018-11-11 20:54:49 +03:00
log "github.com/sirupsen/logrus"
2017-02-11 19:53:20 +03:00
)
2018-01-05 17:05:45 +03:00
type topicid struct {
es ESConf
}
// doPostRequest ...
func doRequest(uri, data, method string) ([]byte, error) {
req, err := http.NewRequest(method, uri, bytes.NewBuffer([]byte(data)))
req.Header.Add("Content-Type", "application/json")
if err != nil {
return []byte(""), err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return []byte(""), err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return []byte(""), err
}
return body, nil
}
2018-01-04 22:59:09 +03:00
2017-02-11 19:53:20 +03:00
// PutToIndex ...
2018-01-08 15:57:56 +03:00
func (es ESConf) PutToIndex(msg ESDoc) (ESDoc, error) {
2017-02-11 19:53:20 +03:00
putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/")
log.Print(putURI)
2017-02-11 19:53:20 +03:00
2018-01-08 15:57:56 +03:00
// Assign topicID for top message
2018-01-05 17:05:45 +03:00
var t topicid
t.es = es
2018-01-08 15:57:56 +03:00
t.getOrCreate(&msg)
2018-01-05 17:05:45 +03:00
2017-02-11 19:53:20 +03:00
doc, err := json.Marshal(msg)
if err != nil {
2018-01-08 15:57:56 +03:00
return msg, err
2017-02-11 19:53:20 +03:00
}
req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc))
req.Header.Add("Content-Type", "application/json")
2017-02-11 19:53:20 +03:00
client := &http.Client{}
resp, err := client.Do(req)
2017-09-27 14:49:50 +03:00
if err != nil {
2018-01-08 15:57:56 +03:00
return msg, err
2017-09-27 14:49:50 +03:00
}
2017-02-11 19:53:20 +03:00
defer resp.Body.Close()
2018-01-08 15:57:56 +03:00
return msg, err
2017-02-11 19:53:20 +03:00
}
// CheckID ...
func (es ESConf) CheckID(id string) (bool, error) {
searchURI := strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "")
2018-01-05 17:05:45 +03:00
body, err := doRequest(searchURI, searchQ, "POST")
2017-02-11 19:53:20 +03:00
if err != nil {
return false, err
}
var e ESRes
err = json.Unmarshal(body, &e)
if err != nil {
return false, err
}
if e.Hits.Total > 0 {
err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, ""))
return false, err
}
2018-01-05 17:05:45 +03:00
2017-02-11 19:53:20 +03:00
if e.TimedOut {
err = errors.New("Request time out")
return false, err
}
return false, nil
}