aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--arbiter/arbiter.go377
-rw-r--r--hived/go.mod4
-rw-r--r--hived/hived.go825
-rw-r--r--hived/hived_test.go23
4 files changed, 772 insertions, 457 deletions
diff --git a/arbiter/arbiter.go b/arbiter/arbiter.go
index 5168b3a..9edf68a 100644
--- a/arbiter/arbiter.go
+++ b/arbiter/arbiter.go
@@ -7,7 +7,7 @@ import (
"errors"
"flag"
"fmt"
- "io/ioutil"
+ "io"
"net"
"net/http"
"net/url"
@@ -25,23 +25,25 @@ import (
)
var (
- flagPort = flag.String("port", "8009", "determines the port the server will listen on")
- flagInterval = flag.Float64("interval", 10, "In seconds, the delay between checking prices")
- redisDB = flag.Int64("redisdb", 1, "determines the db number")
- rdb *redis.Client
- redisAddress = flag.String("redisaddress", "redis:6379", "determines the address of the redis instance")
- redisPassword = flag.String("redispassword", "", "determines the password of the redis db")
+ errBadLogic = errors.New("we should not be here")
+ errUnexpectedParam = errors.New("got unexpected parameter")
+ errUnknownDeployment = errors.New("unknown deployment kind")
)
const (
- SERVER_DEPLOYMENT_TYPE = "SERVER_DEPLOYMENT_TYPE"
- coingeckoAPIURLv3 = "https://api.coingecko.com/api/v3"
- coincapAPIURLv2 = "https://api.coincap.io/v2"
+ serverDeploymentType = "SERVER_DEPLOYMENT_TYPE"
+ coingeckoAPIURLv3 = "https://api.coingecko.com/api/v3"
+ coincapAPIURLv2 = "https://api.coincap.io/v2"
+ getTimeout = 5
+ httpClientTimeout = 5
+ serverTLSReadTimeout = 15
+ serverTLSWriteTimeout = 15
+ defaultGracefulShutdown = 15
)
// https://docs.coincap.io/
type CoinCapAssetGetResponseData struct {
- Id string `json:"id"`
+ ID string `json:"id"`
Rank string `json:"rank"`
Symbol string `json:"symbol"`
Name string `json:"name"`
@@ -54,16 +56,24 @@ type CoinCapAssetGetResponseData struct {
Vwap24Hr string `json:"vwap24Hr"`
}
+type priceResponseData struct {
+ Name string `json:"name"`
+ Price float64 `json:"price"`
+ Unit string `json:"unit"`
+ Err string `json:"err"`
+ IsSuccessful bool `json:"isSuccessful"`
+}
+
type CoinCapAssetGetResponse struct {
Data CoinCapAssetGetResponseData `json:"data"`
TimeStamp int64 `json:"timestamp"`
}
-type HttpHandlerFunc func(http.ResponseWriter, *http.Request)
+type HTTPHandlerFunc func(http.ResponseWriter, *http.Request)
-type HttpHandler struct {
+type HTTPHandler struct {
name string
- function HttpHandlerFunc
+ function HTTPHandlerFunc
}
type priceChanStruct struct {
@@ -81,171 +91,241 @@ func GetProxiedClient() (*http.Client, error) {
if proxyURL == "" {
proxyURL = os.Getenv("HTTPS_PROXY")
}
+
dialer, err := proxy.SOCKS5("tcp", proxyURL, nil, proxy.Direct)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("[GetProxiedClient] : %w", err)
}
+
dialContext := func(ctx context.Context, network, address string) (net.Conn, error) {
- return dialer.Dial(network, address)
+ netConn, err := dialer.Dial(network, address)
+ if err == nil {
+ return netConn, nil
+ }
+
+ return netConn, fmt.Errorf("[dialContext] : %w", err)
}
transport := &http.Transport{
DialContext: dialContext,
DisableKeepAlives: true,
}
- client := &http.Client{Transport: transport}
+ client := &http.Client{
+ Transport: transport,
+ Timeout: httpClientTimeout * time.Second,
+ CheckRedirect: nil,
+ Jar: nil,
+ }
return client, nil
}
// OWASP: https://cheatsheetseries.owasp.org/cheatsheets/REST_Security_Cheat_Sheet.html
-func addSecureHeaders(w *http.ResponseWriter) {
- (*w).Header().Set("Cache-Control", "no-store")
- (*w).Header().Set("Content-Security-Policy", "default-src https;")
- (*w).Header().Set("Strict-Transport-Security", "max-age=63072000;")
- (*w).Header().Set("X-Content-Type-Options", "nosniff")
- (*w).Header().Set("X-Frame-Options", "DENY")
- (*w).Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
+func addSecureHeaders(writer *http.ResponseWriter) {
+ (*writer).Header().Set("Cache-Control", "no-store")
+ (*writer).Header().Set("Content-Security-Policy", "default-src https;")
+ (*writer).Header().Set("Strict-Transport-Security", "max-age=63072000;")
+ (*writer).Header().Set("X-Content-Type-Options", "nosniff")
+ (*writer).Header().Set("X-Frame-Options", "DENY")
+ (*writer).Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
}
-// binance
-func getPriceFromBinance(name, unit string,
- wg *sync.WaitGroup,
- priceChan chan<- priceChanStruct,
- errChan chan<- errorChanStruct) {
+// get price from binance.
+// func getPriceFromBinance(name, unit string,
+// wg *sync.WaitGroup,
+// priceChan chan<- priceChanStruct,
+// errChan chan<- errorChanStruct) {
-}
+// }
-// kucoin
-func getPriceFromKu(name, uni string,
- wg *sync.WaitGroup,
- priceChan chan<- priceChanStruct,
- errChan chan<- errorChanStruct) {
+// get price from kucoin.
+// func getPriceFromKu(name, uni string,
+// wg *sync.WaitGroup,
+// priceChan chan<- priceChanStruct,
+// errChan chan<- errorChanStruct) {
-}
+// }
func getPriceFromCoinGecko(
+ ctx context.Context,
name, unit string,
wg *sync.WaitGroup,
priceChan chan<- priceChanStruct,
- errChan chan<- errorChanStruct) {
+ errChan chan<- errorChanStruct,
+) {
defer wg.Done()
+ priceFloat := 0.
+
params := "/simple/price?ids=" + url.QueryEscape(name) + "&" +
"vs_currencies=" + url.QueryEscape(unit)
path := coingeckoAPIURLv3 + params
- fmt.Println(path)
- // resp, err := http.Get(path)
+
client, err := GetProxiedClient()
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
+ errChan <- errorChanStruct{hasError: true, err: err}
+
+ log.Error().Err(err)
+
+ return
+ }
+
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil)
+ if err != nil {
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
+
return
}
- resp, err := client.Get(path)
+ resp, err := client.Do(req)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
- fmt.Println(err)
+
return
}
defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
+ body, err := io.ReadAll(resp.Body)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
}
jsonBody := make(map[string]interface{})
+
err = json.Unmarshal(body, &jsonBody)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
}
- price := jsonBody[name].(map[string]interface{})[unit].(float64)
+ price, isOk := jsonBody[name].(map[string]interface{})
+ if !isOk {
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
+ errChan <- errorChanStruct{hasError: true, err: err}
+
+ log.Error().Err(err)
+
+ return
+ }
log.Info().Msg(string(body))
- priceChan <- priceChanStruct{name: name, price: price}
+ priceFloat, isOk = price[unit].(float64)
+ if !isOk {
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
+ errChan <- errorChanStruct{hasError: true, err: err}
+
+ log.Error().Err(err)
+
+ return
+ }
+
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: false, err: nil}
}
func getPriceFromCoinCap(
+ ctx context.Context,
name, unit string,
wg *sync.WaitGroup,
priceChan chan<- priceChanStruct,
- errChan chan<- errorChanStruct) {
+ errChan chan<- errorChanStruct,
+) {
defer wg.Done()
+ priceFloat := 0.
+
params := "/assets/" + url.QueryEscape(name)
path := coincapAPIURLv2 + params
- fmt.Println(path)
+
client, err := GetProxiedClient()
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
+ errChan <- errorChanStruct{hasError: true, err: err}
+
+ log.Error().Err(err)
+
+ return
+ }
+
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil)
+ if err != nil {
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
+
return
}
- // resp, err := http.Get(path)
- resp, err := client.Get(path)
+
+ resp, err := client.Do(req)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
+
return
}
defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
+ body, err := io.ReadAll(resp.Body)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
}
- fmt.Println(string(body))
var coinCapAssetGetResponse CoinCapAssetGetResponse
- // jsonBody := make(map[string]interface{})
- // err = json.Unmarshal(body, &jsonBody)
+
err = json.Unmarshal(body, &coinCapAssetGetResponse)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
}
- // price := jsonBody[name].(map[string]interface{})[unit].(float64)
- price, err := strconv.ParseFloat(coinCapAssetGetResponse.Data.PriceUsd, 64)
+ priceFloat, err = strconv.ParseFloat(coinCapAssetGetResponse.Data.PriceUsd, 64)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: true, err: err}
+
log.Error().Err(err)
}
- fmt.Println(price)
log.Info().Msg(string(body))
- priceChan <- priceChanStruct{name: name, price: price}
+ priceChan <- priceChanStruct{name: name, price: priceFloat}
errChan <- errorChanStruct{hasError: false, err: nil}
}
func arbHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
- if r.Method != "GET" {
+
+ if r.Method != http.MethodGet {
http.Error(w, "Method is not supported.", http.StatusNotFound)
}
+
addSecureHeaders(&w)
var name string
+
var unit string
+
params := r.URL.Query()
for key, value := range params {
switch key {
@@ -254,24 +334,31 @@ func arbHandler(w http.ResponseWriter, r *http.Request) {
case "unit":
unit = value[0]
default:
- log.Error().Err(errors.New("Got unexpected parameter."))
+ log.Error().Err(errUnexpectedParam)
}
}
priceChan := make(chan priceChanStruct, 1)
errChan := make(chan errorChanStruct, 1)
- var wg sync.WaitGroup
- wg.Add(1)
- getPriceFromCoinGecko(name, unit, &wg, priceChan, errChan)
- wg.Wait()
+
+ var waitGroup sync.WaitGroup
+
+ ctx, cancel := context.WithTimeout(context.Background(), getTimeout*time.Second)
+ defer cancel()
+
+ waitGroup.Add(1)
+
+ //nolint:contextcheck
+ getPriceFromCoinGecko(ctx, name, unit, &waitGroup, priceChan, errChan)
+ waitGroup.Wait()
select {
case err := <-errChan:
- if err.hasError != false {
+ if err.hasError {
log.Error().Err(err.err)
}
default:
- log.Error().Err(errors.New("We shouldnt be here"))
+ log.Error().Err(errBadLogic)
}
var price priceChanStruct
@@ -279,27 +366,44 @@ func arbHandler(w http.ResponseWriter, r *http.Request) {
case priceCh := <-priceChan:
price = priceCh
default:
- log.Fatal().Err(errors.New("We shouldnt be here"))
+ log.Error().Err(errBadLogic)
}
- json.NewEncoder(w).Encode(map[string]interface{}{
- "name": price.name,
- "price": price.price,
- "unit": unit,
- "err": "",
- "isSuccessful": true,
- })
+ responseData := priceResponseData{
+ Name: price.name,
+ Price: price.price,
+ Unit: "USD",
+ Err: "",
+ IsSuccessful: true,
+ }
+
+ jsonResp, err := json.Marshal(responseData)
+ if err != nil {
+ cancel()
+ //nolint:gocritic
+ log.Fatal().Err(err)
+ }
+
+ _, err = w.Write(jsonResp)
+ if err != nil {
+ cancel()
+ log.Fatal().Err(err)
+ }
}
func coincapHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("Content-Type", "application/json")
- if r.Method != "GET" {
+ if r.Method != http.MethodGet {
http.Error(w, "Method is not supported.", http.StatusNotFound)
}
+
+ w.Header().Add("Content-Type", "application/json")
+
addSecureHeaders(&w)
var name string
+
var unit string
+
params := r.URL.Query()
for key, value := range params {
switch key {
@@ -308,24 +412,31 @@ func coincapHandler(w http.ResponseWriter, r *http.Request) {
case "unit":
unit = value[0]
default:
- log.Error().Err(errors.New("Got unexpected parameter."))
+ log.Error().Err(errUnexpectedParam)
}
}
priceChan := make(chan priceChanStruct, 1)
errChan := make(chan errorChanStruct, 1)
- var wg sync.WaitGroup
- wg.Add(1)
- getPriceFromCoinCap(name, unit, &wg, priceChan, errChan)
- wg.Wait()
+
+ var waitGroup sync.WaitGroup
+
+ waitGroup.Add(1)
+
+ ctx, cancel := context.WithTimeout(context.Background(), getTimeout*time.Second)
+ defer cancel()
+
+ //nolint:contextcheck
+ getPriceFromCoinCap(ctx, name, unit, &waitGroup, priceChan, errChan)
+ waitGroup.Wait()
select {
case err := <-errChan:
- if err.hasError != false {
+ if err.hasError {
log.Error().Err(err.err)
}
default:
- log.Error().Err(errors.New("We shouldnt be here"))
+ log.Error().Err(errBadLogic)
}
var price priceChanStruct
@@ -333,16 +444,29 @@ func coincapHandler(w http.ResponseWriter, r *http.Request) {
case priceCh := <-priceChan:
price = priceCh
default:
- log.Fatal().Err(errors.New("We shouldnt be here"))
+ log.Error().Err(errBadLogic)
}
- json.NewEncoder(w).Encode(map[string]interface{}{
- "name": price.name,
- "price": price.price,
- "unit": "USD",
- "err": "",
- "isSuccessful": true,
- })
+ responseData := priceResponseData{
+ Name: price.name,
+ Price: price.price,
+ Unit: "USD",
+ Err: "",
+ IsSuccessful: true,
+ }
+
+ jsonResp, err := json.Marshal(responseData)
+ if err != nil {
+ cancel()
+ //nolint:gocritic
+ log.Fatal().Err(err)
+ }
+
+ _, err = w.Write(jsonResp)
+ if err != nil {
+ cancel()
+ log.Fatal().Err(err)
+ }
}
func setupLogging() {
@@ -350,37 +474,42 @@ func setupLogging() {
}
func startServer(gracefulWait time.Duration,
- handlers []HttpHandler,
- serverDeploymentType string, port string) {
- r := mux.NewRouter()
+ handlers []HTTPHandler,
+ serverDeploymentType string, port string,
+) {
+ route := mux.NewRouter()
cfg := &tls.Config{
MinVersion: tls.VersionTLS13,
}
+
srv := &http.Server{
Addr: "0.0.0.0:" + port,
- WriteTimeout: time.Second * 15,
- ReadTimeout: time.Second * 15,
- Handler: r,
+ WriteTimeout: time.Second * serverTLSWriteTimeout,
+ ReadTimeout: time.Second * serverTLSReadTimeout,
+ Handler: route,
TLSConfig: cfg,
}
for i := 0; i < len(handlers); i++ {
- r.HandleFunc(handlers[i].name, handlers[i].function)
+ route.HandleFunc(handlers[i].name, handlers[i].function)
}
go func() {
var certPath, keyPath string
- if os.Getenv(serverDeploymentType) == "deployment" {
+
+ switch os.Getenv(serverDeploymentType) {
+ case "deployment":
certPath = "/certs/fullchain1.pem"
keyPath = "/certs/privkey1.pem"
- } else if os.Getenv(serverDeploymentType) == "test" {
+ case "test":
certPath = "/certs/server.cert"
keyPath = "/certs/server.key"
- } else {
- log.Fatal().Err(errors.New(fmt.Sprintf("unknown deployment kind: %s", serverDeploymentType)))
+ default:
+ log.Error().Err(errUnknownDeployment)
}
+
if err := srv.ListenAndServeTLS(certPath, keyPath); err != nil {
- log.Fatal().Err(err)
+ log.Error().Err(err)
}
}()
@@ -388,16 +517,35 @@ func startServer(gracefulWait time.Duration,
signal.Notify(c, os.Interrupt)
<-c
+
ctx, cancel := context.WithTimeout(context.Background(), gracefulWait)
defer cancel()
- srv.Shutdown(ctx)
+
+ if err := srv.Shutdown(ctx); err != nil {
+ log.Error().Err(err)
+ }
+
log.Info().Msg("gracefully shut down the server")
}
func main() {
var gracefulWait time.Duration
- flag.DurationVar(&gracefulWait, "gracefulwait", time.Second*15, "the duration to wait during the graceful shutdown")
+
+ var rdb *redis.Client
+
+ flag.DurationVar(
+ &gracefulWait,
+ "gracefulwait",
+ time.Second*defaultGracefulShutdown,
+ "the duration to wait during the graceful shutdown",
+ )
+
+ flagPort := flag.String("port", "8009", "determines the port the server will listen on")
+ redisDB := flag.Int64("redisdb", 1, "determines the db number")
+ redisAddress := flag.String("redisaddress", "redis:6379", "determines the address of the redis instance")
+ redisPassword := flag.String("redispassword", "", "determines the password of the redis db")
flag.Parse()
+
rdb = redis.NewClient(&redis.Options{
Addr: *redisAddress,
Password: *redisPassword,
@@ -406,10 +554,11 @@ func main() {
defer rdb.Close()
setupLogging()
- var handlerFuncs = []HttpHandler{
+
+ handlerFuncs := []HTTPHandler{
{name: "/crypto/v1/arb/gecko", function: arbHandler},
{name: "/crypto/v1/arb/coincap", function: coincapHandler},
}
- startServer(gracefulWait, handlerFuncs, SERVER_DEPLOYMENT_TYPE, *flagPort)
+ startServer(gracefulWait, handlerFuncs, serverDeploymentType, *flagPort)
}
diff --git a/hived/go.mod b/hived/go.mod
index 126eedc..bbd4fa3 100644
--- a/hived/go.mod
+++ b/hived/go.mod
@@ -8,7 +8,9 @@ require (
github.com/gorilla/mux v1.8.0
github.com/rs/zerolog v1.20.0
github.com/terminaldweller/grpc v1.0.3
+ golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
google.golang.org/grpc v1.42.0
+ google.golang.org/protobuf v1.27.1
)
require (
@@ -18,9 +20,7 @@ require (
go.opentelemetry.io/otel v0.17.0 // indirect
go.opentelemetry.io/otel/metric v0.17.0 // indirect
go.opentelemetry.io/otel/trace v0.17.0 // indirect
- golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb // indirect
golang.org/x/sys v0.0.0-20210112080510-489259a85091 // indirect
golang.org/x/text v0.3.3 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
- google.golang.org/protobuf v1.27.1 // indirect
)
diff --git a/hived/hived.go b/hived/hived.go
index 927e633..42fb3b0 100644
--- a/hived/hived.go
+++ b/hived/hived.go
@@ -1,17 +1,13 @@
package main
import (
- "bytes"
"context"
- "crypto/hmac"
- "crypto/sha512"
"crypto/tls"
- "encoding/hex"
"encoding/json"
"errors"
"flag"
"fmt"
- "io/ioutil"
+ "net"
"net/http"
"net/url"
"os"
@@ -26,41 +22,89 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
pb "github.com/terminaldweller/grpc/telebot/v1"
+ "golang.org/x/net/proxy"
"google.golang.org/grpc"
-)
-
-var (
- flagPort = flag.String("port", "8008", "determined the port the sercice runs on")
- alertsCheckInterval = flag.Int64("alertinterval", 600., "in seconds, the amount of time between alert checks")
- redisAddress = flag.String("redisaddress", "redis:6379", "determines the address of the redis instance")
- redisPassword = flag.String("redispassword", "", "determines the password of the redis db")
- redisDB = flag.Int64("redisdb", 0, "determines the db number")
- botChannelID = flag.Int64("botchannelid", 146328407, "determines the channel id the telgram bot should send messages to")
- cacheDuration = flag.Float64("cacheDuration", 300_000, "determines the price cache validity duration in miliseconds")
- rdb *redis.Client
+ "google.golang.org/protobuf/types/known/timestamppb"
)
const (
cryptocomparePriceURL = "https://min-api.cryptocompare.com/data/price?"
- coingeckoAPIURLv3 = "https://api.coingecko.com/api/v3"
- changellyURL = "https://api.changelly.com"
- TELEGRAM_BOT_TOKEN_ENV_VAR = "TELEGRAM_BOT_TOKEN"
- CHANGELLY_API_KEY_ENV_VAR = "CHANGELLY_API_KEY"
- CHANGELLY_API_SECRET_ENV_VAR = "CHANGELLY_API_SECRET"
- SERVER_DEPLOYMENT_TYPE = "SERVER_DEPLOYMENT_TYPE"
+ telegramBotTokenEnvVar = "TELEGRAM_BOT_TOKEN" //nolint: gosec
+ serverDeploymentType = "SERVER_DEPLOYMENT_TYPE"
+ httpClientTimeout = 5
+ getTimeout = 5
+ serverTLSReadTimeout = 15
+ serverTLSWriteTimeout = 15
+ defaultGracefulShutdown = 15
+ redisContextTimeout = 2
+ pingTimeout = 5
+ alertCheckIntervalDefault = 600
+ redisCacheDurationMultiplier = 1_000_000
+ cacheDurationdefault = 300_000
+ telegramTimeout = 10
+ // coingeckoAPIURLv3 = "https://api.coingecko.com/api/v3"
)
+var (
+ cacheDuration = flag.Float64(
+ "cacheDuration",
+ cacheDurationdefault,
+ "determines the price cache validity duration in miliseconds",
+ )
+ rdb *redis.Client
+ errUnknownParam = errors.New("unknown parameters for endpoint")
+ errIncompParams = errors.New("incomplete set of parameters")
+ errBadLogic = errors.New("bad logic")
+ errFailedTypeAssertion = errors.New("type assertion failed")
+ errFailedUnmarshall = errors.New("failed to unmarshall JSON")
+ errUnknownDeploymentKind = errors.New("unknown deployment kind")
+)
+
+func GetProxiedClient() (*http.Client, error) {
+ proxyURL := os.Getenv("ALL_PROXY")
+ if proxyURL == "" {
+ proxyURL = os.Getenv("HTTPS_PROXY")
+ }
+
+ dialer, err := proxy.SOCKS5("tcp", proxyURL, nil, proxy.Direct)
+ if err != nil {
+ return nil, fmt.Errorf("[GetProxiedClient] : %w", err)
+ }
+
+ dialContext := func(ctx context.Context, network, address string) (net.Conn, error) {
+ netConn, err := dialer.Dial(network, address)
+ if err == nil {
+ return netConn, nil
+ }
+
+ return netConn, fmt.Errorf("[dialContext] : %w", err)
+ }
+
+ transport := &http.Transport{
+ DialContext: dialContext,
+ DisableKeepAlives: true,
+ }
+ client := &http.Client{
+ Transport: transport,
+ Timeout: httpClientTimeout * time.Second,
+ CheckRedirect: nil,
+ Jar: nil,
+ }
+
+ return client, nil
+}
+
// OWASP: https://cheatsheetseries.owasp.org/cheatsheets/REST_Security_Cheat_Sheet.html
-func addSecureHeaders(w *http.ResponseWriter) {
- (*w).Header().Set("Cache-Control", "no-store")
- (*w).Header().Set("Content-Security-Policy", "default-src https;")
- (*w).Header().Set("Strict-Transport-Security", "max-age=63072000;")
- (*w).Header().Set("X-Content-Type-Options", "nosniff")
- (*w).Header().Set("X-Frame-Options", "DENY")
- (*w).Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
+func addSecureHeaders(writer *http.ResponseWriter) {
+ (*writer).Header().Set("Cache-Control", "no-store")
+ (*writer).Header().Set("Content-Security-Policy", "default-src https;")
+ (*writer).Header().Set("Strict-Transport-Security", "max-age=63072000;")
+ (*writer).Header().Set("X-Content-Type-Options", "nosniff")
+ (*writer).Header().Set("X-Frame-Options", "DENY")
+ (*writer).Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
}
-func sendToTg(address, msg string, channelId int64) {
+func sendToTg(address, msg string, channelID int64) {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatal().Err(err)
@@ -69,15 +113,21 @@ func sendToTg(address, msg string, channelId int64) {
c := pb.NewNotificationServiceClient(conn)
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), telegramTimeout*time.Second)
defer cancel()
- r, err := c.Notify(ctx, &pb.NotificationRequest{NotificationText: msg, ChannelId: channelId})
+ response, err := c.Notify(
+ ctx,
+ &pb.NotificationRequest{
+ NotificationText: msg,
+ ChannelId: channelID,
+ RequestTime: timestamppb.Now(),
+ })
if err != nil {
- log.Fatal().Err(err)
+ log.Error().Err(err)
}
- log.Info().Msg(fmt.Sprintf("%v", r))
+ log.Info().Msg(fmt.Sprintf("%v", response))
}
type priceChanStruct struct {
@@ -90,82 +140,99 @@ type errorChanStruct struct {
err error
}
-type APISource int
+// type APISource int
const (
CryptoCompareSource = iota
- CoinGeckoSource
- CoinCapSource
+ // CoinGeckoSource
+ // CoinCapSource
)
-// TODO-add more sources
-// TODO-do a round robin
+// TODO-add more sources.
+// TODO-do a round robin.
func chooseGetPriceSource() int {
return CryptoCompareSource
}
-func getPrice(name, unit string,
- wg *sync.WaitGroup,
+func getPrice(ctx context.Context,
+ name, unit string,
+ waitGroup *sync.WaitGroup,
priceChan chan<- priceChanStruct,
- errChan chan<- errorChanStruct) {
-
- // check price cache
- ctx := context.Background()
+ errChan chan<- errorChanStruct,
+) {
val, err := rdb.Get(ctx, name+"_price").Float64()
+
if err != nil {
- fmt.Println("price cache miss")
source := chooseGetPriceSource()
if source == CryptoCompareSource {
- getPriceFromCryptoCompare(name, unit, wg, priceChan, errChan)
+ getPriceFromCryptoCompare(ctx, name, unit, waitGroup, priceChan, errChan)
}
} else {
- fmt.Println("price cache hit ", val)
priceChan <- priceChanStruct{name: name, price: val}
errChan <- errorChanStruct{hasError: false, err: nil}
- wg.Done()
+ waitGroup.Done()
}
}
+func getPriceFromCryptoCompareErrorHandler(
+ err error,
+ name string,
+ priceChan chan<- priceChanStruct,
+ errChan chan<- errorChanStruct,
+) {
+ defaultPrice := 0.
+ priceChan <- priceChanStruct{name: name, price: defaultPrice}
+ errChan <- errorChanStruct{hasError: true, err: err}
+
+ log.Error().Err(err)
+}
+
func getPriceFromCryptoCompare(
+ ctx context.Context,
name, unit string,
wg *sync.WaitGroup,
priceChan chan<- priceChanStruct,
- errChan chan<- errorChanStruct) {
+ errChan chan<- errorChanStruct,
+) {
defer wg.Done()
params := "fsym=" + url.QueryEscape(name) + "&" +
"tsyms=" + url.QueryEscape(unit)
path := cryptocomparePriceURL + params
- resp, err := http.Get(path)
+
+ client, err := GetProxiedClient()
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
- errChan <- errorChanStruct{hasError: true, err: err}
- log.Error().Err(err)
+ getPriceFromCryptoCompareErrorHandler(err, name, priceChan, errChan)
+
return
}
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
- errChan <- errorChanStruct{hasError: true, err: err}
- log.Error().Err(err)
+ getPriceFromCryptoCompareErrorHandler(err, name, priceChan, errChan)
+
+ return
}
- jsonBody := make(map[string]float64)
- err = json.Unmarshal(body, &jsonBody)
+ resp, err := client.Do(req)
if err != nil {
- priceChan <- priceChanStruct{name: name, price: 0.}
- errChan <- errorChanStruct{hasError: true, err: err}
- log.Error().Err(err)
+ getPriceFromCryptoCompareErrorHandler(err, name, priceChan, errChan)
+
+ return
}
+ defer resp.Body.Close()
- log.Info().Msg(string(body))
+ jsonBody := make(map[string]float64)
+
+ err = json.NewDecoder(resp.Body).Decode(&jsonBody)
+ if err != nil {
+ getPriceFromCryptoCompareErrorHandler(err, name, priceChan, errChan)
+ }
// add a price cache
- ctx := context.Background()
- err = rdb.Set(ctx, name+"_price", jsonBody[unit], time.Duration(*cacheDuration*1000000)).Err()
+ err = rdb.Set(ctx, name+"_price", jsonBody[unit], time.Duration(*cacheDuration*redisCacheDurationMultiplier)).Err()
+
if err != nil {
log.Error().Err(err)
}
@@ -174,16 +241,20 @@ func getPriceFromCryptoCompare(
errChan <- errorChanStruct{hasError: false, err: nil}
}
-func PriceHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("Content-Type", "application/json")
- if r.Method != "GET" {
- http.Error(w, "Method is not supported.", http.StatusNotFound)
+func PriceHandler(writer http.ResponseWriter, request *http.Request) {
+ writer.Header().Add("Content-Type", "application/json")
+
+ if request.Method != http.MethodGet {
+ http.Error(writer, "Method is not supported.", http.StatusNotFound)
}
- addSecureHeaders(&w)
+
+ addSecureHeaders(&writer)
var name string
+
var unit string
- params := r.URL.Query()
+
+ params := request.URL.Query()
for key, value := range params {
switch key {
case "name":
@@ -191,35 +262,46 @@ func PriceHandler(w http.ResponseWriter, r *http.Request) {
case "unit":
unit = value[0]
default:
- log.Error().Err(errors.New("bad parameters for the crypto endpoint."))
+ log.Error().Err(errUnknownParam)
}
}
if name == "" || unit == "" {
- json.NewEncoder(w).Encode(map[string]interface{}{
+ err := json.NewEncoder(writer).Encode(map[string]interface{}{
"err": "query parameters must include name and unit",
- "isSuccessful": false})
- log.Error().Err(errors.New("query parameters must include name and unit."))
+ "isSuccessful": false,
+ })
+ if err != nil {
+ log.Error().Err(errIncompParams)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
+
return
}
- var wg sync.WaitGroup
+ var waitGroup sync.WaitGroup
+
priceChan := make(chan priceChanStruct, 1)
errChan := make(chan errorChanStruct, 1)
+
defer close(errChan)
defer close(priceChan)
- wg.Add(1)
- // TODO- check cache
- go getPrice(name, unit, &wg, priceChan, errChan)
- wg.Wait()
+ waitGroup.Add(1)
+
+ ctx, cancel := context.WithTimeout(request.Context(), getTimeout*time.Second)
+ defer cancel()
+
+ go getPrice(ctx, name, unit, &waitGroup, priceChan, errChan)
+
+ waitGroup.Wait()
select {
case err := <-errChan:
- if err.hasError != false {
+ if err.hasError {
log.Error().Err(err.err)
}
default:
- log.Error().Err(errors.New("this shouldn't have happened'"))
+ log.Error().Err(errBadLogic)
}
var price priceChanStruct
@@ -227,28 +309,39 @@ func PriceHandler(w http.ResponseWriter, r *http.Request) {
case priceCh := <-priceChan:
price = priceCh
default:
- log.Fatal().Err(errors.New("this shouldnt have happened"))
+ log.Error().Err(errBadLogic)
}
- json.NewEncoder(w).Encode(map[string]interface{}{
+ err := json.NewEncoder(writer).Encode(map[string]interface{}{
"name": price.name,
"price": price.price,
"unit": unit,
"err": "",
- "isSuccessful": true})
+ "isSuccessful": true,
+ })
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
}
func PairHandler(w http.ResponseWriter, r *http.Request) {
var err error
+
w.Header().Add("Content-Type", "application/json")
- if r.Method != "GET" {
+
+ if r.Method != http.MethodGet {
http.Error(w, "Method is not supported.", http.StatusNotFound)
}
+
addSecureHeaders(&w)
var one string
+
var two string
+
var multiplier float64
+
params := r.URL.Query()
for key, value := range params {
switch key {
@@ -262,55 +355,71 @@ func PairHandler(w http.ResponseWriter, r *http.Request) {
log.Fatal().Err(err)
}
default:
- log.Fatal().Err(errors.New("unknown parameters for the pair endpoint."))
+ log.Fatal().Err(errUnknownParam)
}
}
if one == "" || two == "" || multiplier == 0. {
- log.Error().Err(errors.New("the query must include one()),two and multiplier"))
+ log.Error().Err(errIncompParams)
}
- var wg sync.WaitGroup
- priceChan := make(chan priceChanStruct, 2)
- errChan := make(chan errorChanStruct, 2)
+ var waitGroup sync.WaitGroup
+
+ priceChan := make(chan priceChanStruct, 2) //nolint: gomnd
+ errChan := make(chan errorChanStruct, 2) //nolint: gomnd
+
defer close(priceChan)
defer close(errChan)
- wg.Add(2)
- go getPrice(one, "USD", &wg, priceChan, errChan)
- go getPrice(two, "USD", &wg, priceChan, errChan)
- wg.Wait()
+ ctx, cancel := context.WithTimeout(r.Context(), getTimeout*time.Second)
+ defer cancel()
+
+ waitGroup.Add(2) //nolint: gomnd
+
+ go getPrice(ctx, one, "USD", &waitGroup, priceChan, errChan)
+ go getPrice(ctx, two, "USD", &waitGroup, priceChan, errChan)
+
+ waitGroup.Wait()
for i := 0; i < 2; i++ {
select {
case err := <-errChan:
- if err.hasError != false {
+ if err.hasError {
log.Error().Err(err.err)
}
default:
- log.Fatal().Err(errors.New("this shouldnt have happened"))
+ log.Error().Err(errBadLogic)
}
}
var priceOne float64
+
var priceTwo float64
+
for i := 0; i < 2; i++ {
select {
case price := <-priceChan:
if price.name == one {
priceOne = price.price
}
+
if price.name == two {
priceTwo = price.price
}
default:
- log.Fatal().Err(errors.New("this shouldnt have happened"))
+ log.Error().Err(errBadLogic)
}
}
ratio := priceOne * multiplier / priceTwo
+
log.Info().Msg(fmt.Sprintf("%v", ratio))
- json.NewEncoder(w).Encode(map[string]interface{}{"ratio": ratio})
+
+ err = json.NewEncoder(w).Encode(map[string]interface{}{"ratio": ratio})
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(w, "internal server error", http.StatusInternalServerError)
+ }
}
type alertType struct {
@@ -322,98 +431,120 @@ type alertsType struct {
Alerts []alertType `json:"alerts"`
}
-func getAlerts() (alertsType, error) {
+func getAlerts() alertsType {
var alerts alertsType
+
ctx := context.Background()
keys := rdb.SMembersMap(ctx, "alertkeys")
alerts.Alerts = make([]alertType, len(keys.Val()))
vals := keys.Val()
- i := 0
+ alertIndex := 0
+
for key := range vals {
alert := rdb.Get(ctx, key[6:])
expr, _ := alert.Result()
- alerts.Alerts[i].Name = key
- alerts.Alerts[i].Expr = expr
- i++
+ alerts.Alerts[alertIndex].Name = key
+ alerts.Alerts[alertIndex].Expr = expr
+ alertIndex++
}
- return alerts, nil
+ return alerts
}
-func alertManager() {
- for {
- alerts, err := getAlerts()
- if err != nil {
- log.Error().Err(err)
- return
- }
- log.Info().Msg(fmt.Sprintf("%v", alerts))
+func alertManagerWorker(alert alertType) {
+ expression, err := govaluate.NewEvaluableExpression(alert.Expr)
+ if err != nil {
+ log.Error().Err(err)
+ }
- for i := range alerts.Alerts {
- expression, err := govaluate.NewEvaluableExpression(alerts.Alerts[i].Expr)
- if err != nil {
- log.Error().Err(err)
- continue
- }
+ vars := expression.Vars()
+ parameters := make(map[string]interface{}, len(vars))
- vars := expression.Vars()
- parameters := make(map[string]interface{}, len(vars))
+ var waitGroup sync.WaitGroup
- var wg sync.WaitGroup
- priceChan := make(chan priceChanStruct, len(vars))
- errChan := make(chan errorChanStruct, len(vars))
- defer close(priceChan)
- defer close(errChan)
- wg.Add(len(vars))
+ priceChan := make(chan priceChanStruct, len(vars))
+ defer close(priceChan)
- for i := range vars {
- // TODO-get from cache
- go getPrice(vars[i], "USD", &wg, priceChan, errChan)
- }
- wg.Wait()
-
- for i := 0; i < len(vars); i++ {
- select {
- case err := <-errChan:
- if err.hasError != false {
- log.Printf(err.err.Error())
- }
- default:
- log.Error().Err(errors.New("this shouldnt have happened"))
- }
- }
+ errChan := make(chan errorChanStruct, len(vars))
+ defer close(errChan)
- for i := 0; i < len(vars); i++ {
- select {
- case price := <-priceChan:
- parameters[price.name] = price.price
- default:
- log.Error().Err(errors.New("this shouldnt have happened"))
- }
- }
+ ctx, cancel := context.WithTimeout(context.Background(), getTimeout*time.Second)
+ defer cancel()
- log.Info().Msg(fmt.Sprintf("parameters: %v", parameters))
- result, err := expression.Evaluate(parameters)
- if err != nil {
- log.Error().Err(err)
- }
+ waitGroup.Add(len(vars))
+
+ for i := range vars {
+ go getPrice(ctx, vars[i], "USD", &waitGroup, priceChan, errChan)
+ }
+
+ waitGroup.Wait()
- var resultBool bool
- log.Info().Msg(fmt.Sprintf("result: %v", result))
- resultBool = result.(bool)
- if resultBool == true {
- token := os.Getenv(TELEGRAM_BOT_TOKEN_ENV_VAR)
- msgText := "notification " + alerts.Alerts[i].Expr + " has been triggered"
- tokenInt, err := strconv.ParseInt(token[1:len(token)-1], 10, 64)
- if err != nil {
- log.Fatal().Err(err)
- }
- sendToTg("telebot:8000", msgText, tokenInt)
+ for i := 0; i < len(vars); i++ {
+ select {
+ case err := <-errChan:
+ if err.hasError {
+ log.Printf(err.err.Error())
}
+ default:
+ log.Error().Err(errBadLogic)
+ }
+ }
+
+ for i := 0; i < len(vars); i++ {
+ select {
+ case price := <-priceChan:
+ parameters[price.name] = price.price
+ default:
+ log.Error().Err(errBadLogic)
+ }
+ }
+
+ log.Info().Msg(fmt.Sprintf("parameters: %v", parameters))
+
+ result, err := expression.Evaluate(parameters)
+ if err != nil {
+ log.Error().Err(err)
+ }
+
+ var resultBool bool
+
+ log.Info().Msg(fmt.Sprintf("result: %v", result))
+
+ resultBool, ok := result.(bool)
+ if !ok {
+ log.Error().Err(errFailedTypeAssertion)
+
+ return
+ }
+
+ if !resultBool {
+ return
+ }
+
+ token := os.Getenv(telegramBotTokenEnvVar)
+ msgText := "notification " + alert.Expr + " has been triggered"
+
+ tokenInt, err := strconv.ParseInt(token[1:len(token)-1], 10, 64)
+
+ if err == nil {
+ log.Error().Err(err)
+ }
+
+ sendToTg("telebot:8000", msgText, tokenInt)
+}
+
+func alertManager(alertsCheckInterval int64) {
+ for {
+ alerts := getAlerts()
+
+ log.Info().Msg(fmt.Sprintf("%v", alerts))
+
+ for alertIndex := range alerts.Alerts {
+ go alertManagerWorker(alerts.Alerts[alertIndex])
}
- time.Sleep(time.Second * time.Duration(*alertsCheckInterval))
+ time.Sleep(time.Second * time.Duration(alertsCheckInterval))
}
}
@@ -422,91 +553,136 @@ type addAlertJSONType struct {
Expr string `json:"expr"`
}
-func (this AlertHandler) HandleAlertPost(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("Content-Type", "application/json")
- bodyBytes, err := ioutil.ReadAll(r.Body)
+func (alertHandler AlertHandler) HandleAlertPost(writer http.ResponseWriter, request *http.Request) {
+ var bodyJSON addAlertJSONType
+
+ writer.Header().Add("Content-Type", "application/json")
+
+ err := json.NewDecoder(request.Body).Decode(&bodyJSON)
if err != nil {
log.Printf(err.Error())
- }
- var bodyJSON addAlertJSONType
- json.Unmarshal(bodyBytes, &bodyJSON)
+ err := json.NewEncoder(writer).Encode(map[string]interface{}{
+ "isSuccessful": false,
+ "error": "not all parameters are valid.",
+ })
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
+ }
if bodyJSON.Name == "" || bodyJSON.Expr == "" {
- json.NewEncoder(w).Encode(map[string]interface{}{
+ err := json.NewEncoder(writer).Encode(map[string]interface{}{
"isSuccessful": false,
- "error": "not all parameters are valid."})
- log.Fatal().Err(errors.New("not all parameters are valid."))
+ "error": "not all parameters are valid.",
+ })
+ if err != nil {
+ log.Error().Err(errFailedUnmarshall)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
+
return
}
- ctx := context.Background()
+ ctx, cancel := context.WithTimeout(request.Context(), redisContextTimeout*time.Second)
+ defer cancel()
+
key := "alert:" + bodyJSON.Name
- this.rdb.Set(ctx, bodyJSON.Name, bodyJSON.Expr, 0)
- this.rdb.SAdd(ctx, "alertkeys", key)
- json.NewEncoder(w).Encode(map[string]interface{}{
+ alertHandler.rdb.Set(ctx, bodyJSON.Name, bodyJSON.Expr, 0)
+ alertHandler.rdb.SAdd(ctx, "alertkeys", key)
+
+ err = json.NewEncoder(writer).Encode(map[string]interface{}{
"isSuccessful": true,
- "error": ""})
+ "error": "",
+ })
+
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
}
-func (this AlertHandler) HandleAlertDelete(w http.ResponseWriter, r *http.Request) {
- var Id string
- w.Header().Add("Content-Type", "application/json")
- params := r.URL.Query()
+func (alertHandler AlertHandler) HandleAlertDelete(writer http.ResponseWriter, request *http.Request) {
+ var identifier string
+
+ writer.Header().Add("Content-Type", "application/json")
+
+ params := request.URL.Query()
+
for key, value := range params {
switch key {
case "key":
- Id = value[0]
+ identifier = value[0]
default:
- log.Error().Err(errors.New("bad parameters for the crypto endpoint."))
+ log.Error().Err(errUnknownParam)
}
}
- if Id == "" {
- json.NewEncoder(w).Encode(map[string]interface{}{
+ if identifier == "" {
+ err := json.NewEncoder(writer).Encode(map[string]interface{}{
"isSuccessful": false,
- "error": "Id parameter is not valid."})
- log.Fatal().Err(errors.New("not all parameters are valid."))
+ "error": "Id parameter is not valid.",
+ })
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
+
return
}
- ctx := context.Background()
+ ctx, cancel := context.WithTimeout(request.Context(), redisContextTimeout*time.Second)
+ defer cancel()
- this.rdb.Del(ctx, Id)
- setKey := "alert:" + Id
- this.rdb.SRem(ctx, "alertkeys", setKey)
+ alertHandler.rdb.Del(ctx, identifier)
+ setKey := "alert:" + identifier
+ alertHandler.rdb.SRem(ctx, "alertkeys", setKey)
log.Printf(setKey)
- json.NewEncoder(w).Encode(struct {
+ err := json.NewEncoder(writer).Encode(struct {
IsSuccessful bool `json:"isSuccessful"`
Err string `json:"err"`
}{IsSuccessful: true, Err: ""})
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
}
-func (this AlertHandler) HandleAlertGet(w http.ResponseWriter, r *http.Request) {
- var Id string
- w.Header().Add("Content-Type", "application/json")
- params := r.URL.Query()
+func (alertHandler AlertHandler) HandleAlertGet(writer http.ResponseWriter, request *http.Request) {
+ var identifier string
+
+ writer.Header().Add("Content-Type", "application/json")
+
+ params := request.URL.Query()
for key, value := range params {
switch key {
case "key":
- Id = value[0]
+ identifier = value[0]
default:
- log.Error().Err(errors.New("bad parameters for the crypto endpoint."))
+ log.Error().Err(errUnknownParam)
}
}
- if Id == "" {
- json.NewEncoder(w).Encode(map[string]interface{}{
+ if identifier == "" {
+ err := json.NewEncoder(writer).Encode(map[string]interface{}{
"isSuccessful": false,
- "error": "Id parameter is not valid."})
- log.Fatal().Err(errors.New("not all parameters are valid."))
+ "error": "Id parameter is not valid.",
+ })
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ }
+
return
}
- ctx := context.Background()
+ ctx, cancel := context.WithTimeout(request.Context(), redisContextTimeout*time.Second)
+ defer cancel()
+
+ redisResult := alertHandler.rdb.Get(ctx, identifier)
- redisResult := this.rdb.Get(ctx, Id)
redisResultString, err := redisResult.Result()
if err != nil {
log.Err(err)
@@ -519,111 +695,67 @@ func (this AlertHandler) HandleAlertGet(w http.ResponseWriter, r *http.Request)
ErrorString = err.Error()
}
- w.Header().Add("Content-Type", "application/json")
+ writer.Header().Add("Content-Type", "application/json")
- json.NewEncoder(w).Encode(struct {
+ err = json.NewEncoder(writer).Encode(struct {
IsSuccessful bool `json:"isSuccessful"`
Error string `json:"error"`
Key string `json:"key"`
Expr string `json:"expr"`
- }{IsSuccessful: true, Error: ErrorString, Key: Id, Expr: redisResultString})
-}
+ }{IsSuccessful: true, Error: ErrorString, Key: identifier, Expr: redisResultString})
-func alertHandler(w http.ResponseWriter, r *http.Request) {
- addSecureHeaders(&w)
- alertHandler := AlertHandler{rdb: rdb}
- if r.Method == "POST" || r.Method == "PUT" || r.Method == "PATCH" {
- alertHandler.HandleAlertPost(w, r)
- } else if r.Method == "DELETE" {
- alertHandler.HandleAlertDelete(w, r)
- } else if r.Method == "GET" {
- alertHandler.HandleAlertGet(w, r)
- } else {
- http.Error(w, "Method is not supported.", http.StatusNotFound)
+ if err != nil {
+ log.Error().Err(err)
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
}
}
-func exHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("Content-Type", "application/json")
- addSecureHeaders(&w)
- if r.Method != "GET" {
- http.Error(w, "Method is not supported.", http.StatusNotFound)
- }
-
- apiKey := os.Getenv(CHANGELLY_API_KEY_ENV_VAR)
- apiSecret := os.Getenv(CHANGELLY_API_SECRET_ENV_VAR)
+func alertHandler(writer http.ResponseWriter, request *http.Request) {
+ addSecureHeaders(&writer)
- body := struct {
- Jsonrpc string `json:"jsonrpc"`
- Id string `json:"id"`
- Method string `json:"method"`
- Params []string `json:"params"`
- }{
- Jsonrpc: "2.0",
- Id: "test",
- Method: "getCurrencies",
- Params: nil}
+ alertHandler := AlertHandler{rdb: rdb}
- bodyJSON, err := json.Marshal(body)
- if err != nil {
- log.Error().Err(err)
+ switch request.Method {
+ case http.MethodPost:
+ alertHandler.HandleAlertPost(writer, request)
+ case http.MethodPut:
+ alertHandler.HandleAlertPost(writer, request)
+ case http.MethodPatch:
+ alertHandler.HandleAlertPost(writer, request)
+ case http.MethodDelete:
+ alertHandler.HandleAlertDelete(writer, request)
+ case http.MethodGet:
+ alertHandler.HandleAlertGet(writer, request)
+ default:
+ http.Error(writer, "Method is not supported.", http.StatusNotFound)
}
+}
- secretBytes := []byte(apiSecret[1 : len(apiSecret)-1])
- mac := hmac.New(sha512.New, secretBytes)
- mac.Write(bodyJSON)
-
- client := &http.Client{}
- req, err := http.NewRequest("POST", changellyURL, bytes.NewReader(bodyJSON))
- if err != nil {
- log.Error().Err(err)
- }
+func healthHandler(writer http.ResponseWriter, request *http.Request) {
+ var RedisError string
- macDigest := hex.EncodeToString(mac.Sum(nil))
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("api-key", apiKey[1:len(apiKey)-1])
- req.Header.Add("sign", macDigest)
+ var HivedError string
- resp, err := client.Do(req)
- if err != nil {
- log.Error().Err(err)
- }
- defer resp.Body.Close()
+ var IsRedisOk bool
- responseBody, err := ioutil.ReadAll(resp.Body)
- log.Printf(string(responseBody))
+ IsHivedOk := true
- responseUnmarshalled := struct {
- Jsonrpc string `json:"jsonrpc"`
- Id string `json:"id"`
- Result []string `json:"result"`
- }{}
+ addSecureHeaders(&writer)
+ writer.Header().Add("Content-Type", "application/json")
- err = json.Unmarshal(responseBody, &responseUnmarshalled)
- if err != nil {
- log.Error().Err(err)
+ if request.Method != http.MethodGet {
+ http.Error(writer, "Method is not supported.", http.StatusNotFound)
}
- json.NewEncoder(w).Encode(responseUnmarshalled)
-}
-
-func healthHandler(w http.ResponseWriter, r *http.Request) {
- var RedisError string
- var HivedError string
- IsHivedOk := true
- var IsRedisOk bool
+ ctx, cancel := context.WithTimeout(request.Context(), pingTimeout*time.Second)
+ defer cancel()
- addSecureHeaders(&w)
- w.Header().Add("Content-Type", "application/json")
- if r.Method != "GET" {
- http.Error(w, "Method is not supported.", http.StatusNotFound)
- }
+ pingResponse := rdb.Ping(ctx)
- pingCtx := context.Background()
- pingResponse := rdb.Ping(pingCtx)
pingResponseResult, err := pingResponse.Result()
if err != nil {
log.Err(err)
+
IsRedisOk = false
RedisError = err.Error()
} else {
@@ -636,63 +768,69 @@ func healthHandler(w http.ResponseWriter, r *http.Request) {
}
}
- w.WriteHeader(http.StatusOK)
+ writer.WriteHeader(http.StatusOK)
+
+ err = json.NewEncoder(writer).Encode(map[string]interface{}{
+ "isHivedOk": IsHivedOk,
+ "hivedError": HivedError,
+ "isRedisOk": IsRedisOk,
+ "redisError": RedisError,
+ })
- json.NewEncoder(w).Encode(struct {
- IsHivedOk bool `json:"isHivedOk"`
- HivedError string `json:"hivedError"`
- IsRedisOk bool `json:"isRedisOk"`
- RedisError string `json:"redisError"`
- }{IsHivedOk: IsHivedOk, HivedError: HivedError, IsRedisOk: IsRedisOk, RedisError: RedisError})
+ if request.Method != http.MethodGet {
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
+ log.Error().Err(err)
+ }
}
-func robotsHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("Content-Type", "text/plain")
- addSecureHeaders(&w)
- json.NewEncoder(w).Encode(struct {
- UserAgents string `json:"User-Agents"`
- Disallow string `json:"Disallow"`
- }{"*", "/"})
+func robotsHandler(writer http.ResponseWriter, r *http.Request) {
+ writer.Header().Add("Content-Type", "text/plain")
+ addSecureHeaders(&writer)
+
+ _, err := writer.Write([]byte("User-Agents: *\nDisallow: /\n"))
+ if err != nil {
+ log.Error().Err(err)
+ }
+
+ http.Error(writer, "internal server error", http.StatusInternalServerError)
}
-func startServer(gracefulWait time.Duration) {
- r := mux.NewRouter()
+func startServer(gracefulWait time.Duration, flagPort string) {
+ router := mux.NewRouter()
+
cfg := &tls.Config{
MinVersion: tls.VersionTLS13,
- CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
PreferServerCipherSuites: true,
- CipherSuites: []uint16{
- tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
- tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
- tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
- tls.TLS_RSA_WITH_AES_256_CBC_SHA,
- },
}
+
srv := &http.Server{
- Addr: "0.0.0.0:" + *flagPort,
- WriteTimeout: time.Second * 15,
- ReadTimeout: time.Second * 15,
- Handler: r,
+ Addr: "0.0.0.0:" + flagPort,
+ WriteTimeout: time.Second * serverTLSWriteTimeout,
+ ReadTimeout: time.Second * serverTLSReadTimeout,
+ Handler: router,
TLSConfig: cfg,
}
- r.HandleFunc("/crypto/v1/health", healthHandler)
- r.HandleFunc("/crypto/v1/price", PriceHandler)
- r.HandleFunc("/crypto/v1/pair", PairHandler)
- r.HandleFunc("/crypto/v1/alert", alertHandler)
- r.HandleFunc("/crypto/v1/ex", exHandler)
- r.HandleFunc("/crypto/v1/robots.txt", robotsHandler)
+
+ router.HandleFunc("/crypto/v1/health", healthHandler)
+ router.HandleFunc("/crypto/v1/price", PriceHandler)
+ router.HandleFunc("/crypto/v1/pair", PairHandler)
+ router.HandleFunc("/crypto/v1/alert", alertHandler)
+ router.HandleFunc("/crypto/v1/robots.txt", robotsHandler)
go func() {
var certPath, keyPath string
- if os.Getenv(SERVER_DEPLOYMENT_TYPE) == "deployment" {
+
+ switch os.Getenv(serverDeploymentType) {
+ case "deployment":
certPath = "/certs/fullchain1.pem"
keyPath = "/certs/privkey1.pem"
- } else if os.Getenv(SERVER_DEPLOYMENT_TYPE) == "test" {
+ case "test":
certPath = "/certs/server.cert"
keyPath = "/certs/server.key"
- } else {
- log.Fatal().Err(errors.New(fmt.Sprintf("unknown deployment kind: %s", SERVER_DEPLOYMENT_TYPE)))
+ default:
+ log.Fatal().Err(errUnknownDeploymentKind)
}
+
if err := srv.ListenAndServeTLS(certPath, keyPath); err != nil {
log.Fatal().Err(err)
}
@@ -702,10 +840,15 @@ func startServer(gracefulWait time.Duration) {
signal.Notify(c, os.Interrupt)
<-c
+
ctx, cancel := context.WithTimeout(context.Background(), gracefulWait)
defer cancel()
- srv.Shutdown(ctx)
- log.Info().Msg("gracefully shut down the server")
+
+ if err := srv.Shutdown(ctx); err != nil {
+ log.Error().Err(err)
+ } else {
+ log.Info().Msg("gracefully shut down the server")
+ }
}
func setupLogging() {
@@ -714,7 +857,21 @@ func setupLogging() {
func main() {
var gracefulWait time.Duration
- flag.DurationVar(&gracefulWait, "gracefulwait", time.Second*15, "the duration to wait during the graceful shutdown")
+
+ flagPort := flag.String("port", "8008", "determined the port the sercice runs on")
+ redisAddress := flag.String("redisaddress", "redis:6379", "determines the address of the redis instance")
+ redisPassword := flag.String("redispassword", "", "determines the password of the redis db")
+ redisDB := flag.Int64("redisdb", 0, "determines the db number")
+ alertsCheckInterval := flag.Int64(
+ "alertinterval",
+ alertCheckIntervalDefault,
+ "in seconds, the amount of time between alert checks")
+
+ flag.DurationVar(
+ &gracefulWait, "gracefulwait",
+ time.Second*defaultGracefulShutdown,
+ "the duration to wait during the graceful shutdown")
+
flag.Parse()
rdb = redis.NewClient(&redis.Options{
@@ -725,6 +882,8 @@ func main() {
defer rdb.Close()
setupLogging()
- go alertManager()
- startServer(gracefulWait)
+
+ go alertManager(*alertsCheckInterval)
+
+ startServer(gracefulWait, *flagPort)
}
diff --git a/hived/hived_test.go b/hived/hived_test.go
index 96bdd1e..bef135d 100644
--- a/hived/hived_test.go
+++ b/hived/hived_test.go
@@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/json"
+ "flag"
"fmt"
"io/ioutil"
"net/http"
@@ -16,6 +17,12 @@ const (
endpoint = "https://api.terminaldweller.com/crypto/v1"
)
+var (
+ redisAddress = flag.String("redisaddress", "redis:6379", "determines the address of the redis instance")
+ redisPassword = flag.String("redispassword", "", "determines the password of the redis db")
+ redisDB = flag.Int64("redisdb", 0, "determines the db number")
+)
+
func errorHandler(recorder *httptest.ResponseRecorder, t *testing.T, err error) {
if err != nil {
t.Errorf(err.Error())
@@ -27,7 +34,7 @@ func errorHandler(recorder *httptest.ResponseRecorder, t *testing.T, err error)
}
func TestPriceHandler(t *testing.T) {
- req, err := http.NewRequest("GET", endpoint+"/price?name=BTC&unit=USD", nil)
+ req, err := http.NewRequest(http.MethodGet, endpoint+"/price?name=BTC&unit=USD", nil)
recorder := httptest.NewRecorder()
PriceHandler(recorder, req)
errorHandler(recorder, t, err)
@@ -49,7 +56,7 @@ func TestPriceHandler(t *testing.T) {
}
func TestPairHandler(t *testing.T) {
- req, err := http.NewRequest("GET", endpoint+"/pair?one=ETH&two=CAKE&multiplier=4.0", nil)
+ req, err := http.NewRequest(http.MethodGet, endpoint+"/pair?one=ETH&two=CAKE&multiplier=4.0", nil)
recorder := httptest.NewRecorder()
PairHandler(recorder, req)
errorHandler(recorder, t, err)
@@ -78,7 +85,7 @@ func TestAlertHandlerPhase1(t *testing.T) {
if err != nil {
fmt.Println(err.Error())
}
- req, err := http.NewRequest("POST", endpoint+"/alert", bytes.NewBuffer(postData))
+ req, err := http.NewRequest(http.MethodPost, endpoint+"/alert", bytes.NewBuffer(postData))
req.Header.Set("Content-Type", "application/json")
recorder := httptest.NewRecorder()
alertHandler := AlertHandler{rdb: rdb}
@@ -108,7 +115,7 @@ func TestAlertHandlerPhase2(t *testing.T) {
})
defer rdb.Close()
- req, err := http.NewRequest("GET", endpoint+"/alert?key=alertTest", nil)
+ req, err := http.NewRequest(http.MethodGet, endpoint+"/alert?key=alertTest", nil)
recorder := httptest.NewRecorder()
alertHandler := AlertHandler{rdb: rdb}
alertHandler.HandleAlertGet(recorder, req)
@@ -142,7 +149,7 @@ func TestAlertHandlerPhase3(t *testing.T) {
if err != nil {
fmt.Println(err.Error())
}
- req, err := http.NewRequest("PUT", endpoint+"/alert", bytes.NewBuffer(postData))
+ req, err := http.NewRequest(http.MethodPut, endpoint+"/alert", bytes.NewBuffer(postData))
req.Header.Set("Content-Type", "application/json")
recorder := httptest.NewRecorder()
alertHandler := AlertHandler{rdb: rdb}
@@ -158,7 +165,7 @@ func TestAlertHandlerPhase4(t *testing.T) {
})
defer rdb.Close()
- req, err := http.NewRequest("GET", endpoint+"/alert?key=alertTest", nil)
+ req, err := http.NewRequest(http.MethodGet, endpoint+"/alert?key=alertTest", nil)
recorder := httptest.NewRecorder()
alertHandler := AlertHandler{rdb: rdb}
alertHandler.HandleAlertGet(recorder, req)
@@ -187,7 +194,7 @@ func TestAlertHandlerPhase5(t *testing.T) {
})
defer rdb.Close()
- req, err := http.NewRequest("DELETE", endpoint+"/alert?key=alertTest", nil)
+ req, err := http.NewRequest(http.MethodDelete, endpoint+"/alert?key=alertTest", nil)
recorder := httptest.NewRecorder()
alertHandler := AlertHandler{rdb: rdb}
alertHandler.HandleAlertGet(recorder, req)
@@ -216,7 +223,7 @@ func TestAlertHandlerPhase6(t *testing.T) {
})
defer rdb.Close()
- req, err := http.NewRequest("GET", endpoint+"/alert?key=alertTest", nil)
+ req, err := http.NewRequest(http.MethodGet, endpoint+"/alert?key=alertTest", nil)
recorder := httptest.NewRecorder()
alertHandler := AlertHandler{rdb: rdb}
alertHandler.HandleAlertGet(recorder, req)