-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance Load Generator to Query Pre-Generated Data Blocks #782
base: master
Are you sure you want to change the base?
Changes from 5 commits
fb07a8f
6337ce7
2748652
7eaa555
d1560eb
8b752ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,6 +92,18 @@ type QueryGroup struct { | |
Step string `yaml:"step,omitempty"` | ||
} | ||
|
||
type BucketConfig struct { | ||
Path string `yaml:"path"` | ||
MinTime int64 `yaml:"minTime"` | ||
MaxTime int64 `yaml:"maxTime"` | ||
} | ||
|
||
type configState struct { | ||
bucketConfig *BucketConfig | ||
Err error | ||
absoluteTime int64 | ||
} | ||
|
||
func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier { | ||
qtype := qg.Type | ||
if qtype == "" { | ||
|
@@ -120,16 +132,59 @@ func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier { | |
} | ||
} | ||
|
||
func (q *Querier) run(wg *sync.WaitGroup) { | ||
// Function to load `minTime` and `maxTime` from bucket-config.yml | ||
func loadKeyConfig() (*BucketConfig, error) { | ||
filePath := "/config/bucket-config.yml" | ||
_, err := os.Stat(filePath) | ||
if os.IsNotExist(err) { | ||
return nil, fmt.Errorf("file not found: %s", filePath) | ||
} | ||
|
||
data, err := os.ReadFile(filePath) | ||
if err != nil { | ||
return nil, fmt.Errorf("error reading file: %w", err) | ||
} | ||
|
||
var bucketConfig BucketConfig | ||
err = yaml.Unmarshal(data, &bucketConfig) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing YAML: %w", err) | ||
} | ||
|
||
return &bucketConfig, nil | ||
} | ||
|
||
func configstate(v *BucketConfig, err error) *configState { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is potentially confusing to have two identifiers which differ only in the case of some letters. |
||
var absolutetime int64 | ||
if v != nil { | ||
absolutetime = v.MaxTime | ||
} | ||
return &configState{ | ||
bucketConfig: v, | ||
Err: err, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the idea is we carry This at least deserves a comment. Maybe an alternative would work, just having |
||
absoluteTime: absolutetime, | ||
} | ||
} | ||
|
||
func (q *Querier) run(wg *sync.WaitGroup, timeBound *configState) { | ||
defer wg.Done() | ||
fmt.Printf("Running querier %s %s for %s\n", q.target, q.name, q.url) | ||
time.Sleep(20 * time.Second) | ||
|
||
for { | ||
start := time.Now() | ||
|
||
runBlockMode := "current" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the idea here is to alternate between current and absolute? |
||
for _, query := range q.queries { | ||
q.query(query.Expr) | ||
if runBlockMode == "current" { | ||
q.query(query.Expr, "current", nil) | ||
} else if timeBound.Err == nil { | ||
q.query(query.Expr, "absolute", timeBound) | ||
} | ||
if runBlockMode == "current" && timeBound.Err == nil { | ||
runBlockMode = "absolute" | ||
} else if timeBound.Err == nil { | ||
runBlockMode = "current" | ||
} | ||
} | ||
|
||
wait := q.interval - time.Since(start) | ||
|
@@ -139,7 +194,7 @@ func (q *Querier) run(wg *sync.WaitGroup) { | |
} | ||
} | ||
|
||
func (q *Querier) query(expr string) { | ||
func (q *Querier) query(expr string, timeMode string, timeBound *configState) { | ||
queryCount.WithLabelValues(q.target, q.name, expr, q.qtype).Inc() | ||
start := time.Now() | ||
|
||
|
@@ -153,9 +208,19 @@ func (q *Querier) query(expr string) { | |
qParams := req.URL.Query() | ||
qParams.Set("query", expr) | ||
if q.qtype == "range" { | ||
qParams.Set("start", fmt.Sprintf("%d", int64(time.Now().Add(-q.start).Unix()))) | ||
qParams.Set("end", fmt.Sprintf("%d", int64(time.Now().Add(-q.end).Unix()))) | ||
qParams.Set("step", q.step) | ||
if timeMode == "current" { | ||
qParams.Set("start", fmt.Sprintf("%d", int64(time.Now().Add(-q.start).Unix()))) | ||
qParams.Set("end", fmt.Sprintf("%d", int64(time.Now().Add(-q.end).Unix()))) | ||
qParams.Set("step", q.step) | ||
} else { | ||
endTime := time.Unix(0, timeBound.bucketConfig.MaxTime*int64(time.Millisecond)) | ||
qParams.Set("start", fmt.Sprintf("%d", int64(endTime.Add(-q.start).Unix()))) | ||
qParams.Set("end", fmt.Sprintf("%d", int64(endTime.Add(-q.end).Unix()))) | ||
qParams.Set("step", q.step) | ||
} | ||
} else if timeMode == "absolute" { | ||
blockinstime := time.Unix(0, timeBound.absoluteTime*int64(time.Millisecond)) | ||
qParams.Set("time", fmt.Sprintf("%d", int64(blockinstime.Unix()))) | ||
} | ||
req.URL.RawQuery = qParams.Encode() | ||
|
||
|
@@ -221,11 +286,17 @@ func main() { | |
|
||
var wg sync.WaitGroup | ||
|
||
bucketConfig, err := loadKeyConfig() | ||
if err != nil { | ||
fmt.Printf("bucket-config.yml file is not present: %v\n", err) | ||
} | ||
timeBound := configstate(bucketConfig, err) | ||
|
||
for i, group := range config.Querier.Groups { | ||
wg.Add(1) | ||
go NewQuerier(i, "pr", prNumber, group).run(&wg) | ||
go NewQuerier(i, "pr", prNumber, group).run(&wg, timeBound) | ||
wg.Add(1) | ||
go NewQuerier(i, "release", prNumber, group).run(&wg) | ||
go NewQuerier(i, "release", prNumber, group).run(&wg, timeBound) | ||
} | ||
|
||
prometheus.MustRegister(queryDuration, queryCount, queryFailCount) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be a parameter, a CLI flag.