Skip to content

Commit

Permalink
Merge pull request #308 from bonitoo-io/feat/query-params
Browse files Browse the repository at this point in the history
feat: adding flux query parameters
  • Loading branch information
vlastahajek authored Feb 18, 2022
2 parents ab6361d + 82759e0 commit ab68e23
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
mkdir -p /tmp/artifacts
mkdir -p /tmp/test-results
- run: sudo rm -rf /usr/local/go
- run: wget https://golang.org/dl/go1.13.14.linux-amd64.tar.gz -O /tmp/go.tgz
- run: wget https://golang.org/dl/go1.17.7.linux-amd64.tar.gz -O /tmp/go.tgz
- run: sudo tar -C /usr/local -xzf /tmp/go.tgz
- run: go version
- run: go get -v -t -d ./...
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
### Features
- [#304](https://github.com/influxdata/influxdb-client-go/pull/304) Added public constructor for `QueryTableResult`
- [#307](https://github.com/influxdata/influxdb-client-go/pull/307) Synced generated server API with the latest [oss.yml](https://github.com/influxdata/openapi/blob/master/contracts/oss.yml).
- [#308](https://github.com/influxdata/influxdb-client-go/pull/308) Added Flux query parameters. Supported by InfluxDB Cloud only now.
- [#308](https://github.com/influxdata/influxdb-client-go/pull/308) Go 1.17 is required

## 2.7.0[2022-01-20]
### Features
Expand Down
74 changes: 73 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This repository contains the reference Go client for InfluxDB 2.
- [Basic Example](#basic-example)
- [Writes in Detail](#writes)
- [Queries in Detail](#queries)
- [Parametrized Queries](#parametrized-queries)
- [Concurrency](#concurrency)
- [Proxy and redirects](#proxy-and-redirects)
- [Checking Server State](#checking-server-state)
Expand Down Expand Up @@ -61,7 +62,7 @@ There are also other examples in the API docs:
## How To Use

### Installation
**Go 1.13** or later is required.
**Go 1.17** or later is required.

1. Add the client package your to your project dependencies (go.mod).
```sh
Expand Down Expand Up @@ -412,6 +413,77 @@ func main() {
client.Close()
}
```
### Parametrized Queries
InfluxDB Cloud supports [Parameterized Queries](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/)
that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more
reusable and can also be used to help prevent injection attacks.
InfluxDB Cloud inserts the params object into the Flux query as a Flux record named `params`. Use dot or bracket
notation to access parameters in the `params` record in your Flux query. Parameterized Flux queries support only `int`
, `float`, and `string` data types. To convert the supported data types into
other [Flux basic data types, use Flux type conversion functions](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/#supported-parameter-data-types).
Query parameters can be passed as a struct or map. Param values can be only simple types or `time.Time`.
The name of the parameter represented by a struct field can be specified by JSON annotation.
Parameterized query example:
> :warning: Parameterized Queries are supported only in InfluxDB Cloud. There is no support in InfluxDB OSS currently.
```go
package main
import (
"context"
"fmt"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient("http://localhost:8086", "my-token")
// Get query client
queryAPI := client.QueryAPI("my-org")
// Define parameters
parameters := struct {
Start string `json:"start"`
Field string `json:"field"`
Value float64 `json:"value"`
}{
"-1h",
"temperature",
25,
}
// Query with parameters
query := `from(bucket:"my-bucket")
|> range(start: duration(params.start))
|> filter(fn: (r) => r._measurement == "stat")
|> filter(fn: (r) => r._field == params.field)
|> filter(fn: (r) => r._value > params.value)`
// Get result
result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// Access data
fmt.Printf("value: %v\n", result.Record().Value())
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
// Ensures background processes finishes
client.Close()
}
```
### Concurrency
InfluxDB Go Client can be used in a concurrent environment. All its functions are thread-safe.
Expand Down
45 changes: 45 additions & 0 deletions api/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,51 @@ func ExampleQueryAPI_query() {
client.Close()
}

func ExampleQueryAPI_queryWithParams() {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient("http://localhost:8086", "my-token")
// Get query client
queryAPI := client.QueryAPI("my-org")
// Define parameters
parameters := struct {
Start string `json:"start"`
Field string `json:"field"`
Value float64 `json:"value"`
}{
"-1h",
"temperature",
25,
}
// Query with parameters
query := `from(bucket:"my-bucket")
|> range(start: duration(params.start))
|> filter(fn: (r) => r._measurement == "stat")
|> filter(fn: (r) => r._field == params.field)
|> filter(fn: (r) => r._value > params.value)`

// Get result
result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// Access data
fmt.Printf("value: %v\n", result.Record().Value())
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
// Ensures background processes finishes
client.Close()
}

func ExampleQueryAPI_queryRaw() {
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient("http://localhost:8086", "my-token")
Expand Down
163 changes: 142 additions & 21 deletions api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/http"
"net/url"
"path"
"reflect"
"strconv"
"strings"
"sync"
Expand All @@ -43,11 +44,35 @@ const (
)

// QueryAPI provides methods for performing synchronously flux query against InfluxDB server.
//
// Flux query can contain reference to parameters, which must be passed via queryParams.
// it can be a struct or map. Param values can be only simple types or time.Time.
// The name of a struct field or a map key (must be a string) will be a param name.
// The name of the parameter represented by a struct field can be specified by JSON annotation:
//
// type Condition struct {
// Start time.Time `json:"start"`
// Field string `json:"field"`
// Value float64 `json:"value"`
// }
//
// Parameters are then accessed via the Flux params object:
//
// query:= `from(bucket: "environment")
// |> range(start: time(v: params.start))
// |> filter(fn: (r) => r._measurement == "air")
// |> filter(fn: (r) => r._field == params.field)
// |> filter(fn: (r) => r._value > params.value)`
//
type QueryAPI interface {
// QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
// QueryRawWithParams executes flux parametrized query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error)
// Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
Query(ctx context.Context, query string) (*QueryTableResult, error)
// QueryWithParams executes flux parametrized query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error)
}

// NewQueryAPI returns new query client for querying buckets belonging to org
Expand All @@ -58,6 +83,28 @@ func NewQueryAPI(org string, service http2.Service) QueryAPI {
}
}

// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
io.Closer
csvReader *csv.Reader
tablePosition int
tableChanged bool
table *query.FluxTableMetadata
record *query.FluxRecord
err error
}

// NewQueryTableResult returns new QueryTableResult
func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
csvReader := csv.NewReader(rawResponse)
csvReader.FieldsPerRecord = -1
return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
}

// queryAPI implements QueryAPI interface
type queryAPI struct {
org string
Expand All @@ -66,13 +113,32 @@ type queryAPI struct {
lock sync.Mutex
}

// queryBody holds the body for an HTTP query request.
type queryBody struct {
Dialect *domain.Dialect `json:"dialect,omitempty"`
Query string `json:"query"`
Type domain.QueryType `json:"type"`
Params interface{} `json:"params,omitempty"`
}

func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) {
return q.QueryRawWithParams(ctx, query, dialect, nil)
}

func (q *queryAPI) QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error) {
if err := checkParamsType(params); err != nil {
return "", err
}
queryURL, err := q.queryURL()
if err != nil {
return "", err
}
queryType := domain.QueryTypeFlux
qr := domain.Query{Query: query, Type: &queryType, Dialect: dialect}
qr := queryBody{
Query: query,
Type: domain.QueryTypeFlux,
Dialect: dialect,
Params: params,
}
qrJSON, err := json.Marshal(qr)
if err != nil {
return "", err
Expand Down Expand Up @@ -118,13 +184,24 @@ func DefaultDialect() *domain.Dialect {
}

func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult, error) {
return q.QueryWithParams(ctx, query, nil)
}

func (q *queryAPI) QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error) {
var queryResult *QueryTableResult
if err := checkParamsType(params); err != nil {
return nil, err
}
queryURL, err := q.queryURL()
if err != nil {
return nil, err
}
queryType := domain.QueryTypeFlux
qr := domain.Query{Query: query, Type: &queryType, Dialect: DefaultDialect()}
qr := queryBody{
Query: query,
Type: domain.QueryTypeFlux,
Dialect: DefaultDialect(),
Params: params,
}
qrJSON, err := json.Marshal(qr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,25 +249,69 @@ func (q *queryAPI) queryURL() (string, error) {
return q.url, nil
}

// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
io.Closer
csvReader *csv.Reader
tablePosition int
tableChanged bool
table *query.FluxTableMetadata
record *query.FluxRecord
err error
// checkParamsType validates the value is struct with simple type fields
// or a map with key as string and value as a simple type
func checkParamsType(p interface{}) error {
if p == nil {
return nil
}
t := reflect.TypeOf(p)
v := reflect.ValueOf(p)
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
if t.Kind() != reflect.Struct && t.Kind() != reflect.Map {
return fmt.Errorf("cannot use %v as query params", t)
}
switch t.Kind() {
case reflect.Struct:
fields := reflect.VisibleFields(t)
for _, f := range fields {
fv := v.FieldByIndex(f.Index)
t := getFieldType(fv)
if !validParamType(t) {
return fmt.Errorf("cannot use field '%s' of type '%v' as a query param", f.Name, t)
}

}
case reflect.Map:
key := t.Key()
if key.Kind() != reflect.String {
return fmt.Errorf("cannot use map key of type '%v' for query param name", key)
}
for _, k := range v.MapKeys() {
f := v.MapIndex(k)
t := getFieldType(f)
if !validParamType(t) {
return fmt.Errorf("cannot use map value type '%v' as a query param", t)
}
}
}
return nil
}

func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
csvReader := csv.NewReader(rawResponse)
csvReader.FieldsPerRecord = -1
return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
// getFieldType extracts type of value
func getFieldType(v reflect.Value) reflect.Type {
t := v.Type()
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
if t.Kind() == reflect.Interface && !v.IsNil() {
t = reflect.ValueOf(v.Interface()).Type()
}
return t
}

// timeType is the exact type for the Time
var timeType = reflect.TypeOf(time.Time{})

// validParamType validates that t is primitive type or string or interface
func validParamType(t reflect.Type) bool {
return (t.Kind() > reflect.Invalid && t.Kind() < reflect.Complex64) ||
t.Kind() == reflect.String ||
t == timeType
}

// TablePosition returns actual flux table position in the result, or -1 if no table was found yet
Expand Down
Loading

0 comments on commit ab68e23

Please sign in to comment.