diff options
author | terminaldweller <devi@terminaldweller.com> | 2024-09-01 02:13:40 +0000 |
---|---|---|
committer | terminaldweller <devi@terminaldweller.com> | 2024-09-01 02:13:40 +0000 |
commit | 7c8e0160eefa9f90b74b729086dc44f108539083 (patch) | |
tree | ecbb9760ead142557ba6cbb4f46d6a0420f7ee28 | |
parent | fixed a bug where we would still get previously seen feeds, updated the readme (diff) | |
download | milla-7c8e0160eefa9f90b74b729086dc44f108539083.tar.gz milla-7c8e0160eefa9f90b74b729086dc44f108539083.zip |
added the url to the rss output. fixed a bug with the rss feeds where different rss feeds where not being handled correctly. added new logging functions
-rw-r--r-- | Dockerfile | 2 | ||||
-rw-r--r-- | Dockerfile_distroless | 2 | ||||
-rw-r--r-- | Dockerfile_distroless_vendored | 2 | ||||
-rw-r--r-- | main.go | 66 | ||||
-rw-r--r-- | rss.go | 158 | ||||
-rw-r--r-- | types.go | 20 |
6 files changed, 132 insertions, 118 deletions
@@ -1,4 +1,4 @@ -FROM golang:1.22-alpine3.20 as builder +FROM golang:1.22-alpine3.20 AS builder WORKDIR /milla COPY go.sum go.mod /milla/ RUN go mod download diff --git a/Dockerfile_distroless b/Dockerfile_distroless index c27c80a..a5a7c47 100644 --- a/Dockerfile_distroless +++ b/Dockerfile_distroless @@ -1,4 +1,4 @@ -FROM golang:1.22-alpine3.20 as builder +FROM golang:1.22-alpine3.20 AS builder WORKDIR /milla COPY go.sum go.mod /milla/ RUN go mod download diff --git a/Dockerfile_distroless_vendored b/Dockerfile_distroless_vendored index 0ccc497..8d8f3ab 100644 --- a/Dockerfile_distroless_vendored +++ b/Dockerfile_distroless_vendored @@ -1,4 +1,4 @@ -FROM golang:1.22-alpine3.20 as builder +FROM golang:1.22-alpine3.20 AS builder WORKDIR /milla COPY go.sum go.mod /milla/ COPY vendor /milla/vendor @@ -1067,32 +1067,30 @@ func ChatGPTHandler( } func connectToDB(appConfig *TomlConfig, ctx *context.Context, poolChan chan *pgxpool.Pool) { - for { - dbURL := fmt.Sprintf( - "postgres://%s:%s@%s/%s", - appConfig.DatabaseUser, - appConfig.DatabasePassword, - appConfig.DatabaseAddress, - appConfig.DatabaseName) - - log.Println("dbURL:", dbURL) + dbURL := fmt.Sprintf( + "postgres://%s:%s@%s/%s", + appConfig.DatabaseUser, + appConfig.DatabasePassword, + appConfig.DatabaseAddress, + appConfig.DatabaseName) - poolConfig, err := pgxpool.ParseConfig(dbURL) - if err != nil { - log.Println(err) - } + log.Println("dbURL:", dbURL) - pool, err := pgxpool.NewWithConfig(*ctx, poolConfig) - if err != nil { - log.Println(err) - time.Sleep(time.Duration(appConfig.MillaReconnectDelay) * time.Second) - } else { - log.Printf("%s connected to database", appConfig.IRCDName) + poolConfig, err := pgxpool.ParseConfig(dbURL) + if err != nil { + LogErrorFatal(err) + } - for _, channel := range appConfig.ScrapeChannels { - tableName := getTableFromChanName(channel, appConfig.IRCDName) - query := fmt.Sprintf( - `create table if not exists %s ( + pool, err := pgxpool.NewWithConfig(*ctx, poolConfig) + if err != nil { + LogErrorFatal(err) + } else { + log.Printf("%s connected to database", appConfig.IRCDName) + + for _, channel := range appConfig.ScrapeChannels { + tableName := getTableFromChanName(channel, appConfig.IRCDName) + query := fmt.Sprintf( + `create table if not exists %s ( id serial primary key, channel text not null, log text not null, @@ -1100,16 +1098,14 @@ func connectToDB(appConfig *TomlConfig, ctx *context.Context, poolChan chan *pgx dateadded timestamp default current_timestamp )`, tableName) - _, err = pool.Exec(*ctx, query) - if err != nil { - log.Println(err.Error()) - time.Sleep(time.Duration(appConfig.MillaReconnectDelay) * time.Second) - } + _, err = pool.Exec(*ctx, query) + if err != nil { + LogErrorFatal(err) } - - appConfig.pool = pool - poolChan <- pool } + + appConfig.pool = pool + poolChan <- pool } } @@ -1121,13 +1117,13 @@ func scrapeChannel(irc *girc.Client, poolChan chan *pgxpool.Pool, appConfig Toml "insert into %s (channel,log,nick) values ('%s','%s','%s')", tableName, sanitizeLog(event.Params[0]), - stripColorCodes(event.Last()), + sanitizeLog(stripColorCodes(event.Last())), event.Source.Name, ) _, err := pool.Exec(context.Background(), query) if err != nil { - log.Println(err.Error()) + LogError(err) } }) } @@ -1137,7 +1133,7 @@ func populateWatchListWords(appConfig *TomlConfig) { for _, filepath := range watchlist.WatchFiles { filebytes, err := os.ReadFile(filepath) if err != nil { - log.Println(err.Error()) + LogError(err) continue } @@ -1151,7 +1147,7 @@ func populateWatchListWords(appConfig *TomlConfig) { } } - log.Print(appConfig.WatchLists["security"].Words) + // log.Print(appConfig.WatchLists["security"].Words) } func WatchListHandler(irc *girc.Client, appConfig TomlConfig) { @@ -19,7 +19,6 @@ import ( ) func GetFeed(feed FeedConfig, - feedChanel chan<- *gofeed.Feed, client *girc.Client, pool *pgxpool.Pool, channel, groupName string, @@ -28,54 +27,52 @@ func GetFeed(feed FeedConfig, parsedFeed, err := feed.FeedParser.ParseURL(feed.URL) if err != nil { - log.Print(err) + LogError(err) } else { - query := fmt.Sprintf("select newest_unix_time from rss where name = '%s'", rowName) - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second) - defer cancel() + if len(parsedFeed.Items) > 0 { + query := fmt.Sprintf("select newest_unix_time from rss where name = '%s'", rowName) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second) + defer cancel() - newestFromDB := int64(0) + newestFromDB := int64(0) - err := pool.QueryRow(ctx, query).Scan(&newestFromDB) - if err != nil { - pool.Exec(ctx, fmt.Sprintf("insert into rss (name, newest_unix_time) values ('%s',0)", rowName)) - } + err := pool.QueryRow(ctx, query).Scan(&newestFromDB) + if err != nil { + pool.Exec(ctx, fmt.Sprintf("insert into rss (name, newest_unix_time) values ('%s',0)", rowName)) + } - log.Print("Newset from DB: ", newestFromDB) + log.Print("Newset from DB: ", newestFromDB) - sortFunc := func(a, b *gofeed.Item) int { - if a.PublishedParsed.Before(*b.PublishedParsed) { - return -1 - } else if a.PublishedParsed.After(*b.PublishedParsed) { - return 1 - } + sortFunc := func(a, b *gofeed.Item) int { + if a.PublishedParsed.Before(*b.PublishedParsed) { + return -1 + } else if a.PublishedParsed.After(*b.PublishedParsed) { + return 1 + } - return 0 - } + return 0 + } - slices.SortFunc(parsedFeed.Items, sortFunc) + slices.SortFunc(parsedFeed.Items, sortFunc) - for _, item := range parsedFeed.Items { - if item.PublishedParsed.Unix() > newestFromDB { - client.Cmd.Message(channel, parsedFeed.Title+": "+item.Title) + for _, item := range parsedFeed.Items { + if item.PublishedParsed.Unix() > newestFromDB { + client.Cmd.Message(channel, parsedFeed.Title+": "+item.Title+">>>"+item.Link) + } } - } - log.Print(parsedFeed.Items[0].PublishedParsed.Unix()) - log.Print(parsedFeed.Items[len(parsedFeed.Items)-1].PublishedParsed.Unix()) + query = fmt.Sprintf("update rss set newest_unix_time = %d where name = '%s'", parsedFeed.Items[len(parsedFeed.Items)-1].PublishedParsed.Unix(), rowName) - query = fmt.Sprintf("update rss set newest_unix_time = %d where name = '%s'", parsedFeed.Items[len(parsedFeed.Items)-1].PublishedParsed.Unix(), rowName) + ctx2, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second) + defer cancel() - ctx2, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second) - defer cancel() + _, err = pool.Exec(ctx2, query) + if err != nil { + LogError(err) + } - _, err = pool.Exec(ctx2, query) - if err != nil { - log.Print(err) } } - - feedChanel <- parsedFeed } func feedDispatcher( @@ -83,49 +80,51 @@ func feedDispatcher( client *girc.Client, pool *pgxpool.Pool, channel, groupName string, + period int, ) { - feedChanel := make(chan *gofeed.Feed) + for { + for i := range len(config.Feeds) { + config.Feeds[i].FeedParser = gofeed.NewParser() - for i := range len(config.Feeds) { - config.Feeds[i].FeedParser = gofeed.NewParser() + config.Feeds[i].FeedParser.UserAgent = config.Feeds[i].UserAgent - config.Feeds[i].FeedParser.UserAgent = config.Feeds[i].UserAgent + if config.Feeds[i].Proxy != "" { + proxyURL, err := url.Parse(config.Feeds[i].Proxy) + if err != nil { + LogError(err) - if config.Feeds[i].Proxy != "" { - proxyURL, err := url.Parse(config.Feeds[i].Proxy) - if err != nil { - log.Print(err) - continue - } + continue + } - dialer, err := proxy.FromURL(proxyURL, &net.Dialer{Timeout: time.Duration(config.Feeds[i].Timeout) * time.Second}) - if err != nil { - log.Print(err) + dialer, err := proxy.FromURL(proxyURL, &net.Dialer{Timeout: time.Duration(config.Feeds[i].Timeout) * time.Second}) + if err != nil { + LogError(err) - continue - } + continue + } - httpClient := http.Client{ - Transport: &http.Transport{ - Dial: dialer.Dial, - }, + httpClient := http.Client{ + Transport: &http.Transport{ + Dial: dialer.Dial, + }, + } + + config.Feeds[i].FeedParser.Client = &httpClient } + } - config.Feeds[i].FeedParser.Client = &httpClient + for _, feed := range config.Feeds { + go GetFeed(feed, client, pool, channel, groupName) } - } - for _, feed := range config.Feeds { - go GetFeed(feed, feedChanel, client, pool, channel, groupName) + time.Sleep(time.Duration(period) * time.Second) } - - // <-feedChanel } func ParseRSSConfig(rssConfFilePath string) *RSSConfig { file, err := os.Open(rssConfFilePath) if err != nil { - log.Print(err) + LogError(err) return nil } @@ -136,7 +135,7 @@ func ParseRSSConfig(rssConfFilePath string) *RSSConfig { err = decoder.Decode(&config) if err != nil { - log.Print(err) + LogError(err) return nil } @@ -145,35 +144,34 @@ func ParseRSSConfig(rssConfFilePath string) *RSSConfig { } func runRSS(appConfig *TomlConfig, client *girc.Client) { - for { - query := fmt.Sprintf( - `create table if not exists rss ( + query := fmt.Sprintf( + `create table if not exists rss ( id serial primary key, name text not null unique, newest_unix_time bigint not null )`) - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(appConfig.RequestTimeout)*time.Second) + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second) defer cancel() _, err := appConfig.pool.Exec(ctx, query) if err != nil { - log.Print(err) - time.Sleep(time.Duration(appConfig.MillaReconnectDelay) * time.Second) + LogError(err) + time.Sleep(time.Duration(60) * time.Second) } else { - for groupName, rss := range appConfig.Rss { - log.Print("RSS: joining ", rss.Channel) - client.Cmd.Join(rss.Channel) - rssConfig := ParseRSSConfig(rss.RssFile) - if rssConfig == nil { - log.Print("Could not parse RSS config file " + rss.RssFile + ". Exiting.") - } else { - for { - feedDispatcher(*rssConfig, client, appConfig.pool, rss.Channel, groupName) - time.Sleep(time.Duration(rssConfig.Period) * time.Second) - } - } - } + break + } + } + + for groupName, rss := range appConfig.Rss { + log.Print("RSS: joining ", rss.Channel) + client.Cmd.Join(rss.Channel) + rssConfig := ParseRSSConfig(rss.RssFile) + if rssConfig == nil { + log.Print("Could not parse RSS config file " + rss.RssFile + ". Exiting.") + } else { + go feedDispatcher(*rssConfig, client, appConfig.pool, rss.Channel, groupName, rssConfig.Period) } } } @@ -2,6 +2,8 @@ package main import ( "context" + "log" + "runtime" "time" "github.com/jackc/pgx/v5/pgxpool" @@ -191,3 +193,21 @@ type RSSConfig struct { Feeds []FeedConfig `json:"feeds"` Period int `json:"period"` } + +func LogError(err error) { + fn, file, line, ok := runtime.Caller(1) + if ok { + log.Printf("%s: %s-%d >>> %v", runtime.FuncForPC(fn).Name(), file, line, err) + } else { + log.Print(err) + } +} + +func LogErrorFatal(err error) { + fn, file, line, ok := runtime.Caller(1) + if ok { + log.Fatalf("%s: %s-%d >>> %v", runtime.FuncForPC(fn).Name(), file, line, err) + } else { + log.Fatal(err) + } +} |