MatrixChat: Move the event indexing logic into separate modules.

This commit is contained in:
Damir Jelić 2019-11-12 13:29:07 +01:00
parent 2c5565e502
commit cfdcf45ac6
5 changed files with 499 additions and 356 deletions

View file

@ -31,6 +31,7 @@ import Analytics from "../../Analytics";
import { DecryptionFailureTracker } from "../../DecryptionFailureTracker";
import MatrixClientPeg from "../../MatrixClientPeg";
import PlatformPeg from "../../PlatformPeg";
import EventIndexPeg from "../../EventIndexPeg";
import SdkConfig from "../../SdkConfig";
import * as RoomListSorter from "../../RoomListSorter";
import dis from "../../dispatcher";
@ -1224,12 +1225,6 @@ export default createReactClass({
_onLoggedOut: async function() {
const platform = PlatformPeg.get();
if (platform.supportsEventIndexing()) {
console.log("Seshat: Deleting event index.");
this.crawlerRef.cancel();
await platform.deleteEventIndex();
}
this.notifyNewScreen('login');
this.setStateForNewView({
view: VIEWS.LOGIN,
@ -1270,8 +1265,6 @@ export default createReactClass({
// to do the first sync
this.firstSyncComplete = false;
this.firstSyncPromise = Promise.defer();
this.crawlerChekpoints = [];
this.liveEventsForIndex = new Set();
const cli = MatrixClientPeg.get();
const IncomingSasDialog = sdk.getComponent('views.dialogs.IncomingSasDialog');
@ -1284,7 +1277,10 @@ export default createReactClass({
cli.setCanResetTimelineCallback(async function(roomId) {
console.log("Request to reset timeline in room ", roomId, " viewing:", self.state.currentRoomId);
// TODO is there a better place to plug this in
await self.addCheckpointForLimitedRoom(roomId);
const eventIndex = EventIndexPeg.get();
if (eventIndex !== null) {
await eventIndex.addCheckpointForLimitedRoom(roomId);
}
if (roomId !== self.state.currentRoomId) {
// It is safe to remove events from rooms we are not viewing.
@ -1301,80 +1297,21 @@ export default createReactClass({
});
cli.on('sync', async (state, prevState, data) => {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
const eventIndex = EventIndexPeg.get();
if (eventIndex === null) return;
await eventIndex.onSync(state, prevState, data);
});
if (prevState === null && state === "PREPARED") {
/// Load our stored checkpoints, if any.
self.crawlerChekpoints = await platform.loadCheckpoints();
console.log("Seshat: Loaded checkpoints",
self.crawlerChekpoints);
return;
}
cli.on("Room.timeline", async (ev, room, toStartOfTimeline, removed, data) => {
const eventIndex = EventIndexPeg.get();
if (eventIndex === null) return;
await eventIndex.onRoomTimeline(ev, room, toStartOfTimeline, removed, data);
});
if (prevState === "PREPARED" && state === "SYNCING") {
const addInitialCheckpoints = async () => {
const client = MatrixClientPeg.get();
const rooms = client.getRooms();
const isRoomEncrypted = (room) => {
return client.isRoomEncrypted(room.roomId);
};
// We only care to crawl the encrypted rooms, non-encrytped
// rooms can use the search provided by the Homeserver.
const encryptedRooms = rooms.filter(isRoomEncrypted);
console.log("Seshat: Adding initial crawler checkpoints");
// Gather the prev_batch tokens and create checkpoints for
// our message crawler.
await Promise.all(encryptedRooms.map(async (room) => {
const timeline = room.getLiveTimeline();
const token = timeline.getPaginationToken("b");
console.log("Seshat: Got token for indexer",
room.roomId, token);
const backCheckpoint = {
roomId: room.roomId,
token: token,
direction: "b",
};
const forwardCheckpoint = {
roomId: room.roomId,
token: token,
direction: "f",
};
await platform.addCrawlerCheckpoint(backCheckpoint);
await platform.addCrawlerCheckpoint(forwardCheckpoint);
self.crawlerChekpoints.push(backCheckpoint);
self.crawlerChekpoints.push(forwardCheckpoint);
}));
};
// If our indexer is empty we're most likely running Riot the
// first time with indexing support or running it with an
// initial sync. Add checkpoints to crawl our encrypted rooms.
const eventIndexWasEmpty = await platform.isEventIndexEmpty();
if (eventIndexWasEmpty) await addInitialCheckpoints();
// Start our crawler.
const crawlerHandle = {};
self.crawlerFunc(crawlerHandle);
self.crawlerRef = crawlerHandle;
return;
}
if (prevState === "SYNCING" && state === "SYNCING") {
// A sync was done, presumably we queued up some live events,
// commit them now.
console.log("Seshat: Committing events");
await platform.commitLiveEvents();
return;
}
cli.on("Event.decrypted", async (ev, err) => {
const eventIndex = EventIndexPeg.get();
if (eventIndex === null) return;
await eventIndex.onEventDecrypted(ev, err);
});
cli.on('sync', function(state, prevState, data) {
@ -1459,44 +1396,6 @@ export default createReactClass({
}, null, true);
});
cli.on("Room.timeline", async (ev, room, toStartOfTimeline, removed, data) => {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
// We only index encrypted rooms locally.
if (!MatrixClientPeg.get().isRoomEncrypted(room.roomId)) return;
// If it isn't a live event or if it's redacted there's nothing to
// do.
if (toStartOfTimeline || !data || !data.liveEvent
|| ev.isRedacted()) {
return;
}
// If the event is not yet decrypted mark it for the
// Event.decrypted callback.
if (ev.isBeingDecrypted()) {
const eventId = ev.getId();
self.liveEventsForIndex.add(eventId);
} else {
// If the event is decrypted or is unencrypted add it to the
// index now.
await self.addLiveEventToIndex(ev);
}
});
cli.on("Event.decrypted", async (ev, err) => {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
const eventId = ev.getId();
// If the event isn't in our live event set, ignore it.
if (!self.liveEventsForIndex.delete(eventId)) return;
if (err) return;
await self.addLiveEventToIndex(ev);
});
cli.on("accountData", function(ev) {
if (ev.getType() === 'im.vector.web.settings') {
if (ev.getContent() && ev.getContent().theme) {
@ -2058,238 +1957,4 @@ export default createReactClass({
{view}
</ErrorBoundary>;
},
async addLiveEventToIndex(ev) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
if (["m.room.message", "m.room.name", "m.room.topic"]
.indexOf(ev.getType()) == -1) {
return;
}
const e = ev.toJSON().decrypted;
const profile = {
displayname: ev.sender.rawDisplayName,
avatar_url: ev.sender.getMxcAvatarUrl(),
};
platform.addEventToIndex(e, profile);
},
async crawlerFunc(handle) {
// TODO either put this in a better place or find a library provided
// method that does this.
const sleep = async (ms) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
let cancelled = false;
console.log("Seshat: Started crawler function");
const client = MatrixClientPeg.get();
const platform = PlatformPeg.get();
handle.cancel = () => {
cancelled = true;
};
while (!cancelled) {
// This is a low priority task and we don't want to spam our
// Homeserver with /messages requests so we set a hefty 3s timeout
// here.
await sleep(3000);
console.log("Seshat: Running the crawler loop.");
if (cancelled) {
console.log("Seshat: Cancelling the crawler.");
break;
}
const checkpoint = this.crawlerChekpoints.shift();
/// There is no checkpoint available currently, one may appear if
// a sync with limited room timelines happens, so go back to sleep.
if (checkpoint === undefined) {
continue;
}
console.log("Seshat: crawling using checkpoint", checkpoint);
// We have a checkpoint, let us fetch some messages, again, very
// conservatively to not bother our Homeserver too much.
const eventMapper = client.getEventMapper();
// TODO we need to ensure to use member lazy loading with this
// request so we get the correct profiles.
let res;
try {
res = await client._createMessagesRequest(
checkpoint.roomId, checkpoint.token, 100,
checkpoint.direction);
} catch (e) {
console.log("Seshat: Error crawling events:", e);
this.crawlerChekpoints.push(checkpoint);
continue
}
if (res.chunk.length === 0) {
console.log("Seshat: Done with the checkpoint", checkpoint);
// We got to the start/end of our timeline, lets just
// delete our checkpoint and go back to sleep.
await platform.removeCrawlerCheckpoint(checkpoint);
continue;
}
// Convert the plain JSON events into Matrix events so they get
// decrypted if necessary.
const matrixEvents = res.chunk.map(eventMapper);
let stateEvents = [];
if (res.state !== undefined) {
stateEvents = res.state.map(eventMapper);
}
const profiles = {};
stateEvents.forEach(ev => {
if (ev.event.content &&
ev.event.content.membership === "join") {
profiles[ev.event.sender] = {
displayname: ev.event.content.displayname,
avatar_url: ev.event.content.avatar_url,
};
}
});
const decryptionPromises = [];
matrixEvents.forEach(ev => {
if (ev.isBeingDecrypted() || ev.isDecryptionFailure()) {
// TODO the decryption promise is a private property, this
// should either be made public or we should convert the
// event that gets fired when decryption is done into a
// promise using the once event emitter method:
// https://nodejs.org/api/events.html#events_events_once_emitter_name
decryptionPromises.push(ev._decryptionPromise);
}
});
// Let us wait for all the events to get decrypted.
await Promise.all(decryptionPromises);
// We filter out events for which decryption failed, are redacted
// or aren't of a type that we know how to index.
const isValidEvent = (value) => {
return ([
"m.room.message",
"m.room.name",
"m.room.topic",
].indexOf(value.getType()) >= 0
&& !value.isRedacted() && !value.isDecryptionFailure()
);
// TODO do we need to check if the event has all the valid
// attributes?
};
// TODO if there ar no events at this point we're missing a lot
// decryption keys, do we wan't to retry this checkpoint at a later
// stage?
const filteredEvents = matrixEvents.filter(isValidEvent);
// Let us convert the events back into a format that Seshat can
// consume.
const events = filteredEvents.map((ev) => {
const jsonEvent = ev.toJSON();
let e;
if (ev.isEncrypted()) e = jsonEvent.decrypted;
else e = jsonEvent;
let profile = {};
if (e.sender in profiles) profile = profiles[e.sender];
const object = {
event: e,
profile: profile,
};
return object;
});
// Create a new checkpoint so we can continue crawling the room for
// messages.
const newCheckpoint = {
roomId: checkpoint.roomId,
token: res.end,
fullCrawl: checkpoint.fullCrawl,
direction: checkpoint.direction,
};
console.log(
"Seshat: Crawled room",
client.getRoom(checkpoint.roomId).name,
"and fetched", events.length, "events.",
);
try {
const eventsAlreadyAdded = await platform.addHistoricEvents(
events, newCheckpoint, checkpoint);
// If all events were already indexed we assume that we catched
// up with our index and don't need to crawl the room further.
// Let us delete the checkpoint in that case, otherwise push
// the new checkpoint to be used by the crawler.
if (eventsAlreadyAdded === true && newCheckpoint.fullCrawl !== true) {
console.log("Seshat: Checkpoint had already all events",
"added, stopping the crawl", checkpoint);
await platform.removeCrawlerCheckpoint(newCheckpoint);
} else {
this.crawlerChekpoints.push(newCheckpoint);
}
} catch (e) {
console.log("Seshat: Error durring a crawl", e);
// An error occured, put the checkpoint back so we
// can retry.
this.crawlerChekpoints.push(checkpoint);
}
}
console.log("Seshat: Stopping crawler function");
},
async addCheckpointForLimitedRoom(roomId) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
if (!MatrixClientPeg.get().isRoomEncrypted(roomId)) return;
const client = MatrixClientPeg.get();
const room = client.getRoom(roomId);
if (room === null) return;
const timeline = room.getLiveTimeline();
const token = timeline.getPaginationToken("b");
const backwardsCheckpoint = {
roomId: room.roomId,
token: token,
fullCrawl: false,
direction: "b",
};
const forwardsCheckpoint = {
roomId: room.roomId,
token: token,
fullCrawl: false,
direction: "f",
};
console.log("Seshat: Added checkpoint because of a limited timeline",
backwardsCheckpoint, forwardsCheckpoint);
await platform.addCrawlerCheckpoint(backwardsCheckpoint);
await platform.addCrawlerCheckpoint(forwardsCheckpoint);
this.crawlerChekpoints.push(backwardsCheckpoint);
this.crawlerChekpoints.push(forwardsCheckpoint);
},
});