aboutsummaryrefslogblamecommitdiffstats
path: root/hived.go
blob: 759c40389c84b5a5ad2628ce5979d4c1ec8dfc3e (plain) (tree)
1
2
3
4
5
6
7
8
9


            
               
                 


                       
                       
                


                   

                  
            
                   

                 
              
 

                                      
                                                                  
                                

                                   


                                                                                     
 



                                                                                                                  
                                                                                                                             

                                                                             
                                                


                                                           
 
                     
 



                                                               
                       
                                    
         
                                                                 
 

                                       
 
                                                  
                       
                                    















                                                                                        
















                                         

                                                         
                                              

                                   

                                                                    
                                    




                                              

                                                                    
                                    




                                             

                                                                    
                                    

         
                                    
 

                                                                       

 
                                                           
                                                          













                                                                              
                                                                                              


                 







                                                                                           











                                                                      
                                                

                 
                                                                            






                                    
                                                                          

         





                                                         



                                                          
                                                          
















                                                                              
                                                    

                         
                                                                                                

                 



                                                                                               










                                                                      




                                                  
                                                        

                         
                                                                                  














                                                      
                                                                                  


                 
                                                 
                                                


                                                                         








                                          

                                      











                                                          

         

                          
 
                     


                                          
                                            

                              
                                                         



                                                                                                  
                                                    
























                                                                                                  
                                                                                                  







                                                                            
                                                                                                  


                                 
                                                                                 

                                                                      
                                                    


                                           
                                                                         

                                                  


                                                                                       
                                               
                                                            
                                 
                                                                                                          
                                                                                  












                                                                             
                                                              
                                                          






                                                
 







                                                                            



                                                     



                                                         

 

                                                                
                                                          


                                        
                           













                                                                                              




                                          
                          






                                                         













































                                                                                              
                                                           
                                                                           


                                        

                                     





                                                                              
                                                        
                                                          


























































                                                                                    

 
                                                            





                                                          



                                                                              


















                                                                         
                                          




                                                                                                      

 






                                                            






                                                     
         




                                              
                                                  














                                                                              

 



                                                        
             



                                                                                                                            






                                             
                      
                        
                         
                                 
 
package main

import (
	"bytes"
	"context"
	"crypto/hmac"
	"crypto/sha512"
	"encoding/hex"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"io/ioutil"
	"net/http"
	"net/url"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"time"

	"github.com/Knetic/govaluate"
	"github.com/go-redis/redis/v8"
	tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api"
	"github.com/gorilla/mux"
	"github.com/rs/zerolog"
	"github.com/rs/zerolog/log"
)

var flagPort = flag.String("port", "8008", "determined the port the sercice runs on")

var alertsCheckInterval = flag.Int64("alertinterval", 600., "in seconds, the amount of time between alert checks")
var redisAddress = flag.String("redisaddress", "redis:6379", "determines the address of the redis instance")
var redisPassword = flag.String("redispassword", "", "determines the password of the redis db")
var redisDB = flag.Int64("redisdb", 0, "determines the db number")
var botChannelID = flag.Int64("botchannelid", 146328407, "determines the channel id the telgram bot should send messages to")

const cryptocomparePriceURL = "https://min-api.cryptocompare.com/data/price?"
const changellyURL = "https://api.changelly.com"
const TELEGRAM_BOT_TOKEN_ENV_VAR = "TELEGRAM_BOT_TOKEN"
const CHANGELLY_API_KEY_ENV_VAR = "CHANGELLY_API_KEY"
const CHANGELLY_API_SECRET_ENV_VAR = "CHANGELLY_API_SECRET"

var rdb *redis.Client

func runTgBot() {
	// bot := getTgBot()
	token := os.Getenv(TELEGRAM_BOT_TOKEN_ENV_VAR)
	bot, err := tgbotapi.NewBotAPI(token[1 : len(token)-1])
	if err != nil {
		log.Error().Err(err)
	}
	log.Debug().Msg("authorized on account bot_bloodstalker")

	update := tgbotapi.NewUpdate(0)
	update.Timeout = 60

	updates, err := bot.GetUpdatesChan(update)
	if err != nil {
		log.Error().Err(err)
	}

	for update := range updates {
		if update.Message == nil {
			continue
		}

		log.Printf("[%s] %s", update.Message.From.UserName, update.Message.Text)

		msg := tgbotapi.NewMessage(update.Message.Chat.ID, update.Message.Text)
		msg.ReplyToMessageID = update.Message.MessageID

		bot.Send(msg)
	}
}

type priceChanStruct struct {
	name  string
	price float64
}

type errorChanStruct struct {
	hasError bool
	err      error
}

func sendGetToCryptoCompare(
	name, unit string,
	wg *sync.WaitGroup,
	priceChan chan<- priceChanStruct,
	errChan chan<- errorChanStruct) {
	defer wg.Done()

	params := "fsym=" + url.QueryEscape(name) + "&" +
		"tsyms=" + url.QueryEscape(unit)
	path := cryptocomparePriceURL + params
	resp, err := http.Get(path)
	if err != nil {
		priceChan <- priceChanStruct{name: name, price: 0.}
		errChan <- errorChanStruct{hasError: true, err: err}
		log.Error().Err(err)
	}
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		priceChan <- priceChanStruct{name: name, price: 0.}
		errChan <- errorChanStruct{hasError: true, err: err}
		log.Error().Err(err)
	}

	jsonBody := make(map[string]float64)
	err = json.Unmarshal(body, &jsonBody)
	if err != nil {
		priceChan <- priceChanStruct{name: name, price: 0.}
		errChan <- errorChanStruct{hasError: true, err: err}
		log.Error().Err(err)
	}

	log.Info().Msg(string(body))

	priceChan <- priceChanStruct{name: name, price: jsonBody[unit]}
	errChan <- errorChanStruct{hasError: false, err: nil}
}

func priceHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Add("Content-Type", "application/json")
	if r.Method != "GET" {
		http.Error(w, "Method is not supported.", http.StatusNotFound)
	}

	var name string
	var unit string
	params := r.URL.Query()
	for key, value := range params {
		switch key {
		case "name":
			name = value[0]
		case "unit":
			unit = value[0]
		default:
			log.Error().Err(errors.New("bad parameters for the crypto endpoint."))
		}
	}

	if name == "" || unit == "" {
		json.NewEncoder(w).Encode(map[string]interface{}{
			"err":          "query parameters must include name and unit",
			"isSuccessful": false})
		log.Error().Err(errors.New("query parameters must include name and unit."))
		return
	}

	var wg sync.WaitGroup
	priceChan := make(chan priceChanStruct, 1)
	errChan := make(chan errorChanStruct, 1)
	defer close(errChan)
	defer close(priceChan)
	wg.Add(1)
	go sendGetToCryptoCompare(name, unit, &wg, priceChan, errChan)
	wg.Wait()

	select {
	case err := <-errChan:
		if err.hasError != false {
			log.Error().Err(err.err)
		}
	default:
		log.Error().Err(errors.New("this shouldn't have happened'"))
	}

	var price priceChanStruct
	select {
	case priceCh := <-priceChan:
		price = priceCh
	default:
		log.Fatal().Err(errors.New("this shouldnt have happened"))
	}

	json.NewEncoder(w).Encode(map[string]interface{}{
		"name":         price.name,
		"price":        price.price,
		"unit":         unit,
		"err":          "",
		"isSuccessful": true})
}

func pairHandler(w http.ResponseWriter, r *http.Request) {
	var err error
	w.Header().Add("Content-Type", "application/json")
	if r.Method != "GET" {
		http.Error(w, "Method is not supported.", http.StatusNotFound)
	}

	var one string
	var two string
	var multiplier float64
	params := r.URL.Query()
	for key, value := range params {
		switch key {
		case "one":
			one = value[0]
		case "two":
			two = value[0]
		case "multiplier":
			multiplier, err = strconv.ParseFloat(value[0], 64)
			if err != nil {
				log.Fatal().Err(err)
			}
		default:
			log.Fatal().Err(errors.New("unknown parameters for the pair endpoint."))
		}
	}

	if one == "" || two == "" || multiplier == 0. {
		log.Error().Err(errors.New("the query must include one()),two and multiplier"))
	}

	var wg sync.WaitGroup
	priceChan := make(chan priceChanStruct, 2)
	errChan := make(chan errorChanStruct, 2)
	defer close(priceChan)
	defer close(errChan)

	wg.Add(2)
	go sendGetToCryptoCompare(one, "USD", &wg, priceChan, errChan)
	go sendGetToCryptoCompare(two, "USD", &wg, priceChan, errChan)
	wg.Wait()

	for i := 0; i < 2; i++ {
		select {
		case err := <-errChan:
			if err.hasError != false {
				log.Error().Err(err.err)
			}
		default:
			log.Fatal().Err(errors.New("this shouldnt have happened"))
		}
	}

	var priceOne float64
	var priceTwo float64
	for i := 0; i < 2; i++ {
		select {
		case price := <-priceChan:
			if price.name == one {
				priceOne = price.price
			}
			if price.name == two {
				priceTwo = price.price
			}
		default:
			log.Fatal().Err(errors.New("this shouldnt have happened"))
		}
	}

	ratio := priceOne * multiplier / priceTwo
	log.Info().Msg(fmt.Sprintf("%v", ratio))
	json.NewEncoder(w).Encode(map[string]interface{}{"ratio": ratio})
}

type alertType struct {
	Name string `json:"name"`
	Expr string `json:"expr"`
}

type alertsType struct {
	Alerts []alertType `json:"alerts"`
}

func getAlerts() (alertsType, error) {
	var alerts alertsType
	ctx := context.Background()
	keys := rdb.SMembersMap(ctx, "alertkeys")
	alerts.Alerts = make([]alertType, len(keys.Val()))
	vals := keys.Val()

	i := 0
	for key := range vals {
		alert := rdb.Get(ctx, key[6:])
		expr, _ := alert.Result()
		alerts.Alerts[i].Name = key
		alerts.Alerts[i].Expr = expr
		i++
	}

	return alerts, nil
}

func alertManager() {
	for {
		alerts, err := getAlerts()
		if err != nil {
			log.Error().Err(err)
			return
		}
		log.Info().Msg(fmt.Sprintf("%v", alerts))

		for i := range alerts.Alerts {
			expression, err := govaluate.NewEvaluableExpression(alerts.Alerts[i].Expr)
			if err != nil {
				log.Error().Err(err)
				continue
			}

			vars := expression.Vars()
			parameters := make(map[string]interface{}, len(vars))

			var wg sync.WaitGroup
			priceChan := make(chan priceChanStruct, len(vars))
			errChan := make(chan errorChanStruct, len(vars))
			defer close(priceChan)
			defer close(errChan)
			wg.Add(len(vars))

			for i := range vars {
				go sendGetToCryptoCompare(vars[i], "USD", &wg, priceChan, errChan)
			}
			wg.Wait()

			for i := 0; i < len(vars); i++ {
				select {
				case err := <-errChan:
					if err.hasError != false {
						log.Printf(err.err.Error())
					}
				default:
					log.Error().Err(errors.New("this shouldnt have happened"))
				}
			}

			for i := 0; i < len(vars); i++ {
				select {
				case price := <-priceChan:
					parameters[price.name] = price.price
				default:
					log.Error().Err(errors.New("this shouldnt have happened"))
				}
			}

			log.Info().Msg(fmt.Sprintf("parameters: %v", parameters))
			result, err := expression.Evaluate(parameters)
			if err != nil {
				log.Error().Err(err)
			}

			var resultBool bool
			log.Info().Msg(fmt.Sprintf("result: %v", result))
			resultBool = result.(bool)
			if resultBool == true {
				// bot := getTgBot()
				token := os.Getenv(TELEGRAM_BOT_TOKEN_ENV_VAR)
				bot, err := tgbotapi.NewBotAPI(token[1 : len(token)-1])
				if err != nil {
					log.Error().Err(err)
				}
				msgText := "notification " + alerts.Alerts[i].Expr + " has been triggered"
				msg := tgbotapi.NewMessage(*botChannelID, msgText)
				bot.Send(msg)
			}
		}

		time.Sleep(time.Second * time.Duration(*alertsCheckInterval))
	}
}

type addAlertJSONType struct {
	Name string `json:"name"`
	Expr string `json:"expr"`
}

func handleAlertPost(w http.ResponseWriter, r *http.Request) {
	w.Header().Add("Content-Type", "application/json")
	bodyBytes, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf(err.Error())
	}

	var bodyJSON addAlertJSONType
	json.Unmarshal(bodyBytes, &bodyJSON)

	if bodyJSON.Name == "" || bodyJSON.Expr == "" {
		json.NewEncoder(w).Encode(map[string]interface{}{
			"isSuccessful": false,
			"error":        "not all parameters are valid."})
		log.Fatal().Err(errors.New("not all parameters are valid."))
		return
	}

	ctx := context.Background()
	key := "alert:" + bodyJSON.Name
	rdb.Set(ctx, bodyJSON.Name, bodyJSON.Expr, 0)
	rdb.SAdd(ctx, "alertkeys", key)
	json.NewEncoder(w).Encode(map[string]interface{}{
		"isSuccessful": true,
		"error":        ""})

}

func handleAlertDelete(w http.ResponseWriter, r *http.Request) {
	var Id string
	w.Header().Add("Content-Type", "application/json")
	params := r.URL.Query()
	for key, value := range params {
		switch key {
		case "key":
			Id = value[0]
		default:
			log.Error().Err(errors.New("bad parameters for the crypto endpoint."))
		}
	}

	if Id == "" {
		json.NewEncoder(w).Encode(map[string]interface{}{
			"isSuccessful": false,
			"error":        "Id parameter is not valid."})
		log.Fatal().Err(errors.New("not all parameters are valid."))
		return
	}

	ctx := context.Background()

	rdb.Del(ctx, Id)
	setKey := "alert:" + Id
	rdb.SRem(ctx, "alertkeys", setKey)
	log.Printf(setKey)

	json.NewEncoder(w).Encode(struct {
		IsSuccessful bool   `json:"isSuccessful"`
		Err          string `json:"err"`
	}{IsSuccessful: true, Err: ""})
}

func handleAlertGet(w http.ResponseWriter, r *http.Request) {
	var Id string
	w.Header().Add("Content-Type", "application/json")
	params := r.URL.Query()
	for key, value := range params {
		switch key {
		case "key":
			Id = value[0]
		default:
			log.Error().Err(errors.New("bad parameters for the crypto endpoint."))
		}
	}

	if Id == "" {
		json.NewEncoder(w).Encode(map[string]interface{}{
			"isSuccessful": false,
			"error":        "Id parameter is not valid."})
		log.Fatal().Err(errors.New("not all parameters are valid."))
		return
	}

	ctx := context.Background()

	redisResult := rdb.Get(ctx, Id)
	redisResultString, err := redisResult.Result()
	if err != nil {
		log.Err(err)
	}

	var ErrorString string
	if err == nil {
		ErrorString = ""
	} else {
		ErrorString = err.Error()
	}

	w.Header().Add("Content-Type", "application/json")

	json.NewEncoder(w).Encode(struct {
		IsSuccessful bool   `json:"isSuccessful"`
		Error        string `json:"error"`
		Key          string `json:"key"`
		Expr         string `json:"expr"`
	}{IsSuccessful: true, Error: ErrorString, Key: Id, Expr: redisResultString})
}

func alertHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method == "POST" || r.Method == "PUT" || r.Method == "PATCH" {
		handleAlertPost(w, r)
	} else if r.Method == "DELETE" {
		handleAlertDelete(w, r)
	} else if r.Method == "GET" {
		handleAlertGet(w, r)
	} else {
		http.Error(w, "Method is not supported.", http.StatusNotFound)
	}

}

func exHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Add("Content-Type", "application/json")
	if r.Method != "GET" {
		http.Error(w, "Method is not supported.", http.StatusNotFound)
	}

	apiKey := os.Getenv(CHANGELLY_API_KEY_ENV_VAR)
	apiSecret := os.Getenv(CHANGELLY_API_SECRET_ENV_VAR)

	body := struct {
		Jsonrpc string   `json:"jsonrpc"`
		Id      string   `json:"id"`
		Method  string   `json:"method"`
		Params  []string `json:"params"`
	}{
		Jsonrpc: "2.0",
		Id:      "test",
		Method:  "getCurrencies",
		Params:  nil}

	bodyJSON, err := json.Marshal(body)
	if err != nil {
		log.Error().Err(err)
	}

	secretBytes := []byte(apiSecret[1 : len(apiSecret)-1])
	mac := hmac.New(sha512.New, secretBytes)
	mac.Write(bodyJSON)

	client := &http.Client{}
	req, err := http.NewRequest("POST", changellyURL, bytes.NewReader(bodyJSON))
	if err != nil {
		log.Error().Err(err)
	}

	macDigest := hex.EncodeToString(mac.Sum(nil))
	req.Header.Add("Content-Type", "application/json")
	req.Header.Add("api-key", apiKey[1:len(apiKey)-1])
	req.Header.Add("sign", macDigest)

	resp, err := client.Do(req)
	if err != nil {
		log.Error().Err(err)
	}
	defer resp.Body.Close()

	responseBody, err := ioutil.ReadAll(resp.Body)
	log.Printf(string(responseBody))

	responseUnmarshalled := struct {
		Jsonrpc string   `json:"jsonrpc"`
		Id      string   `json:"id"`
		Result  []string `json:"result"`
	}{}

	err = json.Unmarshal(responseBody, &responseUnmarshalled)
	if err != nil {
		log.Error().Err(err)
	}

	json.NewEncoder(w).Encode(responseUnmarshalled)
}

func healthHandler(w http.ResponseWriter, r *http.Request) {
	var RedisError string
	var HivedError string
	IsHivedOk := true
	var IsRedisOk bool

	w.Header().Add("Content-Type", "application/json")
	if r.Method != "GET" {
		http.Error(w, "Method is not supported.", http.StatusNotFound)
	}

	pingCtx := context.Background()
	pingResponse := rdb.Ping(pingCtx)
	pingResponseResult, err := pingResponse.Result()
	if err != nil {
		log.Err(err)
		IsRedisOk = false
		RedisError = err.Error()
	} else {
		if pingResponseResult == "PONG" {
			IsRedisOk = true
			RedisError = ""
		} else {
			IsRedisOk = false
			RedisError = "redis did not respond PONG to ping"
		}
	}

	w.WriteHeader(http.StatusOK)

	json.NewEncoder(w).Encode(struct {
		IsHivedOk  bool   `json:"isHivedOk"`
		HivedError string `json:"hivedError"`
		IsRedisOk  bool   `json:"isRedisOk"`
		RedisError string `json:"redisError"`
	}{IsHivedOk: IsHivedOk, HivedError: HivedError, IsRedisOk: IsRedisOk, RedisError: RedisError})
}

func robotsHandler(w http.ResponseWriter, r *http.Request) {
	json.NewEncoder(w).Encode(struct {
		UserAgents string `json:"User-Agents"`
		Disallow   string `json:"Disallow"`
	}{"*", "/"})
}

func startServer(gracefulWait time.Duration) {
	r := mux.NewRouter()
	srv := &http.Server{
		Addr:         "0.0.0.0:" + *flagPort,
		WriteTimeout: time.Second * 15,
		ReadTimeout:  time.Second * 15,
		Handler:      r,
	}
	r.HandleFunc("/health", healthHandler)
	r.HandleFunc("/price", priceHandler)
	r.HandleFunc("/pair", pairHandler)
	r.HandleFunc("/alert", alertHandler)
	r.HandleFunc("/ex", exHandler)
	r.HandleFunc("/robots.txt", robotsHandler)

	go func() {
		if err := srv.ListenAndServe(); err != nil {
			log.Fatal().Err(err)
		}
	}()

	c := make(chan os.Signal, 1)

	signal.Notify(c, os.Interrupt)
	<-c
	ctx, cancel := context.WithTimeout(context.Background(), gracefulWait)
	defer cancel()
	srv.Shutdown(ctx)
	log.Info().Msg("gracefully shut down the server")
}

func setupLogging() {
	zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
}

func main() {
	var gracefulWait time.Duration
	flag.DurationVar(&gracefulWait, "gracefulwait", time.Second*15, "the duration to wait during the graceful shutdown")
	flag.Parse()

	rdb = redis.NewClient(&redis.Options{
		Addr:     *redisAddress,
		Password: *redisPassword,
		DB:       int(*redisDB),
	})
	defer rdb.Close()

	setupLogging()
	// go runTgBot()
	go alertManager()
	startServer(gracefulWait)
}