Storage
This commit is contained in:
parent
c71af3e046
commit
630d746602
54
cmd/main.go
54
cmd/main.go
|
@ -3,19 +3,13 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"text/template"
|
||||
|
||||
"github.com/google/go-github/github"
|
||||
"github.com/idec-net/github2idec/g2i"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
config := loadConfig(filePath)
|
||||
gClient := g2i.NewGithubClient(config, ctx)
|
||||
eventsURL, err := gClient.EventsURL()
|
||||
client := config.NewClient(ctx)
|
||||
eventsURL, err := client.GHClient.EventsURL()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -26,31 +20,27 @@ func main() {
|
|||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// helloMessage := struct {
|
||||
// Sources, Name, Owner, Repo string
|
||||
// Issues []*github.Issue
|
||||
// }{
|
||||
// Sources: "https://github.com/idec-net/github2idec",
|
||||
// Name: "Gdec",
|
||||
// Owner: config.Github.RepoOwner,
|
||||
// Repo: config.Github.Repo,
|
||||
// Issues: issues,
|
||||
// }
|
||||
// t, err := template.New("hello_message.tpl").
|
||||
// ParseFiles("../templates/hello_message.tpl")
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// err = t.Execute(os.Stdout, helloMessage)
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
|
||||
issues, err := gClient.GetIssues()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
helloMessage := struct {
|
||||
Sources, Name, Owner, Repo string
|
||||
Issues []*github.Issue
|
||||
}{
|
||||
Sources: "https://github.com/idec-net/github2idec",
|
||||
Name: "Gdec",
|
||||
Owner: config.Github.RepoOwner,
|
||||
Repo: config.Github.Repo,
|
||||
Issues: issues,
|
||||
}
|
||||
t, err := template.New("hello_message.tpl").
|
||||
ParseFiles("../templates/hello_message.tpl")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = t.Execute(os.Stdout, helloMessage)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
client.Run()
|
||||
// for _, event := range events {
|
||||
// if config.IsEventProcessable(*event.Type) {
|
||||
// b, err := event.RawPayload.MarshalJSON()
|
||||
|
|
110
g2i/client.go
110
g2i/client.go
|
@ -2,15 +2,121 @@ package g2i
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/google/go-github/github"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
EVENTS_UPDATER_SLEEP = 60
|
||||
ISSUES_UPDATER_SLEEP = 60
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
GHClient *GithubClient
|
||||
IDECClient *IDECClient
|
||||
config *Config
|
||||
}
|
||||
|
||||
func (c *Config) NewClient(ctx context.Context) (*Client, error) {
|
||||
func (c *Config) NewClient(ctx context.Context) *Client {
|
||||
client := &Client{}
|
||||
ghc := NewGithubClient(c, ctx)
|
||||
return client, nil
|
||||
ic := NewIDECClient(c)
|
||||
client.GHClient = ghc
|
||||
client.IDECClient = ic
|
||||
client.config = c
|
||||
return client
|
||||
}
|
||||
|
||||
func (c *Client) Run() {
|
||||
// open db first
|
||||
err := c.config.openDB()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer c.config.Data.db.Close()
|
||||
|
||||
evetsCH := make(chan github.Event)
|
||||
// Run issues updater
|
||||
go c.issuesUpdater()
|
||||
// Run events updater
|
||||
go c.eventsUpdater(evetsCH)
|
||||
// Run IDEC messager
|
||||
go c.idecEventMessager(evetsCH)
|
||||
|
||||
// Main loop
|
||||
for {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) idecEventMessager(ch chan github.Event) {
|
||||
for {
|
||||
event := <-ch
|
||||
log.Info(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) eventsUpdater(ch chan github.Event) {
|
||||
log.Info("Events updater is running")
|
||||
eventsURL, err := c.GHClient.EventsURL()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
for {
|
||||
events, err := c.GHClient.GetEvents(eventsURL)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
time.Sleep(time.Second * EVENTS_UPDATER_SLEEP)
|
||||
}
|
||||
|
||||
log.Info(events)
|
||||
|
||||
var newEvents []github.Event
|
||||
for i, _ := range events {
|
||||
if c.config.IsEventProcessable(*events[i].Type) {
|
||||
newEvents = append(newEvents, events[i])
|
||||
}
|
||||
}
|
||||
|
||||
log.Info(newEvents)
|
||||
prevEvents, err := c.config.getEvents()
|
||||
for i, _ := range newEvents {
|
||||
if err == nil {
|
||||
if idNotInEvents(newEvents[i].GetID(), prevEvents) {
|
||||
ch <- newEvents[i]
|
||||
}
|
||||
} else {
|
||||
ch <- newEvents[i]
|
||||
}
|
||||
}
|
||||
if err := c.config.storeEvents(newEvents); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
time.Sleep(time.Second * 60 * 60)
|
||||
}
|
||||
}
|
||||
|
||||
func idNotInEvents(id string, events []github.Event) bool {
|
||||
for i, _ := range events {
|
||||
if events[i].GetID() == id {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Client) issuesUpdater() {
|
||||
for {
|
||||
issues, err := c.GHClient.GetIssues()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
if err := c.config.storeIssues(issues); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * ISSUES_UPDATER_SLEEP)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
package g2i
|
||||
|
||||
import (
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Data Data `json:"data"`
|
||||
IDEC IDEC `json:"idec"`
|
||||
|
@ -8,6 +12,7 @@ type Config struct {
|
|||
|
||||
type Data struct {
|
||||
Path string `json:"path"`
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
type Fetch struct {
|
||||
|
|
|
@ -2,12 +2,15 @@ package g2i
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/google/go-github/github"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type GithubClient struct {
|
||||
|
@ -53,29 +56,43 @@ func (g *GithubClient) EventsURL() (string, error) {
|
|||
func (g *GithubClient) GetEvents(eventsURL string) ([]github.Event, error) {
|
||||
var events []github.Event
|
||||
|
||||
client := &http.Client{}
|
||||
log.Info("Get repository events")
|
||||
|
||||
client := &http.Client{Timeout: time.Second * 15}
|
||||
req, err := http.NewRequest("GET", eventsURL, nil)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return events, err
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return events, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(&events)
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return events, err
|
||||
}
|
||||
|
||||
log.Info("Data: %s", string(data))
|
||||
|
||||
err = json.Unmarshal(data, &events)
|
||||
if err != nil {
|
||||
return events, err
|
||||
}
|
||||
|
||||
log.Info("Events received")
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (c *Config) IsEventProcessable(eventType string) bool {
|
||||
for i, _ := range c.Github.WatchedEventTypes {
|
||||
if eventType == c.Github.WatchedEventTypes[i] {
|
||||
log.Info("Processing event")
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ go 1.12
|
|||
|
||||
require (
|
||||
github.com/Difrex/go-idec v0.0.0-20170724073226-420b1f419842
|
||||
github.com/boltdb/bolt v1.3.1
|
||||
github.com/google/go-github v17.0.0+incompatible
|
||||
github.com/google/go-github/v24 v24.0.1 // indirect
|
||||
github.com/sirupsen/logrus v1.4.0
|
||||
)
|
||||
|
|
16
g2i/go.sum
16
g2i/go.sum
|
@ -1,5 +1,9 @@
|
|||
github.com/Difrex/go-idec v0.0.0-20170724073226-420b1f419842 h1:xrw8RxKPECWc+8vxDaMFwiNGlYngXRcRB3QwTlOIHYg=
|
||||
github.com/Difrex/go-idec v0.0.0-20170724073226-420b1f419842/go.mod h1:Krw/MoqqCqbZg0cRMw2FAU2JLifGYbw+iNK5FRYrCFs=
|
||||
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY=
|
||||
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
|
||||
|
@ -7,8 +11,20 @@ github.com/google/go-github/v24 v24.0.1 h1:KCt1LjMJEey1qvPXxa9SjaWxwTsCWSq6p2Ju5
|
|||
github.com/google/go-github/v24 v24.0.1/go.mod h1:CRqaW1Uns1TCkP0wqTpxYyRxRjxwvKU/XSS44u6X74M=
|
||||
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
|
||||
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
|
||||
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
|
|
|
@ -12,12 +12,14 @@ import (
|
|||
|
||||
type IDECClient struct {
|
||||
FetchConfig *idec.FetchConfig
|
||||
authstring string
|
||||
config *Config
|
||||
}
|
||||
|
||||
func NewIDECClient(config *Config) *IDECClient {
|
||||
return &IDECClient{
|
||||
config: config,
|
||||
config: config,
|
||||
authstring: config.IDEC.Authstring,
|
||||
FetchConfig: &idec.FetchConfig{
|
||||
Limit: config.IDEC.Fetch.Limit,
|
||||
Offset: config.IDEC.Fetch.Offset,
|
||||
|
|
198
g2i/storage.go
Normal file
198
g2i/storage.go
Normal file
|
@ -0,0 +1,198 @@
|
|||
package g2i
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"strconv"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/google/go-github/github"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
ISSUES_KEY = "issues"
|
||||
ISSUES_BUCKET = "issues_bucket"
|
||||
EVENTS_KEY = "events"
|
||||
EVENTS_BUCKET = "events_bucket"
|
||||
)
|
||||
|
||||
func (c *Config) storeEvents(events []github.Event) error {
|
||||
if err := c.checkDB(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.Marshal(events)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.Data.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket, err := tx.CreateBucketIfNotExists([]byte(EVENTS_BUCKET))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = bucket.Put([]byte(EVENTS_KEY), data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Also store an events list size
|
||||
err = bucket.Put([]byte(EVENTS_KEY+"_count"), []byte(string(len(events))))
|
||||
return err
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Config) getEvents() ([]github.Event, error) {
|
||||
var events []github.Event
|
||||
|
||||
if err := c.checkDB(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := c.Data.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(EVENTS_BUCKET))
|
||||
if bucket == nil {
|
||||
return fmt.Errorf("Events bucket not created yet!")
|
||||
}
|
||||
|
||||
data := bucket.Get([]byte(EVENTS_KEY))
|
||||
err := json.Unmarshal(data, &events)
|
||||
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (c *Config) getEventsCount() int {
|
||||
if err := c.checkDB(); err != nil {
|
||||
log.Error(err)
|
||||
return -1
|
||||
}
|
||||
|
||||
var count int
|
||||
err := c.Data.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(EVENTS_BUCKET))
|
||||
if bucket == nil {
|
||||
log.Error("Events bucket not created yet!")
|
||||
return nil
|
||||
}
|
||||
|
||||
data := bucket.Get([]byte(EVENTS_KEY + "_count"))
|
||||
if data != nil {
|
||||
c, err := strconv.Atoi(string(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count = c
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
func (c *Config) storeIssues(issues []*github.Issue) error {
|
||||
if err := c.checkDB(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.Marshal(issues)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.Data.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket, err := tx.CreateBucketIfNotExists([]byte(ISSUES_BUCKET))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = bucket.Put([]byte(ISSUES_KEY), data)
|
||||
return err
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Config) getIssues() ([]*github.Issue, error) {
|
||||
var issues []*github.Issue
|
||||
|
||||
if err := c.checkDB(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := c.Data.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(ISSUES_BUCKET))
|
||||
if bucket == nil {
|
||||
return fmt.Errorf("Issues bucket not created yet!")
|
||||
}
|
||||
|
||||
data := bucket.Get([]byte(ISSUES_KEY))
|
||||
err := json.Unmarshal(data, &issues)
|
||||
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return issues, nil
|
||||
}
|
||||
|
||||
func (c *Config) checkDB() error {
|
||||
if c.Data.db == nil {
|
||||
err := c.openDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Config) openDB() error {
|
||||
err := c.createDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db, err := bolt.Open(c.Data.Path+"/db.bolt", 0600, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Data.db = db
|
||||
|
||||
log.Info("Database is open")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Config) createDB() error {
|
||||
c.createDataDir()
|
||||
|
||||
db, err := bolt.Open(c.Data.Path+"/db.bolt", 0600, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Config) createDataDir() {
|
||||
_, err := os.Open(c.Data.Path)
|
||||
if !os.IsExist(err) {
|
||||
os.Mkdir(c.Data.Path, 0700)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user