|
@ -30,6 +30,7 @@ var password string |
|
|
var uri string |
|
|
var uri string |
|
|
var minlen int |
|
|
var minlen int |
|
|
var readall bool |
|
|
var readall bool |
|
|
|
|
|
var trace bool |
|
|
|
|
|
|
|
|
func usage() { |
|
|
func usage() { |
|
|
fmt.Printf("Usage of %s:\n", os.Args[0]) |
|
|
fmt.Printf("Usage of %s:\n", os.Args[0]) |
|
@ -44,6 +45,7 @@ func init() { |
|
|
flag.StringVar(&uri, "uri", "http://http-exporter", "uri to your http-exporter") |
|
|
flag.StringVar(&uri, "uri", "http://http-exporter", "uri to your http-exporter") |
|
|
flag.IntVar(&minlen, "minlen", 16, "minimal length for line to be pushed") |
|
|
flag.IntVar(&minlen, "minlen", 16, "minimal length for line to be pushed") |
|
|
flag.BoolVar(&readall, "readall", false, "submit whole file if nothing found at server") |
|
|
flag.BoolVar(&readall, "readall", false, "submit whole file if nothing found at server") |
|
|
|
|
|
flag.BoolVar(&trace, "trace", false, "log almost everything") |
|
|
flag.Parse() |
|
|
flag.Parse() |
|
|
|
|
|
|
|
|
if alltxt == "" || station == "" || identifier == "" || uri == "" { |
|
|
if alltxt == "" || station == "" || identifier == "" || uri == "" { |
|
@ -52,6 +54,13 @@ func init() { |
|
|
os.Exit(1) |
|
|
os.Exit(1) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if trace { |
|
|
|
|
|
log.SetLevel(log.TraceLevel) |
|
|
|
|
|
log.Info("trace logging enabled") |
|
|
|
|
|
} else { |
|
|
|
|
|
log.Info("normal logging enabled") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
formatter := &log.TextFormatter{ |
|
|
formatter := &log.TextFormatter{ |
|
|
FullTimestamp: true, |
|
|
FullTimestamp: true, |
|
|
} |
|
|
} |
|
@ -66,7 +75,7 @@ func init() { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func getTimestampFromDb() (int64, bool) { |
|
|
func getTimestampFromDb() (int64, bool) { |
|
|
log.Trace("doing timestamp stuff..") |
|
|
log.Trace("fetching last report from server") |
|
|
var ts LastTs |
|
|
var ts LastTs |
|
|
|
|
|
|
|
|
client := http.Client{ |
|
|
client := http.Client{ |
|
@ -75,36 +84,32 @@ func getTimestampFromDb() (int64, bool) { |
|
|
|
|
|
|
|
|
req, err := http.NewRequest(http.MethodGet, uri + "/timestamp/getLast?identifier=" + identifier, nil) |
|
|
req, err := http.NewRequest(http.MethodGet, uri + "/timestamp/getLast?identifier=" + identifier, nil) |
|
|
req.SetBasicAuth(station, password) |
|
|
req.SetBasicAuth(station, password) |
|
|
log.Info("done req..") |
|
|
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Fatal(err) |
|
|
log.Errorf("error creating request: %s", err.Error()) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
req.Header.Set("X-Station", station) |
|
|
req.Header.Set("X-Station", station) |
|
|
req.Header.Set("X-Identifier", identifier) |
|
|
req.Header.Set("X-Identifier", identifier) |
|
|
|
|
|
|
|
|
log.Trace("done Headet.Set") |
|
|
|
|
|
|
|
|
|
|
|
res, getErr := client.Do(req) |
|
|
res, getErr := client.Do(req) |
|
|
log.Trace("done client.Do") |
|
|
|
|
|
if getErr != nil { |
|
|
if getErr != nil { |
|
|
log.Fatal(getErr) |
|
|
log.Errorf("error executing http request: %s", getErr.Error()) |
|
|
|
|
|
os.Exit(1) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
switch res.StatusCode { |
|
|
switch res.StatusCode { |
|
|
case 200: |
|
|
case 200: |
|
|
log.Trace("200 OK, everything is fine :)") |
|
|
log.Trace("200 OK") |
|
|
case 401: |
|
|
case 401: |
|
|
log.Panic("Authentication failed! Please check entered station and password..") |
|
|
log.Error("Authentication failed! Please check entered station and password..") |
|
|
|
|
|
os.Exit(1) |
|
|
case 404: |
|
|
case 404: |
|
|
log.Info("Yeah, this seems to be our first contact with the server, as it doesnt have any records for %s:%s", station, identifier) |
|
|
log.Tracef("404 for %s:%s", station, identifier) |
|
|
return 0, false |
|
|
return 0, false |
|
|
case 502: |
|
|
case 502: |
|
|
log.Info("Server reports internal problems (%d).. waiting 30 seconds to give it some time to breathe", res.StatusCode) |
|
|
log.Errorf("Server reports internal problems (%d) please wait 30 seconds before trying again..", res.StatusCode) |
|
|
time.Sleep(30 * time.Second) |
|
|
os.Exit(1) |
|
|
// FIXME what to do instead of panic? smart retry logic in main or even here?
|
|
|
|
|
|
log.Panic("bye! iam going home.. ") |
|
|
|
|
|
default: |
|
|
default: |
|
|
log.Panicf("uh well, there seems to be some serious shit going on: %d",res.StatusCode) |
|
|
log.Panicf("uh well, there seems to be some serious shit going on: %d",res.StatusCode) |
|
|
} |
|
|
} |
|
@ -128,15 +133,11 @@ func getTimestampFromDb() (int64, bool) { |
|
|
log.Fatal(readErr) |
|
|
log.Fatal(readErr) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
log.Info("doing json stuff") |
|
|
|
|
|
jsonErr := json.Unmarshal(body, &ts) |
|
|
jsonErr := json.Unmarshal(body, &ts) |
|
|
log.Info("done json stuff") |
|
|
|
|
|
if jsonErr != nil { |
|
|
if jsonErr != nil { |
|
|
log.Fatal(jsonErr) |
|
|
log.Fatal(jsonErr) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
log.Info("doing timestamp stuff done") |
|
|
|
|
|
|
|
|
|
|
|
return ts.Last, true |
|
|
return ts.Last, true |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -181,23 +182,20 @@ func unix2wsjtx(ts int64) (string) { |
|
|
func postLine(batchmode bool, lines chan string) { |
|
|
func postLine(batchmode bool, lines chan string) { |
|
|
// FIXME implement batchmode
|
|
|
// FIXME implement batchmode
|
|
|
|
|
|
|
|
|
// var l httpstuff.Lines
|
|
|
|
|
|
|
|
|
|
|
|
for { |
|
|
for { |
|
|
select { |
|
|
select { |
|
|
case line := <- lines : |
|
|
case line := <- lines : |
|
|
l := Lines{} |
|
|
l := Lines{} |
|
|
l.Text = append(l.Text, line) |
|
|
l.Text = append(l.Text, line) |
|
|
log.Infof("line: %s", l.Text) |
|
|
log.Tracef("line: %s", l.Text) |
|
|
|
|
|
|
|
|
//var jsonStr = []byte(`{"title":"Buy cheese and bread for breakfast."}`)
|
|
|
|
|
|
jsonStr, err := json.Marshal(l) |
|
|
jsonStr, err := json.Marshal(l) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Errorf("cant marshal json: ", err.Error()) |
|
|
log.Errorf("error marshaling json: ", err.Error()) |
|
|
} |
|
|
} |
|
|
req, err := http.NewRequest("POST", uri + "/lines/insert", bytes.NewBuffer(jsonStr)) |
|
|
req, err := http.NewRequest("POST", uri + "/lines/insert", bytes.NewBuffer(jsonStr)) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Errorf("crazy shit during newrequest", err.Error()) |
|
|
log.Errorf("error creating request: %s", err.Error()) |
|
|
} |
|
|
} |
|
|
req.SetBasicAuth(station, password) |
|
|
req.SetBasicAuth(station, password) |
|
|
req.Header.Set("X-Station", station) |
|
|
req.Header.Set("X-Station", station) |
|
@ -207,32 +205,36 @@ func postLine(batchmode bool, lines chan string) { |
|
|
client := &http.Client{} |
|
|
client := &http.Client{} |
|
|
resp, err := client.Do(req) |
|
|
resp, err := client.Do(req) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Errorf("crazy shit during client", err.Error()) |
|
|
log.Errorf("error executing http request: %s", err.Error()) |
|
|
} |
|
|
} |
|
|
defer resp.Body.Close() |
|
|
|
|
|
|
|
|
|
|
|
log.Trace("response Status: ", resp.Status) |
|
|
// FIXME exit fatal if resposne != 200 OK
|
|
|
log.Trace("response Headers: ", resp.Header) |
|
|
log.Tracef("response: %s", resp.Status) |
|
|
body, _ := ioutil.ReadAll(resp.Body) |
|
|
// body, _ := ioutil.ReadAll(resp.Body)
|
|
|
log.Trace("response Body: ", string(body)) |
|
|
// log.Tracef("response: %s", string(body))
|
|
|
|
|
|
resp.Body.Close() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
log.Trace("goodbye from postLine") |
|
|
log.Trace("goodbye from postLine") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func readFile(offset int64, whence int, done chan bool) { |
|
|
func readFile(offset int64, whence int, done chan bool) { |
|
|
log.Trace("start readFile %s", alltxt) |
|
|
workers := 128 |
|
|
|
|
|
|
|
|
lines := make(chan string ,2) |
|
|
log.Tracef("start reading file") |
|
|
|
|
|
|
|
|
|
|
|
lines := make(chan string, workers) |
|
|
|
|
|
|
|
|
|
|
|
for i := 0; i < workers; i++ { |
|
|
go postLine((whence == 0), lines) |
|
|
go postLine((whence == 0), lines) |
|
|
go postLine((whence == 0), lines) |
|
|
} |
|
|
|
|
|
|
|
|
t, _:= tail.TailFile(alltxt, tail.Config{Follow: true, Location: &tail.SeekInfo{Offset: offset, Whence: whence},}) |
|
|
t, _:= tail.TailFile(alltxt, tail.Config{Follow: true, Location: &tail.SeekInfo{Offset: offset, Whence: whence},}) |
|
|
|
|
|
log.Info("scanning for lines..") |
|
|
for line := range t.Lines { |
|
|
for line := range t.Lines { |
|
|
// FIXME
|
|
|
// FIXME
|
|
|
// at least minlen
|
|
|
|
|
|
// check for logline format!
|
|
|
// check for logline format!
|
|
|
|
|
|
// at least minlen
|
|
|
lines <- line.Text |
|
|
lines <- line.Text |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -248,8 +250,6 @@ func main(){ |
|
|
var whence int |
|
|
var whence int |
|
|
var foundoffset bool |
|
|
var foundoffset bool |
|
|
|
|
|
|
|
|
log.Info("starting up..") |
|
|
|
|
|
|
|
|
|
|
|
offset = 0 |
|
|
offset = 0 |
|
|
|
|
|
|
|
|
if !readall { |
|
|
if !readall { |
|
@ -258,25 +258,25 @@ func main(){ |
|
|
if foundts { |
|
|
if foundts { |
|
|
log.Infof("last timestamp in db: %d", lastDbTime) |
|
|
log.Infof("last timestamp in db: %d", lastDbTime) |
|
|
wt := unix2wsjtx(lastDbTime) |
|
|
wt := unix2wsjtx(lastDbTime) |
|
|
log.Infof("searching %s for: %s",alltxt, wt) |
|
|
log.Infof("searching %s for %s",alltxt, wt) |
|
|
offset, foundoffset = findPosition([]byte(wt)) |
|
|
offset, foundoffset = findPosition([]byte(wt)) |
|
|
if foundoffset { |
|
|
if foundoffset { |
|
|
log.Trace("found %s at offset %d ", wt, offset) |
|
|
log.Infof("found %s at offset %d", wt, offset) |
|
|
whence = 0 |
|
|
whence = 0 |
|
|
} else { |
|
|
} else { |
|
|
log.Warnf("timestamp %s NOT found in %s! have you deleted your ALL.TXT or used an old identifier?", wt, alltxt) |
|
|
log.Warnf("timestamp %s NOT found in %s! have you deleted your ALL.TXT or used an old identifier?", wt, alltxt) |
|
|
log.Warn("sending spots from now on.. consider sending your %s to the database administrator for import", alltxt) |
|
|
log.Warnf("sending spots from now on.. consider sending your %s to the database administrator for import", alltxt) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
log.Warn("api has no data for %s:%s", station, identifier) |
|
|
log.Warnf("api has no data for %s:%s", station, identifier) |
|
|
log.Warn("sending spots from now on.. consider sending your %s to the database administrator for import", alltxt) |
|
|
log.Warnf("sending spots from now on.. consider sending your %s to the database administrator for import", alltxt) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
log.Info("readall mode enabled..") |
|
|
log.Info("readall mode enabled..") |
|
|
whence = 0 |
|
|
whence = 0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
log.Infof("offset: %d whence: %d", offset, whence) |
|
|
log.Tracef("offset: %d whence: %d", offset, whence) |
|
|
|
|
|
|
|
|
done := make(chan bool) |
|
|
done := make(chan bool) |
|
|
go readFile(offset, whence, done) |
|
|
go readFile(offset, whence, done) |
|
|