diff options
Diffstat (limited to 'arbiter/arbiter.go')
-rw-r--r-- | arbiter/arbiter.go | 377 |
1 files changed, 263 insertions, 114 deletions
diff --git a/arbiter/arbiter.go b/arbiter/arbiter.go index 5168b3a..9edf68a 100644 --- a/arbiter/arbiter.go +++ b/arbiter/arbiter.go @@ -7,7 +7,7 @@ import ( "errors" "flag" "fmt" - "io/ioutil" + "io" "net" "net/http" "net/url" @@ -25,23 +25,25 @@ import ( ) var ( - flagPort = flag.String("port", "8009", "determines the port the server will listen on") - flagInterval = flag.Float64("interval", 10, "In seconds, the delay between checking prices") - redisDB = flag.Int64("redisdb", 1, "determines the db number") - rdb *redis.Client - redisAddress = flag.String("redisaddress", "redis:6379", "determines the address of the redis instance") - redisPassword = flag.String("redispassword", "", "determines the password of the redis db") + errBadLogic = errors.New("we should not be here") + errUnexpectedParam = errors.New("got unexpected parameter") + errUnknownDeployment = errors.New("unknown deployment kind") ) const ( - SERVER_DEPLOYMENT_TYPE = "SERVER_DEPLOYMENT_TYPE" - coingeckoAPIURLv3 = "https://api.coingecko.com/api/v3" - coincapAPIURLv2 = "https://api.coincap.io/v2" + serverDeploymentType = "SERVER_DEPLOYMENT_TYPE" + coingeckoAPIURLv3 = "https://api.coingecko.com/api/v3" + coincapAPIURLv2 = "https://api.coincap.io/v2" + getTimeout = 5 + httpClientTimeout = 5 + serverTLSReadTimeout = 15 + serverTLSWriteTimeout = 15 + defaultGracefulShutdown = 15 ) // https://docs.coincap.io/ type CoinCapAssetGetResponseData struct { - Id string `json:"id"` + ID string `json:"id"` Rank string `json:"rank"` Symbol string `json:"symbol"` Name string `json:"name"` @@ -54,16 +56,24 @@ type CoinCapAssetGetResponseData struct { Vwap24Hr string `json:"vwap24Hr"` } +type priceResponseData struct { + Name string `json:"name"` + Price float64 `json:"price"` + Unit string `json:"unit"` + Err string `json:"err"` + IsSuccessful bool `json:"isSuccessful"` +} + type CoinCapAssetGetResponse struct { Data CoinCapAssetGetResponseData `json:"data"` TimeStamp int64 `json:"timestamp"` } -type HttpHandlerFunc func(http.ResponseWriter, *http.Request) +type HTTPHandlerFunc func(http.ResponseWriter, *http.Request) -type HttpHandler struct { +type HTTPHandler struct { name string - function HttpHandlerFunc + function HTTPHandlerFunc } type priceChanStruct struct { @@ -81,171 +91,241 @@ func GetProxiedClient() (*http.Client, error) { if proxyURL == "" { proxyURL = os.Getenv("HTTPS_PROXY") } + dialer, err := proxy.SOCKS5("tcp", proxyURL, nil, proxy.Direct) if err != nil { - return nil, err + return nil, fmt.Errorf("[GetProxiedClient] : %w", err) } + dialContext := func(ctx context.Context, network, address string) (net.Conn, error) { - return dialer.Dial(network, address) + netConn, err := dialer.Dial(network, address) + if err == nil { + return netConn, nil + } + + return netConn, fmt.Errorf("[dialContext] : %w", err) } transport := &http.Transport{ DialContext: dialContext, DisableKeepAlives: true, } - client := &http.Client{Transport: transport} + client := &http.Client{ + Transport: transport, + Timeout: httpClientTimeout * time.Second, + CheckRedirect: nil, + Jar: nil, + } return client, nil } // OWASP: https://cheatsheetseries.owasp.org/cheatsheets/REST_Security_Cheat_Sheet.html -func addSecureHeaders(w *http.ResponseWriter) { - (*w).Header().Set("Cache-Control", "no-store") - (*w).Header().Set("Content-Security-Policy", "default-src https;") - (*w).Header().Set("Strict-Transport-Security", "max-age=63072000;") - (*w).Header().Set("X-Content-Type-Options", "nosniff") - (*w).Header().Set("X-Frame-Options", "DENY") - (*w).Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS") +func addSecureHeaders(writer *http.ResponseWriter) { + (*writer).Header().Set("Cache-Control", "no-store") + (*writer).Header().Set("Content-Security-Policy", "default-src https;") + (*writer).Header().Set("Strict-Transport-Security", "max-age=63072000;") + (*writer).Header().Set("X-Content-Type-Options", "nosniff") + (*writer).Header().Set("X-Frame-Options", "DENY") + (*writer).Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS") } -// binance -func getPriceFromBinance(name, unit string, - wg *sync.WaitGroup, - priceChan chan<- priceChanStruct, - errChan chan<- errorChanStruct) { +// get price from binance. +// func getPriceFromBinance(name, unit string, +// wg *sync.WaitGroup, +// priceChan chan<- priceChanStruct, +// errChan chan<- errorChanStruct) { -} +// } -// kucoin -func getPriceFromKu(name, uni string, - wg *sync.WaitGroup, - priceChan chan<- priceChanStruct, - errChan chan<- errorChanStruct) { +// get price from kucoin. +// func getPriceFromKu(name, uni string, +// wg *sync.WaitGroup, +// priceChan chan<- priceChanStruct, +// errChan chan<- errorChanStruct) { -} +// } func getPriceFromCoinGecko( + ctx context.Context, name, unit string, wg *sync.WaitGroup, priceChan chan<- priceChanStruct, - errChan chan<- errorChanStruct) { + errChan chan<- errorChanStruct, +) { defer wg.Done() + priceFloat := 0. + params := "/simple/price?ids=" + url.QueryEscape(name) + "&" + "vs_currencies=" + url.QueryEscape(unit) path := coingeckoAPIURLv3 + params - fmt.Println(path) - // resp, err := http.Get(path) + client, err := GetProxiedClient() if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} + errChan <- errorChanStruct{hasError: true, err: err} + + log.Error().Err(err) + + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) + if err != nil { + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) + return } - resp, err := client.Get(path) + resp, err := client.Do(req) if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) - fmt.Println(err) + return } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) } jsonBody := make(map[string]interface{}) + err = json.Unmarshal(body, &jsonBody) if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) } - price := jsonBody[name].(map[string]interface{})[unit].(float64) + price, isOk := jsonBody[name].(map[string]interface{}) + if !isOk { + priceChan <- priceChanStruct{name: name, price: priceFloat} + errChan <- errorChanStruct{hasError: true, err: err} + + log.Error().Err(err) + + return + } log.Info().Msg(string(body)) - priceChan <- priceChanStruct{name: name, price: price} + priceFloat, isOk = price[unit].(float64) + if !isOk { + priceChan <- priceChanStruct{name: name, price: priceFloat} + errChan <- errorChanStruct{hasError: true, err: err} + + log.Error().Err(err) + + return + } + + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: false, err: nil} } func getPriceFromCoinCap( + ctx context.Context, name, unit string, wg *sync.WaitGroup, priceChan chan<- priceChanStruct, - errChan chan<- errorChanStruct) { + errChan chan<- errorChanStruct, +) { defer wg.Done() + priceFloat := 0. + params := "/assets/" + url.QueryEscape(name) path := coincapAPIURLv2 + params - fmt.Println(path) + client, err := GetProxiedClient() if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} + errChan <- errorChanStruct{hasError: true, err: err} + + log.Error().Err(err) + + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) + if err != nil { + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) + return } - // resp, err := http.Get(path) - resp, err := client.Get(path) + + resp, err := client.Do(req) if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) + return } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) } - fmt.Println(string(body)) var coinCapAssetGetResponse CoinCapAssetGetResponse - // jsonBody := make(map[string]interface{}) - // err = json.Unmarshal(body, &jsonBody) + err = json.Unmarshal(body, &coinCapAssetGetResponse) if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) } - // price := jsonBody[name].(map[string]interface{})[unit].(float64) - price, err := strconv.ParseFloat(coinCapAssetGetResponse.Data.PriceUsd, 64) + priceFloat, err = strconv.ParseFloat(coinCapAssetGetResponse.Data.PriceUsd, 64) if err != nil { - priceChan <- priceChanStruct{name: name, price: 0.} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: true, err: err} + log.Error().Err(err) } - fmt.Println(price) log.Info().Msg(string(body)) - priceChan <- priceChanStruct{name: name, price: price} + priceChan <- priceChanStruct{name: name, price: priceFloat} errChan <- errorChanStruct{hasError: false, err: nil} } func arbHandler(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") - if r.Method != "GET" { + + if r.Method != http.MethodGet { http.Error(w, "Method is not supported.", http.StatusNotFound) } + addSecureHeaders(&w) var name string + var unit string + params := r.URL.Query() for key, value := range params { switch key { @@ -254,24 +334,31 @@ func arbHandler(w http.ResponseWriter, r *http.Request) { case "unit": unit = value[0] default: - log.Error().Err(errors.New("Got unexpected parameter.")) + log.Error().Err(errUnexpectedParam) } } priceChan := make(chan priceChanStruct, 1) errChan := make(chan errorChanStruct, 1) - var wg sync.WaitGroup - wg.Add(1) - getPriceFromCoinGecko(name, unit, &wg, priceChan, errChan) - wg.Wait() + + var waitGroup sync.WaitGroup + + ctx, cancel := context.WithTimeout(context.Background(), getTimeout*time.Second) + defer cancel() + + waitGroup.Add(1) + + //nolint:contextcheck + getPriceFromCoinGecko(ctx, name, unit, &waitGroup, priceChan, errChan) + waitGroup.Wait() select { case err := <-errChan: - if err.hasError != false { + if err.hasError { log.Error().Err(err.err) } default: - log.Error().Err(errors.New("We shouldnt be here")) + log.Error().Err(errBadLogic) } var price priceChanStruct @@ -279,27 +366,44 @@ func arbHandler(w http.ResponseWriter, r *http.Request) { case priceCh := <-priceChan: price = priceCh default: - log.Fatal().Err(errors.New("We shouldnt be here")) + log.Error().Err(errBadLogic) } - json.NewEncoder(w).Encode(map[string]interface{}{ - "name": price.name, - "price": price.price, - "unit": unit, - "err": "", - "isSuccessful": true, - }) + responseData := priceResponseData{ + Name: price.name, + Price: price.price, + Unit: "USD", + Err: "", + IsSuccessful: true, + } + + jsonResp, err := json.Marshal(responseData) + if err != nil { + cancel() + //nolint:gocritic + log.Fatal().Err(err) + } + + _, err = w.Write(jsonResp) + if err != nil { + cancel() + log.Fatal().Err(err) + } } func coincapHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/json") - if r.Method != "GET" { + if r.Method != http.MethodGet { http.Error(w, "Method is not supported.", http.StatusNotFound) } + + w.Header().Add("Content-Type", "application/json") + addSecureHeaders(&w) var name string + var unit string + params := r.URL.Query() for key, value := range params { switch key { @@ -308,24 +412,31 @@ func coincapHandler(w http.ResponseWriter, r *http.Request) { case "unit": unit = value[0] default: - log.Error().Err(errors.New("Got unexpected parameter.")) + log.Error().Err(errUnexpectedParam) } } priceChan := make(chan priceChanStruct, 1) errChan := make(chan errorChanStruct, 1) - var wg sync.WaitGroup - wg.Add(1) - getPriceFromCoinCap(name, unit, &wg, priceChan, errChan) - wg.Wait() + + var waitGroup sync.WaitGroup + + waitGroup.Add(1) + + ctx, cancel := context.WithTimeout(context.Background(), getTimeout*time.Second) + defer cancel() + + //nolint:contextcheck + getPriceFromCoinCap(ctx, name, unit, &waitGroup, priceChan, errChan) + waitGroup.Wait() select { case err := <-errChan: - if err.hasError != false { + if err.hasError { log.Error().Err(err.err) } default: - log.Error().Err(errors.New("We shouldnt be here")) + log.Error().Err(errBadLogic) } var price priceChanStruct @@ -333,16 +444,29 @@ func coincapHandler(w http.ResponseWriter, r *http.Request) { case priceCh := <-priceChan: price = priceCh default: - log.Fatal().Err(errors.New("We shouldnt be here")) + log.Error().Err(errBadLogic) } - json.NewEncoder(w).Encode(map[string]interface{}{ - "name": price.name, - "price": price.price, - "unit": "USD", - "err": "", - "isSuccessful": true, - }) + responseData := priceResponseData{ + Name: price.name, + Price: price.price, + Unit: "USD", + Err: "", + IsSuccessful: true, + } + + jsonResp, err := json.Marshal(responseData) + if err != nil { + cancel() + //nolint:gocritic + log.Fatal().Err(err) + } + + _, err = w.Write(jsonResp) + if err != nil { + cancel() + log.Fatal().Err(err) + } } func setupLogging() { @@ -350,37 +474,42 @@ func setupLogging() { } func startServer(gracefulWait time.Duration, - handlers []HttpHandler, - serverDeploymentType string, port string) { - r := mux.NewRouter() + handlers []HTTPHandler, + serverDeploymentType string, port string, +) { + route := mux.NewRouter() cfg := &tls.Config{ MinVersion: tls.VersionTLS13, } + srv := &http.Server{ Addr: "0.0.0.0:" + port, - WriteTimeout: time.Second * 15, - ReadTimeout: time.Second * 15, - Handler: r, + WriteTimeout: time.Second * serverTLSWriteTimeout, + ReadTimeout: time.Second * serverTLSReadTimeout, + Handler: route, TLSConfig: cfg, } for i := 0; i < len(handlers); i++ { - r.HandleFunc(handlers[i].name, handlers[i].function) + route.HandleFunc(handlers[i].name, handlers[i].function) } go func() { var certPath, keyPath string - if os.Getenv(serverDeploymentType) == "deployment" { + + switch os.Getenv(serverDeploymentType) { + case "deployment": certPath = "/certs/fullchain1.pem" keyPath = "/certs/privkey1.pem" - } else if os.Getenv(serverDeploymentType) == "test" { + case "test": certPath = "/certs/server.cert" keyPath = "/certs/server.key" - } else { - log.Fatal().Err(errors.New(fmt.Sprintf("unknown deployment kind: %s", serverDeploymentType))) + default: + log.Error().Err(errUnknownDeployment) } + if err := srv.ListenAndServeTLS(certPath, keyPath); err != nil { - log.Fatal().Err(err) + log.Error().Err(err) } }() @@ -388,16 +517,35 @@ func startServer(gracefulWait time.Duration, signal.Notify(c, os.Interrupt) <-c + ctx, cancel := context.WithTimeout(context.Background(), gracefulWait) defer cancel() - srv.Shutdown(ctx) + + if err := srv.Shutdown(ctx); err != nil { + log.Error().Err(err) + } + log.Info().Msg("gracefully shut down the server") } func main() { var gracefulWait time.Duration - flag.DurationVar(&gracefulWait, "gracefulwait", time.Second*15, "the duration to wait during the graceful shutdown") + + var rdb *redis.Client + + flag.DurationVar( + &gracefulWait, + "gracefulwait", + time.Second*defaultGracefulShutdown, + "the duration to wait during the graceful shutdown", + ) + + flagPort := flag.String("port", "8009", "determines the port the server will listen on") + redisDB := flag.Int64("redisdb", 1, "determines the db number") + redisAddress := flag.String("redisaddress", "redis:6379", "determines the address of the redis instance") + redisPassword := flag.String("redispassword", "", "determines the password of the redis db") flag.Parse() + rdb = redis.NewClient(&redis.Options{ Addr: *redisAddress, Password: *redisPassword, @@ -406,10 +554,11 @@ func main() { defer rdb.Close() setupLogging() - var handlerFuncs = []HttpHandler{ + + handlerFuncs := []HTTPHandler{ {name: "/crypto/v1/arb/gecko", function: arbHandler}, {name: "/crypto/v1/arb/coincap", function: coincapHandler}, } - startServer(gracefulWait, handlerFuncs, SERVER_DEPLOYMENT_TYPE, *flagPort) + startServer(gracefulWait, handlerFuncs, serverDeploymentType, *flagPort) } |