You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

203 lines
5.5 KiB

package main
import (
log "github.com/sirupsen/logrus"
"net/http"
// "github.com/denzs/wsjtx-dashboards/shared/httpstuff"
"github.com/denzs/wsjtx-exporter/shared/wsjtx"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/jnovack/flag"
"os"
"fmt"
"io/ioutil"
"encoding/json"
"runtime"
"time"
)
type LineRequestManager struct {
Requests chan<- LineRequest
}
type LineRequest struct {
Station string
Identifier string
Line string
}
var metricPath string
var mysqlHost string
var mysqlDb string
var mysqlUser string
var mysqlPass string
var mysqlTable string
var listenHttp string
var trace bool
var useProm bool
var listenPrometheus string
var useMysql bool
func usage() {
fmt.Printf("Usage of %s:\n", os.Args[0])
flag.PrintDefaults()
}
func init() {
flag.StringVar(&mysqlHost, "host", "db", "name/ip of mysql host")
flag.StringVar(&mysqlDb, "db", "digimode_stats", "db name")
flag.StringVar(&mysqlUser, "user", "wsjtx", "mysql username")
flag.StringVar(&mysqlPass, "pass", "secret", "mysql password")
flag.StringVar(&mysqlTable, "table", "wsjtx_all_txt", "mysql table name")
flag.StringVar(&metricPath, "metricPath", "/metrics", "path for prometheus metric endpoint")
flag.StringVar(&listenPrometheus, "promport", ":9888", "listening adress for prometheus metric endpoint")
flag.StringVar(&listenHttp, "httpport", ":4000", "listening address for http endpoint")
flag.BoolVar(&useProm, "prometheus", false, "activate prometheus exporter")
flag.BoolVar(&useMysql, "mysql", false, "activate mysql exporter")
flag.BoolVar(&trace, "trace", false, "log almost everything")
flag.Parse()
formatter := &log.TextFormatter{
FullTimestamp: true,
}
log.SetFormatter(formatter)
if trace {
log.SetLevel(log.TraceLevel)
log.Info("trace logging enabled")
} else {
log.Info("normal logging enabled")
}
if !useProm && !useMysql {
usage()
log.Fatal("you have to enable at least one exporter. see -mysql and -prometheus flags")
}
if useProm {
log.Info("prometheus exporter enabled..")
go func () {
server := http.NewServeMux()
server.Handle(metricPath, promhttp.Handler())
http.ListenAndServe(listenPrometheus, server)
} ()
log.Infof("listening on %s%s for prometheus metrics",listenPrometheus, metricPath)
}
if useMysql {
log.Info("mysql exporter enabled..")
// wait for stupid mysql container to come up..
_, db_down := dbConn()
for db_down {
log.Info("waiting for db to come up..")
time.Sleep(2 * time.Second)
_, db_down = dbConn()
}
init_db()
}
}
func handleGetLastRequest(w http.ResponseWriter, r *http.Request) {
xstation := r.Header.Get("X-STATION")
// xproxystation := r.Header.Get("X-PROXY-STATION")
xidentifier := r.Header.Get("X-IDENTIFIER")
fmt.Printf("X-STATION: %s\n", xstation)
fmt.Printf("X-IDENTIFIER: %s\n", xidentifier)
// FIXME check for existence! and compare X-STATION against X-PROXY-STATION!
last, err := getLast(xstation, xidentifier)
if err != nil {
if err.Error() == "not found" {
http.Error(w, err.Error(), 404)
} else {
http.Error(w, err.Error(), 500)
}
}
reply, err := json.Marshal(last)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
fmt.Printf("%s\n", reply)
// return result
w.Header().Set("content-type", "application/json")
w.Write(reply)
}
// takes line and cares about resolving and storing the metrics
func lineParser(linerequests chan LineRequest, id int) {
for linerequest := range linerequests {
// parsen der line
result, found := wsjtx.ScanLine(linerequest.Line)
if found {
log.Tracef("[%d] found stuff in line!", id)
result.Station = linerequest.Station
result.Identifier= linerequest.Identifier
handlePrometheus(result)
handleMysql(result)
} else {
log.Tracef("[%d] didnt find stuff in line..", id)
}
}
}
// returns channel with 8 workers connected
func lineRequestHandler() chan LineRequest {
linerequests := make (chan LineRequest, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
log.Tracef("launching line worker %d", i)
go lineParser(linerequests, i)
}
return linerequests
}
// handles request with one or more lines
func (lrm *LineRequestManager) handleLinesRequest(w http.ResponseWriter, r *http.Request) {
var l Lines
xstation := r.Header.Get("X-STATION")
// xproxystation := r.Header.Get("X-PROXY-STATION")
xidentifier := r.Header.Get("X-IDENTIFIER")
// FIXME check for existence! and compare X-STATION against X-PROXY-STATION!
b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
http.Error(w, err.Error(), 500)
return
}
err = json.Unmarshal(b, &l)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
for i := 0; i < len(l.Text) ; i++ {
log.Tracef("station[%s] identifier[%s] %s", xstation, xidentifier, l.Text[i])
lrm.Requests <- LineRequest{Station: xstation, Identifier: xidentifier, Line: l.Text[i]}
}
w.Write([]byte("thx"))
}
func main() {
lrm := LineRequestManager{}
lrm.Requests = lineRequestHandler()
server := http.NewServeMux()
server.HandleFunc("/lines/insert", lrm.handleLinesRequest)
server.HandleFunc("/timestamp/getLast", handleGetLastRequest)
log.Infof("listening on %s for wsjtx input", listenHttp)
err := http.ListenAndServe(listenHttp, server)
log.Fatal(err)
}