From d826a868b9e09877e121b06167888f95bb816788 Mon Sep 17 00:00:00 2001 From: Denis Zheleztsov Date: Thu, 7 Dec 2017 17:07:46 +0300 Subject: [PATCH] Switch to go-idec Added Tags, Repto and address fields. Fix content-type issues --- i2es/elastic.go | 10 +++++++ idec/parser.go | 35 +++++++++-------------- main.go | 75 ++++++++++++++++++------------------------------- 3 files changed, 52 insertions(+), 68 deletions(-) diff --git a/i2es/elastic.go b/i2es/elastic.go index 358da80..ceb0d6d 100644 --- a/i2es/elastic.go +++ b/i2es/elastic.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "io/ioutil" + "log" "net/http" "strings" ) @@ -18,6 +19,9 @@ type ESDoc struct { Message string `json:"message"` Date string `json:"date"` MsgID string `json:"msgid"` + Tags string `json:"tags"` + Repto string `json:"repto"` + Address string `json:"address"` } // ESConf ES connection settings @@ -30,13 +34,17 @@ type ESConf struct { // PutToIndex ... func (es ESConf) PutToIndex(msg ESDoc) error { putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") + log.Print(putURI) doc, err := json.Marshal(msg) if err != nil { return err } + // log.Print(string(doc)) req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) + req.Header.Add("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { @@ -69,6 +77,8 @@ func (es ESConf) CheckID(id string) (bool, error) { searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ))) + req.Header.Add("Content-Type", "application/json") + if err != nil { return false, err } diff --git a/idec/parser.go b/idec/parser.go index db78ef0..ba95eb5 100644 --- a/idec/parser.go +++ b/idec/parser.go @@ -1,38 +1,31 @@ package idec import ( - "encoding/base64" "strconv" "strings" + + idec "github.com/Difrex/go-idec" ) // ParseMessage ... func ParseMessage(message string) (Message, error) { var m Message - plainMessage, err := base64.StdEncoding.DecodeString(message) + + msg, err := idec.ParseMessage(message) if err != nil { return m, err } - txtMessage := strings.Split(string(plainMessage), "\n") - - var body string - for i := 8; i < len(txtMessage); i++ { - body = strings.Join([]string{body, txtMessage[i]}, "\n") - } - - ts, err := strconv.Atoi(txtMessage[2]) - if err != nil { - return m, err - } - m.Tags = txtMessage[0] - m.Echo = txtMessage[1] - m.Timestamp = ts - m.From = txtMessage[3] - m.Address = txtMessage[4] - m.To = txtMessage[5] - m.Subg = txtMessage[6] - m.Body = body + m.Tags = msg.Tags.II + m.Echo = msg.Echo + m.Timestamp = msg.Timestamp + m.From = msg.From + m.Address = msg.Address + m.To = msg.To + m.Subg = msg.Subg + m.Body = msg.Body + m.Repto = msg.Tags.Repto + m.ID = msg.ID return m, err } diff --git a/main.go b/main.go index b469c16..b8f60ed 100644 --- a/main.go +++ b/main.go @@ -7,8 +7,6 @@ import ( log "github.com/Sirupsen/logrus" - "sync" - "gitea.difrex.ru/Umbrella/fetcher/i2es" "gitea.difrex.ru/Umbrella/fetcher/idec" ) @@ -95,54 +93,37 @@ func main() { j = 0 } } - var wg sync.WaitGroup - counter := 0 - all := len(messages) - 1 + for _, m := range messages { - wg.Add(1) - go func(m idec.MSG) { - msg, err := idec.ParseMessage(m.Message) - if err != nil { - log.Error(err.Error()) - wg.Done() - return - } - - var esd i2es.ESDoc - esd.Echo = msg.Echo - esd.Subg = msg.Subg - esd.Author = msg.From - esd.To = msg.To - ts := strconv.Itoa(msg.Timestamp) - esd.Date = ts - esd.Message = msg.Body - esd.MsgID = m.ID - - log.Debug("Check message ", m.ID) - _, err = elastic.CheckID(m.ID) - if err != nil { - log.Warn(err.Error()) - wg.Done() - return - } - err = elastic.PutToIndex(esd) - if err != nil { - log.Error(err.Error()) - wg.Done() - return - } - log.Warn("Message ", m.ID, " added to index") - wg.Done() - }(m) - counter++ - all-- - if counter == 2 { - wg.Wait() + msg, err := idec.ParseMessage(m.Message) + if err != nil { + log.Error(err.Error()) + return } - if all == 0 { - break + + var esd i2es.ESDoc + esd.Echo = msg.Echo + esd.Subg = msg.Subg + esd.Author = msg.From + esd.To = msg.To + ts := strconv.Itoa(msg.Timestamp) + esd.Date = ts + esd.Message = msg.Body + esd.MsgID = m.ID + esd.Repto = msg.Repto + esd.Address = msg.Address + + log.Debug("Check message ", m.ID) + _, err = elastic.CheckID(m.ID) + if err != nil { + log.Warn(err.Error()) + continue } + err = elastic.PutToIndex(esd) + if err != nil { + log.Error(err.Error()) + } + log.Warn("Message ", m.ID, " added to index") } - wg.Wait() }