package main import ( "bytes" "context" "crypto/hmac" "crypto/sha512" "encoding/hex" "encoding/json" "errors" "flag" "fmt" "io/ioutil" "net/http" "net/url" "os" "os/signal" "strconv" "sync" "time" "github.com/Knetic/govaluate" "github.com/go-redis/redis/v8" "github.com/go-telegram-bot-api/telegram-bot-api" "github.com/gorilla/mux" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) var flagPort = flag.String("port", "8008", "determined the port the sercice runs on") var alertsCheckInterval = flag.Int64("alertinterval", 600., "in seconds, the amount of time between alert checks") var redisAddress = flag.String("redisaddress", "redis:6379", "determines the address of the redis instance") var redisPassword = flag.String("redispassword", "", "determines the password of the redis db") var redisDB = flag.Int64("redisdb", 0, "determines the db number") var botChannelID = flag.Int64("botchannelid", 146328407, "determines the channel id the telgram bot should send messages to") const cryptocomparePriceURL = "https://min-api.cryptocompare.com/data/price?" const changellyURL = "https://api.changelly.com" const TELEGRAM_BOT_TOKEN_ENV_VAR = "TELEGRAM_BOT_TOKEN" const CHANGELLY_API_KEY_ENV_VAR = "CHANGELLY_API_KEY" const CHANGELLY_API_SECRET_ENV_VAR = "CHANGELLY_API_SECRET" var rdb *redis.Client func runTgBot() { // bot := getTgBot() token := os.Getenv(TELEGRAM_BOT_TOKEN_ENV_VAR) bot, err := tgbotapi.NewBotAPI(token[1 : len(token)-1]) if err != nil { log.Error().Err(err) } log.Debug().Msg("authorized on account bot_bloodstalker") update := tgbotapi.NewUpdate(0) update.Timeout = 60 updates, err := bot.GetUpdatesChan(update) if err != nil { log.Error().Err(err) } for update := range updates { if update.Message == nil { continue } log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text) msg := tgbotapi.NewMessage(update.Message.Chat.ID, update.Message.Text) msg.ReplyToMessageID = update.Message.MessageID bot.Send(msg) } } type priceChanStruct struct { name string price float64 } type errorChanStruct struct { hasError bool err error } func sendGetToCryptoCompare( name, unit string, wg *sync.WaitGroup, priceChan chan<- priceChanStruct, errChan chan<- errorChanStruct) { defer wg.Done() params := "fsym=" + url.QueryEscape(name) + "&" + "tsyms=" + url.QueryEscape(unit) path := cryptocomparePriceURL + params resp, err := http.Get(path) if err != nil { priceChan <- priceChanStruct{name: name, price: 0.} errChan <- errorChanStruct{hasError: true, err: err} log.Error().Err(err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { priceChan <- priceChanStruct{name: name, price: 0.} errChan <- errorChanStruct{hasError: true, err: err} log.Error().Err(err) } jsonBody := make(map[string]float64) err = json.Unmarshal(body, &jsonBody) if err != nil { priceChan <- priceChanStruct{name: name, price: 0.} errChan <- errorChanStruct{hasError: true, err: err} log.Error().Err(err) } log.Info().Msg(string(body)) priceChan <- priceChanStruct{name: name, price: jsonBody[unit]} errChan <- errorChanStruct{hasError: false, err: nil} } func priceHandler(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") if r.Method != "GET" { http.Error(w, "Method is not supported.", http.StatusNotFound) } var name string var unit string params := r.URL.Query() for key, value := range params { switch key { case "name": name = value[0] case "unit": unit = value[0] default: log.Error().Err(errors.New("bad parameters for the crypto endpoint.")) } } if name == "" || unit == "" { json.NewEncoder(w).Encode(map[string]interface{}{ "err": "query parameters must include name and unit", "isSuccessful": false}) log.Error().Err(errors.New("query parameters must include name and unit.")) return } var wg sync.WaitGroup priceChan := make(chan priceChanStruct, 1) errChan := make(chan errorChanStruct, 1) defer close(errChan) defer close(priceChan) wg.Add(1) go sendGetToCryptoCompare(name, unit, &wg, priceChan, errChan) wg.Wait() select { case err := <-errChan: if err.hasError != false { log.Error().Err(err.err) } default: log.Error().Err(errors.New("this shouldn't have happened'")) } var price priceChanStruct select { case priceCh := <-priceChan: price = priceCh default: log.Fatal().Err(errors.New("this shouldnt have happened")) } json.NewEncoder(w).Encode(map[string]interface{}{ "name": price.name, "price": price.price, "unit": unit, "err": "", "isSuccessful": true}) } func pairHandler(w http.ResponseWriter, r *http.Request) { var err error w.Header().Add("Content-Type", "application/json") if r.Method != "GET" { http.Error(w, "Method is not supported.", http.StatusNotFound) } var one string var two string var multiplier float64 params := r.URL.Query() for key, value := range params { switch key { case "one": one = value[0] case "two": two = value[0] case "multiplier": multiplier, err = strconv.ParseFloat(value[0], 64) if err != nil { log.Fatal().Err(err) } default: log.Fatal().Err(errors.New("unknown parameters for the pair endpoint.")) } } if one == "" || two == "" || multiplier == 0. { log.Error().Err(errors.New("the query must include one()),two and multiplier")) } var wg sync.WaitGroup priceChan := make(chan priceChanStruct, 2) errChan := make(chan errorChanStruct, 2) defer close(priceChan) defer close(errChan) wg.Add(2) go sendGetToCryptoCompare(one, "USD", &wg, priceChan, errChan) go sendGetToCryptoCompare(two, "USD", &wg, priceChan, errChan) wg.Wait() for i := 0; i < 2; i++ { select { case err := <-errChan: if err.hasError != false { log.Error().Err(err.err) } default: log.Fatal().Err(errors.New("this shouldnt have happened")) } } var priceOne float64 var priceTwo float64 for i := 0; i < 2; i++ { select { case price := <-priceChan: if price.name == one { priceOne = price.price } if price.name == two { priceTwo = price.price } default: log.Fatal().Err(errors.New("this shouldnt have happened")) } } ratio := priceOne * multiplier / priceTwo log.Info().Msg(fmt.Sprintf("%v", ratio)) json.NewEncoder(w).Encode(map[string]interface{}{"ratio": ratio}) } type alertType struct { Name string `json:"name"` Expr string `json:"expr"` } type alertsType struct { Alerts []alertType `json:"alerts"` } func getAlerts() (alertsType, error) { var alerts alertsType ctx := context.Background() keys := rdb.SMembersMap(ctx, "alertkeys") alerts.Alerts = make([]alertType, len(keys.Val())) vals := keys.Val() i := 0 for key := range vals { alert := rdb.Get(ctx, key[6:]) expr, _ := alert.Result() alerts.Alerts[i].Name = key alerts.Alerts[i].Expr = expr i++ } return alerts, nil } func alertManager() { for { alerts, err := getAlerts() if err != nil { log.Error().Err(err) return } log.Info().Msg(fmt.Sprintf("%v", alerts)) for i := range alerts.Alerts { expression, err := govaluate.NewEvaluableExpression(alerts.Alerts[i].Expr) if err != nil { log.Error().Err(err) continue } vars := expression.Vars() parameters := make(map[string]interface{}, len(vars)) var wg sync.WaitGroup priceChan := make(chan priceChanStruct, len(vars)) errChan := make(chan errorChanStruct, len(vars)) defer close(priceChan) defer close(errChan) wg.Add(len(vars)) for i := range vars { go sendGetToCryptoCompare(vars[i], "USD", &wg, priceChan, errChan) } wg.Wait() for i := 0; i < len(vars); i++ { select { case err := <-errChan: if err.hasError != false { log.Printf(err.err.Error()) } default: log.Error().Err(errors.New("this shouldnt have happened")) } } for i := 0; i < len(vars); i++ { select { case price := <-priceChan: parameters[price.name] = price.price default: log.Error().Err(errors.New("this shouldnt have happened")) } } log.Info().Msg(fmt.Sprintf("parameters: %v", parameters)) result, err := expression.Evaluate(parameters) if err != nil { log.Error().Err(err) } var resultBool bool log.Info().Msg(fmt.Sprintf("result: %v", result)) resultBool = result.(bool) if resultBool == true { // bot := getTgBot() token := os.Getenv(TELEGRAM_BOT_TOKEN_ENV_VAR) bot, err := tgbotapi.NewBotAPI(token[1 : len(token)-1]) if err != nil { log.Error().Err(err) } msgText := "notification " + alerts.Alerts[i].Expr + " has been triggered" msg := tgbotapi.NewMessage(*botChannelID, msgText) bot.Send(msg) } } time.Sleep(time.Second * time.Duration(*alertsCheckInterval)) } } type addAlertJSONType struct { Name string `json:"name"` Expr string `json:"expr"` } func handleAlertPost(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") bodyBytes, err := ioutil.ReadAll(r.Body) if err != nil { log.Printf(err.Error()) } var bodyJSON addAlertJSONType json.Unmarshal(bodyBytes, &bodyJSON) if bodyJSON.Name == "" || bodyJSON.Expr == "" { json.NewEncoder(w).Encode(map[string]interface{}{ "isSuccessful": false, "error": "not all parameters are valid."}) log.Fatal().Err(errors.New("not all parameters are valid.")) return } ctx := context.Background() key := "alert:" + bodyJSON.Name rdb.Set(ctx, bodyJSON.Name, bodyJSON.Expr, 0) rdb.SAdd(ctx, "alertkeys", key) json.NewEncoder(w).Encode(map[string]interface{}{ "isSuccessful": true, "error": ""}) } func handleAlertDelete(w http.ResponseWriter, r *http.Request) { var Id string w.Header().Add("Content-Type", "application/json") params := r.URL.Query() for key, value := range params { switch key { case "key": Id = value[0] default: log.Error().Err(errors.New("bad parameters for the crypto endpoint.")) } } if Id == "" { json.NewEncoder(w).Encode(map[string]interface{}{ "isSuccessful": false, "error": "Id parameter is not valid."}) log.Fatal().Err(errors.New("not all parameters are valid.")) return } ctx := context.Background() rdb.Del(ctx, Id) setKey := "alert:" + Id rdb.SRem(ctx, "alertkeys", setKey) log.Printf(setKey) json.NewEncoder(w).Encode(struct { IsSuccessful bool `json:"isSuccessful"` Err string `json:"err"` }{IsSuccessful: true, Err: ""}) } func handleAlertGet(w http.ResponseWriter, r *http.Request) { var Id string w.Header().Add("Content-Type", "application/json") params := r.URL.Query() for key, value := range params { switch key { case "key": Id = value[0] default: log.Error().Err(errors.New("bad parameters for the crypto endpoint.")) } } if Id == "" { json.NewEncoder(w).Encode(map[string]interface{}{ "isSuccessful": false, "error": "Id parameter is not valid."}) log.Fatal().Err(errors.New("not all parameters are valid.")) return } ctx := context.Background() redisResult := rdb.Get(ctx, Id) redisResultString, err := redisResult.Result() if err != nil { log.Err(err) } var ErrorString string if err == nil { ErrorString = "" } else { ErrorString = err.Error() } w.Header().Add("Content-Type", "application/json") json.NewEncoder(w).Encode(struct { IsSuccessful bool `json:"isSuccessful"` Error string `json:"error"` Key string `json:"key"` Expr string `json:"expr"` }{IsSuccessful: true, Error: ErrorString, Key: Id, Expr: redisResultString}) } func alertHandler(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" || r.Method == "PUT" || r.Method == "PATCH" { handleAlertPost(w, r) } else if r.Method == "DELETE" { handleAlertDelete(w, r) } else if r.Method == "GET" { handleAlertGet(w, r) } else { http.Error(w, "Method is not supported.", http.StatusNotFound) } } func exHandler(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") if r.Method != "GET" { http.Error(w, "Method is not supported.", http.StatusNotFound) } apiKey := os.Getenv(CHANGELLY_API_KEY_ENV_VAR) apiSecret := os.Getenv(CHANGELLY_API_SECRET_ENV_VAR) body := struct { Jsonrpc string `json:"jsonrpc"` Id string `json:"id"` Method string `json:"method"` Params []string `json:"params"` }{ Jsonrpc: "2.0", Id: "test", Method: "getCurrencies", Params: nil} bodyJSON, err := json.Marshal(body) if err != nil { log.Error().Err(err) } secretBytes := []byte(apiSecret[1 : len(apiSecret)-1]) mac := hmac.New(sha512.New, secretBytes) mac.Write(bodyJSON) client := &http.Client{} req, err := http.NewRequest("POST", changellyURL, bytes.NewReader(bodyJSON)) if err != nil { log.Error().Err(err) } macDigest := hex.EncodeToString(mac.Sum(nil)) req.Header.Add("Content-Type", "application/json") req.Header.Add("api-key", apiKey[1:len(apiKey)-1]) req.Header.Add("sign", macDigest) resp, err := client.Do(req) if err != nil { log.Error().Err(err) } defer resp.Body.Close() responseBody, err := ioutil.ReadAll(resp.Body) log.Printf(string(responseBody)) responseUnmarshalled := struct { Jsonrpc string `json:"jsonrpc"` Id string `json:"id"` Result []string `json:"result"` }{} err = json.Unmarshal(responseBody, &responseUnmarshalled) if err != nil { log.Error().Err(err) } json.NewEncoder(w).Encode(responseUnmarshalled) } func healthHandler(w http.ResponseWriter, r *http.Request) { var RedisError string var HivedError string IsHivedOk := true var IsRedisOk bool w.Header().Add("Content-Type", "application/json") if r.Method != "GET" { http.Error(w, "Method is not supported.", http.StatusNotFound) } pingCtx := context.Background() pingResponse := rdb.Ping(pingCtx) pingResponseResult, err := pingResponse.Result() if err != nil { log.Err(err) IsRedisOk = false RedisError = err.Error() } else { if pingResponseResult == "PONG" { IsRedisOk = true RedisError = "" } else { IsRedisOk = false RedisError = "redis did not respond PONG to ping" } } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(struct { IsHivedOk bool `json:"isHivedOk"` HivedError string `json:"hivedError"` IsRedisOk bool `json:"isRedisOk"` RedisError string `json:"redisError"` }{IsHivedOk: IsHivedOk, HivedError: HivedError, IsRedisOk: IsRedisOk, RedisError: RedisError}) } func startServer(gracefulWait time.Duration) { r := mux.NewRouter() srv := &http.Server{ Addr: "0.0.0.0:" + *flagPort, WriteTimeout: time.Second * 15, ReadTimeout: time.Second * 15, Handler: r, } r.HandleFunc("/health", healthHandler) r.HandleFunc("/price", priceHandler) r.HandleFunc("/pair", pairHandler) r.HandleFunc("/alert", alertHandler) r.HandleFunc("/ex", exHandler) go func() { if err := srv.ListenAndServe(); err != nil { log.Fatal().Err(err) } }() c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) <-c ctx, cancel := context.WithTimeout(context.Background(), gracefulWait) defer cancel() srv.Shutdown(ctx) log.Info().Msg("gracefully shut down the server") } func setupLogging() { zerolog.TimeFieldFormat = zerolog.TimeFormatUnix } func main() { var gracefulWait time.Duration flag.DurationVar(&gracefulWait, "gracefulwait", time.Second*15, "the duration to wait during the graceful shutdown") flag.Parse() rdb = redis.NewClient(&redis.Options{ Addr: *redisAddress, Password: *redisPassword, DB: int(*redisDB), }) defer rdb.Close() setupLogging() // go runTgBot() go alertManager() startServer(gracefulWait) }