From 8235e29b0c01b0befdba0f6dca84fe61a2bfba09 Mon Sep 17 00:00:00 2001 From: Carter Lynch Date: Wed, 30 Jun 2021 18:57:32 -0400 Subject: [PATCH 1/4] Add cursor based pagination support to ingest events service --- packages/mds-ingest-service/@types/index.ts | 18 ++++- packages/mds-ingest-service/client/index.ts | 5 +- packages/mds-ingest-service/package.json | 3 +- .../repository/entities/event-entity.ts | 6 +- .../mds-ingest-service/repository/index.ts | 81 ++++++++++++++++--- .../mds-ingest-service/service/manager.ts | 3 +- .../mds-ingest-service/service/provider.ts | 19 +++-- .../mds-ingest-service/tests/index.spec.ts | 81 ++++++++++++++----- pnpm-lock.yaml | 2 + 9 files changed, 170 insertions(+), 48 deletions(-) diff --git a/packages/mds-ingest-service/@types/index.ts b/packages/mds-ingest-service/@types/index.ts index edc75d9ad..3ebc84ac2 100644 --- a/packages/mds-ingest-service/@types/index.ts +++ b/packages/mds-ingest-service/@types/index.ts @@ -15,6 +15,7 @@ */ import { DomainModelCreate, RecordedColumn } from '@mds-core/mds-repository' +import { RpcRoute, RpcServiceDefinition } from '@mds-core/mds-rpc-common' import { ACCESSIBILITY_OPTION, MODALITY, @@ -29,7 +30,6 @@ import { VEHICLE_STATE, VEHICLE_TYPE } from '@mds-core/mds-types' -import { RpcServiceDefinition, RpcRoute } from '@mds-core/mds-rpc-common' export interface DeviceDomainModel extends RecordedColumn { device_id: UUID @@ -75,7 +75,7 @@ export interface GetVehicleEventsFilterParams { propulsion_types?: PROPULSION_TYPE[] provider_ids?: UUID[] vehicle_states?: VEHICLE_STATE[] - time_range: TimeRange + time_range: { start: number; end: number } grouping_type: GROUPING_TYPE vehicle_id?: string device_ids?: UUID[] @@ -84,6 +84,14 @@ export interface GetVehicleEventsFilterParams { limit?: number } +export type GetVehicleEventsResponse = { + events: EventDomainModel[] + cursor: { + prev: Nullable + next: Nullable + } +} + export interface EventDomainModel extends RecordedColumn { device_id: UUID provider_id: UUID @@ -102,12 +110,14 @@ export type EventDomainCreateModel = DomainModelCreate string - getEvents: (params: GetVehicleEventsFilterParams) => EventDomainModel[] + getEventsUsingOptions: (params: GetVehicleEventsFilterParams) => GetVehicleEventsResponse + getEventsUsingCursor: (cursor: string) => GetVehicleEventsResponse getDevices: (ids: UUID[]) => DeviceDomainModel[] } export const IngestServiceDefinition: RpcServiceDefinition = { name: RpcRoute(), - getEvents: RpcRoute(), + getEventsUsingOptions: RpcRoute(), + getEventsUsingCursor: RpcRoute(), getDevices: RpcRoute() } diff --git a/packages/mds-ingest-service/client/index.ts b/packages/mds-ingest-service/client/index.ts index f3c1efbb8..9dd7773c4 100644 --- a/packages/mds-ingest-service/client/index.ts +++ b/packages/mds-ingest-service/client/index.ts @@ -14,8 +14,8 @@ * limitations under the License. */ -import { ServiceClient } from '@mds-core/mds-service-helpers' import { RpcClient, RpcRequest } from '@mds-core/mds-rpc-common' +import { ServiceClient } from '@mds-core/mds-service-helpers' import { IngestService, IngestServiceDefinition } from '../@types' const IngestServiceRpcClient = RpcClient(IngestServiceDefinition, { @@ -26,6 +26,7 @@ const IngestServiceRpcClient = RpcClient(IngestServiceDefinition, { // What the API layer, and any other clients, will invoke. export const IngestServiceClient: ServiceClient = { name: (...args) => RpcRequest(IngestServiceRpcClient.name, args), - getEvents: (...args) => RpcRequest(IngestServiceRpcClient.getEvents, args), + getEventsUsingOptions: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingOptions, args), + getEventsUsingCursor: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingCursor, args), getDevices: (...args) => RpcRequest(IngestServiceRpcClient.getDevices, args) } diff --git a/packages/mds-ingest-service/package.json b/packages/mds-ingest-service/package.json index 5d2698aa9..bb0521dc5 100644 --- a/packages/mds-ingest-service/package.json +++ b/packages/mds-ingest-service/package.json @@ -32,6 +32,7 @@ "@mds-core/mds-types": "0.1.25", "@mds-core/mds-utils": "0.1.28", "joi": "17.4.0", - "typeorm": "0.2.34" + "typeorm": "0.2.34", + "typeorm-cursor-pagination": "0.6.1" } } diff --git a/packages/mds-ingest-service/repository/entities/event-entity.ts b/packages/mds-ingest-service/repository/entities/event-entity.ts index 41be50319..e2087d1bd 100644 --- a/packages/mds-ingest-service/repository/entities/event-entity.ts +++ b/packages/mds-ingest-service/repository/entities/event-entity.ts @@ -14,11 +14,11 @@ * limitations under the License. */ -import { Entity, Column, Index } from 'typeorm' import { BigintTransformer, IdentityColumn, RecordedColumn } from '@mds-core/mds-repository' +import { Nullable, Timestamp } from '@mds-core/mds-types' +import { Column, Entity, Index } from 'typeorm' import { EventDomainModel } from '../../@types' import { TelemetryEntity, TelemetryEntityModel } from './telemetry-entity' -import { Nullable } from '@mds-core/mds-types' export interface EventEntityModel extends IdentityColumn, RecordedColumn { device_id: EventDomainModel['device_id'] @@ -42,7 +42,7 @@ export class EventEntity extends IdentityColumn(RecordedColumn(class {})) implem provider_id: EventEntityModel['provider_id'] @Column('bigint', { transformer: BigintTransformer, primary: true }) - timestamp: EventEntityModel['timestamp'] + timestamp: Timestamp @Column('varchar', { array: true, length: 31 }) event_types: EventEntityModel['event_types'] diff --git a/packages/mds-ingest-service/repository/index.ts b/packages/mds-ingest-service/repository/index.ts index e8ce89d6a..546b74ba7 100644 --- a/packages/mds-ingest-service/repository/index.ts +++ b/packages/mds-ingest-service/repository/index.ts @@ -15,16 +15,17 @@ */ import { InsertReturning, ReadWriteRepository, RepositoryError } from '@mds-core/mds-repository' -import { Any } from 'typeorm' -import { isUUID } from '@mds-core/mds-utils' import { UUID } from '@mds-core/mds-types' +import { isUUID, ValidationError } from '@mds-core/mds-utils' +import { Any } from 'typeorm' +import { buildPaginator, Cursor } from 'typeorm-cursor-pagination' import { - EventDomainModel, - EventDomainCreateModel, - TelemetryDomainCreateModel, DeviceDomainCreateModel, + DeviceDomainModel, + EventDomainCreateModel, GetVehicleEventsFilterParams, - DeviceDomainModel + GetVehicleEventsResponse, + TelemetryDomainCreateModel } from '../@types' import entities from './entities' import { DeviceEntity } from './entities/device-entity' @@ -40,6 +41,8 @@ import { } from './mappers' import migrations from './migrations' +type VehicleEventsQueryParams = GetVehicleEventsFilterParams & Cursor + /** * Aborts execution if not running under a test environment. */ @@ -53,6 +56,17 @@ class IngestReadWriteRepository extends ReadWriteRepository { super('ingest', { entities, migrations }) } + private buildCursor = (cursor: VehicleEventsQueryParams) => + Buffer.from(JSON.stringify(cursor), 'utf-8').toString('base64') + + private parseCursor = (cursor: string): VehicleEventsQueryParams & { limit: number } => { + try { + return JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8')) + } catch (error) { + throw new ValidationError('Invalid cursor', error) + } + } + public createEvents = async (events: EventDomainCreateModel[]) => { const { connect } = this try { @@ -114,10 +128,10 @@ class IngestReadWriteRepository extends ReadWriteRepository { } } - public getEvents = async (params: GetVehicleEventsFilterParams): Promise => { + private getEvents = async (params: VehicleEventsQueryParams): Promise => { const { connect } = this const { - time_range: { start, end }, + time_range, // geography_ids, grouping_type = 'latest_per_vehicle', event_types, @@ -127,8 +141,13 @@ class IngestReadWriteRepository extends ReadWriteRepository { device_ids, propulsion_types, provider_ids, - limit + limit, + beforeCursor, + afterCursor } = params + + const { start, end } = time_range + try { const connection = await connect('ro') @@ -199,14 +218,54 @@ class IngestReadWriteRepository extends ReadWriteRepository { query.andWhere('events.provider_id = ANY(:provider_ids)', { provider_ids }) } - const entities = await query.limit(limit).getMany() + const pager = buildPaginator({ + entity: EventEntity, + alias: 'events', + paginationKeys: ['timestamp', 'id'], + query: { + limit, + beforeCursor: beforeCursor ?? undefined, // typeorm-cursor-pagination type weirdness + afterCursor: afterCursor ?? undefined // typeorm-cursor-pagination type weirdness + } + }) - return entities.map(EventEntityToDomain.map) + const { + data, + cursor: { beforeCursor: nextBeforeCursor, afterCursor: nextAfterCursor } + } = await pager.paginate(query) + + const cursor = { + time_range, + // geography_ids, + grouping_type, + event_types, + vehicle_states, + vehicle_types, + vehicle_id, + device_ids, + propulsion_types, + provider_ids, + limit + } + + return { + events: data.map(EventEntityToDomain.map), + cursor: { + next: nextAfterCursor && this.buildCursor({ ...cursor, beforeCursor: null, afterCursor: nextAfterCursor }), + prev: nextBeforeCursor && this.buildCursor({ ...cursor, beforeCursor: nextBeforeCursor, afterCursor: null }) + } + } } catch (error) { throw RepositoryError(error) } } + public getEventsUsingOptions = async (options: GetVehicleEventsFilterParams): Promise => + this.getEvents({ ...options, beforeCursor: null, afterCursor: null, limit: options.limit ?? 100 }) + + public getEventsUsingCursor = async (cursor: string): Promise => + this.getEvents(this.parseCursor(cursor)) + /** * Nukes everything from orbit. Boom. */ diff --git a/packages/mds-ingest-service/service/manager.ts b/packages/mds-ingest-service/service/manager.ts index 9ad3ac533..9916cd2ee 100644 --- a/packages/mds-ingest-service/service/manager.ts +++ b/packages/mds-ingest-service/service/manager.ts @@ -27,7 +27,8 @@ export const IngestServiceManager = RpcServer( }, { name: args => IngestServiceProvider.name(...args), - getEvents: args => IngestServiceProvider.getEvents(...args), + getEventsUsingOptions: args => IngestServiceProvider.getEventsUsingOptions(...args), + getEventsUsingCursor: args => IngestServiceProvider.getEventsUsingCursor(...args), getDevices: args => IngestServiceProvider.getDevices(...args) }, { diff --git a/packages/mds-ingest-service/service/provider.ts b/packages/mds-ingest-service/service/provider.ts index 3ac510837..f3374a0ca 100644 --- a/packages/mds-ingest-service/service/provider.ts +++ b/packages/mds-ingest-service/service/provider.ts @@ -14,20 +14,29 @@ * limitations under the License. */ -import { ServiceProvider, ProcessController, ServiceResult, ServiceException } from '@mds-core/mds-service-helpers' +import logger from '@mds-core/mds-logger' +import { ProcessController, ServiceException, ServiceProvider, ServiceResult } from '@mds-core/mds-service-helpers' +import { UUID } from '@mds-core/mds-types' import { IngestService } from '../@types' import { IngestRepository } from '../repository' -import logger from '@mds-core/mds-logger' import { validateGetVehicleEventsFilterParams, validateUUIDs } from './validators' -import { UUID } from '@mds-core/mds-types' export const IngestServiceProvider: ServiceProvider & ProcessController = { start: IngestRepository.initialize, stop: IngestRepository.shutdown, name: async () => ServiceResult('mds-ingest-service'), - getEvents: async params => { + getEventsUsingOptions: async params => { + try { + return ServiceResult(await IngestRepository.getEventsUsingOptions(validateGetVehicleEventsFilterParams(params))) + } catch (error) { + const exception = ServiceException(`Error in getEvents `, error) + logger.error('getEvents exception', { exception, error }) + return exception + } + }, + getEventsUsingCursor: async cursor => { try { - return ServiceResult(await IngestRepository.getEvents(validateGetVehicleEventsFilterParams(params))) + return ServiceResult(await IngestRepository.getEventsUsingCursor(cursor)) } catch (error) { const exception = ServiceException(`Error in getEvents `, error) logger.error('getEvents exception', { exception, error }) diff --git a/packages/mds-ingest-service/tests/index.spec.ts b/packages/mds-ingest-service/tests/index.spec.ts index 44431d3d2..b48f2f0e7 100644 --- a/packages/mds-ingest-service/tests/index.spec.ts +++ b/packages/mds-ingest-service/tests/index.spec.ts @@ -15,14 +15,13 @@ * limitations under the License. */ -import { IngestServiceManager } from '../service/manager' -import { IngestServiceClient } from '../client' -import { IngestRepository } from '../repository' import { TEST1_PROVIDER_ID } from '@mds-core/mds-providers' -import { now, uuid, ValidationError } from '@mds-core/mds-utils' -import { Device, VehicleEvent } from '@mds-core/mds-types' -import { EventEntityCreateModel } from '../repository/mappers' +import { Device } from '@mds-core/mds-types' +import { now, uuid } from '@mds-core/mds-utils' import { EventDomainCreateModel, TelemetryDomainCreateModel } from '../@types' +import { IngestServiceClient } from '../client' +import { IngestRepository } from '../repository' +import { IngestServiceManager } from '../service/manager' const DEVICE_UUID_A = uuid() const DEVICE_UUID_B = uuid() @@ -223,7 +222,7 @@ describe('Ingest Service Tests', () => { }) }) - describe('getEvents', () => { + describe('getEventsUsingOptions', () => { beforeEach(async () => { await IngestRepository.createDevices([TEST_TNC_A, TEST_TNC_B]) await IngestRepository.createEvents([TEST_EVENT_A1, TEST_EVENT_B1]) @@ -233,7 +232,7 @@ describe('Ingest Service Tests', () => { }) describe('all_events', () => { it('gets 4 events', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, grouping_type: 'all_events' }) @@ -241,16 +240,21 @@ describe('Ingest Service Tests', () => { }) it('gets 4 events, but limit to 1', async () => { - const events = await IngestServiceClient.getEvents({ + const limit = 1 + const { + events, + cursor: { next, prev } + } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, grouping_type: 'all_events', - limit: 1 + limit }) expect(events.length).toEqual(1) + expect(next).not.toBeNull() }) it('gets two events, filters on event_types', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, event_types: ['service_end'], grouping_type: 'all_events' @@ -261,7 +265,7 @@ describe('Ingest Service Tests', () => { describe('latest_per_vehicle', () => { it('gets two events, one for each device, telemetry is loaded', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, grouping_type: 'latest_per_trip' }) @@ -275,7 +279,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on event_types', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, event_types: ['service_end'], grouping_type: 'latest_per_trip' @@ -286,7 +290,7 @@ describe('Ingest Service Tests', () => { describe('latest_per_vehicle', () => { it('gets two events, one for each device', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, grouping_type: 'latest_per_vehicle' }) @@ -294,7 +298,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on event_types', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, event_types: ['service_end'], grouping_type: 'latest_per_vehicle' @@ -303,7 +307,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on propulsion type', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, propulsion_types: ['electric'], grouping_type: 'latest_per_vehicle' @@ -312,7 +316,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on device_ids', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, device_ids: [DEVICE_UUID_A, DEVICE_UUID_B], grouping_type: 'latest_per_vehicle' @@ -321,7 +325,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on vehicle_type', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, vehicle_types: ['car'], grouping_type: 'latest_per_vehicle' @@ -330,7 +334,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on vehicle_states', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, vehicle_states: ['unknown'], grouping_type: 'latest_per_vehicle' @@ -339,7 +343,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on vehicle_id', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, vehicle_id: TEST_TNC_A.vehicle_id, grouping_type: 'latest_per_vehicle' @@ -348,7 +352,7 @@ describe('Ingest Service Tests', () => { }) it('gets two events, filters on provider_ids', async () => { - const events = await IngestServiceClient.getEvents({ + const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, provider_ids: [TEST1_PROVIDER_ID], grouping_type: 'latest_per_vehicle' @@ -358,6 +362,41 @@ describe('Ingest Service Tests', () => { }) }) + describe('getEventsUsingCursor', () => { + beforeEach(async () => { + await IngestRepository.createDevices([TEST_TNC_A, TEST_TNC_B]) + await IngestRepository.createEvents([TEST_EVENT_A1, TEST_EVENT_B1]) + await IngestRepository.createEvents([TEST_EVENT_A2, TEST_EVENT_B2]) + await IngestRepository.createTelemetries([TEST_TELEMETRY_A1, TEST_TELEMETRY_B1]) + await IngestRepository.createTelemetries([TEST_TELEMETRY_A2, TEST_TELEMETRY_B2]) + }) + + it('fetches the next page', async () => { + // First page + const { + events, + cursor: { next } + } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + grouping_type: 'all_events', + limit: 1 + }) + + expect(events.length).toEqual(1) + expect(next).not.toBeNull() + + // Use cursor for next page + const { + events: nextEvents, + cursor: { prev } + } = await IngestServiceClient.getEventsUsingCursor(next!) + + expect(nextEvents.length).toEqual(1) + expect(nextEvents[0]).not.toStrictEqual(events[0]) + expect(prev).not.toBeNull() + }) + }) + afterAll(async () => { await IngestRepository.shutdown() await IngestServer.stop() diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7e78c3bb6..427b90dd0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -778,6 +778,7 @@ importers: '@mds-core/mds-utils': 0.1.28 joi: 17.4.0 typeorm: 0.2.34 + typeorm-cursor-pagination: 0.6.1 dependencies: '@mds-core/mds-logger': link:../mds-logger '@mds-core/mds-providers': link:../mds-providers @@ -789,6 +790,7 @@ importers: '@mds-core/mds-utils': link:../mds-utils joi: 17.4.0 typeorm: 0.2.34 + typeorm-cursor-pagination: 0.6.1_typeorm@0.2.34 packages/mds-jurisdiction: specifiers: From ac998aeb442a2ec6a6087bba0be1ba9be40d42a7 Mon Sep 17 00:00:00 2001 From: Carter Lynch Date: Thu, 1 Jul 2021 10:07:09 -0400 Subject: [PATCH 2/4] Add order support to ingest events service --- packages/mds-ingest-service/@types/index.ts | 14 +++++ .../mds-ingest-service/repository/index.ts | 12 +++- .../mds-ingest-service/service/validators.ts | 12 +++- .../mds-ingest-service/tests/index.spec.ts | 61 +++++++++++++++++++ 4 files changed, 96 insertions(+), 3 deletions(-) diff --git a/packages/mds-ingest-service/@types/index.ts b/packages/mds-ingest-service/@types/index.ts index 3ebc84ac2..7684068b1 100644 --- a/packages/mds-ingest-service/@types/index.ts +++ b/packages/mds-ingest-service/@types/index.ts @@ -70,6 +70,19 @@ export type TimeRange = { start: Timestamp end: Timestamp } + +export const GetVehicleEventsOrderColumn = ['timestamp', 'vehicle_id', 'provider_id', 'vehicle_state'] + +export type GetVehicleEventsOrderColumn = typeof GetVehicleEventsOrderColumn[number] + +export const GetVehicleEventsOrderDirection = ['ASC', 'DESC'] + +export type GetVehicleEventsOrderDirection = typeof GetVehicleEventsOrderDirection[number] + +export type GetVehicleEventsOrderOption = { + column: GetVehicleEventsOrderColumn + direction?: GetVehicleEventsOrderDirection +} export interface GetVehicleEventsFilterParams { vehicle_types?: VEHICLE_TYPE[] propulsion_types?: PROPULSION_TYPE[] @@ -82,6 +95,7 @@ export interface GetVehicleEventsFilterParams { event_types?: VEHICLE_EVENT[] geography_ids?: UUID[] limit?: number + order?: GetVehicleEventsOrderOption } export type GetVehicleEventsResponse = { diff --git a/packages/mds-ingest-service/repository/index.ts b/packages/mds-ingest-service/repository/index.ts index 546b74ba7..42e35239b 100644 --- a/packages/mds-ingest-service/repository/index.ts +++ b/packages/mds-ingest-service/repository/index.ts @@ -143,7 +143,8 @@ class IngestReadWriteRepository extends ReadWriteRepository { provider_ids, limit, beforeCursor, - afterCursor + afterCursor, + order } = params const { start, end } = time_range @@ -218,12 +219,18 @@ class IngestReadWriteRepository extends ReadWriteRepository { query.andWhere('events.provider_id = ANY(:provider_ids)', { provider_ids }) } + // Use query instead of paginator to manage order if using a joined field + if (order && order.column === 'vehicle_id') { + query.orderBy('devices.vehicle_id', order.direction) + } + const pager = buildPaginator({ entity: EventEntity, alias: 'events', paginationKeys: ['timestamp', 'id'], query: { limit, + order: order?.direction ?? (order?.column === undefined ? 'DESC' : 'ASC'), beforeCursor: beforeCursor ?? undefined, // typeorm-cursor-pagination type weirdness afterCursor: afterCursor ?? undefined // typeorm-cursor-pagination type weirdness } @@ -245,7 +252,8 @@ class IngestReadWriteRepository extends ReadWriteRepository { device_ids, propulsion_types, provider_ids, - limit + limit, + order } return { diff --git a/packages/mds-ingest-service/service/validators.ts b/packages/mds-ingest-service/service/validators.ts index eeefcbd3c..47816a419 100644 --- a/packages/mds-ingest-service/service/validators.ts +++ b/packages/mds-ingest-service/service/validators.ts @@ -29,6 +29,8 @@ import { DeviceDomainModel, EventDomainModel, GetVehicleEventsFilterParams, + GetVehicleEventsOrderColumn, + GetVehicleEventsOrderDirection, GROUPING_TYPES, TelemetryDomainModel } from '../@types' @@ -163,7 +165,15 @@ export const { validate: validateGetVehicleEventsFilterParams } = SchemaValidato vehicle_id: { type: 'string' }, device_ids: { type: 'array', items: { type: 'string', format: 'uuid' } }, event_types: { type: 'array', items: { type: 'string', enum: [...new Set(VEHICLE_EVENTS)] } }, - limit: { type: 'integer' } + limit: { type: 'integer' }, + order: { + type: 'object', + properties: { + column: { type: 'string', enum: [...GetVehicleEventsOrderColumn] }, + direction: { type: 'string', enum: [...GetVehicleEventsOrderDirection] } + }, + additionalProperties: false + } }, required: ['time_range'] }) diff --git a/packages/mds-ingest-service/tests/index.spec.ts b/packages/mds-ingest-service/tests/index.spec.ts index b48f2f0e7..83f818276 100644 --- a/packages/mds-ingest-service/tests/index.spec.ts +++ b/packages/mds-ingest-service/tests/index.spec.ts @@ -261,6 +261,67 @@ describe('Ingest Service Tests', () => { }) expect(events.length).toEqual(2) }) + + it('gets events in order provided (vehicle_id)', async () => { + const { events } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + order: { column: 'vehicle_id', direction: 'ASC' }, + grouping_type: 'all_events' + }) + expect(events[0].device_id).toEqual(DEVICE_UUID_A) + + // reverse order + const { events: eventsDesc } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + order: { column: 'vehicle_id', direction: 'DESC' }, + grouping_type: 'all_events' + }) + + expect(eventsDesc[0].device_id).toEqual(DEVICE_UUID_B) + }) + + it('gets events in order provided (vehicle_state)', async () => { + const { events } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + order: { column: 'vehicle_state', direction: 'ASC' }, + grouping_type: 'all_events' + }) + expect(events[0].vehicle_state).toEqual('removed') + + // reverse order + const { events: eventsDesc } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + order: { column: 'vehicle_state', direction: 'DESC' }, + grouping_type: 'all_events' + }) + + expect(eventsDesc[0].vehicle_state).toEqual('unknown') + }) + + it('respects order when cursor is used', async () => { + const { + events, + cursor: { next } + } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + order: { column: 'vehicle_state', direction: 'ASC' }, + grouping_type: 'all_events', + limit: 2 + }) + expect(events.length).toEqual(2) + expect(next).not.toBeNull() + expect(events[0].vehicle_state).toEqual('removed') + + // reverse order + const { + events: nextEvents, + cursor: { prev } + } = await IngestServiceClient.getEventsUsingCursor(next!) + + expect(nextEvents.length).toEqual(2) + expect(prev).not.toBeNull() + expect(nextEvents[0].vehicle_state).toEqual('unknown') + }) }) describe('latest_per_vehicle', () => { From 3e0d59742c0bf29117da4e3ba81e6adfa1a29645 Mon Sep 17 00:00:00 2001 From: Carter Lynch Date: Thu, 1 Jul 2021 10:14:28 -0400 Subject: [PATCH 3/4] Revert type change used during testing --- packages/mds-ingest-service/@types/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/mds-ingest-service/@types/index.ts b/packages/mds-ingest-service/@types/index.ts index 7684068b1..04b322a9d 100644 --- a/packages/mds-ingest-service/@types/index.ts +++ b/packages/mds-ingest-service/@types/index.ts @@ -88,7 +88,7 @@ export interface GetVehicleEventsFilterParams { propulsion_types?: PROPULSION_TYPE[] provider_ids?: UUID[] vehicle_states?: VEHICLE_STATE[] - time_range: { start: number; end: number } + time_range: TimeRange grouping_type: GROUPING_TYPE vehicle_id?: string device_ids?: UUID[] From f89ae4ad89ab7a7ffb51b84a8a322fe89216e0d8 Mon Sep 17 00:00:00 2001 From: Carter Lynch Date: Thu, 1 Jul 2021 13:48:31 -0400 Subject: [PATCH 4/4] Remove vehicle_id sort option for now, update tests to actually test order --- packages/mds-ingest-service/@types/index.ts | 2 +- .../repository/entities/event-entity.ts | 18 ++++---- .../mds-ingest-service/repository/index.ts | 7 +-- .../mds-ingest-service/tests/index.spec.ts | 43 ++++++++++++++++--- 4 files changed, 48 insertions(+), 22 deletions(-) diff --git a/packages/mds-ingest-service/@types/index.ts b/packages/mds-ingest-service/@types/index.ts index 04b322a9d..6949660ed 100644 --- a/packages/mds-ingest-service/@types/index.ts +++ b/packages/mds-ingest-service/@types/index.ts @@ -71,7 +71,7 @@ export type TimeRange = { end: Timestamp } -export const GetVehicleEventsOrderColumn = ['timestamp', 'vehicle_id', 'provider_id', 'vehicle_state'] +export const GetVehicleEventsOrderColumn = ['timestamp', 'provider_id', 'vehicle_state'] export type GetVehicleEventsOrderColumn = typeof GetVehicleEventsOrderColumn[number] diff --git a/packages/mds-ingest-service/repository/entities/event-entity.ts b/packages/mds-ingest-service/repository/entities/event-entity.ts index e2087d1bd..23b78fa84 100644 --- a/packages/mds-ingest-service/repository/entities/event-entity.ts +++ b/packages/mds-ingest-service/repository/entities/event-entity.ts @@ -15,7 +15,7 @@ */ import { BigintTransformer, IdentityColumn, RecordedColumn } from '@mds-core/mds-repository' -import { Nullable, Timestamp } from '@mds-core/mds-types' +import { Nullable, Timestamp, TRIP_STATE, UUID, VEHICLE_EVENT, VEHICLE_STATE } from '@mds-core/mds-types' import { Column, Entity, Index } from 'typeorm' import { EventDomainModel } from '../../@types' import { TelemetryEntity, TelemetryEntityModel } from './telemetry-entity' @@ -36,32 +36,32 @@ export interface EventEntityModel extends IdentityColumn, RecordedColumn { @Entity('events') export class EventEntity extends IdentityColumn(RecordedColumn(class {})) implements EventEntityModel { @Column('uuid', { primary: true }) - device_id: EventEntityModel['device_id'] + device_id: UUID @Column('uuid') - provider_id: EventEntityModel['provider_id'] + provider_id: UUID @Column('bigint', { transformer: BigintTransformer, primary: true }) timestamp: Timestamp @Column('varchar', { array: true, length: 31 }) - event_types: EventEntityModel['event_types'] + event_types: VEHICLE_EVENT[] @Column('varchar', { length: 31 }) - vehicle_state: EventEntityModel['vehicle_state'] + vehicle_state: VEHICLE_STATE @Column('varchar', { length: 31, nullable: true }) - trip_state: EventEntityModel['trip_state'] + trip_state: TRIP_STATE @Column('bigint', { transformer: BigintTransformer, nullable: true }) - telemetry_timestamp: EventEntityModel['telemetry_timestamp'] + telemetry_timestamp: Nullable @Index() @Column('uuid', { nullable: true }) - trip_id: EventEntityModel['trip_id'] + trip_id: Nullable @Column('uuid', { nullable: true }) - service_area_id: EventEntityModel['service_area_id'] + service_area_id: Nullable telemetry?: TelemetryEntity } diff --git a/packages/mds-ingest-service/repository/index.ts b/packages/mds-ingest-service/repository/index.ts index 42e35239b..f6e0e84bf 100644 --- a/packages/mds-ingest-service/repository/index.ts +++ b/packages/mds-ingest-service/repository/index.ts @@ -219,15 +219,10 @@ class IngestReadWriteRepository extends ReadWriteRepository { query.andWhere('events.provider_id = ANY(:provider_ids)', { provider_ids }) } - // Use query instead of paginator to manage order if using a joined field - if (order && order.column === 'vehicle_id') { - query.orderBy('devices.vehicle_id', order.direction) - } - const pager = buildPaginator({ entity: EventEntity, alias: 'events', - paginationKeys: ['timestamp', 'id'], + paginationKeys: [order?.column ?? 'timestamp', 'id'], query: { limit, order: order?.direction ?? (order?.column === undefined ? 'DESC' : 'ASC'), diff --git a/packages/mds-ingest-service/tests/index.spec.ts b/packages/mds-ingest-service/tests/index.spec.ts index 83f818276..1fa1ffddd 100644 --- a/packages/mds-ingest-service/tests/index.spec.ts +++ b/packages/mds-ingest-service/tests/index.spec.ts @@ -130,6 +130,7 @@ const TEST_EVENT_A1: EventDomainCreateModel = { telemetry_timestamp: testTimestamp, provider_id: TEST1_PROVIDER_ID, trip_id: TRIP_UUID_A + // test-id-1 } const TEST_EVENT_A2: EventDomainCreateModel = { @@ -141,6 +142,7 @@ const TEST_EVENT_A2: EventDomainCreateModel = { telemetry_timestamp: testTimestamp + 1000, provider_id: TEST1_PROVIDER_ID, trip_id: TRIP_UUID_A + // test-id-1 } const TEST_EVENT_B1: EventDomainCreateModel = { @@ -152,6 +154,7 @@ const TEST_EVENT_B1: EventDomainCreateModel = { telemetry_timestamp: testTimestamp, provider_id: TEST1_PROVIDER_ID, trip_id: TRIP_UUID_B + // test-id-2 } const TEST_EVENT_B2: EventDomainCreateModel = { @@ -163,6 +166,7 @@ const TEST_EVENT_B2: EventDomainCreateModel = { telemetry_timestamp: testTimestamp + 1000, provider_id: TEST1_PROVIDER_ID, trip_id: TRIP_UUID_B + // test-id-2 } describe('Ingest Repository Tests', () => { @@ -262,22 +266,23 @@ describe('Ingest Service Tests', () => { expect(events.length).toEqual(2) }) - it('gets events in order provided (vehicle_id)', async () => { + it('gets events in order provided (vehicle_state)', async () => { const { events } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, - order: { column: 'vehicle_id', direction: 'ASC' }, - grouping_type: 'all_events' + order: { column: 'vehicle_state', direction: 'ASC' }, + grouping_type: 'all_events', + limit: 1 }) - expect(events[0].device_id).toEqual(DEVICE_UUID_A) + expect(events[0].vehicle_state).toEqual('removed') // reverse order const { events: eventsDesc } = await IngestServiceClient.getEventsUsingOptions({ time_range: { start: testTimestamp, end: testTimestamp + 2000 }, - order: { column: 'vehicle_id', direction: 'DESC' }, + order: { column: 'vehicle_state', direction: 'DESC' }, grouping_type: 'all_events' }) - expect(eventsDesc[0].device_id).toEqual(DEVICE_UUID_B) + expect(eventsDesc[0].vehicle_state).toEqual('unknown') }) it('gets events in order provided (vehicle_state)', async () => { @@ -322,6 +327,32 @@ describe('Ingest Service Tests', () => { expect(prev).not.toBeNull() expect(nextEvents[0].vehicle_state).toEqual('unknown') }) + + it('uses secondary (timestamp) sort order when primary sort values are equal', async () => { + const { + events, + cursor: { next } + } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + order: { column: 'provider_id', direction: 'ASC' }, + grouping_type: 'all_events', + limit: 4 + }) + + expect(events[0].timestamp).toBeLessThan(events[events.length - 1].timestamp) + + const { + events: descEvents, + cursor: { next: descNext } + } = await IngestServiceClient.getEventsUsingOptions({ + time_range: { start: testTimestamp, end: testTimestamp + 2000 }, + order: { column: 'provider_id', direction: 'DESC' }, + grouping_type: 'all_events', + limit: 4 + }) + + expect(descEvents[0].timestamp).toBeGreaterThan(descEvents[descEvents.length - 1].timestamp) + }) }) describe('latest_per_vehicle', () => {