Skip to content

Commit

Permalink
sdk: improve type checks for multi processing pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
maany committed Jul 14, 2023
1 parent 8e62d27 commit 2c1c30d
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 30 deletions.
10 changes: 6 additions & 4 deletions src/lib/sdk/presenter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Transform, TransformCallback } from 'stream'
import { BaseOutputPort, BaseStreamingOutputPort } from './primary-ports'
import { NextApiResponse } from 'next'
import { IronSession } from 'iron-session'
import { BaseErrorResponseModel, BaseResponseModel } from './usecase-models'
import { BaseViewModel } from './view-models'

/**
* A base class for presenters.
Expand Down Expand Up @@ -71,12 +73,12 @@ export abstract class BasePresenter<TResponseModel, TErrorModel, TViewModel>
* @typeparam TErrorModel The type of the error model to present.
*/
export abstract class BaseStreamingPresenter<
TResponseModel,
TViewModel,
TErrorModel,
TResponseModel extends BaseResponseModel,
TErrorModel extends BaseErrorResponseModel,
TViewModel extends BaseViewModel,
>
extends Transform
implements BaseStreamingOutputPort<TResponseModel, TErrorModel>
implements BaseStreamingOutputPort<TResponseModel, TErrorModel, TViewModel>
{
response: TWebResponse
constructor(response: TWebResponse) {
Expand Down
15 changes: 14 additions & 1 deletion src/lib/sdk/primary-ports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { BaseDTO } from './dto'
import { AuthenticatedRequestModel, BaseErrorResponseModel, BaseResponseModel } from './usecase-models'
import { BaseStreamingPostProcessingPipelineElement } from './postprocessing-pipeline-elements'
import { TWebResponse } from './web'
import { BaseViewModel } from './view-models'

/**
* A base interface for input ports.
Expand Down Expand Up @@ -64,8 +65,20 @@ export interface BaseOutputPort<TResponseModel, TErrorModel> {
* @typeparam TViewModel The type of the view model for the streaming output port.
* @typeparam TErrorModel The type of the error model for the streaming output port.
*/
export interface BaseStreamingOutputPort<TResponseModel, TErrorModel> extends Transform{
export interface BaseStreamingOutputPort<TResponseModel extends BaseResponseModel, TErrorModel extends BaseErrorResponseModel, TViewModel extends BaseViewModel> extends Transform{
response: TWebResponse
presentStream(stream: PassThrough): void

presentError(errorModel: TErrorModel): void

convertErrorModelToViewModel(errorModel: TErrorModel): {
status: number,
viewModel: TViewModel
}

convertResponseModelToViewModel(
responseModel: TResponseModel,
): TViewModel

handleStreamError(error: TErrorModel): void
}
29 changes: 18 additions & 11 deletions src/lib/sdk/usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Transform, TransformCallback, PassThrough, Readable } from 'stream'
import { BaseDTO, BaseStreamableDTO } from './dto'
import { BaseStreamingPostProcessingPipelineElement, BaseResponseModelValidatorPipelineElement, BasePostProcessingPipelineElement } from './postprocessing-pipeline-elements'
import { BaseStreamingPresenter } from './presenter'
import { BaseViewModel } from './view-models'

/**
* A type that represents a simple use case that does not require authentication.
Expand Down Expand Up @@ -209,17 +210,18 @@ export abstract class BaseStreamingUseCase<
TResponseModel extends BaseResponseModel,
TErrorModel extends BaseErrorResponseModel,
TDTO extends BaseStreamableDTO,
TStreamData,
TStreamDTO,
TViewModel extends BaseViewModel
>
extends Transform
implements
BaseStreamableInputPort<AuthenticatedRequestModel<TRequestModel>>
{
protected presenter: BaseStreamingOutputPort<TResponseModel, TErrorModel>
protected presenter: BaseStreamingOutputPort<TResponseModel, TErrorModel, TViewModel>
protected requestModel: AuthenticatedRequestModel<TRequestModel> | undefined

constructor(
presenter: BaseStreamingOutputPort<TResponseModel, TErrorModel>,
presenter: BaseStreamingOutputPort<TResponseModel, TErrorModel, TViewModel>,
) {
super({ objectMode: true })
this.presenter = presenter
Expand Down Expand Up @@ -255,7 +257,7 @@ export abstract class BaseStreamingUseCase<
* @param dto The streamed data from the gateway.
* @returns An object that represents the processed data.
*/
abstract processStreamedData(dto: TStreamData): {
abstract processStreamedData(dto: TStreamDTO): {
data: TResponseModel | TErrorModel
status: 'success' | 'error'
}
Expand Down Expand Up @@ -308,7 +310,7 @@ export abstract class BaseStreamingUseCase<
}

_transform(
dto: TStreamData,
dto: TStreamDTO,
encoding: BufferEncoding,
callback: TransformCallback,
): void {
Expand All @@ -332,20 +334,24 @@ export abstract class BaseStreamingUseCase<
* @typeparam TErrorModel The type of the error model for the use case.
* @typeparam TDTO The type of the data transfer object for the use case.
* @typeparam TStreamData The type of the streamed data for the use case.
* @typeparam TViewModel The type of the view model for the use case.
*/
export abstract class BaseMultiCallStreamableUseCase<
TRequestModel,
TResponseModel extends BaseResponseModel,
TErrorModel extends BaseErrorResponseModel,
TDTO extends BaseStreamableDTO,
TStreamData,
TStreamDTO extends BaseDTO,
TViewModel extends BaseViewModel
>
extends BaseStreamingUseCase<
TRequestModel,
TResponseModel,
TErrorModel,
TDTO,
TStreamData
TStreamDTO,
TViewModel
>
implements
BaseMultiCallStreamableInputPort<
Expand Down Expand Up @@ -377,8 +383,8 @@ export abstract class BaseMultiCallStreamableUseCase<
constructor(
presenter: BaseStreamingPresenter<
TResponseModel,
TStreamData,
TErrorModel
TErrorModel,
TViewModel
>,
postProcessingPipelineElements: BaseStreamingPostProcessingPipelineElement<
AuthenticatedRequestModel<TRequestModel>,
Expand Down Expand Up @@ -420,9 +426,10 @@ export abstract class BaseMultiCallStreamableUseCase<

/**
* Convert the chunk returned from the gateway's stream to a DTO that will be passed forward in the current pipeline.
* @param streamedChunk The chunk returned from the gateway's stream
* @param streamedData The chunk returned from the gateway's stream
* @param requestModel The request model that was used to make the gateway request.
*/
abstract chunkToDTO(streamedChunk: string): TStreamData
abstract streamDataToStreamDTO(streamedData: TStreamData, requestModel?: AuthenticatedRequestModel<TRequestModel>): TStreamDTO

/**
* Validates the final response model after execution of all post processing pipeline elements.
Expand Down Expand Up @@ -451,7 +458,7 @@ export abstract class BaseMultiCallStreamableUseCase<
encoding: BufferEncoding,
callback: TransformCallback,
): void {
const dto = this.chunkToDTO(chunk)
const dto = this.streamDataToStreamDTO(chunk, this.requestModel)
const { status, data } = this.processStreamedData(dto)
if (status === 'success') {
const responseModel = data as TResponseModel
Expand Down
11 changes: 10 additions & 1 deletion test/sdk/post-processing-streaming-pipeline/models.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import { BaseDTO } from "@/lib/sdk/dto"
import { AuthenticatedRequestModel, BaseResponseModel } from "@/lib/sdk/usecase-models"
import { BaseViewModel } from "@/lib/sdk/view-models"

export type RequestModel = AuthenticatedRequestModel<{}>
export interface TResponseModel extends BaseResponseModel {
message: string
}

export type StreamData = {
export type StreamData = string

export interface StreamDTO extends BaseDTO {
status: 'success' | 'error'
title: string
}

export interface ViewModel extends BaseViewModel {
title: string
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,26 @@ import {
import { BaseStreamingPostProcessingPipelineElement } from '@/lib/sdk/postprocessing-pipeline-elements'
import { Readable, Transform, PassThrough } from 'stream'
import { MockHttpStreamableResponseFactory } from 'test/fixtures/http-fixtures'
import { RequestModel, StreamData, TResponseModel } from './models'
import { RequestModel, StreamData, StreamDTO, TResponseModel } from './models'
import {
FirstPipelineElement,
SecondPipelineElement,
} from './pipeline-elements'
import { TestPresenter } from './presenter'
import { BaseViewModel } from '@/lib/sdk/view-models'

describe('BaseMultiCallStreamableUseCase', () => {
class TestMultiCallPipelineUseCase extends BaseMultiCallStreamableUseCase<
RequestModel,
TResponseModel,
BaseErrorResponseModel,
BaseStreamableDTO,
StreamData
StreamData,
StreamDTO,
BaseViewModel
> {


constructor(response: any) {
const firstPipelineElement = new FirstPipelineElement()
const secondPipelineElement = new SecondPipelineElement()
Expand All @@ -47,17 +52,19 @@ describe('BaseMultiCallStreamableUseCase', () => {
return Promise.resolve(dto)
}

streamDataToStreamDTO(streamedData: StreamData, requestModel?: { rucioAuthToken: string } | undefined): StreamDTO {
return {
status: 'success',
title: streamedData,
}
}

handleGatewayError(error: BaseStreamableDTO): BaseErrorResponseModel {
throw new Error('Method not implemented.')
}

chunkToDTO(streamedChunk: string): StreamData {
return {
title: streamedChunk,
} as StreamData
}

processStreamedData(dto: StreamData): {
processStreamedData(dto: StreamDTO): {
data: TResponseModel | BaseErrorResponseModel
status: 'success' | 'error'
} {
Expand Down Expand Up @@ -126,9 +133,11 @@ describe('BaseMultiCallStreamableUseCase', () => {
await done
expect(receivedData).toEqual([
{
status: 'success',
title: 'success: root_element_1 pipeline element 1 transformed pipeline element 2 transformed',
},
{
status: 'success',
title: 'success: root_element_2 pipeline element 1 transformed pipeline element 2 transformed',
},
])
Expand Down
12 changes: 7 additions & 5 deletions test/sdk/post-processing-streaming-pipeline/presenter.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
import { BaseStreamingPresenter } from "@/lib/sdk/presenter"
import { BaseErrorResponseModel, BaseResponseModel } from "@/lib/sdk/usecase-models"
import { StreamData, TResponseModel } from "./models"
import { StreamData, TResponseModel, ViewModel } from "./models"

export class TestPresenter extends BaseStreamingPresenter<
BaseResponseModel,
StreamData,
BaseErrorResponseModel
BaseErrorResponseModel,
ViewModel
> {
constructor(response: any) {
super(response)
}
convertErrorModelToViewModel(errorModel: BaseErrorResponseModel): {
status: number
viewModel: StreamData
viewModel: ViewModel
} {
return {
status: 200,
viewModel: {
status: 'error',
title: 'failed: ' + errorModel.message,
},
}
}
convertResponseModelToViewModel(
responseModel: TResponseModel,
): StreamData {
): ViewModel {
return {
status: 'success',
title: 'success: ' + responseModel.message,
}
}
Expand Down

0 comments on commit 2c1c30d

Please sign in to comment.