Browse Source

Relax checks and error handling on syncing

Syncing was fully discarded if it found any malformed event.
With this change instead it just skips those without hiding
the rooms.
fix-filtering
Alejandro Domínguez 5 years ago
parent
commit
6c1be7dda5
  1. 105
      fractal-gtk/src/appop/sync.rs
  2. 97
      fractal-gtk/src/backend/sync.rs
  3. 336
      fractal-gtk/src/model/room.rs

105
fractal-gtk/src/appop/sync.rs

@ -37,23 +37,20 @@ impl AppOp {
.await;
match query {
Ok(SyncRet::NoSince { rooms, next_batch }) => {
match rooms {
Ok((rooms, default)) => {
let clear_room_list = true;
APPOP!(set_rooms, (rooms, clear_room_list));
// Open the newly joined room
let jtr = default.as_ref().map(|r| r.id.clone());
APPOP!(set_join_to_room, (jtr));
if let Some(room) = default {
let room_id = room.id;
APPOP!(set_active_room_by_id, (room_id));
}
}
Err(err) => {
err.handle_error();
}
};
Ok(SyncRet::NoSince {
rooms,
default,
next_batch,
}) => {
let clear_room_list = true;
APPOP!(set_rooms, (rooms, clear_room_list));
// Open the newly joined room
let jtr = default.as_ref().map(|r| r.id.clone());
APPOP!(set_join_to_room, (jtr));
if let Some(room) = default {
let room_id = room.id;
APPOP!(set_active_room_by_id, (room_id));
}
let s = Some(next_batch);
APPOP!(synced, (s));
@ -65,28 +62,17 @@ impl AppOp {
other,
next_batch,
}) => {
match update_rooms {
Ok(rooms) => {
let clear_room_list = false;
let msgs: Vec<_> =
rooms.iter().flat_map(|r| &r.messages).cloned().collect();
APPOP!(set_rooms, (rooms, clear_room_list));
APPOP!(show_room_messages, (msgs));
}
Err(err) => {
err.handle_error();
}
}
let clear_room_list = false;
let msgs: Vec<_> = update_rooms
.iter()
.flat_map(|r| &r.messages)
.cloned()
.collect();
APPOP!(set_rooms, (update_rooms, clear_room_list));
APPOP!(show_room_messages, (msgs));
match update_rooms_2 {
Ok(rooms) => {
let clear_room_list = false;
APPOP!(set_rooms, (rooms, clear_room_list));
}
Err(err) => {
err.handle_error();
}
}
let clear_room_list = false;
APPOP!(set_rooms, (update_rooms_2, clear_room_list));
for (room_id, unread_notifications) in room_notifications {
let r = room_id;
@ -101,32 +87,25 @@ impl AppOp {
APPOP!(set_room_notifications, (r, n, h));
}
match other {
Ok(other) => {
for room_element in other {
match room_element {
RoomElement::Name(room_id, name) => {
let n = Some(name);
APPOP!(room_name_change, (room_id, n));
}
RoomElement::Topic(room_id, topic) => {
let t = Some(topic);
APPOP!(room_topic_change, (room_id, t));
}
RoomElement::NewAvatar(room_id) => {
APPOP!(new_room_avatar, (room_id));
}
RoomElement::MemberEvent(event) => {
APPOP!(room_member_event, (event));
}
RoomElement::RemoveMessage(room_id, msg_id) => {
APPOP!(remove_message, (room_id, msg_id));
}
}
for room_element in other {
match room_element {
RoomElement::Name(room_id, name) => {
let n = Some(name);
APPOP!(room_name_change, (room_id, n));
}
RoomElement::Topic(room_id, topic) => {
let t = Some(topic);
APPOP!(room_topic_change, (room_id, t));
}
RoomElement::NewAvatar(room_id) => {
APPOP!(new_room_avatar, (room_id));
}
RoomElement::MemberEvent(event) => {
APPOP!(room_member_event, (event));
}
RoomElement::RemoveMessage(room_id, msg_id) => {
APPOP!(remove_message, (room_id, msg_id));
}
}
Err(err) => {
err.handle_error();
}
}

97
fractal-gtk/src/backend/sync.rs

@ -22,7 +22,7 @@ use fractal_api::SyncSettings;
use fractal_api::identifiers::{EventId, RoomId, UserId};
use fractal_api::Client as MatrixClient;
use fractal_api::Error as MatrixError;
use log::error;
use log::{error, warn};
use std::{collections::HashMap, time::Duration};
use super::{get_ruma_client_error, remove_matrix_access_token_if_present, HandleError};
@ -52,49 +52,17 @@ impl HandleError for SyncError {
}
}
#[derive(Debug)]
pub struct RoomsError(MatrixError);
impl<T: Into<MatrixError>> From<T> for RoomsError {
fn from(err: T) -> Self {
Self(err.into())
}
}
impl HandleError for RoomsError {}
#[derive(Debug)]
pub struct UpdateRoomsError(MatrixError);
impl<T: Into<MatrixError>> From<T> for UpdateRoomsError {
fn from(err: T) -> Self {
Self(err.into())
}
}
impl HandleError for UpdateRoomsError {}
#[derive(Debug)]
pub struct RoomElementError(MatrixError);
impl<T: Into<MatrixError>> From<T> for RoomElementError {
fn from(err: T) -> Self {
Self(err.into())
}
}
impl HandleError for RoomElementError {}
pub enum SyncRet {
NoSince {
rooms: Result<(Vec<Room>, Option<Room>), RoomsError>,
rooms: Vec<Room>,
default: Option<Room>,
next_batch: String,
},
WithSince {
update_rooms: Result<Vec<Room>, UpdateRoomsError>,
update_rooms: Vec<Room>,
room_notifications: HashMap<RoomId, UnreadNotificationsCount>,
update_rooms_2: Result<Vec<Room>, UpdateRoomsError>,
other: Result<Vec<RoomElement>, RoomElementError>,
update_rooms_2: Vec<Room>,
other: Vec<RoomElement>,
next_batch: String,
},
}
@ -148,23 +116,21 @@ pub async fn sync(
match session_client.sync_once(sync_settings).await {
Ok(response) => {
if since.is_none() {
let rooms = Room::from_sync_response(&response, user_id)
.map(|rooms| {
let def = join_to_room
.and_then(|jtr| rooms.iter().find(|x| x.id == jtr).cloned());
(rooms, def)
})
.map_err(Into::into);
let rooms = Room::from_sync_response(&response, user_id);
let default =
join_to_room.and_then(|jtr| rooms.iter().find(|x| x.id == jtr).cloned());
let next_batch = response.next_batch;
Ok(SyncRet::NoSince { rooms, next_batch })
Ok(SyncRet::NoSince {
rooms,
default,
next_batch,
})
} else {
let join = &response.rooms.join;
// New rooms
let update_rooms =
Room::from_sync_response(&response, user_id.clone()).map_err(Into::into);
let update_rooms = Room::from_sync_response(&response, user_id.clone());
// Room notifications
let room_notifications = join
@ -179,8 +145,10 @@ pub async fn sync(
let typing: Vec<Member> = room.ephemeral.events
.iter()
.map(|ev| ev.deserialize())
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.inspect(|result_ev| if let Err(err) = result_ev {
warn!("Bad event: {}", err);
})
.filter_map(Result::ok)
.filter_map(|event| match event.content() {
AnyEphemeralRoomEventContent::Typing(content) => {
Some(content.user_ids)
@ -197,10 +165,10 @@ pub async fn sync(
})
.collect();
Ok(Room {
Room {
typing_users: typing,
..Room::new(k.clone(), RoomMembership::Joined(RoomTag::None))
})
}
})
.collect();
@ -214,28 +182,29 @@ pub async fn sync(
.iter()
.map(move |ev| Ok((room_id.clone(), ev.deserialize()?)))
})
.filter_map(|rid_ev| {
let (room_id, event) = match rid_ev {
Ok(roomid_event) => roomid_event,
Err(err) => return Some(Err(err)),
};
.inspect(|result_ev: &Result<_, serde_json::Error>| {
if let Err(err) = result_ev {
warn!("Bad event: {}", err);
}
})
.filter_map(Result::ok)
.filter_map(|(room_id, event)| {
match event {
AnySyncRoomEvent::State(AnySyncStateEvent::RoomName(ev)) => {
let name = ev.content.name().map(Into::into).unwrap_or_default();
Some(Ok(RoomElement::Name(room_id, name)))
Some(RoomElement::Name(room_id, name))
}
AnySyncRoomEvent::State(AnySyncStateEvent::RoomTopic(ev)) => {
Some(Ok(RoomElement::Topic(room_id, ev.content.topic)))
Some(RoomElement::Topic(room_id, ev.content.topic))
}
AnySyncRoomEvent::State(AnySyncStateEvent::RoomAvatar(_)) => {
Some(Ok(RoomElement::NewAvatar(room_id)))
Some(RoomElement::NewAvatar(room_id))
}
AnySyncRoomEvent::State(AnySyncStateEvent::RoomMember(ev)) => {
Some(Ok(RoomElement::MemberEvent(ev.into_full_event(room_id))))
Some(RoomElement::MemberEvent(ev.into_full_event(room_id)))
}
AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomRedaction(ev)) => {
Some(Ok(RoomElement::RemoveMessage(room_id, ev.redacts)))
Some(RoomElement::RemoveMessage(room_id, ev.redacts))
}
AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomMessage(_)) => None,
AnySyncRoomEvent::Message(AnySyncMessageEvent::Sticker(_)) => {

336
fractal-gtk/src/model/room.rs

@ -1,6 +1,7 @@
use crate::model::member::Member;
use crate::model::member::MemberList;
use crate::model::message::Message;
use anyhow::anyhow;
use chrono::DateTime;
use chrono::Utc;
use either::Either;
@ -13,10 +14,10 @@ use fractal_api::events::{
};
use fractal_api::identifiers::{EventId, RoomAliasId, RoomId, UserId};
use fractal_api::url::{ParseError as UrlError, Url};
use fractal_api::Error as MatrixError;
use log::{debug, info};
use fractal_api::Raw;
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::{TryFrom, TryInto};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -85,6 +86,28 @@ pub enum RoomTag {
Custom(String),
}
#[derive(Deserialize, Serialize)]
#[serde(try_from = "&str")]
struct DirectType;
impl TryFrom<&str> for DirectType {
type Error = anyhow::Error;
fn try_from(event_type: &str) -> Result<Self, Self::Error> {
match event_type {
"m.direct" => Ok(Self),
_ => Err(anyhow!("not a m.direct event")),
}
}
}
#[derive(Deserialize, Serialize)]
struct CustomDirectEvent {
content: BTreeMap<String, Vec<RoomId>>,
#[serde(rename = "type")]
_type: DirectType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Room {
pub id: RoomId,
@ -136,25 +159,23 @@ impl Room {
}
}
pub fn from_sync_response(
response: &SyncResponse,
user_id: UserId,
) -> Result<Vec<Self>, MatrixError> {
pub fn from_sync_response(response: &SyncResponse, user_id: UserId) -> Vec<Self> {
// getting the list of direct rooms
let direct: HashSet<RoomId> = response.account_data.events
.iter()
.cloned()
// Synapse sometimes sends an object with the key "[object Object]"
// instead of a user ID, so we have to skip those invalid objects
// in the array in order to avoid discarding everything
.filter_map(|ev| ev.deserialize().ok())
.find_map(|event| match event {
AnyBasicEvent::Direct(ev) => Some(ev.content.0),
_ => None,
// instead of a user ID. Since we don't need the key, we use our own
// event type so it accepts that.
// There's always at most one object of this type.
.find_map(|ev| {
Raw::<CustomDirectEvent>::from_json(ev.into_json())
.deserialize()
.ok()
})
.into_iter()
.flatten()
.flat_map(|(_, rids)| rids)
.collect();
.map_or(Default::default(), |direct_event| {
direct_event.content.values().flatten().cloned().collect()
});
/*
response.rooms.join.iter().for_each(|(_, room)| {
@ -163,18 +184,30 @@ impl Room {
*/
let joined_rooms = response.rooms.join.iter().map(|(k, room)| {
let stevents = room
let stevents: Vec<_> = room
.state
.events
.iter()
.map(|ev| ev.deserialize())
.collect::<Result<Vec<_>, _>>()?;
let dataevs = room
.inspect(|result_ev| {
if let Err(err) = result_ev {
warn!("Bad event: {}", err);
}
})
.filter_map(Result::ok)
.collect();
let dataevs: Vec<_> = room
.account_data
.events
.iter()
.map(|ev| ev.deserialize())
.collect::<Result<Vec<_>, _>>()?;
.inspect(|result_ev| {
if let Err(err) = result_ev {
warn!("Bad event: {}", err);
}
})
.filter_map(Result::ok)
.collect();
let room_tag = dataevs
.iter()
.find_map(|event| match event {
@ -255,8 +288,13 @@ impl Room {
.events
.iter()
.map(|ev| Ok((k.clone(), ev.deserialize()?)))
.filter_map(|k_ev| k_ev.map(TryInto::try_into).map(Result::ok).transpose())
.collect::<Result<Vec<_>, MatrixError>>()?,
.inspect(|result_ev: &Result<_, serde_json::Error>| {
if let Err(err) = result_ev {
warn!("Bad event: {}", err);
}
})
.filter_map(|k_ev| k_ev.ok()?.try_into().ok())
.collect(),
admins: stevents
.iter()
.filter_map(|event| match event {
@ -295,13 +333,18 @@ impl Room {
.ephemeral
.events
.iter()
.find_map(|event| match event.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => Some(Ok(ev.content.0)),
Ok(_) => None,
Err(err) => Some(Err(err)),
.map(|ev| ev.deserialize())
.inspect(|result_ev| {
if let Err(err) = result_ev {
warn!("Bad event: {}", err);
}
})
.filter_map(Result::ok)
.filter_map(|event| match event {
AnySyncEphemeralRoomEvent::Receipt(ev) => Some(ev.content.0),
_ => None,
})
.transpose()?
.into_iter()
.take(1)
.flatten()
.map(|(event_id, receipts)| {
let receipts = receipts
@ -346,12 +389,17 @@ impl Room {
.ephemeral
.events
.iter()
.find_map(|event| match event.deserialize() {
Ok(AnySyncEphemeralRoomEvent::FullyRead(ev)) => Some(Ok(ev.content.event_id)),
Ok(_) => None,
Err(err) => Some(Err(err)),
.map(|ev| ev.deserialize())
.inspect(|result_ev| {
if let Err(err) = result_ev {
warn!("Bad event: {}", err);
}
})
.filter_map(Result::ok)
.find_map(|event| match event {
AnySyncEphemeralRoomEvent::FullyRead(ev) => Some(ev.content.event_id),
_ => None,
})
.transpose()?
{
let event_id = Some(event_id);
@ -363,119 +411,123 @@ impl Room {
});
}
Ok(r)
});
let left_rooms = response.rooms.leave.iter().map(|(k, room)| {
// TODO: The spec doesn't explain where to get the reason
// for the kicking from, so matrix-sdk doesn't support
// that.
if let Some(last_event) = room
.timeline
.events
.last()
.map(|ev| serde_json::to_value(ev.json()))
.transpose()?
{
if let Some(kicker) =
UserId::try_from(last_event["sender"].as_str().unwrap_or_default())
.ok()
.filter(|leave_uid| *leave_uid != user_id)
{
let kick_reason = &last_event["content"]["reason"];
let reason = Reason::Kicked(
String::from(kick_reason.as_str().unwrap_or_default()),
kicker,
);
return Ok(Self::new(k.clone(), RoomMembership::Left(reason)));
}
};
Ok(Self::new(k.clone(), RoomMembership::Left(Reason::None)))
r
});
let invited_rooms = response
.rooms
.invite
.iter()
.map(|(k, room)| {
let stevents = room
.invite_state
.events
.iter()
.map(|ev| ev.deserialize())
.collect::<Result<Vec<_>, _>>()?;
let inv_sender = stevents
.iter()
.find_map(|event| match event {
AnyStrippedStateEvent::RoomMember(ev)
if ev.content.membership == MembershipState::Invite
&& ev.state_key == user_id =>
{
Some(ev)
let left_rooms =
response.rooms.leave.iter().map(|(k, room)| {
// TODO: The spec doesn't explain where to get the reason
// for the kicking from, so matrix-sdk doesn't support
// that.
if let Some(last_event) = room.timeline.events.last().and_then(|ev| {
match serde_json::to_value(ev.json()) {
Ok(event) => Some(event),
Err(err) => {
warn!("Bad event: {}", err);
None
}
_ => None,
})
.map(|ev| ev.sender.clone());
if let Some(inv_sender) = inv_sender {
Ok(Some(Self {
name: stevents
.iter()
.filter_map(|event| match event {
AnyStrippedStateEvent::RoomName(ev) => {
ev.content.name().filter(|name| !name.is_empty()).map(Err)
}
AnyStrippedStateEvent::RoomCanonicalAlias(ev) => ev
.content
.alias
.as_ref()
.map(|r_alias| Ok(r_alias.as_str())),
_ => None,
})
.try_fold(None, |_, alias_name| alias_name.map(Some))
.unwrap_or_else(Some)
.map(Into::into)
.or_else(|| {
let members: Vec<_> = stevents
.iter()
.filter_map(|event| member_from_stripped_event(event, &user_id))
.take(3)
.map(Into::into)
.collect();
room_name_from_members(&members)
}),
avatar: stevents
.iter()
.find_map(|event| match event {
AnyStrippedStateEvent::RoomAvatar(ev) => {
Some(ev.content.url.as_deref())
}
_ => None,
})
.flatten()
.map(Url::parse)
.and_then(Result::ok),
alias: stevents
.iter()
.find_map(|event| match event {
AnyStrippedStateEvent::RoomCanonicalAlias(ev) => {
Some(ev.content.alias.clone())
}
_ => None,
})
.flatten(),
topic: stevents.iter().find_map(|event| match event {
AnyStrippedStateEvent::RoomTopic(ev) => Some(ev.content.topic.clone()),
}
}) {
if let Some(kicker) =
UserId::try_from(last_event["sender"].as_str().unwrap_or_default())
.ok()
.filter(|leave_uid| *leave_uid != user_id)
{
let kick_reason = &last_event["content"]["reason"];
let reason = Reason::Kicked(
String::from(kick_reason.as_str().unwrap_or_default()),
kicker,
);
return Self::new(k.clone(), RoomMembership::Left(reason));
}
};
Self::new(k.clone(), RoomMembership::Left(Reason::None))
});
let invited_rooms = response.rooms.invite.iter().filter_map(|(k, room)| {
let stevents: Vec<_> = room
.invite_state
.events
.iter()
.map(|ev| ev.deserialize())
.inspect(|result_ev| {
if let Err(err) = result_ev {
warn!("Bad event: {}", err);
}
})
.filter_map(Result::ok)
.collect();
let inv_sender = stevents
.iter()
.find_map(|event| match event {
AnyStrippedStateEvent::RoomMember(ev)
if ev.content.membership == MembershipState::Invite
&& ev.state_key == user_id =>
{
Some(ev)
}
_ => None,
})
.map(|ev| ev.sender.clone());
if let Some(inv_sender) = inv_sender {
Some(Self {
name: stevents
.iter()
.filter_map(|event| match event {
AnyStrippedStateEvent::RoomName(ev) => {
ev.content.name().filter(|name| !name.is_empty()).map(Err)
}
AnyStrippedStateEvent::RoomCanonicalAlias(ev) => ev
.content
.alias
.as_ref()
.map(|r_alias| Ok(r_alias.as_str())),
_ => None,
})
.try_fold(None, |_, alias_name| alias_name.map(Some))
.unwrap_or_else(Some)
.map(Into::into)
.or_else(|| {
let members: Vec<_> = stevents
.iter()
.filter_map(|event| member_from_stripped_event(event, &user_id))
.take(3)
.map(Into::into)
.collect();
room_name_from_members(&members)
}),
direct: direct.contains(&k),
..Self::new(k.clone(), RoomMembership::Invited(inv_sender))
}))
} else {
Ok(None)
}
})
.filter_map(Result::transpose);
avatar: stevents
.iter()
.find_map(|event| match event {
AnyStrippedStateEvent::RoomAvatar(ev) => {
Some(ev.content.url.as_deref())
}
_ => None,
})
.flatten()
.map(Url::parse)
.and_then(Result::ok),
alias: stevents
.iter()
.find_map(|event| match event {
AnyStrippedStateEvent::RoomCanonicalAlias(ev) => {
Some(ev.content.alias.clone())
}
_ => None,
})
.flatten(),
topic: stevents.iter().find_map(|event| match event {
AnyStrippedStateEvent::RoomTopic(ev) => Some(ev.content.topic.clone()),
_ => None,
}),
direct: direct.contains(&k),
..Self::new(k.clone(), RoomMembership::Invited(inv_sender))
})
} else {
None
}
});
joined_rooms
.chain(left_rooms)

Loading…
Cancel
Save