Skip to content

Commit

Permalink
fix: input queue logic bugs, now allows for lockstep
Browse files Browse the repository at this point in the history
  • Loading branch information
gschup committed Jun 1, 2024
1 parent fd370c3 commit b13e7fc
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 63 deletions.
20 changes: 10 additions & 10 deletions src/input_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ impl<T: Config> InputQueue<T> {
/// Adds an input frame to the queue. Will consider the set frame delay.
pub(crate) fn add_input(&mut self, input: PlayerInput<T::Input>) -> Frame {
// Verify that inputs are passed in sequentially by the user, regardless of frame delay.
assert!(
self.last_added_frame == NULL_FRAME
|| input.frame + self.frame_delay as i32 == self.last_added_frame + 1
);
if self.last_added_frame != NULL_FRAME
&& input.frame + self.frame_delay as i32 != self.last_added_frame + 1
{
// drop the input if not given sequentially
return NULL_FRAME;
}

// Move the queue head to the correct point in preparation to input the frame into the queue.
let new_frame = self.advance_queue_head(input.frame);
Expand Down Expand Up @@ -267,22 +269,20 @@ mod input_queue_tests {
}

#[test]
#[should_panic]
fn test_add_input_wrong_frame() {
let mut queue = InputQueue::<TestConfig>::new();
let input = PlayerInput::new(0, TestInput { inp: 0 });
queue.add_input(input); // fine
assert_eq!(queue.add_input(input), 0); // fine
let input_wrong_frame = PlayerInput::new(3, TestInput { inp: 0 });
queue.add_input(input_wrong_frame); // not fine
assert_eq!(queue.add_input(input_wrong_frame), NULL_FRAME); // input dropped
}

#[test]
#[should_panic]
fn test_add_input_twice() {
let mut queue = InputQueue::<TestConfig>::new();
let input = PlayerInput::new(0, TestInput { inp: 0 });
queue.add_input(input); // fine
queue.add_input(input); // not fine
assert_eq!(queue.add_input(input), 0); // fine
assert_eq!(queue.add_input(input), NULL_FRAME); // input dropped
}

#[test]
Expand Down
66 changes: 35 additions & 31 deletions src/sessions/p2p_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ impl<T: Config> P2PSession<T> {
return Err(GgrsError::NotSynchronized);
}

// check if input for all local players is queued
for handle in self.player_reg.local_player_handles() {
if !self.local_inputs.contains_key(&handle) {
return Err(GgrsError::InvalidRequest {
info: "Missing local input while calling advance_frame().".to_owned(),
});
}
}

// This list of requests will be returned to the user
let mut requests = Vec::new();

Expand Down Expand Up @@ -330,51 +339,46 @@ impl<T: Config> P2PSession<T> {
*/

// register local inputs in the system and send them
let mut inputs_dropped = false;
for handle in self.player_reg.local_player_handles() {
match self.local_inputs.get_mut(&handle) {
Some(player_input) => {
// send the input into the sync layer
let actual_frame = self.sync_layer.add_local_input(handle, *player_input)?;
if actual_frame != NULL_FRAME {
// if not dropped, send the input to all other clients, but with the correct frame (influenced by input delay)
player_input.frame = actual_frame;
self.local_connect_status[handle].last_frame = actual_frame;
} else {
inputs_dropped = true;
}
}
None => {
return Err(GgrsError::InvalidRequest {
info: "Missing local input while calling advance_frame().".to_owned(),
});
}
// we have checked that these all exist
let player_input = self.local_inputs.get_mut(&handle).expect("Missing local input while calling advance_frame().");
// send the input into the sync layer
let actual_frame = self.sync_layer.add_local_input(handle, *player_input);
player_input.frame = actual_frame;
// if the input has not been dropped
if actual_frame != NULL_FRAME {
self.local_connect_status[handle].last_frame = actual_frame;
}
}

if !inputs_dropped {
// send the inputs to all clients
// if the local inputs have not been dropped by the sync layer, send to all remote clients
if !self.local_inputs.values().any(|&i| i.frame == NULL_FRAME) {
for endpoint in self.player_reg.remotes.values_mut() {
// send the input directly
endpoint.send_input(&self.local_inputs, &self.local_connect_status);
endpoint.send_all_messages(&mut self.socket);
}
}

// clear the local inputs after sending them
self.local_inputs.clear();

/*
* ADVANCE THE STATE
*/

// get correct inputs for the current frame
let inputs = self
.sync_layer
.synchronized_inputs(&self.local_connect_status);
// advance the frame count
self.sync_layer.advance_frame();
requests.push(GgrsRequest::AdvanceFrame { inputs });
let frames_ahead = self.sync_layer.current_frame() - self.sync_layer.last_confirmed_frame();
if self.sync_layer.current_frame() < self.max_prediction as i32
|| frames_ahead <= self.max_prediction as i32
{
// get correct inputs for the current frame
let inputs = self
.sync_layer
.synchronized_inputs(&self.local_connect_status);
// advance the frame count
self.sync_layer.advance_frame();
// clear the local inputs after advancing the frame to allow new inputs to be ingested
self.local_inputs.clear();
requests.push(GgrsRequest::AdvanceFrame { inputs });
} else {
println!("Prediction Threshold reached. Skipping on frame {}", self.sync_layer.current_frame());
}

Ok(requests)
}
Expand Down
2 changes: 1 addition & 1 deletion src/sessions/sync_test_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<T: Config> SyncTestSession<T> {
// pass all inputs into the sync layer
for (&handle, &input) in self.local_inputs.iter() {
// send the input into the sync layer
self.sync_layer.add_local_input(handle, input)?;
self.sync_layer.add_local_input(handle, input);
}
// clear local inputs after using them
self.local_inputs.clear();
Expand Down
26 changes: 5 additions & 21 deletions src/sync_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use bytemuck::Zeroable;
use parking_lot::Mutex;
use std::sync::Arc;

use crate::error::GgrsError;
use crate::frame_info::{GameState, PlayerInput};
use crate::input_queue::InputQueue;
use crate::network::messages::ConnectionStatus;
Expand Down Expand Up @@ -177,17 +176,10 @@ impl<T: Config> SyncLayer<T> {
&mut self,
player_handle: PlayerHandle,
input: PlayerInput<T::Input>,
) -> Result<Frame, GgrsError> {
let frames_ahead = self.current_frame - self.last_confirmed_frame;
if self.current_frame >= self.max_prediction as i32
&& frames_ahead > self.max_prediction as i32
{
return Err(GgrsError::PredictionThreshold);
}

) -> Frame {
// The input provided should match the current frame, we account for input delay later
assert_eq!(input.frame, self.current_frame);
Ok(self.input_queues[player_handle].add_input(input))
self.input_queues[player_handle].add_input(input)
}

/// Adds remote input to the corresponding input queue.
Expand Down Expand Up @@ -249,6 +241,9 @@ impl<T: Config> SyncLayer<T> {
frame = std::cmp::min(frame, self.last_saved_frame);
}

// never delete stuff ahead of the current frame
frame = std::cmp::min(frame, self.current_frame());

// if we set the last confirmed frame beyond the first incorrect frame, we discard inputs that we need later for adjusting the gamestate.
assert!(first_incorrect == NULL_FRAME || first_incorrect >= frame);

Expand Down Expand Up @@ -320,17 +315,6 @@ mod sync_layer_tests {
type Address = SocketAddr;
}

#[test]
#[should_panic]
fn test_reach_prediction_threshold() {
let mut sync_layer = SyncLayer::<TestConfig>::new(2, 8);
for i in 0..20 {
let game_input = PlayerInput::new(i, TestInput { inp: i as u8 });
sync_layer.add_local_input(0, game_input).unwrap(); // should crash at frame 7
sync_layer.advance_frame();
}
}

#[test]
fn test_different_delays() {
let mut sync_layer = SyncLayer::<TestConfig>::new(2, 8);
Expand Down

0 comments on commit b13e7fc

Please sign in to comment.