package main import ( "flag" "os" "strings" "strconv" log "github.com/sirupsen/logrus" "io/ioutil" "gitea.difrex.ru/Umbrella/fetcher/i2es" "gitea.difrex.ru/Umbrella/fetcher/idec" ) var ( inode, es, esIndex, esType, addEchoes, blacklistFile string offset, limit int all, reindex bool denyMessages []string ) // init ... func init() { flag.IntVar(&offset, "offset", 0, "") flag.IntVar(&limit, "limit", -1, "") flag.BoolVar(&all, "all", false, "Get all messages") flag.BoolVar(&reindex, "reindex", false, "Force update an a message") flag.StringVar(&inode, "inode", "https://ii-net.tk/ii/ii-point.php?q=/", "IDEC node URI") flag.StringVar(&es, "es", "http://127.0.0.1:9200", "ES host") flag.StringVar(&esIndex, "esindex", "idec", "ES index") flag.StringVar(&esType, "estype", "post", "ES document type") flag.StringVar(&addEchoes, "echoes", "", "Additional echoes to fetch. Comma separated") flag.StringVar(&blacklistFile, "blacklist", "/blacklist.txt", "Path to the blacklist file") flag.Parse() denyMessages = loadBlacklist() } func main() { // extensions := idec.NewExtensions() var fc idec.FetchConfig fc.Node = inode log.Print("Working for node: ", inode) // trying get echo list var echoes []string e, err := fc.GetEchoList() if err != nil { log.Error(err.Error()) echoes = []string{"ii.14", "linux.14"} } else { for _, echo := range e { echoes = append(echoes, echo.Name) } } if addEchoes != "" { echoes = append(echoes, strings.Split(addEchoes, ",")...) } fc.Echoes = echoes log.Print("Receive echoelist: ", fc.Echoes) fc.Num = 10 fc.Limit = limit fc.Offset = offset var elastic i2es.ESConf elastic.Host = es elastic.Index = esIndex elastic.Type = esType var ids []idec.ID if all { ids, err = fc.GetAllMessagesIDS() if err != nil { panic(err) } } else { ids, err = fc.GetMessagesIDS() if err != nil { panic(err) } } var newMessages []i2es.ESDoc var messages []idec.MSG var stash []idec.ID j := 0 for _, id := range ids { stash = append(stash, id) j++ if j == 50 { m, err := fc.GetRawMessages(stash) if err != nil { log.Print(err) } for _, m := range m { messages = append(messages, m) } stash = nil j = 0 } } for _, m := range messages { msg, err := idec.ParseMessage(m.Message) if err != nil { log.Error(err.Error()) log.Error(m.ID) continue } var esd i2es.ESDoc esd.Echo = msg.Echo esd.Subg = msg.Subg esd.Author = msg.From esd.To = msg.To ts := strconv.Itoa(msg.Timestamp) esd.Date = ts esd.Message = msg.Body esd.MsgID = m.ID esd.Repto = msg.Repto esd.Address = msg.Address esd.Misplaced = "no" log.Debug("Check message ", m.ID) _, err = elastic.CheckID(m.ID) if err != nil && !reindex { if reindex { log.Warn("Reindexing " + m.ID) } continue } if !isMessageBlacklisted(esd.MsgID) { esd, err = elastic.PutToIndex(esd) if err != nil { log.Error(err.Error()) continue } } else { log.Warnf("Skipping blacklisted message %s", esd.MsgID) continue } log.Warn("Message ", m.ID, " added to index and to assignates list") newMessages = append(newMessages, esd) } err = elastic.AssignTopics(newMessages) if err != nil { log.Error(err.Error()) } } func isMessageBlacklisted(id string) bool { if denyMessages == nil { return false } for i := range denyMessages { if id == denyMessages[i] { return true } } return false } func loadBlacklist() []string { if _, err := os.Stat(blacklistFile); os.IsNotExist(err) { return nil } file, err := os.Open(blacklistFile) if err != nil { return nil } defer file.Close() data, err := ioutil.ReadAll(file) if err != nil { return nil } return strings.Split(string(data), "\n") }