From f18258c593509dd6819327d92af38e5d27d2f8f4 Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Sat, 11 Feb 2017 19:53:20 +0300 Subject: [PATCH] Fetcher on Go first iteration --- i2es/elastic.go | 95 ++++++++++++++++++++++++++++++++++++++ idec/proto.go | 60 ++++++++++++++++++++++-- main.go | 120 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 270 insertions(+), 5 deletions(-) create mode 100644 i2es/elastic.go create mode 100644 main.go diff --git a/i2es/elastic.go b/i2es/elastic.go new file mode 100644 index 0000000..7be320f --- /dev/null +++ b/i2es/elastic.go @@ -0,0 +1,95 @@ +package i2es + +import ( + "bytes" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + "strings" +) + +// ESDoc Elasticsearch document structure +type ESDoc struct { + Echo string `json:"echo"` + Subg string `json:"subj"` + To string `json:"to"` + Author string `json:"author"` + Message string `json:"message"` + Date string `json:"date"` + MsgID string `json:"msgid"` +} + +// ESConf ES connection settings +type ESConf struct { + Host string + Index string + Type string +} + +// PutToIndex ... +func (es ESConf) PutToIndex(msg ESDoc) error { + putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") + + doc, err := json.Marshal(msg) + if err != nil { + return err + } + + req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) + client := &http.Client{} + resp, err := client.Do(req) + + defer resp.Body.Close() + + return err +} + +// ESRes ES response minimal structure +type ESRes struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Hits Hits `json:"hits"` +} + +// Hits ... +type Hits struct { + Total int `json:"total"` + MaxScore float32 `json:"max_score"` + Hits []interface{} `json:"hits"` +} + +// CheckID ... +func (es ESConf) CheckID(id string) (bool, error) { + searchURI := strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/") + + searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") + + req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ))) + client := &http.Client{} + resp, err := client.Do(req) + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, err + } + + var e ESRes + err = json.Unmarshal(body, &e) + if err != nil { + return false, err + } + + if e.Hits.Total > 0 { + err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, "")) + return false, err + } + if e.TimedOut { + err = errors.New("Request time out") + return false, err + } + + return false, nil +} diff --git a/idec/proto.go b/idec/proto.go index 32fb8ba..44989dd 100644 --- a/idec/proto.go +++ b/idec/proto.go @@ -101,9 +101,58 @@ func (f FetchConfig) GetMessagesIDS() ([]ID, error) { return ids, nil } +// GetAllMessagesIDS get all message ids from node +func (f FetchConfig) GetAllMessagesIDS() ([]ID, error) { + var ids []ID + + var getURI string + getEchoes := strings.Join(f.Echoes, "/") + + getURI = strings.Join([]string{f.Node, echoSchema, getEchoes}, "") + + // Get messages ids + response, err := http.Get(getURI) + if err != nil { + return ids, err + } + + defer response.Body.Close() + c, err := ioutil.ReadAll(response.Body) + if err != nil { + return ids, err + } + + var i ID + var curEcho string + rawIDS := strings.Split(string(c), "\n") + for _, line := range rawIDS { + + // Match echoarea + if strings.Contains(line, ".") { + curEcho = line + continue + } + + // Match message ID + if !strings.Contains(line, ".") && !strings.Contains(line, ":") && line != "" { + i.Echo = curEcho + i.MsgID = line + ids = append(ids, i) + } + } + + return ids, nil +} + +// MSG ... +type MSG struct { + Message string `json:"message"` + ID string `json:"id"` +} + // GetRawMessages get messages from node -func (f FetchConfig) GetRawMessages(ids []ID) ([]string, error) { - var messages []string +func (f FetchConfig) GetRawMessages(ids []ID) ([]MSG, error) { + var messages []MSG var messagesIDS []string for _, id := range ids { @@ -116,7 +165,8 @@ func (f FetchConfig) GetRawMessages(ids []ID) ([]string, error) { // Get messages ids response, err := http.Get(getURI) if err != nil { - return messages, err + e := errors.New("Failed to get " + getURI + ". ") + return messages, e } defer response.Body.Close() @@ -130,8 +180,8 @@ func (f FetchConfig) GetRawMessages(ids []ID) ([]string, error) { break } message := strings.Split(m, ":") - if len(message) > 0 { - messages = append(messages, message[1]) + if len(message) > 1 { + messages = append(messages, MSG{message[1], message[0]}) } } diff --git a/main.go b/main.go new file mode 100644 index 0000000..f23402b --- /dev/null +++ b/main.go @@ -0,0 +1,120 @@ +package main + +import ( + "flag" + "gitea.difrex.ru/Umbrella/fetcher/i2es" + "gitea.difrex.ru/Umbrella/fetcher/idec" + "log" + "strconv" +) + +var inode string +var offset int +var limit int +var all bool +var es string +var esIndex string +var esType string + +// init ... +func init() { + flag.IntVar(&offset, "offset", 0, "") + flag.IntVar(&limit, "limit", -1, "") + flag.BoolVar(&all, "all", false, "Get all messages") + flag.StringVar(&inode, "inode", "https://ii-net.tk/ii/ii-point.php?q=/", "IDEC node URI") + flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host") + flag.StringVar(&esIndex, "esindex", "idec", "ES index") + flag.StringVar(&esType, "estype", "post", "ES document type") + flag.Parse() +} + +func main() { + // extensions := idec.NewExtensions() + var fc idec.FetchConfig + + fc.Node = inode + + log.Print("Working for node: ", inode) + // trying get echo list + e, err := fc.GetEchoList() + if err != nil { + fc.Echoes = []string{"ii.14", "linux.14"} + } else { + var echoes []string + for _, echo := range e { + echoes = append(echoes, echo.Name) + } + fc.Echoes = echoes + } + + log.Print("Receive echoelist: ", fc.Echoes) + + fc.Num = 10 + fc.Limit = limit + fc.Offset = offset + + var elastic i2es.ESConf + elastic.Host = es + elastic.Index = esIndex + elastic.Type = esType + + var ids []idec.ID + if all { + ids, err = fc.GetAllMessagesIDS() + if err != nil { + panic(err) + } + } else { + ids, err = fc.GetMessagesIDS() + if err != nil { + panic(err) + } + } + + var messages []idec.MSG + var stash []idec.ID + j := 0 + for _, id := range ids { + stash = append(stash, id) + j++ + if j == 20 { + m, err := fc.GetRawMessages(stash) + if err != nil { + log.Print(err) + } + for _, m := range m { + messages = append(messages, m) + } + stash = nil + j = 0 + } + } + for _, m := range messages { + msg, err := idec.ParseMessage(m.Message) + if err != nil { + panic(err) + } + + var esd i2es.ESDoc + esd.Echo = msg.Echo + esd.Subg = msg.Subg + esd.Author = msg.From + esd.To = msg.To + ts := strconv.Itoa(msg.Timestamp) + esd.Date = ts + esd.Message = msg.Body + esd.MsgID = m.ID + + log.Print("Check message ", m.ID) + _, err = elastic.CheckID(m.ID) + if err != nil { + log.Print("Alredy in index ", err) + continue + } + err = elastic.PutToIndex(esd) + if err != nil { + log.Print(err) + } + log.Print("Message ", msg.ID, " added to index") + } +}