diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts index c2453d64b933..2ea4204886c2 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts @@ -37,7 +37,7 @@ async function main() { const filtered = await lines .map((w) => ({ word: w })) .apply(beam.withRowCoder({ word: "str" })) - .asyncApply( + .applyAsync( sqlTransform( "SELECT word, count(*) as c from PCOLLECTION group by word" ) diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts index 686be3b05948..2841ec8f77d2 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts @@ -37,7 +37,7 @@ function wordCount(lines: beam.PCollection): beam.PCollection { async function main() { // python apache_beam/runners/portability/local_job_service_main.py --port 3333 await new PortableRunner("localhost:3333").run(async (root) => { - const lines = await root.asyncApply( + const lines = await root.applyAsync( textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt") ); diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts index db5592d0bb1b..fe57f1209693 100644 --- a/sdks/typescript/src/apache_beam/internal/pipeline.ts +++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts @@ -179,7 +179,7 @@ export class Pipeline { return this.postApplyTransform(transform, transformProto, result); } - async asyncApplyTransform< + async applyAsyncTransform< InputT extends pvalue.PValue, OutputT extends pvalue.PValue >(transform: AsyncPTransformClass, input: InputT) { @@ -190,7 +190,7 @@ export class Pipeline { let result: OutputT; try { this.transformStack.push(transformId); - result = await transform.asyncExpandInternal(input, this, transformProto); + result = await transform.expandInternalAsync(input, this, transformProto); } finally { this.transformStack.pop(); } diff --git a/sdks/typescript/src/apache_beam/io/avroio.ts b/sdks/typescript/src/apache_beam/io/avroio.ts index 225dea9d858a..b6ebf45cf285 100644 --- a/sdks/typescript/src/apache_beam/io/avroio.ts +++ b/sdks/typescript/src/apache_beam/io/avroio.ts @@ -44,7 +44,7 @@ export function writeToAvro(filePath: string, options: { schema: Schema }) { withCoderInternal(RowCoder.fromSchema(options.schema)) ); } - return pcoll.asyncApply( + return pcoll.applyAsync( schemaio, {}>( "writeToAvro", "beam:transform:org.apache.beam:schemaio_avro_write:v1", diff --git a/sdks/typescript/src/apache_beam/io/parquetio.ts b/sdks/typescript/src/apache_beam/io/parquetio.ts index cdf3b2b65e22..e7c7f7dc66e0 100644 --- a/sdks/typescript/src/apache_beam/io/parquetio.ts +++ b/sdks/typescript/src/apache_beam/io/parquetio.ts @@ -33,7 +33,7 @@ export function readFromParquet( } = {} ): (root: beam.Root) => Promise> { return async function readFromParquet(root: beam.Root) { - return root.asyncApply( + return root.applyAsync( pythonTransform("apache_beam.dataframe.io.ReadViaPandas", { path: filePattern, format: "parquet", @@ -57,7 +57,7 @@ export function writeToParquet( delete options.schema; } return { - filesWritten: await toWrite.asyncApply( + filesWritten: await toWrite.applyAsync( pythonTransform("apache_beam.dataframe.io.WriteViaPandas", { path: filePathPrefix, format: "parquet", diff --git a/sdks/typescript/src/apache_beam/io/pubsub.ts b/sdks/typescript/src/apache_beam/io/pubsub.ts index ead6c57e6f87..c5513fe41718 100644 --- a/sdks/typescript/src/apache_beam/io/pubsub.ts +++ b/sdks/typescript/src/apache_beam/io/pubsub.ts @@ -96,7 +96,7 @@ export function readFromPubSubWithAttributes( > { return async function readFromPubSubWithAttributes(root: beam.Root) { return ( - await root.asyncApply(readFromPubSubWithAttributesRaw(options)) + await root.applyAsync(readFromPubSubWithAttributesRaw(options)) ).map((encoded) => PubSub.protos.google.pubsub.v1.PubsubMessage.decode(encoded) ); @@ -126,7 +126,7 @@ export function writeToPubSub(topic: string, options: WriteOptions = {}) { PubSub.protos.google.pubsub.v1.PubsubMessage.encode({ data }).finish() ) .apply(internal.withCoderInternal(new BytesCoder())) - .asyncApply(writeToPubSubRaw(topic, options)); + .applyAsync(writeToPubSubRaw(topic, options)); }; } diff --git a/sdks/typescript/src/apache_beam/io/textio.ts b/sdks/typescript/src/apache_beam/io/textio.ts index 67a83727947b..7dedbcf26989 100644 --- a/sdks/typescript/src/apache_beam/io/textio.ts +++ b/sdks/typescript/src/apache_beam/io/textio.ts @@ -30,7 +30,7 @@ export function readFromText( filePattern: string ): beam.AsyncPTransform> { return async function readFromText(root: beam.Root) { - return root.asyncApply( + return root.applyAsync( pythonTransform>( "apache_beam.io.ReadFromText", { @@ -59,7 +59,7 @@ export function writeToText( filesWritten: await pcoll .map((e) => (typeof e == "string" ? e : "" + e)) .apply(withCoderInternal(new StrUtf8Coder())) - .asyncApply( + .applyAsync( pythonTransform("apache_beam.io.WriteToText", { file_path_prefix: filePathPrefix, ...camelToSnakeOptions(options), @@ -74,7 +74,7 @@ export function readFromCsv( options: {} = {} ): (root: beam.Root) => Promise> { return async function readFromCsv(root: beam.Root) { - return root.asyncApply( + return root.applyAsync( pythonTransform("apache_beam.dataframe.io.ReadViaPandas", { path: filePattern, format: "csv", @@ -96,7 +96,7 @@ export function writeToCsv( toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema))); } return { - filesWritten: await toWrite.asyncApply( + filesWritten: await toWrite.applyAsync( pythonTransform("apache_beam.dataframe.io.WriteViaPandas", { path: filePathPrefix, format: "csv", @@ -113,7 +113,7 @@ export function readFromJson( options: {} = {} ): (root: beam.Root) => Promise> { return async function readFromJson(root: beam.Root) { - return root.asyncApply( + return root.applyAsync( pythonTransform("apache_beam.dataframe.io.ReadViaPandas", { path: filePattern, format: "json", @@ -137,7 +137,7 @@ export function writeToJson( toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema))); } return { - filesWritten: await toWrite.asyncApply( + filesWritten: await toWrite.applyAsync( pythonTransform("apache_beam.dataframe.io.WriteViaPandas", { path: filePathPrefix, format: "json", diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts index 3185c4771b7a..7275095cc472 100644 --- a/sdks/typescript/src/apache_beam/pvalue.ts +++ b/sdks/typescript/src/apache_beam/pvalue.ts @@ -47,13 +47,13 @@ export class Root { return this.pipeline.applyTransform(transform, this); } - async asyncApply>( + async applyAsync>( transform: AsyncPTransform ) { if (!(transform instanceof AsyncPTransformClass)) { transform = new AsyncPTransformClassFromCallable(transform); } - return await this.pipeline.asyncApplyTransform(transform, this); + return await this.pipeline.applyAsyncTransform(transform, this); } } @@ -91,13 +91,13 @@ export class PCollection { return this.pipeline.applyTransform(transform, this); } - asyncApply>( + applyAsync>( transform: AsyncPTransform, OutputT> ) { if (!(transform instanceof AsyncPTransformClass)) { transform = new AsyncPTransformClassFromCallable(transform); } - return this.pipeline.asyncApplyTransform(transform, this); + return this.pipeline.applyAsyncTransform(transform, this); } map( @@ -228,14 +228,14 @@ class PValueWrapper> { return this.pipeline(root).applyTransform(transform, this.pvalue); } - async asyncApply>( + async applyAsync>( transform: AsyncPTransform, root: Root | null = null ) { if (!(transform instanceof AsyncPTransformClass)) { transform = new AsyncPTransformClassFromCallable(transform); } - return await this.pipeline(root).asyncApplyTransform( + return await this.pipeline(root).applyAsyncTransform( transform, this.pvalue ); @@ -302,7 +302,7 @@ class AsyncPTransformClassFromCallable< this.expander = expander; } - async asyncExpandInternal( + async expandInternalAsync( input: InputT, pipeline: Pipeline, transformProto: runnerApi.PTransform diff --git a/sdks/typescript/src/apache_beam/transforms/external.ts b/sdks/typescript/src/apache_beam/transforms/external.ts index af6d27e66234..2f66cca9d2d9 100644 --- a/sdks/typescript/src/apache_beam/transforms/external.ts +++ b/sdks/typescript/src/apache_beam/transforms/external.ts @@ -101,7 +101,7 @@ class RawExternalTransform< } } - async asyncExpandInternal( + async expandInternalAsync( input: InputT, pipeline: Pipeline, transformProto: runnerApi.PTransform diff --git a/sdks/typescript/src/apache_beam/transforms/sql.ts b/sdks/typescript/src/apache_beam/transforms/sql.ts index 9170683c1cd8..9f0e117f2f81 100644 --- a/sdks/typescript/src/apache_beam/transforms/sql.ts +++ b/sdks/typescript/src/apache_beam/transforms/sql.ts @@ -84,7 +84,7 @@ export function sqlTransform< ) as InputT; } - return await P(input).asyncApply( + return await P(input).applyAsync( external.rawExternalTransform( "beam:external:java:sql:v1", { query: query }, diff --git a/sdks/typescript/src/apache_beam/transforms/transform.ts b/sdks/typescript/src/apache_beam/transforms/transform.ts index fe84009987b0..da22cb70dcd8 100644 --- a/sdks/typescript/src/apache_beam/transforms/transform.ts +++ b/sdks/typescript/src/apache_beam/transforms/transform.ts @@ -73,16 +73,16 @@ export class AsyncPTransformClass< this.beamName = name || this.constructor.name; } - async asyncExpand(input: InputT): Promise { + async expandAsync(input: InputT): Promise { throw new Error("Method expand has not been implemented."); } - async asyncExpandInternal( + async expandInternalAsync( input: InputT, pipeline: Pipeline, transformProto: runnerApi.PTransform ): Promise { - return this.asyncExpand(input); + return this.expandAsync(input); } } @@ -94,7 +94,7 @@ export class PTransformClass< throw new Error("Method expand has not been implemented."); } - async asyncExpand(input: InputT): Promise { + async expandAsync(input: InputT): Promise { return this.expand(input); } @@ -106,7 +106,7 @@ export class PTransformClass< return this.expand(input); } - async asyncExpandInternal( + async expandInternalAsync( input: InputT, pipeline: Pipeline, transformProto: runnerApi.PTransform diff --git a/sdks/typescript/test/docs/programming_guide.ts b/sdks/typescript/test/docs/programming_guide.ts index c6cb3131f484..fc038b47982e 100644 --- a/sdks/typescript/test/docs/programming_guide.ts +++ b/sdks/typescript/test/docs/programming_guide.ts @@ -60,7 +60,7 @@ describe("Programming Guide Tested Samples", function () { // [START pipelines_constructing_reading] async function pipeline(root: beam.Root) { // Note that textio.ReadFromText is an AsyncPTransform. - const pcoll: PCollection = await root.asyncApply( + const pcoll: PCollection = await root.applyAsync( textio.ReadFromText("path/to/text_pattern") ); } diff --git a/sdks/typescript/test/io_test.ts b/sdks/typescript/test/io_test.ts index 6947a2d65b7e..3d4a2e03085d 100644 --- a/sdks/typescript/test/io_test.ts +++ b/sdks/typescript/test/io_test.ts @@ -71,12 +71,12 @@ xdescribe("IO Tests", function () { await createRunner().run(async (root) => { await root // .apply(beam.create(lines)) - .asyncApply(textio.writeToText(path.join(tempDir, "out.txt"))); + .applyAsync(textio.writeToText(path.join(tempDir, "out.txt"))); }); await createRunner().run(async (root) => { ( - await root.asyncApply( + await root.applyAsync( textio.readFromText(path.join(tempDir, "out.txt*")) ) ).apply(testing.assertDeepEqual(lines)); @@ -90,13 +90,13 @@ xdescribe("IO Tests", function () { await root // .apply(beam.create(elements)) .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0]))) - .asyncApply(textio.writeToCsv(path.join(tempDir, "out.csv"))); + .applyAsync(textio.writeToCsv(path.join(tempDir, "out.csv"))); }); console.log(tempDir); await createRunner().run(async (root) => { ( - await root.asyncApply( + await root.applyAsync( textio.readFromCsv(path.join(tempDir, "out.csv*")) ) ).apply(testing.assertDeepEqual(elements)); @@ -110,12 +110,12 @@ xdescribe("IO Tests", function () { await root // .apply(beam.create(elements)) .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0]))) - .asyncApply(textio.writeToJson(path.join(tempDir, "out.json"))); + .applyAsync(textio.writeToJson(path.join(tempDir, "out.json"))); }); await createRunner().run(async (root) => { ( - await root.asyncApply( + await root.applyAsync( textio.readFromJson(path.join(tempDir, "out.json*")) ) ).apply(testing.assertDeepEqual(elements)); @@ -129,14 +129,14 @@ xdescribe("IO Tests", function () { await root // .apply(beam.create(elements)) .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0]))) - .asyncApply( + .applyAsync( parquetio.writeToParquet(path.join(tempDir, "out.parquet")) ); }); await createRunner().run(async (root) => { ( - await root.asyncApply( + await root.applyAsync( parquetio.readFromParquet(path.join(tempDir, "out.parquet*")) ) ).apply(testing.assertDeepEqual(elements)); @@ -144,7 +144,7 @@ xdescribe("IO Tests", function () { await createRunner().run(async (root) => { ( - await root.asyncApply( + await root.applyAsync( parquetio.readFromParquet(path.join(tempDir, "out.parquet*"), { columns: ["label", "rank"], }) @@ -176,14 +176,14 @@ xdescribe("IO Tests", function () { await createRunner(options).run(async (root) => { await root // .apply(beam.create(elements)) - .asyncApply( + .applyAsync( avroio.writeToAvro(path_join(tempDir, "out.avro"), { schema }) ); }); await createRunner(options).run(async (root) => { ( - await root.asyncApply( + await root.applyAsync( avroio.readFromAvro(path_join(tempDir, "out.avro*"), { schema }) ) ).apply(testing.assertDeepEqual(elements)); @@ -239,19 +239,19 @@ xdescribe("IO Tests", function () { await root // .apply(beam.create(elements)) .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0]))) - .asyncApply( + .applyAsync( bigqueryio.writeToBigQuery(table, { createDisposition: "IfNeeded" }) ); }); await createRunner(options).run(async (root) => { - (await root.asyncApply(bigqueryio.readFromBigQuery({ table }))) // + (await root.applyAsync(bigqueryio.readFromBigQuery({ table }))) // .apply(testing.assertDeepEqual(elements)); }); await createRunner(options).run(async (root) => { ( - await root.asyncApply( + await root.applyAsync( bigqueryio.readFromBigQuery({ query: `SELECT label, rank FROM ${table}`, }) @@ -286,7 +286,7 @@ xdescribe("IO Tests", function () { try { pipelineHandle = await createRunner(options).runAsync(async (root) => { await ( - await root.asyncApply( + await root.applyAsync( pubsub.readFromPubSub({ subscription: readSubscription.name, }) @@ -296,7 +296,7 @@ xdescribe("IO Tests", function () { .map((msg) => msg.toUpperCase()) .map((msg) => new TextEncoder().encode(msg)) .apply(internal.withCoderInternal(new BytesCoder())) - .asyncApply(pubsub.writeToPubSub(writeTopic.name)); + .applyAsync(pubsub.writeToPubSub(writeTopic.name)); }); console.log("Pipeline started", pipelineHandle.jobId); diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index eeae46e06394..12098949f831 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -617,7 +617,7 @@ the transform itself as an argument, and the operation returns the output {{< highlight typescript >}} [Output PCollection] = [Input PCollection].apply([Transform]) -[Output PCollection] = await [Input PCollection].asyncApply([AsyncTransform]) +[Output PCollection] = await [Input PCollection].applyAsync([AsyncTransform]) {{< /highlight >}} {{< paragraph class="language-java language-py language-typescript" >}} @@ -735,7 +735,7 @@ One can apply transforms to these composite types by wrapping them with {{< paragraph class="language-typescript" >}} PTransforms come in two flavors, synchronous and asynchronous, depending on whether their *application** involves asynchronous invocations. -An `AsyncTransform` must be applied with `asyncApply` and returns a `Promise` +An `AsyncTransform` must be applied with `applyAsync` and returns a `Promise` which must be awaited before further pipeline construction. {{< /paragraph >}}