Skip to content

Commit

Permalink
feat: add open lineage support (#11987)
Browse files Browse the repository at this point in the history
- [ ] Regenerate this pull request now.

BEGIN_COMMIT_OVERRIDE
fix: change `start_time` in message
`.google.cloud.datacatalog.lineage.v1.LineageEvent` to `required` as
intended by api
feat: add open lineage support
END_COMMIT_OVERRIDE

PiperOrigin-RevId: 579762272

Source-Link:
googleapis/googleapis@58878bd

Source-Link:
googleapis/googleapis-gen@96a4d73
Copy-Tag:
eyJwIjoicGFja2FnZXMvZ29vZ2xlLWNsb3VkLWRhdGFjYXRhbG9nLWxpbmVhZ2UvLk93bEJvdC55YW1sIiwiaCI6Ijk2YTRkNzM1YjkyZjIwMmZiZGQyYzYyZGEyZTIxZTYzMjE2YmZhODgifQ==

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
gcf-owl-bot[bot] and gcf-owl-bot[bot] authored Nov 7, 2023
1 parent bf5ee9f commit 079b58a
Show file tree
Hide file tree
Showing 19 changed files with 1,757 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
Process,
ProcessLinkInfo,
ProcessLinks,
ProcessOpenLineageRunEventRequest,
ProcessOpenLineageRunEventResponse,
Run,
SearchLinksRequest,
SearchLinksResponse,
Expand Down Expand Up @@ -85,6 +87,8 @@
"Process",
"ProcessLinkInfo",
"ProcessLinks",
"ProcessOpenLineageRunEventRequest",
"ProcessOpenLineageRunEventResponse",
"Run",
"SearchLinksRequest",
"SearchLinksResponse",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__version__ = "0.3.0" # {x-release-please-version}
__version__ = "0.0.0" # {x-release-please-version}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
Process,
ProcessLinkInfo,
ProcessLinks,
ProcessOpenLineageRunEventRequest,
ProcessOpenLineageRunEventResponse,
Run,
SearchLinksRequest,
SearchLinksResponse,
Expand Down Expand Up @@ -82,6 +84,8 @@
"Process",
"ProcessLinkInfo",
"ProcessLinks",
"ProcessOpenLineageRunEventRequest",
"ProcessOpenLineageRunEventResponse",
"Run",
"SearchLinksRequest",
"SearchLinksResponse",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
"list_runs"
]
},
"ProcessOpenLineageRunEvent": {
"methods": [
"process_open_lineage_run_event"
]
},
"SearchLinks": {
"methods": [
"search_links"
Expand Down Expand Up @@ -160,6 +165,11 @@
"list_runs"
]
},
"ProcessOpenLineageRunEvent": {
"methods": [
"process_open_lineage_run_event"
]
},
"SearchLinks": {
"methods": [
"search_links"
Expand Down Expand Up @@ -245,6 +255,11 @@
"list_runs"
]
},
"ProcessOpenLineageRunEvent": {
"methods": [
"process_open_lineage_run_event"
]
},
"SearchLinks": {
"methods": [
"search_links"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__version__ = "0.3.0" # {x-release-please-version}
__version__ = "0.0.0" # {x-release-please-version}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from google.longrunning import operations_pb2 # type: ignore
from google.protobuf import empty_pb2 # type: ignore
from google.protobuf import field_mask_pb2 # type: ignore
from google.protobuf import struct_pb2 # type: ignore
from google.protobuf import timestamp_pb2 # type: ignore

from google.cloud.datacatalog_lineage_v1.services.lineage import pagers
Expand Down Expand Up @@ -222,6 +223,127 @@ def __init__(
client_info=client_info,
)

async def process_open_lineage_run_event(
self,
request: Optional[
Union[lineage.ProcessOpenLineageRunEventRequest, dict]
] = None,
*,
parent: Optional[str] = None,
open_lineage: Optional[struct_pb2.Struct] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> lineage.ProcessOpenLineageRunEventResponse:
r"""Creates new lineage events together with their
parents: process and run. Updates the process and run if
they already exist. Mapped from Open Lineage
specification:
https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json.
.. code-block:: python
# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import datacatalog_lineage_v1
async def sample_process_open_lineage_run_event():
# Create a client
client = datacatalog_lineage_v1.LineageAsyncClient()
# Initialize request argument(s)
request = datacatalog_lineage_v1.ProcessOpenLineageRunEventRequest(
parent="parent_value",
)
# Make the request
response = await client.process_open_lineage_run_event(request=request)
# Handle the response
print(response)
Args:
request (Optional[Union[google.cloud.datacatalog_lineage_v1.types.ProcessOpenLineageRunEventRequest, dict]]):
The request object. Request message for
[ProcessOpenLineageRunEvent][google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEvent].
parent (:class:`str`):
Required. The name of the project and
its location that should own the
process, run, and lineage event.
This corresponds to the ``parent`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
open_lineage (:class:`google.protobuf.struct_pb2.Struct`):
Required. OpenLineage message
following OpenLineage format:
https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json
This corresponds to the ``open_lineage`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
google.cloud.datacatalog_lineage_v1.types.ProcessOpenLineageRunEventResponse:
Response message for
[ProcessOpenLineageRunEvent][google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEvent].
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([parent, open_lineage])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

request = lineage.ProcessOpenLineageRunEventRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
if parent is not None:
request.parent = parent
if open_lineage is not None:
request.open_lineage = open_lineage

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.process_open_lineage_run_event,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
)

# Send the request.
response = await rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

async def create_process(
self,
request: Optional[Union[lineage.CreateProcessRequest, dict]] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from google.longrunning import operations_pb2 # type: ignore
from google.protobuf import empty_pb2 # type: ignore
from google.protobuf import field_mask_pb2 # type: ignore
from google.protobuf import struct_pb2 # type: ignore
from google.protobuf import timestamp_pb2 # type: ignore

from google.cloud.datacatalog_lineage_v1.services.lineage import pagers
Expand Down Expand Up @@ -503,6 +504,129 @@ def __init__(
api_audience=client_options.api_audience,
)

def process_open_lineage_run_event(
self,
request: Optional[
Union[lineage.ProcessOpenLineageRunEventRequest, dict]
] = None,
*,
parent: Optional[str] = None,
open_lineage: Optional[struct_pb2.Struct] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> lineage.ProcessOpenLineageRunEventResponse:
r"""Creates new lineage events together with their
parents: process and run. Updates the process and run if
they already exist. Mapped from Open Lineage
specification:
https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json.
.. code-block:: python
# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import datacatalog_lineage_v1
def sample_process_open_lineage_run_event():
# Create a client
client = datacatalog_lineage_v1.LineageClient()
# Initialize request argument(s)
request = datacatalog_lineage_v1.ProcessOpenLineageRunEventRequest(
parent="parent_value",
)
# Make the request
response = client.process_open_lineage_run_event(request=request)
# Handle the response
print(response)
Args:
request (Union[google.cloud.datacatalog_lineage_v1.types.ProcessOpenLineageRunEventRequest, dict]):
The request object. Request message for
[ProcessOpenLineageRunEvent][google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEvent].
parent (str):
Required. The name of the project and
its location that should own the
process, run, and lineage event.
This corresponds to the ``parent`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
open_lineage (google.protobuf.struct_pb2.Struct):
Required. OpenLineage message
following OpenLineage format:
https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json
This corresponds to the ``open_lineage`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
google.cloud.datacatalog_lineage_v1.types.ProcessOpenLineageRunEventResponse:
Response message for
[ProcessOpenLineageRunEvent][google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEvent].
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([parent, open_lineage])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a lineage.ProcessOpenLineageRunEventRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
if not isinstance(request, lineage.ProcessOpenLineageRunEventRequest):
request = lineage.ProcessOpenLineageRunEventRequest(request)
# If we have keyword arguments corresponding to fields on the
# request, apply these.
if parent is not None:
request.parent = parent
if open_lineage is not None:
request.open_lineage = open_lineage

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[
self._transport.process_open_lineage_run_event
]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
)

# Send the request.
response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

def create_process(
self,
request: Optional[Union[lineage.CreateProcessRequest, dict]] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ def __init__(
def _prep_wrapped_messages(self, client_info):
# Precompute the wrapped methods.
self._wrapped_methods = {
self.process_open_lineage_run_event: gapic_v1.method.wrap_method(
self.process_open_lineage_run_event,
default_timeout=None,
client_info=client_info,
),
self.create_process: gapic_v1.method.wrap_method(
self.create_process,
default_timeout=None,
Expand Down Expand Up @@ -220,6 +225,18 @@ def operations_client(self):
"""Return the client designed to process long-running operations."""
raise NotImplementedError()

@property
def process_open_lineage_run_event(
self,
) -> Callable[
[lineage.ProcessOpenLineageRunEventRequest],
Union[
lineage.ProcessOpenLineageRunEventResponse,
Awaitable[lineage.ProcessOpenLineageRunEventResponse],
],
]:
raise NotImplementedError()

@property
def create_process(
self,
Expand Down
Loading

0 comments on commit 079b58a

Please sign in to comment.