package node import ( "bytes" "encoding/json" "io" "io/ioutil" "net/http" "strconv" "strings" "time" "fmt" "encoding/base64" "gitea.difrex.ru/Umbrella/fetcher/i2es" log "github.com/sirupsen/logrus" ) const ( echoAgg = "echo_uniq" ) // MakePlainTextMessage ... func MakePlainTextMessage(hit i2es.ESDoc) []byte { tags := "ii/ok" if hit.Repto != "" { tags += fmt.Sprintf("/repto/%s", hit.Repto) } m := []string{ tags, hit.Echo, hit.Date, hit.Author, hit.Address, hit.To, hit.Subg, hit.Message, } return []byte(strings.Join(m, "\n")) } // GetPlainTextMessage ... func (es ESConf) GetPlainTextMessage(msgid string) []byte { var searchURI string if es.Index != "" && es.Type != "" { searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") } else { searchURI = strings.Join([]string{es.Host, "search"}, "/") } searchQ := []byte(strings.Join([]string{ `{"query": {"match": {"_id": "`, msgid, `"}}}`}, "")) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) if err != nil { log.Error(err.Error()) return []byte("") } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return []byte("") } defer resp.Body.Close() var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return []byte("") } if len(esr.Hits.Hits) > 0 { return MakePlainTextMessage(esr.Hits.Hits[0].Source) } return []byte("") } // GetEchoMessageHashes ... func (es ESConf) GetEchoMessageHashes(echo string) []string { var hashes []string var searchURI string if es.Index != "" && es.Type != "" { searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") } else { searchURI = strings.Join([]string{es.Host, "search"}, "/") } searchQ := []byte(strings.Join([]string{ `{"sort": [ {"date":{ "order": "desc" }},{ "_score":{ "order": "desc" }}], "query": {"query_string" : {"fields": ["echo.keyword"], "query":"`, echo, `"}}, "size": 500}`}, "")) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(searchQ)) if err != nil { log.Error(err.Error()) return hashes } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return hashes } defer resp.Body.Close() content, err := ioutil.ReadAll(resp.Body) if err != nil { log.Error(err.Error()) hashes = append(hashes, "error: Internal error") return hashes } var esr ESSearchResp err = json.Unmarshal(content, &esr) if err != nil { log.Error(err.Error()) hashes = append(hashes, "error: Internal error") return hashes } for _, hit := range esr.Hits.Hits { hashes = append(hashes, hit.Source.MsgID) } return hashes } // GetLimitedEchoMessageHashes ... func (es ESConf) GetLimitedEchoMessageHashes(echo string, offset int, limit int) []string { var hashes []string // Check offset var order string if offset <= 0 { order = "desc" } else { order = "asc" } l := strconv.Itoa(limit) searchQ := []byte(strings.Join([]string{ `{"sort": [ {"date":{ "order": "`, order, `" }},{ "_score":{ "order": "`, order, `" }}], "query": {"query_string" : {"fields": ["msgid.keyword", "echo.keyword"], "query":"`, echo, `"}}, "size":`, l, `}`}, "")) req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer(searchQ)) if err != nil { log.Error(err.Error()) return hashes } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return hashes } defer resp.Body.Close() var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return hashes } for _, hit := range esr.Hits.Hits { hashes = append(hashes, hit.Source.MsgID) } return hashes } func (es ESConf) DoSearch(query string) []i2es.ESDoc { q := `{"sort": [ {"date":{ "order": "desc" }},{ "_score":{ "order": "desc" }}], "query": {"query_string" : {"fields": ["message", "subg"], "query":` + query + `}}, "size": 100}` req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer([]byte(q))) if err != nil { log.Error(err.Error()) return nil } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return nil } defer resp.Body.Close() var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return nil } var posts []i2es.ESDoc for _, hit := range esr.Hits.Hits { posts = append(posts, hit.Source) } return posts } func (es ESConf) GetUMMessages(msgs string) []string { var encodedMessages []string // First get messages list messages := strings.Split(msgs, "/") var searchURI string if es.Index != "" && es.Type != "" { searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") } else { searchURI = strings.Join([]string{es.Host, "search"}, "/") } query := []byte(` { "query": { "query_string" : { "fields": ["msgid.keyword"], "query":"` + strings.Join(messages, " OR ") + `" } }, "sort": [{"date":{ "order": "desc" }}, { "_score":{ "order": "desc" }} ] }`) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query)) if err != nil { log.Error(err.Error()) return encodedMessages } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return encodedMessages } defer resp.Body.Close() var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return encodedMessages } for _, hit := range esr.Hits.Hits { m := fmt.Sprintf("%s:%s", hit.Source.MsgID, base64.StdEncoding.EncodeToString(MakePlainTextMessage(hit.Source))) encodedMessages = append(encodedMessages, m) } return encodedMessages } // GetUEchoMessageHashes ... func (es ESConf) GetUEchoMessageHashes(echoes string) []string { var echohashes []string // First get echoes list el := strings.Split(echoes, "/") // Check offset and limit var offset int var limit int withOL := false if strings.Contains(el[len(el)-1], ":") { oflim := strings.Split(el[len(el)-1], ":") o, err := strconv.Atoi(oflim[0]) l, err := strconv.Atoi(oflim[1]) if err != nil { log.Error(err) } else { offset = o limit = l withOL = true } } eh := make(map[string][]string) var curEcho string for i, echo := range el { if echo == "" { continue } if !strings.Contains(echo, ":") { curEcho = echo } if withOL { recEcho := es.GetLimitedEchoMessageHashes(curEcho, offset, limit) eh[curEcho] = make([]string, len(curEcho)) eh[curEcho] = append(eh[curEcho], recEcho...) } else { recEcho := es.GetEchoMessageHashes(curEcho) eh[curEcho] = make([]string, len(recEcho)) eh[curEcho] = append(eh[curEcho], recEcho...) } if i == len(el) { break } } // Make standard output: // echo.name // Some20SimbolsHash333 for k, v := range eh { echohashes = append(echohashes, k) if k == "" || k == "\n" { continue } for _, e := range v { if e == "" { continue } echohashes = append(echohashes, e) } } return addNewLineToLastWord(echohashes) } // GetXC implements /x/c func (es ESConf) GetXC(echoes string) []string { var searchURI string var counts []string if es.Index != "" && es.Type != "" { searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") } else { searchURI = strings.Join([]string{es.Host, "search"}, "/") } query := []byte(` { "query": { "query_string" : { "fields": ["echo.keyword"], "query": "` + strings.Join(strings.Split(echoes, "/"), " OR ") + `" } }, "size": 0, "aggs": { "uniqueEcho": { "cardinality": { "field": "echo.keyword" } }, "echo": { "terms": { "field": "echo.keyword", "size": 1000 } } } } `) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query)) if err != nil { log.Error(err.Error()) return counts } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return counts } defer resp.Body.Close() var esr EchoAggregations err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return counts } log.Infof("%+v", esr) for _, hit := range esr.EchoAgg["echo"].Buckets { counts = append(counts, fmt.Sprintf("%s:%d", hit.Key, hit.DocCount)) } return counts } type ThreadBucket struct { DocCount int64 `json:"doc_count"` Key string `json:"key"` Post Hits } var defaultEchoes = []string{`"idec.talks"`, `"pipe.2032"`, `"linux.14"`, `"develop.16"`, `"dynamic.local"`, `"std.club"`, `"std.hugeping"`, `"difrex.blog"`, `"ii.test.14"`} func (es ESConf) GetTopic(topicID string) (posts []i2es.ESDoc) { query := []byte(strings.Join([]string{ `{"sort": [{"date": {"order": "asc"}}, {"_score": {"order": "desc" }}], "size":1000,"query": {"term": {"topicid.keyword": "`, topicID, `"}}}`}, "")) req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query))) if err != nil { log.Error(err) return } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err) return } defer resp.Body.Close() var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return nil } for _, hit := range esr.Hits.Hits { hit.Source.Message = strings.Trim(hit.Source.Message, "\n") hit.Source.Date = parseTime(hit.Source.Date) posts = append(posts, hit.Source) } return } func (es ESConf) GetMessage(msgID string) (posts []i2es.ESDoc) { query := []byte(strings.Join([]string{ `{"sort": [{"date": {"order": "asc"}}, {"_score": {"order": "desc" }}], "size":1000,"query": {"term": {"msgid.keyword": "`, msgID, `"}}}`}, "")) req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query))) if err != nil { log.Error(err) return } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err) return } defer resp.Body.Close() var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return nil } for _, hit := range esr.Hits.Hits { hit.Source.Message = strings.Trim(hit.Source.Message, "\n") hit.Source.Date = parseTime(hit.Source.Date) posts = append(posts, hit.Source) } return } func (es ESConf) GetThreads(pageNum int, echoes ...string) (posts []i2es.ESDoc) { ech := defaultEchoes if len(echoes) > 0 { ech = []string{} for _, echo := range echoes { ech = append(ech, fmt.Sprintf(`"%s"`, echo)) } } rangeStr := `"from":"now-30d","to":"now-0d"` if pageNum > 1 { to := 30*pageNum - 30 from := 30 * pageNum rangeStr = fmt.Sprintf(`"from":"now-%dd","to":"now-%dd"`, from, to) } log.Debug(rangeStr) query := `{"sort":[{"date":{"order":"desc"}}],"aggs":{"topics":{"terms":{"field":"topicid.keyword","size":100},"aggs":{"post":{"top_hits":{"size":1,"sort":[{"date":{"order":"desc"}}],"_source":{"include": ["subg","author","date","echo","topicid","address", "repto"]}}}}}},"query":{"bool":{"must":[{"range":{"date":{` + rangeStr + `}}},{"constant_score":{"filter":{"terms":{"echo.keyword": [` + strings.Join(ech, ",") + `]}}}}]}}}` log.Debug("Run: ", query) req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query))) if err != nil { log.Error(err) return } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err) return } defer resp.Body.Close() var data ESAggsResp if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { log.Error(err) return } for _, bucket := range data.Aggregations.Topics.Buckets { // Empty topicid if bucket.Key == "" { continue } for _, post := range bucket.Post.Hits.Hits { posts = append(posts, post.Source) } } return } func (es ESConf) GetThreadsYear(pageNum int, echoes ...string) (posts []i2es.ESDoc) { ech := defaultEchoes if len(echoes) > 0 { ech = []string{} for _, echo := range echoes { ech = append(ech, fmt.Sprintf(`"%s"`, echo)) } } query := `{"sort":[{"date":{"order":"desc"}}],"aggs":{"topics":{"terms":{"field":"topicid.keyword","size":500},"aggs":{"post":{"top_hits":{"size":1,"sort":[{"date":{"order":"desc"}}],"_source":{"include": ["subg","author","date","echo","topicid","address", "repto"]}}}}}},"query":{"bool":{"must":[{"range":{"date":{"from": "now-365d", "to": "now"}}}, {"constant_score":{"filter":{"terms":{"echo.keyword": [` + strings.Join(ech, ",") + `]}}}}]}}}` log.Debug("Run: ", query) req, err := http.NewRequest("POST", es.searchURI(), bytes.NewReader([]byte(query))) if err != nil { log.Error(err) return } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err) return } defer resp.Body.Close() if resp.StatusCode > 200 { d, _ := io.ReadAll(resp.Body) log.Debug(string(d)) } var data ESAggsResp if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { log.Error(err) return } for _, bucket := range data.Aggregations.Topics.Buckets { // Empty topicid if bucket.Key == "" { continue } for _, post := range bucket.Post.Hits.Hits { posts = append(posts, post.Source) } } return } func (es ESConf) GetLatestPosts(sum int) []i2es.ESDoc { query := fmt.Sprintf(`{"sort": [{"date": {"order": "desc"}}, {"_score": {"order": "desc" }}], "size": %d}`, sum) log.Debugf("Do %s request", query) req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer([]byte(query))) if err != nil { log.Error(err.Error()) return nil } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return nil } defer resp.Body.Close() var esr ESSearchResp err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return nil } var posts []i2es.ESDoc for _, hit := range esr.Hits.Hits { hit.Source.Date = parseTime(hit.Source.Date) hit.Source.Message = strings.Trim(hit.Source.Message, "\n") posts = append(posts, hit.Source) } return posts } func parseTime(t string) string { i, err := strconv.ParseInt(t, 10, 64) if err != nil { return "" } ts := time.Unix(i, 0) return ts.Format(time.UnixDate) } type echo struct { Name string Docs int64 } func (es ESConf) GetEchoesList() []echo { searchQ := []byte(`{ "size": 0, "aggs": { "uniqueEcho": { "cardinality": { "field": "echo.keyword" } }, "echo": { "terms": { "field": "echo.keyword", "size": 1000 } } } }`) log.Debugf("Do %s request", searchQ) req, err := http.NewRequest("POST", es.searchURI(), bytes.NewBuffer(searchQ)) if err != nil { log.Error(err.Error()) return nil } req.Header.Add("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Error(err.Error()) return nil } defer resp.Body.Close() var esr EchoAggregations err = json.NewDecoder(resp.Body).Decode(&esr) if err != nil { log.Error(err.Error()) return nil } var echoes []echo for _, bucket := range esr.EchoAgg["echo"].Buckets { echoes = append(echoes, echo{bucket.Key, int64(bucket.DocCount)}) } return echoes } // GetListTXT ... func (es ESConf) GetListTXT() []byte { var listTXT []string echoes := es.GetEchoesList() for _, echo := range echoes { listTXT = append(listTXT, fmt.Sprintf("%s:%d:(TODO) description support", echo.Name, echo.Docs)) } // Add new line to be more compatible with fetchers listTXT[len(listTXT)-1] = listTXT[len(listTXT)-1] + "\n" return []byte(strings.Join(listTXT, "\n")) }