fetcher/main.go

192 lines
3.8 KiB
Go
Raw Normal View History

2017-02-11 19:53:20 +03:00
package main
import (
"flag"
2020-07-23 15:28:26 +03:00
"os"
2018-11-08 14:50:15 +03:00
"strings"
2017-09-27 14:49:50 +03:00
"strconv"
log "github.com/sirupsen/logrus"
2017-09-27 14:49:50 +03:00
2020-07-23 15:28:26 +03:00
"io/ioutil"
2017-02-11 19:53:20 +03:00
"gitea.difrex.ru/Umbrella/fetcher/i2es"
"gitea.difrex.ru/Umbrella/fetcher/idec"
)
2018-11-08 14:50:15 +03:00
var (
2020-07-23 15:28:26 +03:00
inode, es, esIndex, esType, addEchoes, blacklistFile string
offset, limit int
all, reindex bool
denyMessages []string
2018-11-08 14:50:15 +03:00
)
2017-02-11 19:53:20 +03:00
// init ...
func init() {
flag.IntVar(&offset, "offset", 0, "<int>")
flag.IntVar(&limit, "limit", -1, "<int>")
flag.BoolVar(&all, "all", false, "Get all messages")
2018-01-05 17:05:45 +03:00
flag.BoolVar(&reindex, "reindex", false, "Force update an a message")
2017-02-11 19:53:20 +03:00
flag.StringVar(&inode, "inode", "https://ii-net.tk/ii/ii-point.php?q=/", "IDEC node URI")
flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host")
flag.StringVar(&esIndex, "esindex", "idec", "ES index")
flag.StringVar(&esType, "estype", "post", "ES document type")
2018-11-08 14:50:15 +03:00
flag.StringVar(&addEchoes, "echoes", "", "Additional echoes to fetch. Comma separated")
2020-07-23 15:28:26 +03:00
flag.StringVar(&blacklistFile, "blacklist", "/blacklist.txt", "Path to the blacklist file")
2017-02-11 19:53:20 +03:00
flag.Parse()
2020-07-23 15:28:26 +03:00
denyMessages = loadBlacklist()
2017-02-11 19:53:20 +03:00
}
func main() {
// extensions := idec.NewExtensions()
2020-07-23 15:28:26 +03:00
2017-02-11 19:53:20 +03:00
var fc idec.FetchConfig
fc.Node = inode
log.Print("Working for node: ", inode)
// trying get echo list
2018-11-08 14:50:15 +03:00
var echoes []string
2017-02-11 19:53:20 +03:00
e, err := fc.GetEchoList()
if err != nil {
2017-09-27 14:49:50 +03:00
log.Error(err.Error())
2018-11-08 14:50:15 +03:00
echoes = []string{"ii.14", "linux.14"}
2017-02-11 19:53:20 +03:00
} else {
for _, echo := range e {
echoes = append(echoes, echo.Name)
}
}
2018-11-08 14:50:15 +03:00
if addEchoes != "" {
echoes = append(echoes, strings.Split(addEchoes, ",")...)
}
fc.Echoes = echoes
2017-02-11 19:53:20 +03:00
log.Print("Receive echoelist: ", fc.Echoes)
fc.Num = 10
fc.Limit = limit
fc.Offset = offset
var elastic i2es.ESConf
elastic.Host = es
elastic.Index = esIndex
elastic.Type = esType
var ids []idec.ID
if all {
ids, err = fc.GetAllMessagesIDS()
if err != nil {
panic(err)
}
} else {
ids, err = fc.GetMessagesIDS()
if err != nil {
panic(err)
}
}
2018-01-08 15:57:56 +03:00
var newMessages []i2es.ESDoc
2017-02-11 19:53:20 +03:00
var messages []idec.MSG
var stash []idec.ID
j := 0
for _, id := range ids {
stash = append(stash, id)
j++
2018-01-05 17:05:45 +03:00
if j == 50 {
2017-02-11 19:53:20 +03:00
m, err := fc.GetRawMessages(stash)
if err != nil {
log.Print(err)
}
for _, m := range m {
messages = append(messages, m)
}
stash = nil
j = 0
}
}
2017-02-11 19:53:20 +03:00
for _, m := range messages {
msg, err := idec.ParseMessage(m.Message)
if err != nil {
log.Error(err.Error())
2018-01-05 17:05:45 +03:00
log.Error(m.ID)
continue
}
2017-02-11 19:53:20 +03:00
var esd i2es.ESDoc
esd.Echo = msg.Echo
esd.Subg = msg.Subg
esd.Author = msg.From
esd.To = msg.To
ts := strconv.Itoa(msg.Timestamp)
esd.Date = ts
esd.Message = msg.Body
esd.MsgID = m.ID
esd.Repto = msg.Repto
esd.Address = msg.Address
2018-01-08 15:57:56 +03:00
esd.Misplaced = "no"
2018-01-05 17:05:45 +03:00
log.Debug("Check message ", m.ID)
_, err = elastic.CheckID(m.ID)
2018-01-05 17:05:45 +03:00
if err != nil && !reindex {
2018-01-05 17:39:37 +03:00
if reindex {
2018-01-08 15:57:56 +03:00
log.Warn("Reindexing " + m.ID)
2018-01-05 17:39:37 +03:00
}
continue
2017-02-11 19:53:20 +03:00
}
2020-07-23 15:28:26 +03:00
if !isMessageBlacklisted(esd.MsgID) {
esd, err = elastic.PutToIndex(esd)
if err != nil {
log.Error(err.Error())
continue
}
} else {
log.Warnf("Skipping blacklisted message %s", esd.MsgID)
2018-01-05 17:05:45 +03:00
continue
2017-02-11 19:53:20 +03:00
}
2020-07-23 15:28:26 +03:00
2018-01-08 15:57:56 +03:00
log.Warn("Message ", m.ID, " added to index and to assignates list")
newMessages = append(newMessages, esd)
}
err = elastic.AssignTopics(newMessages)
if err != nil {
log.Error(err.Error())
}
2017-02-11 19:53:20 +03:00
}
2020-07-23 15:28:26 +03:00
func isMessageBlacklisted(id string) bool {
if denyMessages == nil {
return false
}
for i := range denyMessages {
if id == denyMessages[i] {
return true
}
}
return false
}
func loadBlacklist() []string {
if _, err := os.Stat(blacklistFile); os.IsNotExist(err) {
return nil
}
file, err := os.Open(blacklistFile)
if err != nil {
return nil
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
return nil
}
return strings.Split(string(data), "\n")
}