Skip to content

Commit

Permalink
Merge pull request #15 from erikjohnston/rav/query_internal_metadata
Browse files Browse the repository at this point in the history
Expose `rejection_reason` and `internal_metadata`
  • Loading branch information
erikjohnston authored Feb 2, 2022
2 parents 56a3dfe + bca5bcc commit fb13695
Showing 1 changed file with 42 additions and 34 deletions.
76 changes: 42 additions & 34 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::fs::File;
use std::io;

mod database;
use database::{DatabaseConnection, DatabaseConnector, PostgresConnectionExt};
use database::{DatabaseConnection, DatabaseConnector, DbRow};



Expand All @@ -38,9 +38,11 @@ struct RoomRow {
sender: String,
state_group: Option<i64>,
json: serde_json::Value,
internal_metadata: serde_json::Value,
ts: i64,
edges: Vec<String>,
stream_ordering: i32,
rejection_reason: Option<String>,
}

#[derive(Serialize)]
Expand Down Expand Up @@ -94,11 +96,12 @@ impl RouteHandler for RoomHandler {
let mut conn = self.connector.connect();

let events = match conn {
DatabaseConnection::Postgres(ref mut pg_conn) => {
pg_conn.query_rows(
DatabaseConnection::Postgres(_) => {
conn.query(
r#"
SELECT event_id, events.type, state_key, depth, sender, state_group,
json, origin_server_ts, stream_ordering,
SELECT event_id, events.type, state_events.state_key, depth, sender, state_group,
json, internal_metadata, origin_server_ts, stream_ordering,
rejections.reason,
array(
SELECT prev_event_id FROM event_edges
WHERE is_state = false and event_id = events.event_id
Expand All @@ -107,53 +110,32 @@ impl RouteHandler for RoomHandler {
INNER JOIN event_json USING (event_id)
LEFT JOIN state_events USING (event_id)
LEFT JOIN event_to_state_groups USING (event_id)
LEFT JOIN rejections USING (event_id)
WHERE events.room_id = $1 AND stream_ordering <= $2::bigint
ORDER BY stream_ordering DESC
LIMIT $3::int
"#,
&[&room_id, &max_stream, &page_size],
|row| RoomRow {
event_id: row.get(0),
etype: row.get(1),
state_key: row.get(2),
depth: row.get(3),
sender: row.get(4),
state_group: row.get(5),
json: serde_json::from_str(&row.get::<_, String>(6))
.expect("json was not json"),
ts: row.get(7),
stream_ordering: row.get(8),
edges: row.get(9),
}
|row| parse_event_row(row),
).expect("room sql query failed")
}
DatabaseConnection::Sqlite(_) => {
let mut events = conn.query(
r#"
SELECT event_id, events.type, state_key, depth, sender, state_group, json, origin_server_ts,
stream_ordering
SELECT event_id, events.type, state_events.state_key, depth, sender, state_group,
json, internal_metadata, origin_server_ts, stream_ordering,
rejections.reason
FROM events
JOIN event_json USING (event_id)
LEFT JOIN state_events USING (event_id)
LEFT JOIN event_to_state_groups USING (event_id)
LEFT JOIN rejections USING (event_id)
WHERE events.room_id = $1 AND stream_ordering <= $2::bigint
ORDER BY stream_ordering DESC
LIMIT $3::int
"#,
&[&room_id, &max_stream, &page_size],
|row| RoomRow {
event_id: row.get(0),
etype: row.get(1),
state_key: row.get(2),
depth: row.get(3),
sender: row.get(4),
state_group: row.get(5),
json: serde_json::from_str(&row.get::<String>(6))
.expect("json was not json"),
ts: row.get(7),
stream_ordering: row.get(8),
edges: Vec::new(),
}
|row| parse_event_row(row),
).expect("room sql query failed");

for event in &mut events {
Expand Down Expand Up @@ -195,7 +177,7 @@ impl RouteHandler for StateHandler {
SELECT prev_state_group FROM state_group_edges e, state s
WHERE s.state_group = e.state_group
)
SELECT event_id, es.type, state_key, ej.json, e.depth
SELECT event_id, es.type, es.state_key, ej.json, e.depth
FROM state_groups_state
NATURAL JOIN (
SELECT type, state_key, max(state_group) as state_group FROM state_groups_state
Expand All @@ -221,6 +203,32 @@ impl RouteHandler for StateHandler {
}
}

fn parse_event_row(row: &mut DbRow<'_>) -> RoomRow {
// the postgres variant returns the event edges as an array
let edges = match row {
DbRow::Postgres(ref mut pgrow) => pgrow.get(11),
_ => Vec::new(),
};

return RoomRow {
event_id: row.get(0),
etype: row.get(1),
state_key: row.get(2),
depth: row.get(3),
sender: row.get(4),
state_group: row.get(5),
json: serde_json::from_str(&row.get::<String>(6))
.expect("json was not json"),
internal_metadata: serde_json::from_str(&row.get::<String>(7))
.expect("internal_metadata was not json"),
ts: row.get(8),
stream_ordering: row.get(9),
rejection_reason: row.get(10),
edges: edges,
};
}


fn content_from_json(s: String) -> serde_json::Value {
let json: serde_json::Value = serde_json::from_str(&s).expect("content was not json");
return json["content"].clone()
Expand Down

0 comments on commit fb13695

Please sign in to comment.