Switch to go-idec

Added Tags, Repto and address fields.
Fix content-type issues
This commit is contained in:
Denis Zheleztsov 2017-12-07 17:07:46 +03:00
parent f2640e0919
commit d826a868b9
3 changed files with 52 additions and 68 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
"strings" "strings"
) )
@ -18,6 +19,9 @@ type ESDoc struct {
Message string `json:"message"` Message string `json:"message"`
Date string `json:"date"` Date string `json:"date"`
MsgID string `json:"msgid"` MsgID string `json:"msgid"`
Tags string `json:"tags"`
Repto string `json:"repto"`
Address string `json:"address"`
} }
// ESConf ES connection settings // ESConf ES connection settings
@ -30,13 +34,17 @@ type ESConf struct {
// PutToIndex ... // PutToIndex ...
func (es ESConf) PutToIndex(msg ESDoc) error { func (es ESConf) PutToIndex(msg ESDoc) error {
putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/") putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/")
log.Print(putURI)
doc, err := json.Marshal(msg) doc, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
// log.Print(string(doc))
req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc)) req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc))
req.Header.Add("Content-Type", "application/json")
client := &http.Client{} client := &http.Client{}
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
@ -69,6 +77,8 @@ func (es ESConf) CheckID(id string) (bool, error) {
searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "") searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "")
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ))) req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ)))
req.Header.Add("Content-Type", "application/json")
if err != nil { if err != nil {
return false, err return false, err
} }

View File

@ -1,38 +1,31 @@
package idec package idec
import ( import (
"encoding/base64"
"strconv" "strconv"
"strings" "strings"
idec "github.com/Difrex/go-idec"
) )
// ParseMessage ... // ParseMessage ...
func ParseMessage(message string) (Message, error) { func ParseMessage(message string) (Message, error) {
var m Message var m Message
plainMessage, err := base64.StdEncoding.DecodeString(message)
msg, err := idec.ParseMessage(message)
if err != nil { if err != nil {
return m, err return m, err
} }
txtMessage := strings.Split(string(plainMessage), "\n") m.Tags = msg.Tags.II
m.Echo = msg.Echo
var body string m.Timestamp = msg.Timestamp
for i := 8; i < len(txtMessage); i++ { m.From = msg.From
body = strings.Join([]string{body, txtMessage[i]}, "\n") m.Address = msg.Address
} m.To = msg.To
m.Subg = msg.Subg
ts, err := strconv.Atoi(txtMessage[2]) m.Body = msg.Body
if err != nil { m.Repto = msg.Tags.Repto
return m, err m.ID = msg.ID
}
m.Tags = txtMessage[0]
m.Echo = txtMessage[1]
m.Timestamp = ts
m.From = txtMessage[3]
m.Address = txtMessage[4]
m.To = txtMessage[5]
m.Subg = txtMessage[6]
m.Body = body
return m, err return m, err
} }

27
main.go
View File

@ -7,8 +7,6 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"sync"
"gitea.difrex.ru/Umbrella/fetcher/i2es" "gitea.difrex.ru/Umbrella/fetcher/i2es"
"gitea.difrex.ru/Umbrella/fetcher/idec" "gitea.difrex.ru/Umbrella/fetcher/idec"
) )
@ -95,16 +93,11 @@ func main() {
j = 0 j = 0
} }
} }
var wg sync.WaitGroup
counter := 0
all := len(messages) - 1
for _, m := range messages { for _, m := range messages {
wg.Add(1)
go func(m idec.MSG) {
msg, err := idec.ParseMessage(m.Message) msg, err := idec.ParseMessage(m.Message)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
wg.Done()
return return
} }
@ -117,32 +110,20 @@ func main() {
esd.Date = ts esd.Date = ts
esd.Message = msg.Body esd.Message = msg.Body
esd.MsgID = m.ID esd.MsgID = m.ID
esd.Repto = msg.Repto
esd.Address = msg.Address
log.Debug("Check message ", m.ID) log.Debug("Check message ", m.ID)
_, err = elastic.CheckID(m.ID) _, err = elastic.CheckID(m.ID)
if err != nil { if err != nil {
log.Warn(err.Error()) log.Warn(err.Error())
wg.Done() continue
return
} }
err = elastic.PutToIndex(esd) err = elastic.PutToIndex(esd)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
wg.Done()
return
} }
log.Warn("Message ", m.ID, " added to index") log.Warn("Message ", m.ID, " added to index")
wg.Done()
}(m)
counter++
all--
if counter == 2 {
wg.Wait()
}
if all == 0 {
break
}
} }
wg.Wait()
} }