lessmore/node/elastic.go

714 lines
16 KiB
Go

package node
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"fmt"
"encoding/base64"
"gitea.difrex.ru/Umbrella/fetcher/i2es"
log "github.com/sirupsen/logrus"
)
const (
echoAgg = "echo_uniq"
)
// MakePlainTextMessage ...
func MakePlainTextMessage(hit i2es.ESDoc) []byte {
tags := "ii/ok"
if hit.Repto != "" {
tags += fmt.Sprintf("/repto/%s", hit.Repto)
}
m := []string{
tags,
hit.Echo,
hit.Date,
hit.Author,
hit.Address,
hit.To,
hit.Subg,
hit.Message,
}
return []byte(strings.Join(m, "\n"))
}
// GetPlainTextMessage ...
func (es ESConf) GetPlainTextMessage(msgid string) []byte {
var searchURI string
if es.Index != "" && es.Type != "" {
searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
} else {
searchURI = strings.Join([]string{es.Host, "search"}, "/")
}
searchQ := []byte(strings.Join([]string{
`{"query": {"match": {"_id": "`, msgid, `"}}}`}, ""))
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return []byte("")
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return []byte("")
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return []byte("")
}
if len(esr.Hits.Hits) > 0 {
return MakePlainTextMessage(esr.Hits.Hits[0].Source)
}
return []byte("")
}
// GetEchoMessageHashes ...
func (es ESConf) GetEchoMessageHashes(echo string) []string {
var hashes []string
var searchURI string
if es.Index != "" && es.Type != "" {
searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
} else {
searchURI = strings.Join([]string{es.Host, "search"}, "/")
}
searchQ := []byte(strings.Join([]string{
`{"sort": [
{"date":{ "order": "desc" }},{ "_score":{ "order": "desc" }}],
"query": {"query_string" : {"fields": ["echo.keyword"], "query":"`, echo, `"}}, "size": 500}`}, ""))
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return hashes
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return hashes
}
defer resp.Body.Close()
content, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error(err.Error())
hashes = append(hashes, "error: Internal error")
return hashes
}
var esr ESSearchResp
err = json.Unmarshal(content, &esr)
if err != nil {
log.Error(err.Error())
hashes = append(hashes, "error: Internal error")
return hashes
}
for _, hit := range esr.Hits.Hits {
hashes = append(hashes, hit.Source.MsgID)
}
return hashes
}
// GetLimitedEchoMessageHashes ...
func (es ESConf) GetLimitedEchoMessageHashes(echo string, offset int, limit int) []string {
var hashes []string
// Check offset
var order string
if offset <= 0 {
order = "desc"
} else {
order = "asc"
}
l := strconv.Itoa(limit)
searchQ := []byte(strings.Join([]string{
`{"sort": [
{"date":{ "order": "`, order, `" }},{ "_score":{ "order": "`, order, `" }}],
"query": {"query_string" : {"fields": ["msgid.keyword", "echo.keyword"], "query":"`, echo, `"}}, "size":`, l, `}`}, ""))
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return hashes
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return hashes
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return hashes
}
for _, hit := range esr.Hits.Hits {
hashes = append(hashes, hit.Source.MsgID)
}
return hashes
}
func (es ESConf) DoSearch(query string) []i2es.ESDoc {
q := `{"sort": [
{"date":{ "order": "desc" }},{ "_score":{ "order": "desc" }}],
"query": {"query_string" : {"fields": ["message", "subg"], "query":` + query + `}}, "size": 100}`
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer([]byte(q)))
if err != nil {
log.Error(err.Error())
return nil
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return nil
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return nil
}
var posts []i2es.ESDoc
for _, hit := range esr.Hits.Hits {
posts = append(posts, hit.Source)
}
return posts
}
func (es ESConf) GetUMMessages(msgs string) []string {
var encodedMessages []string
// First get messages list
messages := strings.Split(msgs, "/")
var searchURI string
if es.Index != "" && es.Type != "" {
searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
} else {
searchURI = strings.Join([]string{es.Host, "search"}, "/")
}
query := []byte(`
{
"query": {
"query_string" : {
"fields": ["msgid.keyword"],
"query":"` + strings.Join(messages, " OR ") + `"
}
},
"sort": [{"date":{ "order": "desc" }},
{ "_score":{ "order": "desc" }}
]
}`)
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query))
if err != nil {
log.Error(err.Error())
return encodedMessages
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return encodedMessages
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return encodedMessages
}
for _, hit := range esr.Hits.Hits {
m := fmt.Sprintf("%s:%s",
hit.Source.MsgID,
base64.StdEncoding.EncodeToString(MakePlainTextMessage(hit.Source)))
encodedMessages = append(encodedMessages, m)
}
return encodedMessages
}
// GetUEchoMessageHashes ...
func (es ESConf) GetUEchoMessageHashes(echoes string) []string {
var echohashes []string
// First get echoes list
el := strings.Split(echoes, "/")
// Check offset and limit
var offset int
var limit int
withOL := false
if strings.Contains(el[len(el)-1], ":") {
oflim := strings.Split(el[len(el)-1], ":")
o, err := strconv.Atoi(oflim[0])
l, err := strconv.Atoi(oflim[1])
if err != nil {
log.Error(err)
} else {
offset = o
limit = l
withOL = true
}
}
eh := make(map[string][]string)
var curEcho string
for i, echo := range el {
if echo == "" {
continue
}
if !strings.Contains(echo, ":") {
curEcho = echo
}
if withOL {
recEcho := es.GetLimitedEchoMessageHashes(curEcho, offset, limit)
eh[curEcho] = make([]string, len(curEcho))
eh[curEcho] = append(eh[curEcho], recEcho...)
} else {
recEcho := es.GetEchoMessageHashes(curEcho)
eh[curEcho] = make([]string, len(recEcho))
eh[curEcho] = append(eh[curEcho], recEcho...)
}
if i == len(el) {
break
}
}
// Make standard output:
// echo.name
// Some20SimbolsHash333
for k, v := range eh {
echohashes = append(echohashes, k)
if k == "" || k == "\n" {
continue
}
for _, e := range v {
if e == "" {
continue
}
echohashes = append(echohashes, e)
}
}
return addNewLineToLastWord(echohashes)
}
// GetXC implements /x/c
func (es ESConf) GetXC(echoes string) []string {
var searchURI string
var counts []string
if es.Index != "" && es.Type != "" {
searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
} else {
searchURI = strings.Join([]string{es.Host, "search"}, "/")
}
query := []byte(`
{
"query": {
"query_string" : {
"fields": ["echo.keyword"],
"query": "` + strings.Join(strings.Split(echoes, "/"), " OR ") + `"
}
},
"size": 0,
"aggs": {
"uniqueEcho": {
"cardinality": {
"field": "echo.keyword"
}
},
"echo": {
"terms": {
"field": "echo.keyword",
"size": 1000
}
}
}
}
`)
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query))
if err != nil {
log.Error(err.Error())
return counts
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return counts
}
defer resp.Body.Close()
var esr EchoAggregations
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return counts
}
log.Infof("%+v", esr)
for _, hit := range esr.EchoAgg["echo"].Buckets {
counts = append(counts, fmt.Sprintf("%s:%d", hit.Key, hit.DocCount))
}
return counts
}
type ThreadBucket struct {
DocCount int64 `json:"doc_count"`
Key string `json:"key"`
Post Hits
}
var defaultEchoes = []string{`"idec.talks"`, `"pipe.2032"`, `"linux.14"`, `"develop.16"`, `"dynamic.local"`, `"std.club"`, `"std.hugeping"`, `"difrex.blog"`, `"ii.test.14"`}
func (es ESConf) GetTopic(topicID string) (posts []i2es.ESDoc) {
query := []byte(strings.Join([]string{
`{"sort": [{"date": {"order": "asc"}},
{"_score": {"order": "desc" }}], "size":1000,"query": {"term": {"topicid.keyword": "`, topicID, `"}}}`}, ""))
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query)))
if err != nil {
log.Error(err)
return
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return nil
}
for _, hit := range esr.Hits.Hits {
hit.Source.Message = strings.Trim(hit.Source.Message, "\n")
hit.Source.Date = parseTime(hit.Source.Date)
posts = append(posts, hit.Source)
}
return
}
func (es ESConf) GetMessage(msgID string) (posts []i2es.ESDoc) {
query := []byte(strings.Join([]string{
`{"sort": [{"date": {"order": "asc"}},
{"_score": {"order": "desc" }}], "size":1000,"query": {"term": {"msgid.keyword": "`, msgID, `"}}}`}, ""))
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query)))
if err != nil {
log.Error(err)
return
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return nil
}
for _, hit := range esr.Hits.Hits {
hit.Source.Message = strings.Trim(hit.Source.Message, "\n")
hit.Source.Date = parseTime(hit.Source.Date)
posts = append(posts, hit.Source)
}
return
}
func (es ESConf) GetThreads(pageNum int, echoes ...string) (posts []i2es.ESDoc) {
ech := defaultEchoes
if len(echoes) > 0 {
ech = []string{}
for _, echo := range echoes {
ech = append(ech, fmt.Sprintf(`"%s"`, echo))
}
}
rangeStr := `"from":"now-30d","to":"now-0d"`
if pageNum > 1 {
to := 30*pageNum - 30
from := 30 * pageNum
rangeStr = fmt.Sprintf(`"from":"now-%dd","to":"now-%dd"`, from, to)
}
log.Debug(rangeStr)
query := `{"sort":[{"date":{"order":"desc"}}],"aggs":{"topics":{"terms":{"field":"topicid.keyword","size":100},"aggs":{"post":{"top_hits":{"size":1,"sort":[{"date":{"order":"desc"}}],"_source":{"include": ["subg","author","date","echo","topicid","address", "repto"]}}}}}},"query":{"bool":{"must":[{"range":{"date":{` + rangeStr + `}}},{"constant_score":{"filter":{"terms":{"echo.keyword": [` +
strings.Join(ech, ",") +
`]}}}}]}}}`
log.Debug("Run: ", query)
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query)))
if err != nil {
log.Error(err)
return
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return
}
defer resp.Body.Close()
var data ESAggsResp
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
log.Error(err)
return
}
for _, bucket := range data.Aggregations.Topics.Buckets {
// Empty topicid
if bucket.Key == "" {
continue
}
for _, post := range bucket.Post.Hits.Hits {
posts = append(posts, post.Source)
}
}
return
}
func (es ESConf) GetThreadsYear(pageNum int, echoes ...string) (posts []i2es.ESDoc) {
ech := defaultEchoes
if len(echoes) > 0 {
ech = []string{}
for _, echo := range echoes {
ech = append(ech, fmt.Sprintf(`"%s"`, echo))
}
}
query := `{"sort":[{"date":{"order":"desc"}}],"aggs":{"topics":{"terms":{"field":"topicid.keyword","size":500},"aggs":{"post":{"top_hits":{"size":1,"sort":[{"date":{"order":"desc"}}],"_source":{"include": ["subg","author","date","echo","topicid","address", "repto"]}}}}}},"query":{"bool":{"must":[{"range":{"date":{"from": "now-365d", "to": "now"}}}, {"constant_score":{"filter":{"terms":{"echo.keyword": [` +
strings.Join(ech, ",") +
`]}}}}]}}}`
log.Debug("Run: ", query)
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query)))
if err != nil {
log.Error(err)
return
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err)
return
}
defer resp.Body.Close()
if resp.StatusCode > 200 {
d, _ := io.ReadAll(resp.Body)
log.Debug(string(d))
}
var data ESAggsResp
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
log.Error(err)
return
}
for _, bucket := range data.Aggregations.Topics.Buckets {
// Empty topicid
if bucket.Key == "" {
continue
}
for _, post := range bucket.Post.Hits.Hits {
posts = append(posts, post.Source)
}
}
return
}
func (es ESConf) GetLatestPosts(sum int) []i2es.ESDoc {
log.Debug(sum)
query := fmt.Sprintf(`{"sort": [{"date": {"order": "desc"}}, {"_score": {"order": "desc" }}], "size": %d}`, sum)
log.Debugf("Do %s request", query)
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer([]byte(query)))
if err != nil {
log.Error(err.Error())
return nil
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return nil
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return nil
}
var posts []i2es.ESDoc
for _, hit := range esr.Hits.Hits {
hit.Source.Date = parseTime(hit.Source.Date)
hit.Source.Message = strings.Trim(hit.Source.Message, "\n")
posts = append(posts, hit.Source)
}
return posts
}
func parseTime(t string) string {
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return ""
}
ts := time.Unix(i, 0)
return ts.Format(time.UnixDate)
}
type echo struct {
Name string
Docs int64
}
func (es ESConf) GetEchoesList() []echo {
searchQ := []byte(`{
"size": 0,
"aggs": {
"uniqueEcho": {
"cardinality": {
"field": "echo.keyword"
}
},
"echo": {
"terms": {
"field": "echo.keyword",
"size": 1000
}
}
}
}`)
log.Debugf("Do %s request", searchQ)
req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer(searchQ))
if err != nil {
log.Error(err.Error())
return nil
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return nil
}
defer resp.Body.Close()
var esr EchoAggregations
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return nil
}
var echoes []echo
for _, bucket := range esr.EchoAgg["echo"].Buckets {
echoes = append(echoes, echo{bucket.Key, int64(bucket.DocCount)})
}
return echoes
}
// GetListTXT ...
func (es ESConf) GetListTXT() []byte {
var listTXT []string
echoes := es.GetEchoesList()
for _, echo := range echoes {
listTXT = append(listTXT, fmt.Sprintf("%s:%d:(TODO) description support", echo.Name, echo.Docs))
}
// Add new line to be more compatible with fetchers
listTXT[len(listTXT)-1] = listTXT[len(listTXT)-1] + "\n"
return []byte(strings.Join(listTXT, "\n"))
}