Logging imporovement and fix list.txt

This commit is contained in:
Denis Zheleztsov 2017-09-27 14:49:50 +03:00
parent 635056c7c6
commit f2640e0919
3 changed files with 70 additions and 29 deletions

View File

@ -39,6 +39,9 @@ func (es ESConf) PutToIndex(msg ESDoc) error {
req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc))
client := &http.Client{} client := &http.Client{}
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close() defer resp.Body.Close()
@ -66,8 +69,15 @@ func (es ESConf) CheckID(id string) (bool, error) {
searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "")
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ))) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ)))
if err != nil {
return false, err
}
client := &http.Client{} client := &http.Client{}
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close() defer resp.Body.Close()

View File

@ -8,6 +8,8 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
log "github.com/Sirupsen/logrus"
) )
// IDEC Extensions. see: https://ii-net.tk/idec-doc/?p=extensions // IDEC Extensions. see: https://ii-net.tk/idec-doc/?p=extensions
@ -66,6 +68,7 @@ func (f FetchConfig) GetMessagesIDS() ([]ID, error) {
limit := strconv.Itoa(f.Limit) limit := strconv.Itoa(f.Limit)
getURI = strings.Join([]string{f.Node, echoSchema, getEchoes, "/", offset, ":", limit}, "") getURI = strings.Join([]string{f.Node, echoSchema, getEchoes, "/", offset, ":", limit}, "")
log.Info(getURI)
// Get messages ids // Get messages ids
response, err := http.Get(getURI) response, err := http.Get(getURI)
@ -200,7 +203,7 @@ func (f FetchConfig) GetEchoList() ([]Echo, error) {
var echoes []Echo var echoes []Echo
// Check node features support // Check node features support
fres, err := http.Get(strings.Join([]string{f.Node, features}, "/")) fres, err := http.Get(strings.Join([]string{f.Node, features}, ""))
if err != nil { if err != nil {
return echoes, err return echoes, err
} }
@ -212,11 +215,11 @@ func (f FetchConfig) GetEchoList() ([]Echo, error) {
} }
if !strings.Contains(string(c), listTXT) { if !strings.Contains(string(c), listTXT) {
err = errors.New("Node does not support echoes list") err = errors.New(strings.Join([]string{f.Node, features}, "") + " Node does not support echoes list " + string(c))
return echoes, err return echoes, err
} }
lres, err := http.Get(strings.Join([]string{f.Node, listTXT}, "/")) lres, err := http.Get(strings.Join([]string{f.Node, listTXT}, ""))
if err != nil { if err != nil {
return echoes, err return echoes, err
} }

80
main.go
View File

@ -2,10 +2,15 @@ package main
import ( import (
"flag" "flag"
"strconv"
log "github.com/Sirupsen/logrus"
"sync"
"gitea.difrex.ru/Umbrella/fetcher/i2es" "gitea.difrex.ru/Umbrella/fetcher/i2es"
"gitea.difrex.ru/Umbrella/fetcher/idec" "gitea.difrex.ru/Umbrella/fetcher/idec"
"log"
"strconv"
) )
var inode string var inode string
@ -38,6 +43,7 @@ func main() {
// trying get echo list // trying get echo list
e, err := fc.GetEchoList() e, err := fc.GetEchoList()
if err != nil { if err != nil {
log.Error(err.Error())
fc.Echoes = []string{"ii.14", "linux.14"} fc.Echoes = []string{"ii.14", "linux.14"}
} else { } else {
var echoes []string var echoes []string
@ -89,32 +95,54 @@ func main() {
j = 0 j = 0
} }
} }
var wg sync.WaitGroup
counter := 0
all := len(messages) - 1
for _, m := range messages { for _, m := range messages {
msg, err := idec.ParseMessage(m.Message) wg.Add(1)
if err != nil { go func(m idec.MSG) {
panic(err) msg, err := idec.ParseMessage(m.Message)
if err != nil {
log.Error(err.Error())
wg.Done()
return
}
var esd i2es.ESDoc
esd.Echo = msg.Echo
esd.Subg = msg.Subg
esd.Author = msg.From
esd.To = msg.To
ts := strconv.Itoa(msg.Timestamp)
esd.Date = ts
esd.Message = msg.Body
esd.MsgID = m.ID
log.Debug("Check message ", m.ID)
_, err = elastic.CheckID(m.ID)
if err != nil {
log.Warn(err.Error())
wg.Done()
return
}
err = elastic.PutToIndex(esd)
if err != nil {
log.Error(err.Error())
wg.Done()
return
}
log.Warn("Message ", m.ID, " added to index")
wg.Done()
}(m)
counter++
all--
if counter == 2 {
wg.Wait()
}
if all == 0 {
break
} }
var esd i2es.ESDoc
esd.Echo = msg.Echo
esd.Subg = msg.Subg
esd.Author = msg.From
esd.To = msg.To
ts := strconv.Itoa(msg.Timestamp)
esd.Date = ts
esd.Message = msg.Body
esd.MsgID = m.ID
log.Print("Check message ", m.ID)
_, err = elastic.CheckID(m.ID)
if err != nil {
log.Print(err)
continue
}
err = elastic.PutToIndex(esd)
if err != nil {
log.Print(err)
}
log.Print("Message ", msg.ID, " added to index")
} }
wg.Wait()
} }