Browse Source

last revision with local exporters ;)

master 0.1
Sebastian Denz 4 years ago
parent
commit
3e3795ee93
  1. 26
      Readme.md
  2. 48
      cmd/alltxt2csv/main.go
  3. 8
      cmd/pskreporter-exporter/main.go
  4. 10
      cmd/pskreporter-exporter/pskreporter.go
  5. 10
      cmd/wsjtx-exporter/main.go
  6. 6
      doc/alltxt2csv.md
  7. 10
      doc/pskreporter-exporter.md
  8. 20
      doc/wsjtx-exporter.md
  9. 38
      mail.txt
  10. 19
      misc/ALL.TXT.test
  11. 0
      misc/screenshots/screenshot.png
  12. 24
      shared/wsjtx/wsjtx.go
  13. 30
      todo.md

26
Readme.md

@ -13,11 +13,11 @@ this can be very interesting to evaluate your **personal** hf conditions from th
some use cases:
* choose dxcc, band, mode and timeframe in all combinations so see when your were successfully able to receive and transmit, to plan future activities
* use grafana alerts or prometheus alertmanager to send yourself alerts when your stations receives something interesting
* compare input of different station like for antenna comparions
* have nice wallboard for contesting or dxpedition
* display rx/tx data as map with custom timespan and filters
* compare input of different station via flexible queries/graphs
* display rx/tx data on a map with custom timespan and filters
* compare hf propagration forecasts to your personal situation
* use grafana alerts or prometheus alertmanager to send yourself alerts when your stations receives something interesting
* impress other people with cool dashboards ;)
* ...
@ -32,7 +32,7 @@ have fun!
* grafana
* prometheus or mysql
## tooling overview
## repository overview
* **pskreporter-exporter**
* polls pskreporter.info for your callsign
@ -44,6 +44,7 @@ have fun!
* reads whole ALL.txt since 2019 and creates csv-file which can be imported into mysql
* **misc**
* sample dashboards
* sql table definitions
* shell script to hop between bands via rigctl to allow fractional monitoring of multiple bands
* ready to launch docker-compose setup based on https://github.com/stefanprodan/dockprom
@ -51,9 +52,7 @@ have fun!
### what about prometheus and mysql?
show pro/con overview:
### prometheus
#### prometheus
pro
* you get nicer graphs with counters and bars if you ask me
@ -64,7 +63,7 @@ con
* not possible to import historical data
* not as flexible as mysql regarding complex queries
### mysql
#### mysql
pro:
* you can import your ALL.txt since 2019
@ -111,13 +110,13 @@ here be dragons:
### grafana says too many rows for mysql datasource
choose a bigger interval
try choosing a bigger interval
### how long does it take to import my data into mysql?
* my ALL.TXT (new format start july 2019) contains ~ 13.7 mio lines and has ~ 850M
* converting to csv takes ~ 12min on i7-4750HQ (2015) and the result has ~ 1.2G
* currently this is done using another module which uses regular expressions which is not optimial for this use case
* currently this is done using a module which uses regular expressions which is not optimial for this use case
* importing the csv to mysql takes ~ 3.5min
* querying the whole time (~ 1.5 years) in grafana takes some seconds
@ -125,6 +124,13 @@ choose a bigger interval
well, it depends on your queries, but propably yes! ;)
### when i count the decoded messages in wsjt-x manually it doesnt match the prometheus graphs
that is correct! due to the way prometheus get the results it doesnt know the corret timestamp of the event.
rounding/rating does the rest.
..in mysql it is correct!
## howtos
there is no special howto on integrating the components into your existing grafana/prometheus/mysql infrastructure,

48
cmd/alltxt2csv/main.go

@ -10,6 +10,7 @@ import (
"os"
"bufio"
"runtime"
"sync"
// "github.com/mmcloughlin/geohash"
// "github.com/tzneal/ham-go/dxcc"
"github.com/denzs/wsjtx_dashboards/shared/wsjtx"
@ -48,22 +49,38 @@ func init() {
} else {
log.Info("normal logging enabled")
}
_ , err := os.Stat(pathout)
if !os.IsNotExist(err) {
log.Fatalf("file %s already exists..", pathout)
}
func eatline(lines chan string, results chan wsjtx.Result) {
}
func eatline(id int, lines chan string, results chan wsjtx.Result, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("worker %d.. starting..", id)
loop:
for {
select {
case line := <- lines :
case line, more := <- lines :
if more {
result, parsed := wsjtx.ScanLine(line)
if parsed {
results <- result
}
} else {
log.Infof("eatline worker[%d]: resultchan seems down.. going home..", id)
break loop
}
}
}
log.Infof("worker %d.. done!", id)
return
}
func eatfile(results chan wsjtx.Result) {
func eatfile(results chan wsjtx.Result, done chan bool) {
var wg sync.WaitGroup
log.Info("starting eating file, please wait..")
filein, err := os.Open(pathin)
@ -75,29 +92,34 @@ func eatfile(results chan wsjtx.Result) {
lines := make(chan string,runtime.NumCPU())
for w := 0; w <= runtime.NumCPU(); w++ {
go eatline(lines, results)
wg.Add(1)
go eatline(w, lines, results, &wg)
}
i := 0
for scanner.Scan() {
i++
if i % 1000000 == 0 {
log.Infof("%d lines parsed..", i)
log.Infof("%d lines read..", i)
}
log.Info("line: ", scanner.Text())
lines <- scanner.Text()
}
log.Info("sending children to bed..")
close(lines)
wg.Wait()
log.Info("children are in bed now! ;)")
filein.Close()
done <- true
log.Info("done.. eatfile")
return
}
func main(){
_ , err := os.Stat(pathout)
if !os.IsNotExist(err) {
log.Fatalf("file %s already exists..", pathout)
}
fileout, err := os.OpenFile(pathout,os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
log.Fatal(err)
@ -106,9 +128,11 @@ func main(){
writer := bufio.NewWriter(fileout)
results := make(chan wsjtx.Result,runtime.NumCPU())
done := make(chan bool)
go eatfile(results)
go eatfile(results, done)
loop:
for {
select {
case result := <- results :
@ -116,6 +140,8 @@ func main(){
if err != nil {
log.Warn(err)
}
case <- done :
break loop
}
}
writer.Flush()

8
cmd/pskreporter-exporter/main.go

@ -33,11 +33,11 @@ func usage() {
func init() {
flag.StringVar(&station, "station", "", "callsign to monitor on pskreporter")
flag.StringVar(&mysql_host, "dbhost", "db", "name/ip of mysql host")
flag.StringVar(&mysql_host, "host", "db", "name/ip of mysql host")
flag.StringVar(&mysql_db, "db", "digimode_stats", "db name")
flag.StringVar(&mysql_user, "dbuser", "wsjtx", "mysql username")
flag.StringVar(&mysql_pass, "dbpass", "secret", "mysql password")
flag.StringVar(&mysql_table, "dbtable", "pskreporter_stats", "mysql table name")
flag.StringVar(&mysql_user, "user", "wsjtx", "mysql username")
flag.StringVar(&mysql_pass, "pass", "secret", "mysql password")
flag.StringVar(&mysql_table, "table", "pskreporter_stats", "mysql table name")
flag.StringVar(&metricpath, "metricpath", "/metrics", "path for prometheus metric endpoint")
flag.IntVar(&port, "port", 2113, "port for prometheus metric endpoint")
flag.BoolVar(&useProm, "prometheus", false, "activate prometheus exporter")

10
cmd/pskreporter-exporter/pskreporter.go

@ -128,6 +128,12 @@ func GetBand(freq float64) (string){
if (freq>28000000 && freq<30000000) {
band = "10m"
}
if (freq>144000000 && freq<145000000) {
band = "2m"
}
if (freq>432000000 && freq<433000000) {
band = "70cm"
}
return band
}
@ -180,9 +186,9 @@ func fetchReports(reports chan report) {
for {
if lastseqno == 0 {
url = "https://retrieve.pskreporter.info/query?senderCallsign=" + station + "&rronly=true"
url = "https://retrieve.pskreporter.info/query?senderCallsign=" + station + "&rronly=true&appcontact=git@ixyd.net"
} else {
url = "https://retrieve.pskreporter.info/query?senderCallsign=" + station + "&lastseqno=" + fmt.Sprintf("%d",lastseqno) + "&rronly=true"
url = "https://retrieve.pskreporter.info/query?senderCallsign=" + station + "&lastseqno=" + fmt.Sprintf("%d",lastseqno) + "&rronly=true&appcontact=git@ixyd.net"
}
log.WithFields(log.Fields{"url":url}).Debug("fetching reports")
resp, err := http.Get(url)

10
cmd/wsjtx-exporter/main.go

@ -33,12 +33,12 @@ func usage() {
func init() {
flag.StringVar(&station, "station", "localstation", "your callsign or wsjtx instance identifier")
flag.StringVar(&pathin, "pathin", "/wsjtx/ALL.TXT", "path to WSJT-X ALL.TXT")
flag.StringVar(&mysql_host, "dbhost", "db", "name/ip of mysql host")
flag.StringVar(&pathin, "in", "/wsjtx/ALL.TXT", "path to WSJT-X ALL.TXT")
flag.StringVar(&mysql_host, "host", "db", "name/ip of mysql host")
flag.StringVar(&mysql_db, "db", "digimode_stats", "db name")
flag.StringVar(&mysql_user, "dbuser", "wsjtx", "mysql username")
flag.StringVar(&mysql_pass, "dbpass", "secret", "mysql password")
flag.StringVar(&mysql_table, "dbtable", "wsjtx_all_txt", "mysql table name")
flag.StringVar(&mysql_user, "user", "wsjtx", "mysql username")
flag.StringVar(&mysql_pass, "pass", "secret", "mysql password")
flag.StringVar(&mysql_table, "table", "wsjtx_all_txt", "mysql table name")
flag.StringVar(&metricpath, "metricpath", "/metrics", "path for prometheus metric endpoint")
flag.IntVar(&port, "port", 2112, "port for prometheus metric endpoint")
flag.BoolVar(&useProm, "prometheus", false, "activate prometheus exporter")

6
doc/alltxt2csv.md

@ -23,5 +23,7 @@ alltxt2csv -in ~/.local/share/WSJT-X/ALL.TXT -out ~/dev/wsjtx_dashboards/import/
* prepare IMPORT.SQL
docker exec -ti db /usr/bin/mysql --local-infile=1 -pverysecret digimode_stats -e "SET GLOBAL local_infile=1;"
docker exec -ti db /usr/bin/mysql --local-infile=1 -pverysecret digimode_stats -e "\. /wsjtx/import/DL3SD.SQL"
```
mysql --local-infile=1 digimode_stats -e "SET GLOBAL local_infile=1;"
mysql --local-infile=1 digimode_stats -e "\. /wsjtx/import/DL3SD.SQL"
```

10
doc/pskreporter-exporter.md

@ -4,19 +4,19 @@ poll pskreporter.info every 5 minutes to stores the results into mysql and/or ex
parameter:
```
Usage of go/bin/pskreporter_exporter:
Usage of /home/ixyd/go/bin/pskreporter-exporter:
-db="digimode_stats": db name
-dbhost="db": name/ip of mysql host
-dbpass="secret": mysql password
-dbtable="pskreporter_stats": mysql table name
-dbuser="wsjtx": mysql username
-debug=false: enable debug logging
-host="db": name/ip of mysql host
-metricpath="/metrics": path for prometheus metric endpoint
-mysql=false: activate mysql exporter
-pass="secret": mysql password
-port=2113: port for prometheus metric endpoint
-prometheus=false: activate prometheus exporter
-station="": callsign to monitor on pskreporter
-table="pskreporter_stats": mysql table name
-trace=false: log almost everything
-user="wsjtx": mysql username
```
unsure about using in combination with gridtracker...

20
doc/wsjtx-exporter.md

@ -4,31 +4,31 @@ follows WSJTX-X ALL.TXT file to store entries in mysql and export metrics for pr
parameters:
```
Usage of go/bin/wsjtx-exporter:
Usage of /home/ixyd/go/bin/wsjtx-exporter:
-db string
db name (default "digimode_stats")
-dbhost string
-host string
name/ip of mysql host (default "db")
-dbpass string
mysql password (default "secret")
-dbtable string
mysql table name (default "wsjtx_all_txt")
-dbuser string
mysql username (default "wsjtx")
-in string
path to WSJT-X ALL.TXT (default "/wsjtx/ALL.TXT")
-metricpath string
path for prometheus metric endpoint (default "/metrics")
-mysql
activate mysql exporter
-pathin string
path to WSJT-X ALL.TXT (default "/wsjtx/ALL.TXT")
-pass string
mysql password (default "secret")
-port int
port for prometheus metric endpoint (default 2112)
-prometheus
activate prometheus exporter
-station string
your callsign or wsjtx instance identifier (default "localstation")
-table string
mysql table name (default "wsjtx_all_txt")
-trace
log almost everything
-user string
mysql username (default "wsjtx")
```
## systemd user unit for linux

38
mail.txt

@ -0,0 +1,38 @@
pjsg-pskmapn@nospam.gladstonefamily.net
Dear OM Philip,
first let me thank youu for your awesome project! I am using pskreporter since 2 years and i really like using
it and i really appreciate the additional value you provide to the hamradio community with your project! :)
At the moment i am building tools to export wsjt-x received messages and even the results from your pskreporter service into mysql and prometheus.
The idea is to have a tool which you can use to analyse at which time you were able to send and receive a specifix dxcc or even callsign.
Basically it is as service as your pskreporter, but with long time history and very flexible queries and visualisations :)
And it allow storing data from multiple stations, so your are to compare the situation from your station with your friends over the last year for a single band for example..
Right now this already doing pretty nice and i plan to publish it on github in the next days/weeks.
But before publishing it i would like to hear your opinion on the integration of your service.
Right now every connected station polls your API every 5 minutes. (at the moment there are only 2 active stations)
I already try to keep the traffic minimal by using the parameters rronly=true and tracking the lastseqno and use it for further requests.
But when running locally on each station i am afraid of generating too much traffic which could maybe affect applications like GridTracker which also query your API and so maybe hit your servers rate limit.
So i am thinking about centralizing the lookups against your API.
So instead of having 2 polling instance which query only their own callsign i am thinking about one instance polling 2 callsigns for example.
But with my current state of knowledge i think i have to run 2 single queries.
-> Is that correct? Or is there a way to ask for more than one callsign with a single query?
-> I also plan to add the parameter appcontact. The idea is to provide my mailaddress as i am the maintainer of the code and not the address of the user right?
If i could make a wish for a feature it would be a way to suscribe for callsigns to 'livestream' incoming events from pskreporter into my application without the 5 minute delay.. but i can imagine that is not trivial!
I just wanted to mention it ;)
I would be very happy if you could say something to my questions and if you have any other feedback i would really appreciate it!
Thanks again for your awesome service! :)
Best regards from germany and stay healthy,
73 de DL3SD / Sebastian Denz

19
misc/ALL.TXT.test

@ -0,0 +1,19 @@
190825_181900 14.074 Rx FT8 -20 0.2 1080 CQ DX E78IW JN93
190825_181900 14.074 Rx FT8 3 0.3 1175 PD5A G4WWG RRR
190825_181900 14.074 Rx FT8 -17 0.9 1227 RU3DF 7X2TT -17
201129_000400 7.074 Rx FT8 12 -0.2 1159 KC2QJB TNX 73
201128_233615 7.074 Rx FT8 -13 0.9 1991 68D5EEF7FD76D3AB1E
201128_222145 7.074 Rx FT8 -14 0.1 2656 TU 73 KD4RH
201128_222145 7.074 Rx FT8 -14 0.1 2656 TU 73 KD4RH
201128_220600 7.074 Rx FT8 -7 0.3 2450 OK1NF <...> -10
x
201128_215015 7.074 Rx FT8 -24 1.7 1781 T48JOI RR73; L06SE <UA3ZCQ> -12
201128_214530 7.074 Rx FT8 -16 -0.9 1498 RT3OAY CI7KFH/P PP07
201128_212245 7.074 Rx FT8 -3 0.2 1296 RR TNX 73 GL
191011_143115 0.000 Rx FT8 -16 0.1 2499 CQ SV1VS KM18
201129_122430 7.074 Rx FT8 0 0.1 2047 CQ S57NRC JN75
201129_122430 7.074 Rx FT8 -7 0.2 1014 PD3BW IR3MD JN55
201129_122430 7.074 Rx FT8 -8 -0.5 947 IQ8BI S52UF JN76
201129_122430 7.074 Rx FT8 -14 0.5 2396 UA9CJM UR0UP KO50

0
screenshot.png → misc/screenshots/screenshot.png

Before

Width:  |  Height:  |  Size: 447 KiB

After

Width:  |  Height:  |  Size: 447 KiB

24
shared/wsjtx/wsjtx.go

@ -79,6 +79,12 @@ func GetBand(freq float64) (string){
if (freq>28 && freq<30) {
band = "10m"
}
if (freq>144 && freq<145) {
band = "2m"
}
if (freq>432 && freq<433){
band = "70cm"
}
return band
}
@ -89,6 +95,12 @@ func ScanLine(line string) (Result, bool) {
result := new(Result)
found := false
// dont fail on too short lines
if len(line) < 16 {
log.WithFields(log.Fields{"line":line}).Error("line too short")
return *result, false
}
// parse only lines in new format, because old format misses band
if line[6] == '_' && line[15] != 'T' {
dataSlice := strings.Fields(line)
@ -98,7 +110,7 @@ func ScanLine(line string) (Result, bool) {
element.Time, err = time.Parse("060102_150405",v)
if err != nil {
log.WithFields(log.Fields{"err":err}).Trace("something went wrong while parsing the timestamp: ",v)
continue
return *result, false
}
case 1:
element.Bandf,_ = strconv.ParseFloat(v,10)
@ -143,12 +155,12 @@ func ScanLine(line string) (Result, bool) {
tmp = strings.Replace(result.Call, "<", "", -1)
result.Call = strings.Replace(tmp, ">", "", -1)
result.Band = GetBand(element.Bandf)
result.Ent, found = dxcc.Lookup(result.Call)
// FIXME result.Grid = qrz.lookup(result.Call) ;) or track in ALL.txt ^^
if(found){
if found && result.Band != "unknown" {
result.Signal = element.Strength
result.Mode = element.Mode
result.Band = GetBand(element.Bandf)
// FIXME
// * get better grid, see above
// * build geohash from better grid with gpsfromgrid(result.Grid)
@ -172,7 +184,13 @@ func ScanLine(line string) (Result, bool) {
}).Trace("successfully parsed line")
return *result, true
} else {
if !found {
log.WithFields(log.Fields{"line":line,"callsign":element.Second}).Trace("cant parse callsign")
} else if result.Band == "unknown" {
log.WithFields(log.Fields{"line":line,"band":result.Band}).Trace("cant parse band")
} else {
log.WithFields(log.Fields{"line":line}).Error("something really strange happened..")
}
}
} else {
// log.WithFields(log.Fields{"line":line}).Info("found old formated line..")

30
todo.md

@ -1,37 +1,37 @@
* wsjtx-exporter
* cqzone und ituzone nicht in db
* -back parameter implementieren
* remove calls metric?
* https://prometheus.io/docs/practices/naming/#labels
* richtiges oder qrz grid tracking
* cant reach database ist bei mir aufgetreten
* adressieren ;)
* systemd user unit
* windows aequivalent??
* trace: sucessfully parsed.. loggt: fields.time
* bei kaputter Zeile:
Nov 26 20:10:47 sebo-OptiPlex-980 wsjtx_exporter[869]: goroutine 12 [running]:
Nov 26 20:10:47 sebo-OptiPlex-980 wsjtx_exporter[869]: github.com/denzs/wsjtx_dashboards/shared/wsjtx.ScanLine(0xc0000a4004, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
* alternativ findposition mechanik implementieren (automatischer rewind auf basis des letzten mysql eintrages)
* smarter move um sich als windows dienst einzutragen
* alltxt2csv
* direkt importfeature wieder aufnehmen..?
* research how to reduce result set effectively in mysql
* primary key um band ergaenzen?
* maybe add https://github.com/grafana/grafana/issues/8341 ?
* database
* propably more indices
* MySQL
* timestamp vs datetime aus sicht von grafana
* create table aus binaries nehmen?
* kontrolle sollte beim db admin liegen
* dafuer check ob table vorhanden!
* prometheus timestamp feature researchen
* doc
* german docs..
* server und/oder skript/readme
* server und/oder skript/readme zum aufsetzen
* fix dashboards
* umgang mit refresh der variablen??
* provide dashboards to grafana
* prometheues metric + value + TIMESTAMP!!!!!!
* vendoring
* add howto for ubuntu/win10