Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mds-ingest-service] Add pagination and order support to getEvents via ingest #643

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions packages/mds-ingest-service/@types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ export type TimeRange = {
start: Timestamp
end: Timestamp
}

export const GetVehicleEventsOrderColumn = <const>['timestamp', 'provider_id', 'vehicle_state']

export type GetVehicleEventsOrderColumn = typeof GetVehicleEventsOrderColumn[number]

export const GetVehicleEventsOrderDirection = <const>['ASC', 'DESC']

export type GetVehicleEventsOrderDirection = typeof GetVehicleEventsOrderDirection[number]

export type GetVehicleEventsOrderOption = {
column: GetVehicleEventsOrderColumn
direction?: GetVehicleEventsOrderDirection
}
export interface GetVehicleEventsFilterParams {
vehicle_types?: VEHICLE_TYPE[]
propulsion_types?: PROPULSION_TYPE[]
Expand All @@ -82,6 +95,15 @@ export interface GetVehicleEventsFilterParams {
event_types?: VEHICLE_EVENT[]
geography_ids?: UUID[]
limit?: number
order?: GetVehicleEventsOrderOption
}

export type GetVehicleEventsResponse = {
events: EventDomainModel[]
cursor: {
prev: Nullable<string>
next: Nullable<string>
}
}

export interface EventDomainModel extends RecordedColumn {
Expand All @@ -102,12 +124,14 @@ export type EventDomainCreateModel = DomainModelCreate<Omit<EventDomainModel, ke

export interface IngestService {
name: () => string
getEvents: (params: GetVehicleEventsFilterParams) => EventDomainModel[]
getEventsUsingOptions: (params: GetVehicleEventsFilterParams) => GetVehicleEventsResponse
getEventsUsingCursor: (cursor: string) => GetVehicleEventsResponse
getDevices: (ids: UUID[]) => DeviceDomainModel[]
}

export const IngestServiceDefinition: RpcServiceDefinition<IngestService> = {
name: RpcRoute<IngestService['name']>(),
getEvents: RpcRoute<IngestService['getEvents']>(),
getEventsUsingOptions: RpcRoute<IngestService['getEventsUsingOptions']>(),
getEventsUsingCursor: RpcRoute<IngestService['getEventsUsingCursor']>(),
getDevices: RpcRoute<IngestService['getDevices']>()
}
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const IngestServiceRpcClient = RpcClient(IngestServiceDefinition, {
// What the API layer, and any other clients, will invoke.
export const IngestServiceClient: ServiceClient<IngestService> = {
name: (...args) => RpcRequest(IngestServiceRpcClient.name, args),
getEvents: (...args) => RpcRequest(IngestServiceRpcClient.getEvents, args),
getEventsUsingOptions: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingOptions, args),
getEventsUsingCursor: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingCursor, args),
getDevices: (...args) => RpcRequest(IngestServiceRpcClient.getDevices, args)
}
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@mds-core/mds-types": "0.1.25",
"@mds-core/mds-utils": "0.1.28",
"joi": "17.4.0",
"typeorm": "0.2.34"
"typeorm": "0.2.34",
"typeorm-cursor-pagination": "0.6.1"
}
}
20 changes: 10 additions & 10 deletions packages/mds-ingest-service/repository/entities/event-entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { BigintTransformer, IdentityColumn, RecordedColumn } from '@mds-core/mds-repository'
import { Nullable } from '@mds-core/mds-types'
import { Nullable, Timestamp, TRIP_STATE, UUID, VEHICLE_EVENT, VEHICLE_STATE } from '@mds-core/mds-types'
import { Column, Entity, Index } from 'typeorm'
import { EventDomainModel } from '../../@types'
import { TelemetryEntity, TelemetryEntityModel } from './telemetry-entity'
Expand All @@ -36,32 +36,32 @@ export interface EventEntityModel extends IdentityColumn, RecordedColumn {
@Entity('events')
export class EventEntity extends IdentityColumn(RecordedColumn(class {})) implements EventEntityModel {
@Column('uuid', { primary: true })
device_id: EventEntityModel['device_id']
device_id: UUID

@Column('uuid')
provider_id: EventEntityModel['provider_id']
provider_id: UUID

@Column('bigint', { transformer: BigintTransformer, primary: true })
timestamp: EventEntityModel['timestamp']
timestamp: Timestamp
Copy link
Author

@cjlynch12 cjlynch12 Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange type behavior here with the typeorm-cursor-pagination lib.
When this is left as EventEntityModel['timestamp'], and you pass limit to the buildPaginator(...) function, it results in the following error:

unknown type in cursor: [object]1625149089973

Which corresponds to the time_range: {begin} parameter.

Interestingly, this is only an issue when limit is passed. If the type declaration here is left as EventEntityModel['timestamp'] and limit isn't provided, it works fine. As soon as limit is provided, it breaks.

Changing this to Timestamp directly here fixes the issue in both cases.

This doesn't have any issues afaict, all previous tests are still passing with this change. 🀷

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We’ve seen this before.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've recently stopped defining entity properties in terms of Entity or (especially) Domain model properties and use the base types instead. Some libraries that use metadata reflection have issues tracing the aliases to the base type. In recent repositories, you will see a pattern something like:

export class SomeEntity {
  @Column(...)
  entity_id: UUID

  @Column(...)
  timestamp: Timestamp
}

/* Create an alias to keep consistent with other services */
export SomeEntityModel = SomeEntity


@Column('varchar', { array: true, length: 31 })
event_types: EventEntityModel['event_types']
event_types: VEHICLE_EVENT[]

@Column('varchar', { length: 31 })
vehicle_state: EventEntityModel['vehicle_state']
vehicle_state: VEHICLE_STATE

@Column('varchar', { length: 31, nullable: true })
trip_state: EventEntityModel['trip_state']
trip_state: TRIP_STATE

@Column('bigint', { transformer: BigintTransformer, nullable: true })
telemetry_timestamp: EventEntityModel['telemetry_timestamp']
telemetry_timestamp: Nullable<Timestamp>

@Index()
@Column('uuid', { nullable: true })
trip_id: EventEntityModel['trip_id']
trip_id: Nullable<UUID>

@Column('uuid', { nullable: true })
service_area_id: EventEntityModel['service_area_id']
service_area_id: Nullable<UUID>

telemetry?: TelemetryEntity
}
76 changes: 69 additions & 7 deletions packages/mds-ingest-service/repository/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

import { InsertReturning, ReadWriteRepository, RepositoryError } from '@mds-core/mds-repository'
import { UUID } from '@mds-core/mds-types'
import { isUUID } from '@mds-core/mds-utils'
import { isUUID, ValidationError } from '@mds-core/mds-utils'
import { Any } from 'typeorm'
import { buildPaginator, Cursor } from 'typeorm-cursor-pagination'
import {
DeviceDomainCreateModel,
DeviceDomainModel,
EventDomainCreateModel,
EventDomainModel,
GetVehicleEventsFilterParams,
GetVehicleEventsResponse,
TelemetryDomainCreateModel
} from '../@types'
import entities from './entities'
Expand All @@ -40,6 +41,8 @@ import {
} from './mappers'
import migrations from './migrations'

type VehicleEventsQueryParams = GetVehicleEventsFilterParams & Cursor

/**
* Aborts execution if not running under a test environment.
*/
Expand All @@ -53,6 +56,17 @@ class IngestReadWriteRepository extends ReadWriteRepository {
super('ingest', { entities, migrations })
}

private buildCursor = (cursor: VehicleEventsQueryParams) =>
Buffer.from(JSON.stringify(cursor), 'utf-8').toString('base64')

private parseCursor = (cursor: string): VehicleEventsQueryParams & { limit: number } => {
try {
return JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8'))
} catch (error) {
throw new ValidationError('Invalid cursor', error)
}
}

public createEvents = async (events: EventDomainCreateModel[]) => {
const { connect } = this
try {
Expand Down Expand Up @@ -114,10 +128,10 @@ class IngestReadWriteRepository extends ReadWriteRepository {
}
}

public getEvents = async (params: GetVehicleEventsFilterParams): Promise<EventDomainModel[]> => {
private getEvents = async (params: VehicleEventsQueryParams): Promise<GetVehicleEventsResponse> => {
const { connect } = this
const {
time_range: { start, end },
time_range,
// geography_ids,
grouping_type = 'latest_per_vehicle',
event_types,
Expand All @@ -127,8 +141,14 @@ class IngestReadWriteRepository extends ReadWriteRepository {
device_ids,
propulsion_types,
provider_ids,
limit
limit,
beforeCursor,
afterCursor,
order
} = params

const { start, end } = time_range

try {
const connection = await connect('ro')

Expand Down Expand Up @@ -199,14 +219,56 @@ class IngestReadWriteRepository extends ReadWriteRepository {
query.andWhere('events.provider_id = ANY(:provider_ids)', { provider_ids })
}

const entities = await query.limit(limit).getMany()
const pager = buildPaginator({
entity: EventEntity,
alias: 'events',
paginationKeys: [order?.column ?? 'timestamp', 'id'],
query: {
limit,
order: order?.direction ?? (order?.column === undefined ? 'DESC' : 'ASC'),
beforeCursor: beforeCursor ?? undefined, // typeorm-cursor-pagination type weirdness
afterCursor: afterCursor ?? undefined // typeorm-cursor-pagination type weirdness
}
})

return entities.map(EventEntityToDomain.map)
const {
data,
cursor: { beforeCursor: nextBeforeCursor, afterCursor: nextAfterCursor }
} = await pager.paginate(query)

const cursor = {
time_range,
// geography_ids,
grouping_type,
event_types,
vehicle_states,
vehicle_types,
vehicle_id,
device_ids,
propulsion_types,
provider_ids,
limit,
order
}

return {
events: data.map(EventEntityToDomain.map),
cursor: {
next: nextAfterCursor && this.buildCursor({ ...cursor, beforeCursor: null, afterCursor: nextAfterCursor }),
prev: nextBeforeCursor && this.buildCursor({ ...cursor, beforeCursor: nextBeforeCursor, afterCursor: null })
}
}
} catch (error) {
throw RepositoryError(error)
}
}

public getEventsUsingOptions = async (options: GetVehicleEventsFilterParams): Promise<GetVehicleEventsResponse> =>
this.getEvents({ ...options, beforeCursor: null, afterCursor: null, limit: options.limit ?? 100 })

public getEventsUsingCursor = async (cursor: string): Promise<GetVehicleEventsResponse> =>
this.getEvents(this.parseCursor(cursor))

/**
* Nukes everything from orbit. Boom.
*/
Expand Down
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/service/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export const IngestServiceManager = RpcServer(
},
{
name: args => IngestServiceProvider.name(...args),
getEvents: args => IngestServiceProvider.getEvents(...args),
getEventsUsingOptions: args => IngestServiceProvider.getEventsUsingOptions(...args),
getEventsUsingCursor: args => IngestServiceProvider.getEventsUsingCursor(...args),
getDevices: args => IngestServiceProvider.getDevices(...args)
},
{
Expand Down
13 changes: 11 additions & 2 deletions packages/mds-ingest-service/service/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@ export const IngestServiceProvider: ServiceProvider<IngestService> & ProcessCont
start: IngestRepository.initialize,
stop: IngestRepository.shutdown,
name: async () => ServiceResult('mds-ingest-service'),
getEvents: async params => {
getEventsUsingOptions: async params => {
try {
return ServiceResult(await IngestRepository.getEvents(validateGetVehicleEventsFilterParams(params)))
return ServiceResult(await IngestRepository.getEventsUsingOptions(validateGetVehicleEventsFilterParams(params)))
} catch (error) {
const exception = ServiceException(`Error in getEvents `, error)
logger.error('getEvents exception', { exception, error })
return exception
}
},
getEventsUsingCursor: async cursor => {
try {
return ServiceResult(await IngestRepository.getEventsUsingCursor(cursor))
} catch (error) {
const exception = ServiceException(`Error in getEvents `, error)
logger.error('getEvents exception', { exception, error })
Expand Down
12 changes: 11 additions & 1 deletion packages/mds-ingest-service/service/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
DeviceDomainModel,
EventDomainModel,
GetVehicleEventsFilterParams,
GetVehicleEventsOrderColumn,
GetVehicleEventsOrderDirection,
GROUPING_TYPES,
TelemetryDomainModel
} from '../@types'
Expand Down Expand Up @@ -163,7 +165,15 @@ export const { validate: validateGetVehicleEventsFilterParams } = SchemaValidato
vehicle_id: { type: 'string' },
device_ids: { type: 'array', items: { type: 'string', format: 'uuid' } },
event_types: { type: 'array', items: { type: 'string', enum: [...new Set(VEHICLE_EVENTS)] } },
limit: { type: 'integer' }
limit: { type: 'integer' },
order: {
type: 'object',
properties: {
column: { type: 'string', enum: [...GetVehicleEventsOrderColumn] },
direction: { type: 'string', enum: [...GetVehicleEventsOrderDirection] }
},
additionalProperties: false
}
},
required: ['time_range']
})
Expand Down
Loading