Fetcher on Go first iteration

This commit is contained in:
Denis Zheleztsov 2017-02-11 19:53:20 +03:00
parent 5971ff5530
commit f18258c593
3 changed files with 270 additions and 5 deletions

95
i2es/elastic.go Normal file
View File

@ -0,0 +1,95 @@
package i2es
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"strings"
)
// ESDoc Elasticsearch document structure
type ESDoc struct {
Echo string `json:"echo"`
Subg string `json:"subj"`
To string `json:"to"`
Author string `json:"author"`
Message string `json:"message"`
Date string `json:"date"`
MsgID string `json:"msgid"`
}
// ESConf ES connection settings
type ESConf struct {
Host string
Index string
Type string
}
// PutToIndex ...
func (es ESConf) PutToIndex(msg ESDoc) error {
putURI := strings.Join([]string{es.Host, es.Index, es.Type, msg.MsgID}, "/")
doc, err := json.Marshal(msg)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", putURI, bytes.NewBuffer(doc))
client := &http.Client{}
resp, err := client.Do(req)
defer resp.Body.Close()
return err
}
// ESRes ES response minimal structure
type ESRes struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Hits Hits `json:"hits"`
}
// Hits ...
type Hits struct {
Total int `json:"total"`
MaxScore float32 `json:"max_score"`
Hits []interface{} `json:"hits"`
}
// CheckID ...
func (es ESConf) CheckID(id string) (bool, error) {
searchURI := strings.Join([]string{es.Host, es.Index, es.Type, "_search"}, "/")
searchQ := strings.Join([]string{`{"query": {"match": {"_id": "`, id, `"}}}`}, "")
req, err := http.NewRequest("POST", searchURI, bytes.NewBuffer([]byte(searchQ)))
client := &http.Client{}
resp, err := client.Do(req)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, err
}
var e ESRes
err = json.Unmarshal(body, &e)
if err != nil {
return false, err
}
if e.Hits.Total > 0 {
err = errors.New(strings.Join([]string{"Message ", id, " already in index"}, ""))
return false, err
}
if e.TimedOut {
err = errors.New("Request time out")
return false, err
}
return false, nil
}

View File

@ -101,9 +101,58 @@ func (f FetchConfig) GetMessagesIDS() ([]ID, error) {
return ids, nil
}
// GetAllMessagesIDS get all message ids from node
func (f FetchConfig) GetAllMessagesIDS() ([]ID, error) {
var ids []ID
var getURI string
getEchoes := strings.Join(f.Echoes, "/")
getURI = strings.Join([]string{f.Node, echoSchema, getEchoes}, "")
// Get messages ids
response, err := http.Get(getURI)
if err != nil {
return ids, err
}
defer response.Body.Close()
c, err := ioutil.ReadAll(response.Body)
if err != nil {
return ids, err
}
var i ID
var curEcho string
rawIDS := strings.Split(string(c), "\n")
for _, line := range rawIDS {
// Match echoarea
if strings.Contains(line, ".") {
curEcho = line
continue
}
// Match message ID
if !strings.Contains(line, ".") && !strings.Contains(line, ":") && line != "" {
i.Echo = curEcho
i.MsgID = line
ids = append(ids, i)
}
}
return ids, nil
}
// MSG ...
type MSG struct {
Message string `json:"message"`
ID string `json:"id"`
}
// GetRawMessages get messages from node
func (f FetchConfig) GetRawMessages(ids []ID) ([]string, error) {
var messages []string
func (f FetchConfig) GetRawMessages(ids []ID) ([]MSG, error) {
var messages []MSG
var messagesIDS []string
for _, id := range ids {
@ -116,7 +165,8 @@ func (f FetchConfig) GetRawMessages(ids []ID) ([]string, error) {
// Get messages ids
response, err := http.Get(getURI)
if err != nil {
return messages, err
e := errors.New("Failed to get " + getURI + ". ")
return messages, e
}
defer response.Body.Close()
@ -130,8 +180,8 @@ func (f FetchConfig) GetRawMessages(ids []ID) ([]string, error) {
break
}
message := strings.Split(m, ":")
if len(message) > 0 {
messages = append(messages, message[1])
if len(message) > 1 {
messages = append(messages, MSG{message[1], message[0]})
}
}

120
main.go Normal file
View File

@ -0,0 +1,120 @@
package main
import (
"flag"
"gitea.difrex.ru/Umbrella/fetcher/i2es"
"gitea.difrex.ru/Umbrella/fetcher/idec"
"log"
"strconv"
)
var inode string
var offset int
var limit int
var all bool
var es string
var esIndex string
var esType string
// init ...
func init() {
flag.IntVar(&offset, "offset", 0, "<int>")
flag.IntVar(&limit, "limit", -1, "<int>")
flag.BoolVar(&all, "all", false, "Get all messages")
flag.StringVar(&inode, "inode", "https://ii-net.tk/ii/ii-point.php?q=/", "IDEC node URI")
flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host")
flag.StringVar(&esIndex, "esindex", "idec", "ES index")
flag.StringVar(&esType, "estype", "post", "ES document type")
flag.Parse()
}
func main() {
// extensions := idec.NewExtensions()
var fc idec.FetchConfig
fc.Node = inode
log.Print("Working for node: ", inode)
// trying get echo list
e, err := fc.GetEchoList()
if err != nil {
fc.Echoes = []string{"ii.14", "linux.14"}
} else {
var echoes []string
for _, echo := range e {
echoes = append(echoes, echo.Name)
}
fc.Echoes = echoes
}
log.Print("Receive echoelist: ", fc.Echoes)
fc.Num = 10
fc.Limit = limit
fc.Offset = offset
var elastic i2es.ESConf
elastic.Host = es
elastic.Index = esIndex
elastic.Type = esType
var ids []idec.ID
if all {
ids, err = fc.GetAllMessagesIDS()
if err != nil {
panic(err)
}
} else {
ids, err = fc.GetMessagesIDS()
if err != nil {
panic(err)
}
}
var messages []idec.MSG
var stash []idec.ID
j := 0
for _, id := range ids {
stash = append(stash, id)
j++
if j == 20 {
m, err := fc.GetRawMessages(stash)
if err != nil {
log.Print(err)
}
for _, m := range m {
messages = append(messages, m)
}
stash = nil
j = 0
}
}
for _, m := range messages {
msg, err := idec.ParseMessage(m.Message)
if err != nil {
panic(err)
}
var esd i2es.ESDoc
esd.Echo = msg.Echo
esd.Subg = msg.Subg
esd.Author = msg.From
esd.To = msg.To
ts := strconv.Itoa(msg.Timestamp)
esd.Date = ts
esd.Message = msg.Body
esd.MsgID = m.ID
log.Print("Check message ", m.ID)
_, err = elastic.CheckID(m.ID)
if err != nil {
log.Print("Alredy in index ", err)
continue
}
err = elastic.PutToIndex(esd)
if err != nil {
log.Print(err)
}
log.Print("Message ", msg.ID, " added to index")
}
}