From 9c23daf783ec35958671d80eefa6e9bfbdbcca41 Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Sun, 4 Nov 2018 12:05:23 +0300 Subject: [PATCH] /x/c method --- node/api.go | 57 +++++++++++++++++++++---- node/elastic.go | 108 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 7 deletions(-) diff --git a/node/api.go b/node/api.go index 4b8e770..e2de149 100644 --- a/node/api.go +++ b/node/api.go @@ -1,11 +1,12 @@ package node import ( - "github.com/gorilla/mux" "log" "net/http" "strings" "time" + + "github.com/gorilla/mux" ) // ListTXTHandler ... @@ -27,7 +28,7 @@ func (es ESConf) ListTXTHandler(w http.ResponseWriter, r *http.Request) { // XFeaturesHandler list supported features func XFeaturesHandler(w http.ResponseWriter, r *http.Request) { - features := []string{"list.txt", "u/e"} + features := []string{"list.txt", "u/e", "u/m"} LogRequest(r) @@ -75,6 +76,27 @@ func (es ESConf) UEHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte(strings.Join(messages, "\n"))) } +// UMHandler /u/m/ schema +func (es ESConf) UMHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + e := vars["ids"] + + log.Print("/u/e/ vars: ", e) + + LogRequest(r) + + ch := make(chan []string) + // Get echolist + go func() { + ch <- es.GetUMMessages(e) + }() + + messages := <-ch + + w.WriteHeader(200) + w.Write([]byte(strings.Join(messages, "\n"))) +} + // MHandler /m/ schema func (es ESConf) MHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -94,18 +116,39 @@ func (es ESConf) MHandler(w http.ResponseWriter, r *http.Request) { w.Write(message) } +// XCHandler /x/c schema +func (es ESConf) XCHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + echoes := vars["echoes"] + + LogRequest(r) + + ch := make(chan []string) + // Get echolist + go func() { + ch <- es.GetXC(echoes) + }() + + counts := <-ch + + w.WriteHeader(200) + w.Write([]byte(strings.Join(counts, "\n"))) +} + // Serve ... func Serve(listen string, es ESConf) { r := mux.NewRouter() - r.HandleFunc("/list.txt", es.ListTXTHandler) - r.HandleFunc("/x/features", XFeaturesHandler) + r.HandleFunc("/list.txt", es.ListTXTHandler).Methods("GET") + r.HandleFunc("/x/features", XFeaturesHandler).Methods("GET") // Standart schemas - r.HandleFunc("/e/{echo}", es.EHandler) - r.HandleFunc("/m/{msgid}", es.MHandler) + r.HandleFunc("/e/{echo}", es.EHandler).Methods("GET") + r.HandleFunc("/m/{msgid}", es.MHandler).Methods("GET") // Extensions - r.HandleFunc("/u/e/{echoes:[a-z0-9-_/.:]+}", es.UEHandler) + r.HandleFunc("/u/e/{echoes:[a-z0-9-_/.:]+}", es.UEHandler).Methods("GET") + r.HandleFunc("/u/m/{ids:[a-zA-Z0-9-_/.:]+}", es.UMHandler).Methods("GET") + r.HandleFunc("/x/c/{echoes:[a-zA-Z0-9-_/.:]+}", es.XCHandler).Methods("GET") http.Handle("/", r) diff --git a/node/elastic.go b/node/elastic.go index 2ee85f6..6a23460 100644 --- a/node/elastic.go +++ b/node/elastic.go @@ -10,6 +10,8 @@ import ( "fmt" + "encoding/base64" + "gitea.difrex.ru/Umbrella/fetcher/i2es" log "github.com/Sirupsen/logrus" ) @@ -162,6 +164,54 @@ func (es ESConf) GetLimitedEchoMessageHashes(echo string, offset int, limit int) return hashes } +func (es ESConf) GetUMMessages(msgs string) []string { + var encodedMessages []string + + // First get messages list + messages := strings.Split(msgs, "/") + var searchURI string + if es.Index != "" && es.Type != "" { + searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") + } else { + searchURI = strings.Join([]string{es.Host, "search"}, "/") + } + query := []byte(` +{ + "query": { + "query_string" : { + "fields": ["msgid"], + "query":"` + strings.Join(messages, " OR ") + `" + } + }, + "sort": [{"date":{ "order": "desc" }}, + { "_score":{ "order": "desc" }} + ] +}`) + req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query)) + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Error(err.Error()) + return encodedMessages + } + + defer resp.Body.Close() + + var esr ESSearchResp + err = json.NewDecoder(resp.Body).Decode(&esr) + if err != nil { + log.Error(err.Error()) + return encodedMessages + } + + for _, hit := range esr.Hits.Hits { + m := fmt.Sprintf("%s:%s", hit.Source.MsgID, base64.StdEncoding.EncodeToString(MakePlainTextMessage(hit.Source))) + encodedMessages = append(encodedMessages, m) + } + + return encodedMessages +} + // GetUEchoMessageHashes ... func (es ESConf) GetUEchoMessageHashes(echoes string) []string { var echohashes []string @@ -230,6 +280,64 @@ func (es ESConf) GetUEchoMessageHashes(echoes string) []string { return echohashes } +// GetXC implements /x/c +func (es ESConf) GetXC(echoes string) []string { + var searchURI string + var counts []string + if es.Index != "" && es.Type != "" { + searchURI = strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") + } else { + searchURI = strings.Join([]string{es.Host, "search"}, "/") + } + + query := []byte(` +{ + "query": { + "query_string" : { + "fields": ["echo"], + "query": "` + strings.Join(strings.Split(echoes, "/"), " OR ") + `" + } + }, + "size": 0, + "aggs": { + "uniqueEcho": { + "cardinality": { + "field": "echo" + } + }, + "echo": { + "terms": { + "field": "echo", + "size": 1000 + } + } + } +} +`) + req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer(query)) + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Error(err.Error()) + return counts + } + + defer resp.Body.Close() + + var esr EchoAggregations + err = json.NewDecoder(resp.Body).Decode(&esr) + if err != nil { + log.Error(err.Error()) + return counts + } + log.Infof("%+v", esr) + + for _, hit := range esr.EchoAgg["echo"].Buckets { + counts = append(counts, fmt.Sprintf("%s:%d", hit.Key, hit.DocCount)) + } + return counts +} + // GetListTXT ... func (es ESConf) GetListTXT() []byte { var searchURI string