/x/c method

This commit is contained in:
Denis Zheleztsov 2018-11-04 12:05:23 +03:00
parent b388acb8c5
commit 9c23daf783
2 changed files with 158 additions and 7 deletions

View File

@ -1,11 +1,12 @@
package node
import (
"github.com/gorilla/mux"
"log"
"net/http"
"strings"
"time"
"github.com/gorilla/mux"
)
// ListTXTHandler ...
@ -27,7 +28,7 @@ func (es ESConf) ListTXTHandler(w http.ResponseWriter, r *http.Request) {
// XFeaturesHandler list supported features
func XFeaturesHandler(w http.ResponseWriter, r *http.Request) {
features := []string{"list.txt", "u/e"}
features := []string{"list.txt", "u/e", "u/m"}
LogRequest(r)
@ -75,6 +76,27 @@ func (es ESConf) UEHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(strings.Join(messages, "\n")))
}
// UMHandler /u/m/ schema
func (es ESConf) UMHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
e := vars["ids"]
log.Print("/u/e/ vars: ", e)
LogRequest(r)
ch := make(chan []string)
// Get echolist
go func() {
ch <- es.GetUMMessages(e)
}()
messages := <-ch
w.WriteHeader(200)
w.Write([]byte(strings.Join(messages, "\n")))
}
// MHandler /m/ schema
func (es ESConf) MHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
@ -94,18 +116,39 @@ func (es ESConf) MHandler(w http.ResponseWriter, r *http.Request) {
w.Write(message)
}
// XCHandler /x/c schema
func (es ESConf) XCHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
echoes := vars["echoes"]
LogRequest(r)
ch := make(chan []string)
// Get echolist
go func() {
ch <- es.GetXC(echoes)
}()
counts := <-ch
w.WriteHeader(200)
w.Write([]byte(strings.Join(counts, "\n")))
}
// Serve ...
func Serve(listen string, es ESConf) {
r := mux.NewRouter()
r.HandleFunc("/list.txt", es.ListTXTHandler)
r.HandleFunc("/x/features", XFeaturesHandler)
r.HandleFunc("/list.txt", es.ListTXTHandler).Methods("GET")
r.HandleFunc("/x/features", XFeaturesHandler).Methods("GET")
// Standart schemas
r.HandleFunc("/e/{echo}", es.EHandler)
r.HandleFunc("/m/{msgid}", es.MHandler)
r.HandleFunc("/e/{echo}", es.EHandler).Methods("GET")
r.HandleFunc("/m/{msgid}", es.MHandler).Methods("GET")
// Extensions
r.HandleFunc("/u/e/{echoes:[a-z0-9-_/.:]+}", es.UEHandler)
r.HandleFunc("/u/e/{echoes:[a-z0-9-_/.:]+}", es.UEHandler).Methods("GET")
r.HandleFunc("/u/m/{ids:[a-zA-Z0-9-_/.:]+}", es.UMHandler).Methods("GET")
r.HandleFunc("/x/c/{echoes:[a-zA-Z0-9-_/.:]+}", es.XCHandler).Methods("GET")
http.Handle("/", r)

View File

@ -10,6 +10,8 @@ import (
"fmt"
"encoding/base64"
"gitea.difrex.ru/Umbrella/fetcher/i2es"
log "github.com/Sirupsen/logrus"
)
@ -162,6 +164,54 @@ func (es ESConf) GetLimitedEchoMessageHashes(echo string, offset int, limit int)
return hashes
}
func (es ESConf) GetUMMessages(msgs string) []string {
var encodedMessages []string
// First get messages list
messages := strings.Split(msgs, "/")
var searchURI string
if es.Index != "" && es.Type != "" {
searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
} else {
searchURI = strings.Join([]string{es.Host, "search"}, "/")
}
query := []byte(`
{
"query": {
"query_string" : {
"fields": ["msgid"],
"query":"` + strings.Join(messages, " OR ") + `"
}
},
"sort": [{"date":{ "order": "desc" }},
{ "_score":{ "order": "desc" }}
]
}`)
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return encodedMessages
}
defer resp.Body.Close()
var esr ESSearchResp
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return encodedMessages
}
for _, hit := range esr.Hits.Hits {
m := fmt.Sprintf("%s:%s", hit.Source.MsgID, base64.StdEncoding.EncodeToString(MakePlainTextMessage(hit.Source)))
encodedMessages = append(encodedMessages, m)
}
return encodedMessages
}
// GetUEchoMessageHashes ...
func (es ESConf) GetUEchoMessageHashes(echoes string) []string {
var echohashes []string
@ -230,6 +280,64 @@ func (es ESConf) GetUEchoMessageHashes(echoes string) []string {
return echohashes
}
// GetXC implements /x/c
func (es ESConf) GetXC(echoes string) []string {
var searchURI string
var counts []string
if es.Index != "" && es.Type != "" {
searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
} else {
searchURI = strings.Join([]string{es.Host, "search"}, "/")
}
query := []byte(`
{
"query": {
"query_string" : {
"fields": ["echo"],
"query": "` + strings.Join(strings.Split(echoes, "/"), " OR ") + `"
}
},
"size": 0,
"aggs": {
"uniqueEcho": {
"cardinality": {
"field": "echo"
}
},
"echo": {
"terms": {
"field": "echo",
"size": 1000
}
}
}
}
`)
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Error(err.Error())
return counts
}
defer resp.Body.Close()
var esr EchoAggregations
err = json.NewDecoder(resp.Body).Decode(&esr)
if err != nil {
log.Error(err.Error())
return counts
}
log.Infof("%+v", esr)
for _, hit := range esr.EchoAgg["echo"].Buckets {
counts = append(counts, fmt.Sprintf("%s:%d", hit.Key, hit.DocCount))
}
return counts
}
// GetListTXT ...
func (es ESConf) GetListTXT() []byte {
var searchURI string