Skip to content

Commit

Permalink
feat: Add slice API for gRPC and pydeephaven (#6195)
Browse files Browse the repository at this point in the history
- In support of #6059
  • Loading branch information
wusteven815 authored Oct 23, 2024
1 parent f9a1217 commit 8c44147
Show file tree
Hide file tree
Showing 16 changed files with 1,688 additions and 1,090 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.deephaven.proto.backplane.grpc.SeekRowRequest;
import io.deephaven.proto.backplane.grpc.SelectDistinctRequest;
import io.deephaven.proto.backplane.grpc.SelectOrUpdateRequest;
import io.deephaven.proto.backplane.grpc.SliceRequest;
import io.deephaven.proto.backplane.grpc.SnapshotTableRequest;
import io.deephaven.proto.backplane.grpc.SnapshotWhenTableRequest;
import io.deephaven.proto.backplane.grpc.SortTableRequest;
Expand Down Expand Up @@ -534,6 +535,17 @@ void checkPermissionMetaTable(AuthContext authContext, MetaTableRequest request,
void checkPermissionComputeColumnStatistics(AuthContext authContext,
ColumnStatisticsRequest request, List<Table> sourceTables);

/**
* Authorize a request to Slice.
*
* @param authContext the authentication context of the request
* @param request the request to authorize
* @param sourceTables the operation's source tables
* @throws io.grpc.StatusRuntimeException if the user is not authorized to invoke Slice
*/
void checkPermissionSlice(AuthContext authContext, SliceRequest request,
List<Table> sourceTables);

/**
* A default implementation that funnels all requests to invoke {@code checkPermission}.
*/
Expand Down Expand Up @@ -759,6 +771,11 @@ public void checkPermissionComputeColumnStatistics(AuthContext authContext,
ColumnStatisticsRequest request, List<Table> sourceTables) {
checkPermission(authContext, sourceTables);
}

public void checkPermissionSlice(AuthContext authContext, SliceRequest request,
List<Table> sourceTables) {
checkPermission(authContext, sourceTables);
}
}

/**
Expand Down Expand Up @@ -1089,5 +1106,12 @@ public void checkPermissionComputeColumnStatistics(AuthContext authContext,
delegate.checkPermissionComputeColumnStatistics(authContext, request, sourceTables);
}
}

public void checkPermissionSlice(AuthContext authContext, SliceRequest request,
List<Table> sourceTables) {
if (delegate != null) {
delegate.checkPermissionSlice(authContext, request, sourceTables);
}
}
}
}
2,278 changes: 1,206 additions & 1,072 deletions go/internal/proto/table/table.pb.go

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions go/internal/proto/table/table_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public static Stream<TableReference> getSourceIds(Operation op) {
return Stream.of(op.getColumnStatistics().getSourceId());
case MULTI_JOIN:
return op.getMultiJoin().getMultiJoinInputsList().stream().map(MultiJoinInput::getSourceId);
case SLICE:
return Stream.of(op.getSlice().getSourceId());
case OP_NOT_SET:
throw new IllegalStateException("Operation id not set");
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ service TableService {
* primary use case for this is the Deephaven Web UI.
*/
rpc ComputeColumnStatistics(ColumnStatisticsRequest) returns (ExportedTableCreationResponse) {}

/**
* Returns a new table representing a sliced subset of the original table. The start position is inclusive
* and the end position is exclusive. If a negative value is given, then the position is counted from the end of
* the table.
*/
rpc Slice(SliceRequest) returns (ExportedTableCreationResponse) {}
}

message TableReference {
Expand Down Expand Up @@ -1282,6 +1289,13 @@ message ColumnStatisticsRequest {
optional int32 unique_value_limit = 4;
}

message SliceRequest {
Ticket result_id = 1;
TableReference source_id = 2;
sint64 first_position_inclusive = 3 [jstype=JS_STRING];
sint64 last_position_exclusive = 4 [jstype=JS_STRING];
}

message BatchTableRequest {
repeated Operation ops = 1;

Expand Down Expand Up @@ -1331,6 +1345,7 @@ message BatchTableRequest {
AjRajTablesRequest raj = 41;
ColumnStatisticsRequest column_statistics = 42;
MultiJoinTablesRequest multi_join = 43;
SliceRequest slice = 44;
}
}
}
42 changes: 41 additions & 1 deletion py/client/pydeephaven/_table_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pydeephaven._table_ops import UpdateOp, LazyUpdateOp, ViewOp, UpdateViewOp, SelectOp, DropColumnsOp, \
SelectDistinctOp, SortOp, UnstructuredFilterOp, HeadOp, TailOp, HeadByOp, TailByOp, UngroupOp, NaturalJoinOp, \
ExactJoinOp, CrossJoinOp, AjOp, RajOp, UpdateByOp, SnapshotTableOp, SnapshotWhenTableOp, WhereInTableOp, \
AggregateAllOp, AggregateOp, SortDirection
SliceOp, AggregateAllOp, AggregateOp, SortDirection
from pydeephaven._utils import to_list
from pydeephaven.agg import Aggregation, _AggregationColumns
from pydeephaven.dherror import DHError
Expand Down Expand Up @@ -716,3 +716,43 @@ def where_not_in(self, filter_table: Table, cols: Union[str, List[str]]) -> Unio
"""
table_op = WhereInTableOp(filter_table=filter_table, cols=to_list(cols), inverted=True)
return self.table_op_handler(table_op)

def slice(self, start: int, stop: int) -> Union[Table, Query]:
"""Extracts a subset of a table by row positions into a new Table.
If both the start and the stop are positive, then both are counted from the beginning of the table.
The start is inclusive, and the stop is exclusive. slice(0, N) is equivalent to :meth:`~Table.head` (N)
The start must be less than or equal to the stop.
If the start is positive and the stop is negative, then the start is counted from the beginning of the
table, inclusively. The stop is counted from the end of the table. For example, slice(1, -1) includes all
rows but the first and last. If the stop is before the start, the result is an empty table.
If the start is negative, and the stop is zero, then the start is counted from the end of the table,
and the end of the slice is the size of the table. slice(-N, 0) is equivalent to :meth:`~Table.tail` (N).
If the start is negative and the stop is negative, they are both counted from the end of the
table. For example, slice(-2, -1) returns the second to last row of the table.
Args:
start (int): the first row position to include in the result
stop (int): the last row position to include in the result
Returns:
a new Table
Raises:
DHError
Examples:
>>> table.slice(0, 5) # first 5 rows
>>> table.slice(-5, 0) # last 5 rows
>>> table.slice(2, 6) # rows from index 2 to 5
>>> table.slice(6, 2) # ERROR: cannot slice start after end
>>> table.slice(-6, -2) # rows from 6th last to 2nd last (exclusive)
>>> table.slice(-2, -6) # ERROR: cannot slice start after end
>>> table.slice(2, -3) # all rows except the first 2 and the last 3
>>> table.slice(-6, 8) # rows from 6th last to index 8 (exclusive)
"""
table_op = SliceOp(first_position_inclusive=start, last_position_exclusive=stop)
return self.table_op_handler(table_op)
19 changes: 19 additions & 0 deletions py/client/pydeephaven/_table_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,25 @@ def make_grpc_request_for_batch(self, result_id, source_id) -> Any:
where_in=self.make_grpc_request(result_id=result_id, source_id=source_id))


class SliceOp(TableOp):
def __init__(self, first_position_inclusive: int, last_position_exclusive: int):
self.first_position_inclusive = first_position_inclusive
self.last_position_exclusive = last_position_exclusive

@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Slice

def make_grpc_request(self, result_id, source_id) -> Any:
return table_pb2.SliceRequest(result_id=result_id, source_id=source_id,
first_position_inclusive=self.first_position_inclusive,
last_position_exclusive=self.last_position_exclusive)

def make_grpc_request_for_batch(self, result_id, source_id) -> Any:
return table_pb2.BatchTableRequest.Operation(
slice=self.make_grpc_request(result_id=result_id, source_id=source_id))


class MetaTableOp(TableOp):
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
Expand Down
40 changes: 23 additions & 17 deletions py/client/pydeephaven/proto/table_pb2.py

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions py/client/pydeephaven/proto/table_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ def __init__(self, channel):
request_serializer=deephaven_dot_proto_dot_table__pb2.ColumnStatisticsRequest.SerializeToString,
response_deserializer=deephaven_dot_proto_dot_table__pb2.ExportedTableCreationResponse.FromString,
_registered_method=True)
self.Slice = channel.unary_unary(
'/io.deephaven.proto.backplane.grpc.TableService/Slice',
request_serializer=deephaven_dot_proto_dot_table__pb2.SliceRequest.SerializeToString,
response_deserializer=deephaven_dot_proto_dot_table__pb2.ExportedTableCreationResponse.FromString,
_registered_method=True)


class TableServiceServicer(object):
Expand Down Expand Up @@ -655,6 +660,16 @@ def ComputeColumnStatistics(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Slice(self, request, context):
"""*
Returns a new table representing a sliced subset of the original table. The start position is inclusive
and the end position is exclusive. If a negative value is given, then the position is counted from the end of
the table.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_TableServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand Down Expand Up @@ -883,6 +898,11 @@ def add_TableServiceServicer_to_server(servicer, server):
request_deserializer=deephaven_dot_proto_dot_table__pb2.ColumnStatisticsRequest.FromString,
response_serializer=deephaven_dot_proto_dot_table__pb2.ExportedTableCreationResponse.SerializeToString,
),
'Slice': grpc.unary_unary_rpc_method_handler(
servicer.Slice,
request_deserializer=deephaven_dot_proto_dot_table__pb2.SliceRequest.FromString,
response_serializer=deephaven_dot_proto_dot_table__pb2.ExportedTableCreationResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'io.deephaven.proto.backplane.grpc.TableService', rpc_method_handlers)
Expand Down Expand Up @@ -2107,3 +2127,30 @@ def ComputeColumnStatistics(request,
timeout,
metadata,
_registered_method=True)

@staticmethod
def Slice(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/io.deephaven.proto.backplane.grpc.TableService/Slice',
deephaven_dot_proto_dot_table__pb2.SliceRequest.SerializeToString,
deephaven_dot_proto_dot_table__pb2.ExportedTableCreationResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
39 changes: 39 additions & 0 deletions py/client/pydeephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,45 @@ def where_not_in(self, filter_table: Table, cols: Union[str, List[str]]) -> Tabl
DHError
"""
return super(Table, self).where_not_in(filter_table, cols)

def slice(self, start: int, stop: int) -> Table:
"""Extracts a subset of a table by row positions into a new Table.
If both the start and the stop are positive, then both are counted from the beginning of the table.
The start is inclusive, and the stop is exclusive. slice(0, N) is equivalent to :meth:`~Table.head` (N)
The start must be less than or equal to the stop.
If the start is positive and the stop is negative, then the start is counted from the beginning of the
table, inclusively. The stop is counted from the end of the table. For example, slice(1, -1) includes all
rows but the first and last. If the stop is before the start, the result is an empty table.
If the start is negative, and the stop is zero, then the start is counted from the end of the table,
and the end of the slice is the size of the table. slice(-N, 0) is equivalent to :meth:`~Table.tail` (N).
If the start is negative and the stop is negative, they are both counted from the end of the
table. For example, slice(-2, -1) returns the second to last row of the table.
Args:
start (int): the first row position to include in the result
stop (int): the last row position to include in the result
Returns:
a new Table
Raises:
DHError
Examples:
>>> table.slice(0, 5) # first 5 rows
>>> table.slice(-5, 0) # last 5 rows
>>> table.slice(2, 6) # rows from index 2 to 5
>>> table.slice(6, 2) # ERROR: cannot slice start after end
>>> table.slice(-6, -2) # rows from 6th last to 2nd last (exclusive)
>>> table.slice(-2, -6) # ERROR: cannot slice start after end
>>> table.slice(2, -3) # all rows except the first 2 and the last 3
>>> table.slice(-6, 8) # rows from 6th last to index 8 (exclusive)
"""
return super(Table, self).slice(start, stop)


class InputTable(Table):
Expand Down
Loading

0 comments on commit 8c44147

Please sign in to comment.