diff --git a/i2es/elastic.go b/i2es/elastic.go index 44b14c7..358da80 100644 --- a/i2es/elastic.go +++ b/i2es/elastic.go @@ -39,6 +39,9 @@ func (es ESConf) PutToIndex(msg ESDoc) error { req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) client := &http.Client{} resp, err := client.Do(req) + if err != nil { + return err + } defer resp.Body.Close() @@ -66,8 +69,15 @@ func (es ESConf) CheckID(id string) (bool, error) { searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ))) + if err != nil { + return false, err + } + client := &http.Client{} resp, err := client.Do(req) + if err != nil { + return false, err + } defer resp.Body.Close() diff --git a/idec/proto.go b/idec/proto.go index 44989dd..bcdc1ab 100644 --- a/idec/proto.go +++ b/idec/proto.go @@ -8,6 +8,8 @@ import ( "net/http" "strconv" "strings" + + log "github.com/Sirupsen/logrus" ) // IDEC Extensions. see: https://ii-net.tk/idec-doc/?p=extensions @@ -66,6 +68,7 @@ func (f FetchConfig) GetMessagesIDS() ([]ID, error) { limit := strconv.Itoa(f.Limit) getURI = strings.Join([]string{f.Node, echoSchema, getEchoes, "/", offset, ":", limit}, "") + log.Info(getURI) // Get messages ids response, err := http.Get(getURI) @@ -200,7 +203,7 @@ func (f FetchConfig) GetEchoList() ([]Echo, error) { var echoes []Echo // Check node features support - fres, err := http.Get(strings.Join([]string{f.Node, features}, "/")) + fres, err := http.Get(strings.Join([]string{f.Node, features}, "")) if err != nil { return echoes, err } @@ -212,11 +215,11 @@ func (f FetchConfig) GetEchoList() ([]Echo, error) { } if !strings.Contains(string(c), listTXT) { - err = errors.New("Node does not support echoes list") + err = errors.New(strings.Join([]string{f.Node, features}, "") + " Node does not support echoes list " + string(c)) return echoes, err } - lres, err := http.Get(strings.Join([]string{f.Node, listTXT}, "/")) + lres, err := http.Get(strings.Join([]string{f.Node, listTXT}, "")) if err != nil { return echoes, err } diff --git a/main.go b/main.go index ba3f210..b469c16 100644 --- a/main.go +++ b/main.go @@ -2,10 +2,15 @@ package main import ( "flag" + + "strconv" + + log "github.com/Sirupsen/logrus" + + "sync" + "gitea.difrex.ru/Umbrella/fetcher/i2es" "gitea.difrex.ru/Umbrella/fetcher/idec" - "log" - "strconv" ) var inode string @@ -38,6 +43,7 @@ func main() { // trying get echo list e, err := fc.GetEchoList() if err != nil { + log.Error(err.Error()) fc.Echoes = []string{"ii.14", "linux.14"} } else { var echoes []string @@ -89,32 +95,54 @@ func main() { j = 0 } } + var wg sync.WaitGroup + counter := 0 + all := len(messages) - 1 for _, m := range messages { - msg, err := idec.ParseMessage(m.Message) - if err != nil { - panic(err) + wg.Add(1) + go func(m idec.MSG) { + msg, err := idec.ParseMessage(m.Message) + if err != nil { + log.Error(err.Error()) + wg.Done() + return + } + + var esd i2es.ESDoc + esd.Echo = msg.Echo + esd.Subg = msg.Subg + esd.Author = msg.From + esd.To = msg.To + ts := strconv.Itoa(msg.Timestamp) + esd.Date = ts + esd.Message = msg.Body + esd.MsgID = m.ID + + log.Debug("Check message ", m.ID) + _, err = elastic.CheckID(m.ID) + if err != nil { + log.Warn(err.Error()) + wg.Done() + return + } + err = elastic.PutToIndex(esd) + if err != nil { + log.Error(err.Error()) + wg.Done() + return + } + log.Warn("Message ", m.ID, " added to index") + wg.Done() + }(m) + counter++ + all-- + if counter == 2 { + wg.Wait() + } + if all == 0 { + break } - var esd i2es.ESDoc - esd.Echo = msg.Echo - esd.Subg = msg.Subg - esd.Author = msg.From - esd.To = msg.To - ts := strconv.Itoa(msg.Timestamp) - esd.Date = ts - esd.Message = msg.Body - esd.MsgID = m.ID - - log.Print("Check message ", m.ID) - _, err = elastic.CheckID(m.ID) - if err != nil { - log.Print(err) - continue - } - err = elastic.PutToIndex(esd) - if err != nil { - log.Print(err) - } - log.Print("Message ", msg.ID, " added to index") } + wg.Wait() }