From 407305d9b9ae5c2a7d235de32a0451572dae1d05 Mon Sep 17 00:00:00 2001 From: Julie Krasnick <99275379+jckras@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:21:32 -0500 Subject: [PATCH] RSDK-9175: DataSync API Go SDK (#4562) --- app/data_client.go | 627 ++++++++++++++++++-- app/data_client_test.go | 359 +++++++++-- app/viam_client_test.go | 6 +- go.mod | 4 +- go.sum | 8 +- testutils/inject/datasync_service_client.go | 96 +++ 6 files changed, 1003 insertions(+), 97 deletions(-) create mode 100644 testutils/inject/datasync_service_client.go diff --git a/app/data_client.go b/app/data_client.go index 8578eefc9e2..54e7dce2567 100644 --- a/app/data_client.go +++ b/app/data_client.go @@ -3,11 +3,16 @@ package app import ( "context" + "errors" + "fmt" + "os" + "path/filepath" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" pb "go.viam.com/api/app/data/v1" + syncPb "go.viam.com/api/app/datasync/v1" "go.viam.com/utils/rpc" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" @@ -18,9 +23,14 @@ import ( // DataClient implements the DataServiceClient interface. type DataClient struct { - client pb.DataServiceClient + dataClient pb.DataServiceClient + dataSyncClient syncPb.DataSyncServiceClient } +const ( + UploadChunkSize = 64 * 1024 // UploadChunkSize is 64 KB +) + // Order specifies the order in which data is returned. type Order int32 @@ -177,13 +187,111 @@ type DatabaseConnReturn struct { HasDatabaseUser bool } +// DataSyncClient structs + +// SensorMetadata contains the time the sensor data was requested and was received. +type SensorMetadata struct { + TimeRequested time.Time + TimeReceived time.Time + MimeType MimeType + Annotations Annotations +} + +// SensorData contains the contents and metadata for tabular data. +type SensorData struct { + Metadata SensorMetadata + SDStruct map[string]interface{} + SDBinary []byte +} + +// DataType specifies the type of data uploaded. +type DataType int32 + +// DataType constants define the possible DataType options. +const ( + DataTypeUnspecified DataType = iota + DataTypeBinarySensor + DataTypeTabularSensor + DataTypeFile +) + +// MimeType specifies the format of a file being uploaded. +type MimeType int32 + +// MimeType constants define the possible MimeType options. +const ( + MimeTypeUnspecified MimeType = iota + MimeTypeJPEG + MimeTypePNG + MimeTypePCD +) + +// UploadMetadata contains the metadata for binary (image + file) data. +type UploadMetadata struct { + PartID string + ComponentType string + ComponentName string + MethodName string + Type DataType + FileName string + MethodParameters map[string]interface{} + FileExtension string + Tags []string +} + +// FileData contains the contents of binary (image + file) data. +type FileData struct { + Data []byte +} + +// BinaryOptions represents optional parameters for the BinaryDataCaptureUpload method. +type BinaryOptions struct { + Type *DataType + FileName *string + MethodParameters map[string]interface{} + Tags []string + DataRequestTimes *[2]time.Time +} + +// TabularOptions represents optional parameters for the TabularDataCaptureUpload method. +type TabularOptions struct { + Type *DataType + FileName *string + MethodParameters map[string]interface{} + FileExtension *string + Tags []string +} + +// StreamingOptions represents optional parameters for the StreamingDataCaptureUpload method. +type StreamingOptions struct { + ComponentType *string + ComponentName *string + MethodName *string + Type *DataType + FileName *string + MethodParameters map[string]interface{} + Tags []string + DataRequestTimes *[2]time.Time +} + +// FileUploadOptions represents optional parameters for the FileUploadFromPath & FileUploadFromBytes methods. +type FileUploadOptions struct { + ComponentType *string + ComponentName *string + MethodName *string + FileName *string + MethodParameters map[string]interface{} + FileExtension *string + Tags []string +} + // NewDataClient constructs a new DataClient using the connection passed in by the viamClient. -func NewDataClient( - conn rpc.ClientConn, -) *DataClient { +func NewDataClient(conn rpc.ClientConn) *DataClient { d := pb.NewDataServiceClient(conn) + s := syncPb.NewDataSyncServiceClient(conn) return &DataClient{ - client: d, + dataClient: d, + dataSyncClient: s, } } @@ -243,27 +351,6 @@ func captureMetadataFromProto(proto *pb.CaptureMetadata) CaptureMetadata { } } -func captureMetadataToProto(metadata CaptureMetadata) *pb.CaptureMetadata { - methodParams, err := protoutils.ConvertMapToProtoAny(metadata.MethodParameters) - if err != nil { - return nil - } - return &pb.CaptureMetadata{ - OrganizationId: metadata.OrganizationID, - LocationId: metadata.LocationID, - RobotName: metadata.RobotName, - RobotId: metadata.RobotID, - PartName: metadata.PartName, - PartId: metadata.PartID, - ComponentType: metadata.ComponentType, - ComponentName: metadata.ComponentName, - MethodName: metadata.MethodName, - MethodParameters: methodParams, - Tags: metadata.Tags, - MimeType: metadata.MimeType, - } -} - func binaryDataFromProto(proto *pb.BinaryData) BinaryData { return BinaryData{ Binary: proto.Binary, @@ -415,7 +502,7 @@ func (d *DataClient) TabularDataByFilter( countOnly bool, includeInternalData bool, ) (TabularDataReturn, error) { - resp, err := d.client.TabularDataByFilter(ctx, &pb.TabularDataByFilterRequest{ + resp, err := d.dataClient.TabularDataByFilter(ctx, &pb.TabularDataByFilterRequest{ DataRequest: &pb.DataRequest{ Filter: filterToProto(filter), Limit: limit, @@ -432,10 +519,9 @@ func (d *DataClient) TabularDataByFilter( dataArray := []TabularData{} var metadata *pb.CaptureMetadata for _, data := range resp.Data { - if len(resp.Metadata) > 0 && int(data.MetadataIndex) < len(resp.Metadata) { + if int(data.MetadataIndex) < len(resp.Metadata) { metadata = resp.Metadata[data.MetadataIndex] } else { - // Use an empty CaptureMetadata as a fallback metadata = &pb.CaptureMetadata{} } dataArray = append(dataArray, tabularDataFromProto(data, metadata)) @@ -450,7 +536,7 @@ func (d *DataClient) TabularDataByFilter( // TabularDataBySQL queries tabular data with a SQL query. func (d *DataClient) TabularDataBySQL(ctx context.Context, organizationID, sqlQuery string) ([]map[string]interface{}, error) { - resp, err := d.client.TabularDataBySQL(ctx, &pb.TabularDataBySQLRequest{ + resp, err := d.dataClient.TabularDataBySQL(ctx, &pb.TabularDataBySQLRequest{ OrganizationId: organizationID, SqlQuery: sqlQuery, }) @@ -466,7 +552,7 @@ func (d *DataClient) TabularDataBySQL(ctx context.Context, organizationID, sqlQu // TabularDataByMQL queries tabular data with an MQL (MongoDB Query Language) query. func (d *DataClient) TabularDataByMQL(ctx context.Context, organizationID string, mqlbinary [][]byte) ([]map[string]interface{}, error) { - resp, err := d.client.TabularDataByMQL(ctx, &pb.TabularDataByMQLRequest{ + resp, err := d.dataClient.TabularDataByMQL(ctx, &pb.TabularDataByMQLRequest{ OrganizationId: organizationID, MqlBinary: mqlbinary, }) @@ -492,7 +578,7 @@ func (d *DataClient) BinaryDataByFilter( countOnly bool, includeInternalData bool, ) (BinaryDataReturn, error) { - resp, err := d.client.BinaryDataByFilter(ctx, &pb.BinaryDataByFilterRequest{ + resp, err := d.dataClient.BinaryDataByFilter(ctx, &pb.BinaryDataByFilterRequest{ DataRequest: &pb.DataRequest{ Filter: filterToProto(filter), Limit: limit, @@ -519,7 +605,7 @@ func (d *DataClient) BinaryDataByFilter( // BinaryDataByIDs queries binary data and metadata based on given IDs. func (d *DataClient) BinaryDataByIDs(ctx context.Context, binaryIDs []BinaryID) ([]BinaryData, error) { - resp, err := d.client.BinaryDataByIDs(ctx, &pb.BinaryDataByIDsRequest{ + resp, err := d.dataClient.BinaryDataByIDs(ctx, &pb.BinaryDataByIDsRequest{ IncludeBinary: true, BinaryIds: binaryIDsToProto(binaryIDs), }) @@ -536,7 +622,7 @@ func (d *DataClient) BinaryDataByIDs(ctx context.Context, binaryIDs []BinaryID) // DeleteTabularData deletes tabular data older than a number of days, based on the given organization ID. // It returns the number of tabular datapoints deleted. func (d *DataClient) DeleteTabularData(ctx context.Context, organizationID string, deleteOlderThanDays uint32) (uint64, error) { - resp, err := d.client.DeleteTabularData(ctx, &pb.DeleteTabularDataRequest{ + resp, err := d.dataClient.DeleteTabularData(ctx, &pb.DeleteTabularDataRequest{ OrganizationId: organizationID, DeleteOlderThanDays: deleteOlderThanDays, }) @@ -549,7 +635,7 @@ func (d *DataClient) DeleteTabularData(ctx context.Context, organizationID strin // DeleteBinaryDataByFilter deletes binary data based on given filters. // It returns the number of binary datapoints deleted. func (d *DataClient) DeleteBinaryDataByFilter(ctx context.Context, filter Filter) (uint64, error) { - resp, err := d.client.DeleteBinaryDataByFilter(ctx, &pb.DeleteBinaryDataByFilterRequest{ + resp, err := d.dataClient.DeleteBinaryDataByFilter(ctx, &pb.DeleteBinaryDataByFilterRequest{ Filter: filterToProto(filter), IncludeInternalData: true, }) @@ -562,7 +648,7 @@ func (d *DataClient) DeleteBinaryDataByFilter(ctx context.Context, filter Filter // DeleteBinaryDataByIDs deletes binary data based on given IDs. // It returns the number of binary datapoints deleted. func (d *DataClient) DeleteBinaryDataByIDs(ctx context.Context, binaryIDs []BinaryID) (uint64, error) { - resp, err := d.client.DeleteBinaryDataByIDs(ctx, &pb.DeleteBinaryDataByIDsRequest{ + resp, err := d.dataClient.DeleteBinaryDataByIDs(ctx, &pb.DeleteBinaryDataByIDsRequest{ BinaryIds: binaryIDsToProto(binaryIDs), }) if err != nil { @@ -573,7 +659,7 @@ func (d *DataClient) DeleteBinaryDataByIDs(ctx context.Context, binaryIDs []Bina // AddTagsToBinaryDataByIDs adds string tags, unless the tags are already present, to binary data based on given IDs. func (d *DataClient) AddTagsToBinaryDataByIDs(ctx context.Context, tags []string, binaryIDs []BinaryID) error { - _, err := d.client.AddTagsToBinaryDataByIDs(ctx, &pb.AddTagsToBinaryDataByIDsRequest{ + _, err := d.dataClient.AddTagsToBinaryDataByIDs(ctx, &pb.AddTagsToBinaryDataByIDsRequest{ BinaryIds: binaryIDsToProto(binaryIDs), Tags: tags, }) @@ -582,7 +668,7 @@ func (d *DataClient) AddTagsToBinaryDataByIDs(ctx context.Context, tags []string // AddTagsToBinaryDataByFilter adds string tags, unless the tags are already present, to binary data based on the given filter. func (d *DataClient) AddTagsToBinaryDataByFilter(ctx context.Context, tags []string, filter Filter) error { - _, err := d.client.AddTagsToBinaryDataByFilter(ctx, &pb.AddTagsToBinaryDataByFilterRequest{ + _, err := d.dataClient.AddTagsToBinaryDataByFilter(ctx, &pb.AddTagsToBinaryDataByFilterRequest{ Filter: filterToProto(filter), Tags: tags, }) @@ -594,7 +680,7 @@ func (d *DataClient) AddTagsToBinaryDataByFilter(ctx context.Context, tags []str func (d *DataClient) RemoveTagsFromBinaryDataByIDs(ctx context.Context, tags []string, binaryIDs []BinaryID, ) (uint64, error) { - resp, err := d.client.RemoveTagsFromBinaryDataByIDs(ctx, &pb.RemoveTagsFromBinaryDataByIDsRequest{ + resp, err := d.dataClient.RemoveTagsFromBinaryDataByIDs(ctx, &pb.RemoveTagsFromBinaryDataByIDsRequest{ BinaryIds: binaryIDsToProto(binaryIDs), Tags: tags, }) @@ -609,7 +695,7 @@ func (d *DataClient) RemoveTagsFromBinaryDataByIDs(ctx context.Context, func (d *DataClient) RemoveTagsFromBinaryDataByFilter(ctx context.Context, tags []string, filter Filter, ) (uint64, error) { - resp, err := d.client.RemoveTagsFromBinaryDataByFilter(ctx, &pb.RemoveTagsFromBinaryDataByFilterRequest{ + resp, err := d.dataClient.RemoveTagsFromBinaryDataByFilter(ctx, &pb.RemoveTagsFromBinaryDataByFilterRequest{ Filter: filterToProto(filter), Tags: tags, }) @@ -622,7 +708,7 @@ func (d *DataClient) RemoveTagsFromBinaryDataByFilter(ctx context.Context, // TagsByFilter retrieves all unique tags associated with the data that match the specified filter. // It returns the list of these unique tags. func (d *DataClient) TagsByFilter(ctx context.Context, filter Filter) ([]string, error) { - resp, err := d.client.TagsByFilter(ctx, &pb.TagsByFilterRequest{ + resp, err := d.dataClient.TagsByFilter(ctx, &pb.TagsByFilterRequest{ Filter: filterToProto(filter), }) if err != nil { @@ -643,7 +729,7 @@ func (d *DataClient) AddBoundingBoxToImageByID( xMaxNormalized float64, yMaxNormalized float64, ) (string, error) { - resp, err := d.client.AddBoundingBoxToImageByID(ctx, &pb.AddBoundingBoxToImageByIDRequest{ + resp, err := d.dataClient.AddBoundingBoxToImageByID(ctx, &pb.AddBoundingBoxToImageByIDRequest{ BinaryId: binaryIDToProto(binaryID), Label: label, XMinNormalized: xMinNormalized, @@ -663,7 +749,7 @@ func (d *DataClient) RemoveBoundingBoxFromImageByID( bboxID string, binaryID BinaryID, ) error { - _, err := d.client.RemoveBoundingBoxFromImageByID(ctx, &pb.RemoveBoundingBoxFromImageByIDRequest{ + _, err := d.dataClient.RemoveBoundingBoxFromImageByID(ctx, &pb.RemoveBoundingBoxFromImageByIDRequest{ BinaryId: binaryIDToProto(binaryID), BboxId: bboxID, }) @@ -673,7 +759,7 @@ func (d *DataClient) RemoveBoundingBoxFromImageByID( // BoundingBoxLabelsByFilter retrieves all unique string labels for bounding boxes that match the specified filter. // It returns a list of these labels. func (d *DataClient) BoundingBoxLabelsByFilter(ctx context.Context, filter Filter) ([]string, error) { - resp, err := d.client.BoundingBoxLabelsByFilter(ctx, &pb.BoundingBoxLabelsByFilterRequest{ + resp, err := d.dataClient.BoundingBoxLabelsByFilter(ctx, &pb.BoundingBoxLabelsByFilterRequest{ Filter: filterToProto(filter), }) if err != nil { @@ -694,7 +780,7 @@ func (d *DataClient) UpdateBoundingBox(ctx context.Context, xMaxNormalized *float64, // optional yMaxNormalized *float64, // optional ) error { - _, err := d.client.UpdateBoundingBox(ctx, &pb.UpdateBoundingBoxRequest{ + _, err := d.dataClient.UpdateBoundingBox(ctx, &pb.UpdateBoundingBoxRequest{ BinaryId: binaryIDToProto(binaryID), BboxId: bboxID, Label: label, @@ -710,7 +796,7 @@ func (d *DataClient) UpdateBoundingBox(ctx context.Context, // It returns the hostname endpoint, a URI for connecting to the database via MongoDB clients, // and a flag indicating whether a database user is configured for the Viam organization. func (d *DataClient) GetDatabaseConnection(ctx context.Context, organizationID string) (DatabaseConnReturn, error) { - resp, err := d.client.GetDatabaseConnection(ctx, &pb.GetDatabaseConnectionRequest{ + resp, err := d.dataClient.GetDatabaseConnection(ctx, &pb.GetDatabaseConnectionRequest{ OrganizationId: organizationID, }) if err != nil { @@ -729,7 +815,7 @@ func (d *DataClient) ConfigureDatabaseUser( organizationID string, password string, ) error { - _, err := d.client.ConfigureDatabaseUser(ctx, &pb.ConfigureDatabaseUserRequest{ + _, err := d.dataClient.ConfigureDatabaseUser(ctx, &pb.ConfigureDatabaseUserRequest{ OrganizationId: organizationID, Password: password, }) @@ -742,7 +828,7 @@ func (d *DataClient) AddBinaryDataToDatasetByIDs( binaryIDs []BinaryID, datasetID string, ) error { - _, err := d.client.AddBinaryDataToDatasetByIDs(ctx, &pb.AddBinaryDataToDatasetByIDsRequest{ + _, err := d.dataClient.AddBinaryDataToDatasetByIDs(ctx, &pb.AddBinaryDataToDatasetByIDsRequest{ BinaryIds: binaryIDsToProto(binaryIDs), DatasetId: datasetID, }) @@ -755,9 +841,452 @@ func (d *DataClient) RemoveBinaryDataFromDatasetByIDs( binaryIDs []BinaryID, datasetID string, ) error { - _, err := d.client.RemoveBinaryDataFromDatasetByIDs(ctx, &pb.RemoveBinaryDataFromDatasetByIDsRequest{ + _, err := d.dataClient.RemoveBinaryDataFromDatasetByIDs(ctx, &pb.RemoveBinaryDataFromDatasetByIDsRequest{ BinaryIds: binaryIDsToProto(binaryIDs), DatasetId: datasetID, }) return err } + +func uploadMetadataToProto(metadata UploadMetadata) *syncPb.UploadMetadata { + var methodParams map[string]*anypb.Any + if metadata.MethodParameters != nil { + var err error + methodParams, err = protoutils.ConvertMapToProtoAny(metadata.MethodParameters) + if err != nil { + return nil + } + } + return &syncPb.UploadMetadata{ + PartId: metadata.PartID, + ComponentType: metadata.ComponentType, + ComponentName: metadata.ComponentName, + MethodName: metadata.MethodName, + Type: syncPb.DataType(metadata.Type), + FileName: metadata.FileName, + MethodParameters: methodParams, + FileExtension: metadata.FileExtension, + Tags: metadata.Tags, + } +} + +func annotationsToProto(annotations Annotations) *pb.Annotations { + var protoBboxes []*pb.BoundingBox + for _, bbox := range annotations.Bboxes { + protoBboxes = append(protoBboxes, &pb.BoundingBox{ + Id: bbox.ID, + Label: bbox.Label, + XMinNormalized: bbox.XMinNormalized, + YMinNormalized: bbox.YMinNormalized, + XMaxNormalized: bbox.XMaxNormalized, + YMaxNormalized: bbox.YMaxNormalized, + }) + } + return &pb.Annotations{ + Bboxes: protoBboxes, + } +} + +func sensorMetadataToProto(metadata SensorMetadata) *syncPb.SensorMetadata { + return &syncPb.SensorMetadata{ + TimeRequested: timestamppb.New(metadata.TimeRequested), + TimeReceived: timestamppb.New(metadata.TimeReceived), + MimeType: syncPb.MimeType(metadata.MimeType), + Annotations: annotationsToProto(metadata.Annotations), + } +} + +// Ensure only one of SDStruct or SDBinary is set. +func validateSensorData(sensorData SensorData) error { + if sensorData.SDStruct != nil && len(sensorData.SDBinary) > 0 { + return errors.New("sensorData cannot have both SDStruct and SDBinary set") + } + return nil +} + +func sensorDataToProto(sensorData SensorData) (*syncPb.SensorData, error) { + if err := validateSensorData(sensorData); err != nil { + return nil, err + } + switch { + case len(sensorData.SDBinary) > 0: + return &syncPb.SensorData{ + Metadata: sensorMetadataToProto(sensorData.Metadata), + Data: &syncPb.SensorData_Binary{ + Binary: sensorData.SDBinary, + }, + }, nil + case sensorData.SDStruct != nil: + pbStruct, err := structpb.NewStruct(sensorData.SDStruct) + if err != nil { + return nil, err + } + return &syncPb.SensorData{ + Metadata: sensorMetadataToProto(sensorData.Metadata), + Data: &syncPb.SensorData_Struct{ + Struct: pbStruct, + }, + }, nil + default: + return nil, errors.New("sensorData must have either SDStruct or SDBinary set") + } +} + +func sensorContentsToProto(sensorContents []SensorData) ([]*syncPb.SensorData, error) { + var protoSensorContents []*syncPb.SensorData + for _, item := range sensorContents { + protoItem, err := sensorDataToProto(item) + if err != nil { + return nil, err // Propagate the error + } + protoSensorContents = append(protoSensorContents, protoItem) + } + return protoSensorContents, nil +} + +func formatFileExtension(fileExt string) string { + if fileExt == "" { + return fileExt + } + if fileExt[0] == '.' { + return fileExt + } + return "." + fileExt +} + +// BinaryDataCaptureUpload uploads the contents and metadata for binary data. +func (d *DataClient) BinaryDataCaptureUpload( + ctx context.Context, + binaryData []byte, + partID string, + componentType string, + componentName string, + methodName string, + fileExtension string, + options *BinaryOptions, +) (string, error) { + var sensorMetadata SensorMetadata + if options.DataRequestTimes != nil && len(options.DataRequestTimes) == 2 { + sensorMetadata = SensorMetadata{ + TimeRequested: options.DataRequestTimes[0], + TimeReceived: options.DataRequestTimes[1], + } + } + sensorData := SensorData{ + Metadata: sensorMetadata, + SDStruct: nil, + SDBinary: binaryData, + } + metadata := UploadMetadata{ + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: methodName, + Type: DataTypeBinarySensor, + FileExtension: formatFileExtension(fileExtension), + } + if options.FileName != nil { + metadata.FileName = *options.FileName + } + if options.MethodParameters != nil { + metadata.MethodParameters = options.MethodParameters + } + if options.Tags != nil { + metadata.Tags = options.Tags + } + + response, err := d.dataCaptureUpload(ctx, metadata, []SensorData{sensorData}) + if err != nil { + return "", err + } + return response, nil +} + +// TabularDataCaptureUpload uploads the contents and metadata for tabular data. +func (d *DataClient) TabularDataCaptureUpload( + ctx context.Context, + tabularData []map[string]interface{}, + partID string, + componentType string, + componentName string, + methodName string, + dataRequestTimes [][2]time.Time, + options *TabularOptions, +) (string, error) { + if len(dataRequestTimes) != len(tabularData) { + return "", errors.New("dataRequestTimes and tabularData lengths must be equal") + } + var sensorContents []SensorData + for i, tabData := range tabularData { + sensorMetadata := SensorMetadata{} + dates := dataRequestTimes[i] + if len(dates) == 2 { + sensorMetadata.TimeRequested = dates[0] + sensorMetadata.TimeReceived = dates[1] + } + sensorData := SensorData{ + Metadata: sensorMetadata, + SDStruct: tabData, + SDBinary: nil, + } + sensorContents = append(sensorContents, sensorData) + } + metadata := UploadMetadata{ + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: methodName, + Type: DataTypeTabularSensor, + } + + if options.FileName != nil { + metadata.FileName = *options.FileName + } + if options.MethodParameters != nil { + metadata.MethodParameters = options.MethodParameters + } + if options.FileExtension != nil { + metadata.FileExtension = formatFileExtension(*options.FileExtension) + } + if options.Tags != nil { + metadata.Tags = options.Tags + } + response, err := d.dataCaptureUpload(ctx, metadata, sensorContents) + if err != nil { + return "", err + } + return response, nil +} + +// dataCaptureUpload uploads the metadata and contents for either tabular or binary data, +// and returns the file ID associated with the uploaded data and metadata. +func (d *DataClient) dataCaptureUpload(ctx context.Context, metadata UploadMetadata, sensorContents []SensorData) (string, error) { + sensorContentsPb, err := sensorContentsToProto(sensorContents) + if err != nil { + return "", err + } + resp, err := d.dataSyncClient.DataCaptureUpload(ctx, &syncPb.DataCaptureUploadRequest{ + Metadata: uploadMetadataToProto(metadata), + SensorContents: sensorContentsPb, + }) + if err != nil { + return "", err + } + return resp.FileId, nil +} + +// StreamingDataCaptureUpload uploads metadata and streaming binary data in chunks. +func (d *DataClient) StreamingDataCaptureUpload( + ctx context.Context, + data []byte, + partID string, + fileExt string, + options *StreamingOptions, +) (string, error) { + uploadMetadata := UploadMetadata{ + PartID: partID, + Type: DataTypeBinarySensor, + FileExtension: fileExt, + } + if options.ComponentType != nil { + uploadMetadata.ComponentType = *options.ComponentType + } + if options.ComponentName != nil { + uploadMetadata.ComponentName = *options.ComponentName + } + if options.MethodName != nil { + uploadMetadata.MethodName = *options.MethodName + } + if options.FileName != nil { + uploadMetadata.FileName = *options.FileName + } + if options.MethodParameters != nil { + uploadMetadata.MethodParameters = options.MethodParameters + } + if options.Tags != nil { + uploadMetadata.Tags = options.Tags + } + uploadMetadataPb := uploadMetadataToProto(uploadMetadata) + var sensorMetadata SensorMetadata + if options.DataRequestTimes != nil && len(options.DataRequestTimes) == 2 { + sensorMetadata = SensorMetadata{ + TimeRequested: options.DataRequestTimes[0], + TimeReceived: options.DataRequestTimes[1], + } + } + sensorMetadataPb := sensorMetadataToProto(sensorMetadata) + metadata := &syncPb.DataCaptureUploadMetadata{ + UploadMetadata: uploadMetadataPb, + SensorMetadata: sensorMetadataPb, + } + // establish a streaming connection. + stream, err := d.dataSyncClient.StreamingDataCaptureUpload(ctx) + if err != nil { + return "", err + } + // send the metadata as the first packet. + metaReq := &syncPb.StreamingDataCaptureUploadRequest{ + UploadPacket: &syncPb.StreamingDataCaptureUploadRequest_Metadata{ + Metadata: metadata, + }, + } + if err := stream.Send(metaReq); err != nil { + return "", err + } + + // send the binary data in chunks. + for start := 0; start < len(data); start += UploadChunkSize { + end := start + UploadChunkSize + if end > len(data) { + end = len(data) + } + dataReq := &syncPb.StreamingDataCaptureUploadRequest{ + UploadPacket: &syncPb.StreamingDataCaptureUploadRequest_Data{ + Data: data[start:end], + }, + } + if err := stream.Send(dataReq); err != nil { + return "", err + } + } + // close the stream and get the response. + resp, err := stream.CloseAndRecv() + if err != nil { + return "", err + } + return resp.FileId, nil +} + +// FileUploadFromBytes uploads the contents and metadata for binary data such as encoded images or other data represented by bytes +// and returns the file id of the uploaded data. +func (d *DataClient) FileUploadFromBytes( + ctx context.Context, + partID string, + data []byte, + opts *FileUploadOptions, +) (string, error) { + metadata := &syncPb.UploadMetadata{ + PartId: partID, + Type: syncPb.DataType_DATA_TYPE_FILE, + } + if opts.MethodParameters != nil { + methodParams, err := protoutils.ConvertMapToProtoAny(opts.MethodParameters) + if err != nil { + return "", err + } + metadata.MethodParameters = methodParams + } + if opts.ComponentType != nil { + metadata.ComponentType = *opts.ComponentType + } + if opts.ComponentName != nil { + metadata.ComponentName = *opts.ComponentName + } + if opts.MethodName != nil { + metadata.MethodName = *opts.MethodName + } + if opts.FileName != nil { + metadata.FileName = *opts.FileName + } + if opts.FileExtension != nil { + metadata.FileExtension = formatFileExtension(*opts.FileExtension) + } + if opts.Tags != nil { + metadata.Tags = opts.Tags + } + return d.fileUploadStreamResp(metadata, data) +} + +// FileUploadFromPath uploads the contents and metadata for binary data created from a filepath +// and returns the file id of the uploaded data. +func (d *DataClient) FileUploadFromPath( + ctx context.Context, + partID string, + filePath string, + opts *FileUploadOptions, +) (string, error) { + metadata := &syncPb.UploadMetadata{ + PartId: partID, + Type: syncPb.DataType_DATA_TYPE_FILE, + } + if opts.MethodParameters != nil { + methodParams, err := protoutils.ConvertMapToProtoAny(opts.MethodParameters) + if err != nil { + return "", err + } + metadata.MethodParameters = methodParams + } + if opts.ComponentType != nil { + metadata.ComponentType = *opts.ComponentType + } + if opts.ComponentName != nil { + metadata.ComponentName = *opts.ComponentName + } + if opts.MethodName != nil { + metadata.MethodName = *opts.MethodName + } + if opts.FileExtension != nil { + metadata.FileExtension = formatFileExtension(*opts.FileExtension) + } + if opts.Tags != nil { + metadata.Tags = opts.Tags + } + if opts.FileName != nil { + metadata.FileName = *opts.FileName + } else if filePath != "" { + metadata.FileName = filepath.Base(filePath) + metadata.FileExtension = filepath.Ext(filePath) + } + + var data []byte + // Prepare file data from filepath + if filePath != "" { + //nolint:gosec + fileData, err := os.ReadFile(filePath) + if err != nil { + return "", err + } + data = fileData + } + return d.fileUploadStreamResp(metadata, data) +} + +func (d *DataClient) fileUploadStreamResp(metadata *syncPb.UploadMetadata, data []byte) (string, error) { + // establish a streaming connection. + stream, err := d.dataSyncClient.FileUpload(context.Background()) + if err != nil { + return "", err + } + // send the metadata as the first packet. + metaReq := &syncPb.FileUploadRequest{ + UploadPacket: &syncPb.FileUploadRequest_Metadata{ + Metadata: metadata, + }, + } + if err := stream.Send(metaReq); err != nil { + return "", fmt.Errorf("failed to send metadata: %w", err) + } + // send file contents in chunks + for start := 0; start < len(data); start += UploadChunkSize { + end := start + UploadChunkSize + if end > len(data) { + end = len(data) + } + dataReq := &syncPb.FileUploadRequest{ + UploadPacket: &syncPb.FileUploadRequest_FileContents{ + FileContents: &syncPb.FileData{ + Data: data[start:end], + }, + }, + } + if err := stream.Send(dataReq); err != nil { + return "", err + } + } + // close stream and get response + resp, err := stream.CloseAndRecv() + if err != nil { + return "", err + } + return resp.FileId, nil +} diff --git a/app/data_client_test.go b/app/data_client_test.go index 5a3a22468a0..7e89aa6d328 100644 --- a/app/data_client_test.go +++ b/app/data_client_test.go @@ -2,16 +2,19 @@ package app import ( "context" + "os" "testing" "time" "go.mongodb.org/mongo-driver/bson" pb "go.viam.com/api/app/data/v1" + syncPb "go.viam.com/api/app/datasync/v1" "go.viam.com/test" utils "go.viam.com/utils/protoutils" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" + "go.viam.com/rdk/protoutils" "go.viam.com/rdk/testutils/inject" ) @@ -31,15 +34,18 @@ const ( bboxLabel = "bbox_label" tag = "tag" fileName = "file_name" - fileExt = "file_ext.ext" + fileExt = ".ext" datasetID = "dataset_id" binaryMetaID = "binary_id" mongodbURI = "mongo_uri" hostName = "host_name" last = "last" + fileID = "file_id" ) var ( + binaryDataType = DataTypeBinarySensor + tabularDataType = DataTypeTabularSensor locationIDs = []string{locationID} orgIDs = []string{organizationID} mimeTypes = []string{mimeType} @@ -48,6 +54,7 @@ var ( tags = []string{tag} startTime = time.Now().UTC().Round(time.Millisecond) endTime = time.Now().UTC().Round(time.Millisecond) + dataRequestTimes = [2]time.Time{startTime, endTime} count = uint64(5) limit = uint64(5) countOnly = true @@ -55,6 +62,32 @@ var ( data = map[string]interface{}{ "key": "value", } + fileNamePtr = fileName + fileExtPtr = fileExt + componentTypePtr = componentType + componentNamePtr = componentName + methodNamePtr = method + tabularMetadata = CaptureMetadata{ + OrganizationID: organizationID, + LocationID: locationID, + RobotName: robotName, + RobotID: robotID, + PartName: partName, + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: method, + MethodParameters: methodParameters, + Tags: tags, + MimeType: mimeType, + } + tabularData = TabularData{ + Data: data, + MetadataIndex: 0, + Metadata: tabularMetadata, + TimeRequested: startTime, + TimeReceived: endTime, + } binaryID = BinaryID{ FileID: "file1", OrganizationID: organizationID, @@ -106,23 +139,6 @@ var ( } ) -func annotationsToProto(annotations Annotations) *pb.Annotations { - var protoBboxes []*pb.BoundingBox - for _, bbox := range annotations.Bboxes { - protoBboxes = append(protoBboxes, &pb.BoundingBox{ - Id: bbox.ID, - Label: bbox.Label, - XMinNormalized: bbox.XMinNormalized, - YMinNormalized: bbox.YMinNormalized, - XMaxNormalized: bbox.XMaxNormalized, - YMaxNormalized: bbox.YMaxNormalized, - }) - } - return &pb.Annotations{ - Bboxes: protoBboxes, - } -} - func binaryDataToProto(binaryData BinaryData) *pb.BinaryData { return &pb.BinaryData{ Binary: binaryData.Binary, @@ -130,6 +146,27 @@ func binaryDataToProto(binaryData BinaryData) *pb.BinaryData { } } +func captureMetadataToProto(metadata CaptureMetadata) *pb.CaptureMetadata { + methodParams, err := protoutils.ConvertMapToProtoAny(metadata.MethodParameters) + if err != nil { + return nil + } + return &pb.CaptureMetadata{ + OrganizationId: metadata.OrganizationID, + LocationId: metadata.LocationID, + RobotName: metadata.RobotName, + RobotId: metadata.RobotID, + PartName: metadata.PartName, + PartId: metadata.PartID, + ComponentType: metadata.ComponentType, + ComponentName: metadata.ComponentName, + MethodName: metadata.MethodName, + MethodParameters: methodParams, + Tags: metadata.Tags, + MimeType: metadata.MimeType, + } +} + func binaryMetadataToProto(binaryMetadata BinaryMetadata) *pb.BinaryMetadata { return &pb.BinaryMetadata{ Id: binaryMetadata.ID, @@ -157,9 +194,13 @@ func createGrpcClient() *inject.DataServiceClient { return &inject.DataServiceClient{} } +func createGrpcDataSyncClient() *inject.DataSyncServiceClient { + return &inject.DataSyncServiceClient{} +} + func TestDataClient(t *testing.T) { grpcClient := createGrpcClient() - client := DataClient{client: grpcClient} + client := DataClient{dataClient: grpcClient} captureInterval := CaptureInterval{ Start: time.Now(), @@ -187,21 +228,6 @@ func TestDataClient(t *testing.T) { DatasetID: datasetID, } - tabularMetadata := CaptureMetadata{ - OrganizationID: organizationID, - LocationID: locationID, - RobotName: robotName, - RobotID: robotID, - PartName: partName, - PartID: partID, - ComponentType: componentType, - ComponentName: componentName, - MethodName: method, - MethodParameters: methodParameters, - Tags: tags, - MimeType: mimeType, - } - binaryMetadata := BinaryMetadata{ ID: binaryMetaID, CaptureMetadata: tabularMetadata, @@ -227,13 +253,6 @@ func TestDataClient(t *testing.T) { } t.Run("TabularDataByFilter", func(t *testing.T) { - tabularData := TabularData{ - Data: data, - MetadataIndex: 0, - Metadata: tabularMetadata, - TimeRequested: startTime, - TimeReceived: endTime, - } dataStruct, _ := utils.StructToStructPb(data) tabularDataPb := &pb.TabularData{ Data: dataStruct, @@ -579,3 +598,261 @@ func TestDataClient(t *testing.T) { client.RemoveBinaryDataFromDatasetByIDs(context.Background(), binaryIDs, datasetID) }) } + +func TestDataSyncClient(t *testing.T) { + grpcClient := createGrpcDataSyncClient() + client := DataClient{dataSyncClient: grpcClient} + + uploadMetadata := UploadMetadata{ + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: method, + Type: DataTypeBinarySensor, + FileName: fileName, + MethodParameters: methodParameters, + FileExtension: fileExt, + Tags: tags, + } + + t.Run("BinaryDataCaptureUpload", func(t *testing.T) { + uploadMetadata.Type = DataTypeBinarySensor + options := BinaryOptions{ + Type: &binaryDataType, + FileName: &fileNamePtr, + MethodParameters: methodParameters, + Tags: tags, + DataRequestTimes: &dataRequestTimes, + } + grpcClient.DataCaptureUploadFunc = func(ctx context.Context, in *syncPb.DataCaptureUploadRequest, + opts ...grpc.CallOption, + ) (*syncPb.DataCaptureUploadResponse, error) { + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + + test.That(t, in.Metadata.PartId, test.ShouldEqual, partID) + test.That(t, in.Metadata.ComponentType, test.ShouldEqual, componentType) + test.That(t, in.Metadata.ComponentName, test.ShouldEqual, componentName) + test.That(t, in.Metadata.MethodName, test.ShouldEqual, method) + test.That(t, in.Metadata.Type, test.ShouldEqual, binaryDataType) + test.That(t, in.Metadata.FileName, test.ShouldEqual, fileName) + test.That(t, in.Metadata.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, in.Metadata.FileExtension, test.ShouldEqual, fileExt) + test.That(t, in.Metadata.Tags, test.ShouldResemble, tags) + + test.That(t, in.SensorContents[0].Metadata.TimeRequested, test.ShouldResemble, timestamppb.New(startTime)) + test.That(t, in.SensorContents[0].Metadata.TimeReceived, test.ShouldResemble, timestamppb.New(endTime)) + dataField, ok := in.SensorContents[0].Data.(*syncPb.SensorData_Binary) + test.That(t, ok, test.ShouldBeTrue) + test.That(t, dataField.Binary, test.ShouldResemble, binaryDataByte) + return &syncPb.DataCaptureUploadResponse{ + FileId: fileID, + }, nil + } + resp, _ := client.BinaryDataCaptureUpload(context.Background(), + binaryDataByte, partID, componentType, componentName, + method, fileExt, &options) + test.That(t, resp, test.ShouldResemble, fileID) + }) + + t.Run("TabularDataCaptureUpload", func(t *testing.T) { + uploadMetadata.Type = DataTypeTabularSensor + dataStruct, _ := utils.StructToStructPb(data) + tabularDataPb := &pb.TabularData{ + Data: dataStruct, + MetadataIndex: 0, + TimeRequested: timestamppb.New(startTime), + TimeReceived: timestamppb.New(endTime), + } + options := TabularOptions{ + Type: &binaryDataType, + FileName: &fileNamePtr, + MethodParameters: methodParameters, + FileExtension: &fileExtPtr, + Tags: tags, + } + grpcClient.DataCaptureUploadFunc = func(ctx context.Context, in *syncPb.DataCaptureUploadRequest, + opts ...grpc.CallOption, + ) (*syncPb.DataCaptureUploadResponse, error) { + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + + test.That(t, in.Metadata.PartId, test.ShouldEqual, partID) + test.That(t, in.Metadata.ComponentType, test.ShouldEqual, componentType) + test.That(t, in.Metadata.ComponentName, test.ShouldEqual, componentName) + test.That(t, in.Metadata.MethodName, test.ShouldEqual, method) + test.That(t, in.Metadata.Type, test.ShouldEqual, tabularDataType) + test.That(t, in.Metadata.FileName, test.ShouldEqual, fileName) + test.That(t, in.Metadata.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, in.Metadata.FileExtension, test.ShouldEqual, fileExt) + test.That(t, in.Metadata.Tags, test.ShouldResemble, tags) + + test.That(t, in.SensorContents[0].Metadata.TimeRequested, test.ShouldResemble, timestamppb.New(startTime)) + test.That(t, in.SensorContents[0].Metadata.TimeReceived, test.ShouldResemble, timestamppb.New(endTime)) + dataField, ok := in.SensorContents[0].Data.(*syncPb.SensorData_Struct) + test.That(t, ok, test.ShouldBeTrue) + test.That(t, dataField.Struct, test.ShouldResemble, tabularDataPb.Data) + return &syncPb.DataCaptureUploadResponse{ + FileId: fileID, + }, nil + } + tabularData := []map[string]interface{}{data} + dataRequestTimes := [][2]time.Time{ + {startTime, endTime}, + } + resp, _ := client.TabularDataCaptureUpload(context.Background(), + tabularData, partID, componentType, componentName, method, + dataRequestTimes, &options) + test.That(t, resp, test.ShouldResemble, fileID) + }) + + t.Run("StreamingDataCaptureUpload", func(t *testing.T) { + options := StreamingOptions{ + ComponentType: &componentTypePtr, + ComponentName: &componentNamePtr, + MethodName: &methodNamePtr, + Type: &binaryDataType, + FileName: &fileNamePtr, + MethodParameters: methodParameters, + Tags: tags, + DataRequestTimes: &dataRequestTimes, + } + // Mock implementation of the streaming client. + mockStream := &inject.DataSyncServiceStreamingDataCaptureUploadClient{ + SendFunc: func(req *syncPb.StreamingDataCaptureUploadRequest) error { + switch packet := req.UploadPacket.(type) { + case *syncPb.StreamingDataCaptureUploadRequest_Metadata: + meta := packet.Metadata + test.That(t, meta.UploadMetadata.PartId, test.ShouldEqual, partID) + test.That(t, meta.UploadMetadata.FileExtension, test.ShouldEqual, fileExt) + test.That(t, meta.UploadMetadata.ComponentType, test.ShouldEqual, componentType) + test.That(t, meta.UploadMetadata.ComponentName, test.ShouldEqual, componentName) + test.That(t, meta.UploadMetadata.MethodName, test.ShouldEqual, method) + test.That(t, meta.UploadMetadata.Tags, test.ShouldResemble, tags) + test.That(t, meta.SensorMetadata.TimeRequested, test.ShouldResemble, timestamppb.New(startTime)) + test.That(t, meta.SensorMetadata.TimeReceived, test.ShouldResemble, timestamppb.New(endTime)) + case *syncPb.StreamingDataCaptureUploadRequest_Data: + test.That(t, packet.Data, test.ShouldResemble, binaryDataByte) + default: + t.Errorf("unexpected packet type: %T", packet) + } + return nil + }, + CloseAndRecvFunc: func() (*syncPb.StreamingDataCaptureUploadResponse, error) { + return &syncPb.StreamingDataCaptureUploadResponse{ + FileId: fileID, + }, nil + }, + } + grpcClient.StreamingDataCaptureUploadFunc = func(ctx context.Context, + opts ...grpc.CallOption, + ) (syncPb.DataSyncService_StreamingDataCaptureUploadClient, error) { + return mockStream, nil + } + resp, err := client.StreamingDataCaptureUpload(context.Background(), binaryDataByte, partID, fileExt, &options) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp, test.ShouldEqual, fileID) + }) + t.Run("FileUploadFromBytes", func(t *testing.T) { + options := FileUploadOptions{ + ComponentType: &componentTypePtr, + ComponentName: &componentNamePtr, + MethodName: &methodNamePtr, + FileName: &fileNamePtr, + MethodParameters: methodParameters, + FileExtension: &fileExtPtr, + Tags: tags, + } + // Mock implementation of the streaming client. + //nolint:dupl + mockStream := &inject.DataSyncServiceFileUploadClient{ + SendFunc: func(req *syncPb.FileUploadRequest) error { + switch packet := req.UploadPacket.(type) { + case *syncPb.FileUploadRequest_Metadata: + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + meta := packet.Metadata + test.That(t, meta.PartId, test.ShouldEqual, partID) + test.That(t, meta.ComponentType, test.ShouldEqual, componentType) + test.That(t, meta.ComponentName, test.ShouldEqual, componentName) + test.That(t, meta.MethodName, test.ShouldEqual, method) + test.That(t, meta.Type, test.ShouldEqual, DataTypeFile) + test.That(t, meta.FileName, test.ShouldEqual, fileName) + test.That(t, meta.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, meta.FileExtension, test.ShouldEqual, fileExt) + test.That(t, meta.Tags, test.ShouldResemble, tags) + case *syncPb.FileUploadRequest_FileContents: + test.That(t, packet.FileContents.Data, test.ShouldResemble, binaryDataByte) + default: + t.Errorf("unexpected packet type: %T", packet) + } + return nil + }, + CloseAndRecvFunc: func() (*syncPb.FileUploadResponse, error) { + return &syncPb.FileUploadResponse{ + FileId: fileID, + }, nil + }, + } + grpcClient.FileUploadFunc = func(ctx context.Context, + opts ...grpc.CallOption, + ) (syncPb.DataSyncService_FileUploadClient, error) { + return mockStream, nil + } + resp, err := client.FileUploadFromBytes(context.Background(), partID, binaryDataByte, &options) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp, test.ShouldEqual, fileID) + }) + + t.Run("FileUploadFromPath", func(t *testing.T) { + options := FileUploadOptions{ + ComponentType: &componentTypePtr, + ComponentName: &componentNamePtr, + MethodName: &methodNamePtr, + FileName: &fileNamePtr, + MethodParameters: methodParameters, + FileExtension: &fileExtPtr, + Tags: tags, + } + // Create a temporary file for testing + tempContent := []byte("test file content") + tempFile, err := os.CreateTemp("", "test-upload-*.txt") + test.That(t, err, test.ShouldBeNil) + defer os.Remove(tempFile.Name()) + // Mock implementation of the streaming client. + //nolint:dupl + mockStream := &inject.DataSyncServiceFileUploadClient{ + SendFunc: func(req *syncPb.FileUploadRequest) error { + switch packet := req.UploadPacket.(type) { + case *syncPb.FileUploadRequest_Metadata: + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + meta := packet.Metadata + test.That(t, meta.PartId, test.ShouldEqual, partID) + test.That(t, meta.ComponentType, test.ShouldEqual, componentType) + test.That(t, meta.ComponentName, test.ShouldEqual, componentName) + test.That(t, meta.MethodName, test.ShouldEqual, method) + test.That(t, meta.Type, test.ShouldEqual, DataTypeFile) + test.That(t, meta.FileName, test.ShouldEqual, fileName) + test.That(t, meta.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, meta.FileExtension, test.ShouldEqual, fileExt) + test.That(t, meta.Tags, test.ShouldResemble, tags) + case *syncPb.FileUploadRequest_FileContents: + test.That(t, packet.FileContents.Data, test.ShouldResemble, tempContent) + default: + t.Errorf("unexpected packet type: %T", packet) + } + return nil + }, + CloseAndRecvFunc: func() (*syncPb.FileUploadResponse, error) { + return &syncPb.FileUploadResponse{ + FileId: fileID, + }, nil + }, + } + grpcClient.FileUploadFunc = func(ctx context.Context, + opts ...grpc.CallOption, + ) (syncPb.DataSyncService_FileUploadClient, error) { + return mockStream, nil + } + resp, err := client.FileUploadFromPath(context.Background(), partID, tempFile.Name(), &options) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp, test.ShouldEqual, fileID) + }) +} diff --git a/app/viam_client_test.go b/app/viam_client_test.go index 08b93ad1c31..32c32133edc 100644 --- a/app/viam_client_test.go +++ b/app/viam_client_test.go @@ -6,6 +6,7 @@ import ( "github.com/viamrobotics/webrtc/v3" pb "go.viam.com/api/app/data/v1" + syncPb "go.viam.com/api/app/datasync/v1" "go.viam.com/test" "go.viam.com/utils" "go.viam.com/utils/rpc" @@ -138,10 +139,13 @@ func TestNewDataClient(t *testing.T) { dataClient := client.DataClient() test.That(t, dataClient, test.ShouldNotBeNil) test.That(t, dataClient, test.ShouldHaveSameTypeAs, &DataClient{}) - test.That(t, dataClient.client, test.ShouldImplement, (*pb.DataServiceClient)(nil)) + test.That(t, dataClient.dataClient, test.ShouldImplement, (*pb.DataServiceClient)(nil)) // Testing that a second call to DataClient() returns the same instance dataClient2 := client.DataClient() test.That(t, dataClient2, test.ShouldNotBeNil) test.That(t, dataClient, test.ShouldResemble, dataClient2) + + // Add test for dataSyncClient + test.That(t, dataClient.dataSyncClient, test.ShouldImplement, (*syncPb.DataSyncServiceClient)(nil)) } diff --git a/go.mod b/go.mod index 6796070676e..2a09ede386e 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.357 + go.viam.com/api v0.1.366 go.viam.com/test v1.2.4 go.viam.com/utils v0.1.116 goji.io v2.0.2+incompatible @@ -189,7 +189,7 @@ require ( github.com/dnephin/pflag v1.0.7 // indirect github.com/docker/cli v25.0.4+incompatible // indirect github.com/docker/distribution v2.8.3+incompatible // indirect - github.com/docker/docker v25.0.4+incompatible // indirect + github.com/docker/docker v25.0.6+incompatible // indirect github.com/docker/docker-credential-helpers v0.8.1 // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/go.sum b/go.sum index 9194d3137fc..fbfd8594150 100644 --- a/go.sum +++ b/go.sum @@ -340,8 +340,8 @@ github.com/docker/cli v25.0.4+incompatible h1:DatRkJ+nrFoYL2HZUzjM5Z5sAmcA5XGp+A github.com/docker/cli v25.0.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v25.0.4+incompatible h1:XITZTrq+52tZyZxUOtFIahUf3aH367FLxJzt9vZeAF8= -github.com/docker/docker v25.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v25.0.6+incompatible h1:5cPwbwriIcsua2REJe8HqQV+6WlWc1byg2QSXzBxBGg= +github.com/docker/docker v25.0.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.8.1 h1:j/eKUktUltBtMzKqmfLB0PAgqYyMHOp5vfsD1807oKo= github.com/docker/docker-credential-helpers v0.8.1/go.mod h1:P3ci7E3lwkZg6XiHdRKft1KckHiO9a2rNtyFbZ/ry9M= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= @@ -1516,8 +1516,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.357 h1:L9LBYbaH0imv/B+mVxqtSgClIl4flzjLV6LclfnD9Nc= -go.viam.com/api v0.1.357/go.mod h1:5lpVRxMsKFCaahqsnJfPGwJ9baoQ6PIKQu3lxvy6Wtw= +go.viam.com/api v0.1.366 h1:lUen0W04hwdFL95GoQkYaweZO5ySG40BnUl7HHVZE3o= +go.viam.com/api v0.1.366/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.116 h1:hoCj3SsV8LZAOEP75TjMeX57axhravS8rNUYmhpTWtM= diff --git a/testutils/inject/datasync_service_client.go b/testutils/inject/datasync_service_client.go new file mode 100644 index 00000000000..321aaa1e48c --- /dev/null +++ b/testutils/inject/datasync_service_client.go @@ -0,0 +1,96 @@ +package inject + +import ( + "context" + + datapb "go.viam.com/api/app/datasync/v1" + "google.golang.org/grpc" +) + +// DataSyncServiceClient represents a fake instance of a data sync service client. +type DataSyncServiceClient struct { + datapb.DataSyncServiceClient + DataCaptureUploadFunc func(ctx context.Context, in *datapb.DataCaptureUploadRequest, + opts ...grpc.CallOption) (*datapb.DataCaptureUploadResponse, error) + FileUploadFunc func(ctx context.Context, + opts ...grpc.CallOption) (datapb.DataSyncService_FileUploadClient, error) + StreamingDataCaptureUploadFunc func(ctx context.Context, + opts ...grpc.CallOption) (datapb.DataSyncService_StreamingDataCaptureUploadClient, error) +} + +// DataCaptureUpload calls the injected DataCaptureUpload or the real version. +func (client *DataSyncServiceClient) DataCaptureUpload(ctx context.Context, in *datapb.DataCaptureUploadRequest, + opts ...grpc.CallOption, +) (*datapb.DataCaptureUploadResponse, error) { + if client.DataCaptureUploadFunc == nil { + return client.DataSyncServiceClient.DataCaptureUpload(ctx, in, opts...) + } + return client.DataCaptureUploadFunc(ctx, in, opts...) +} + +// FileUpload calls the injected FileUpload or the real version. +func (client *DataSyncServiceClient) FileUpload(ctx context.Context, + opts ...grpc.CallOption, +) (datapb.DataSyncService_FileUploadClient, error) { + if client.FileUploadFunc == nil { + return client.DataSyncServiceClient.FileUpload(ctx, opts...) + } + return client.FileUploadFunc(ctx, opts...) +} + +// StreamingDataCaptureUpload calls the injected StreamingDataCaptureUpload or the real version. +func (client *DataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context, + opts ...grpc.CallOption, +) (datapb.DataSyncService_StreamingDataCaptureUploadClient, error) { + if client.StreamingDataCaptureUploadFunc == nil { + return client.DataSyncServiceClient.StreamingDataCaptureUpload(ctx, opts...) + } + return client.StreamingDataCaptureUploadFunc(ctx, opts...) +} + +// DataSyncServiceStreamingDataCaptureUploadClient represents a fake instance of +// a StreamingDataCaptureUpload client. +type DataSyncServiceStreamingDataCaptureUploadClient struct { + datapb.DataSyncService_StreamingDataCaptureUploadClient + SendFunc func(*datapb.StreamingDataCaptureUploadRequest) error + CloseAndRecvFunc func() (*datapb.StreamingDataCaptureUploadResponse, error) +} + +// Send calls the injected Send or the real version. +func (client *DataSyncServiceStreamingDataCaptureUploadClient) Send(req *datapb.StreamingDataCaptureUploadRequest) error { + if client.SendFunc == nil { + return client.DataSyncService_StreamingDataCaptureUploadClient.Send(req) + } + return client.SendFunc(req) +} + +// CloseAndRecv calls the injected CloseAndRecv or the real version. +func (client *DataSyncServiceStreamingDataCaptureUploadClient) CloseAndRecv() (*datapb.StreamingDataCaptureUploadResponse, error) { + if client.CloseAndRecvFunc == nil { + return client.DataSyncService_StreamingDataCaptureUploadClient.CloseAndRecv() + } + return client.CloseAndRecvFunc() +} + +// DataSyncServiceFileUploadClient represents a fake instance of a FileUpload client. +type DataSyncServiceFileUploadClient struct { + datapb.DataSyncService_FileUploadClient + SendFunc func(*datapb.FileUploadRequest) error + CloseAndRecvFunc func() (*datapb.FileUploadResponse, error) +} + +// Send calls the injected Send or the real version. +func (client *DataSyncServiceFileUploadClient) Send(req *datapb.FileUploadRequest) error { + if client.SendFunc == nil { + return client.DataSyncService_FileUploadClient.Send(req) + } + return client.SendFunc(req) +} + +// CloseAndRecv calls the injected CloseAndRecv or the real version. +func (client *DataSyncServiceFileUploadClient) CloseAndRecv() (*datapb.FileUploadResponse, error) { + if client.CloseAndRecvFunc == nil { + return client.DataSyncService_FileUploadClient.CloseAndRecv() + } + return client.CloseAndRecvFunc() +}