Skip to content

Commit

Permalink
Update DFPRollingWindowStage to emit correct window (nv-morpheus#683)
Browse files Browse the repository at this point in the history
DFPRollingWindowStage was only emitting last batch once `min_history` was met. This PR updates the stage to emit all accumulated rows meeting configured window history requirements.

Fixes nv-morpheus#674

Authors:
  - Eli Fajardo (https://github.com/efajardo-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#683
  • Loading branch information
efajardo-nv authored and jjacobelli committed Mar 7, 2023
1 parent 56cb847 commit fe57121
Showing 1 changed file with 2 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,10 @@ def _build_window(self, message: DFPMessageMeta) -> MultiDFPMessage:
raise RuntimeError(("Overlapping rolling history detected. "
"Rolling history can only be used with non-overlapping batches"))

train_offset = train_df.index.get_loc(first_row_idx)

# Otherwise return a new message
return MultiDFPMessage(meta=DFPMessageMeta(df=train_df, user_id=user_id),
mess_offset=train_offset,
mess_count=found_count)
mess_offset=0,
mess_count=len(train_df))

def on_data(self, message: DFPMessageMeta):

Expand Down

0 comments on commit fe57121

Please sign in to comment.