Merge branch 'poljar/seshat-ui-pr' into develop
This commit is contained in:
commit
e2dd2bd950
13 changed files with 672 additions and 61 deletions
|
@ -17,20 +17,25 @@ limitations under the License.
|
|||
import PlatformPeg from "../PlatformPeg";
|
||||
import {MatrixClientPeg} from "../MatrixClientPeg";
|
||||
import {EventTimeline, RoomMember} from 'matrix-js-sdk';
|
||||
import {sleep} from "../utils/promise";
|
||||
import {EventEmitter} from "events";
|
||||
|
||||
/*
|
||||
* Event indexing class that wraps the platform specific event indexing.
|
||||
*/
|
||||
export default class EventIndex {
|
||||
export default class EventIndex extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
this.crawlerCheckpoints = [];
|
||||
// The time that the crawler will wait between /rooms/{room_id}/messages
|
||||
// requests
|
||||
this._crawlerTimeout = 3000;
|
||||
// The time in ms that the crawler will wait loop iterations if there
|
||||
// have not been any checkpoints to consume in the last iteration.
|
||||
this._crawlerIdleTime = 5000;
|
||||
this._crawlerSleepTime = 3000;
|
||||
// The maximum number of events our crawler should fetch in a single
|
||||
// crawl.
|
||||
this._eventsPerCrawl = 100;
|
||||
this._crawler = null;
|
||||
this._currentCheckpoint = null;
|
||||
this.liveEventsForIndex = new Set();
|
||||
}
|
||||
|
||||
|
@ -65,59 +70,62 @@ export default class EventIndex {
|
|||
client.removeListener('Room.timelineReset', this.onTimelineReset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get crawler checkpoints for the encrypted rooms and store them in the index.
|
||||
*/
|
||||
async addInitialCheckpoints() {
|
||||
const indexManager = PlatformPeg.get().getEventIndexingManager();
|
||||
const client = MatrixClientPeg.get();
|
||||
const rooms = client.getRooms();
|
||||
|
||||
const isRoomEncrypted = (room) => {
|
||||
return client.isRoomEncrypted(room.roomId);
|
||||
};
|
||||
|
||||
// We only care to crawl the encrypted rooms, non-encrypted
|
||||
// rooms can use the search provided by the homeserver.
|
||||
const encryptedRooms = rooms.filter(isRoomEncrypted);
|
||||
|
||||
console.log("EventIndex: Adding initial crawler checkpoints");
|
||||
|
||||
// Gather the prev_batch tokens and create checkpoints for
|
||||
// our message crawler.
|
||||
await Promise.all(encryptedRooms.map(async (room) => {
|
||||
const timeline = room.getLiveTimeline();
|
||||
const token = timeline.getPaginationToken("b");
|
||||
|
||||
console.log("EventIndex: Got token for indexer",
|
||||
room.roomId, token);
|
||||
|
||||
const backCheckpoint = {
|
||||
roomId: room.roomId,
|
||||
token: token,
|
||||
direction: "b",
|
||||
};
|
||||
|
||||
const forwardCheckpoint = {
|
||||
roomId: room.roomId,
|
||||
token: token,
|
||||
direction: "f",
|
||||
};
|
||||
|
||||
await indexManager.addCrawlerCheckpoint(backCheckpoint);
|
||||
await indexManager.addCrawlerCheckpoint(forwardCheckpoint);
|
||||
this.crawlerCheckpoints.push(backCheckpoint);
|
||||
this.crawlerCheckpoints.push(forwardCheckpoint);
|
||||
}));
|
||||
}
|
||||
|
||||
onSync = async (state, prevState, data) => {
|
||||
const indexManager = PlatformPeg.get().getEventIndexingManager();
|
||||
|
||||
if (prevState === "PREPARED" && state === "SYNCING") {
|
||||
const addInitialCheckpoints = async () => {
|
||||
const client = MatrixClientPeg.get();
|
||||
const rooms = client.getRooms();
|
||||
|
||||
const isRoomEncrypted = (room) => {
|
||||
return client.isRoomEncrypted(room.roomId);
|
||||
};
|
||||
|
||||
// We only care to crawl the encrypted rooms, non-encrypted.
|
||||
// rooms can use the search provided by the homeserver.
|
||||
const encryptedRooms = rooms.filter(isRoomEncrypted);
|
||||
|
||||
console.log("EventIndex: Adding initial crawler checkpoints");
|
||||
|
||||
// Gather the prev_batch tokens and create checkpoints for
|
||||
// our message crawler.
|
||||
await Promise.all(encryptedRooms.map(async (room) => {
|
||||
const timeline = room.getLiveTimeline();
|
||||
const token = timeline.getPaginationToken("b");
|
||||
|
||||
console.log("EventIndex: Got token for indexer",
|
||||
room.roomId, token);
|
||||
|
||||
const backCheckpoint = {
|
||||
roomId: room.roomId,
|
||||
token: token,
|
||||
direction: "b",
|
||||
};
|
||||
|
||||
const forwardCheckpoint = {
|
||||
roomId: room.roomId,
|
||||
token: token,
|
||||
direction: "f",
|
||||
};
|
||||
|
||||
await indexManager.addCrawlerCheckpoint(backCheckpoint);
|
||||
await indexManager.addCrawlerCheckpoint(forwardCheckpoint);
|
||||
this.crawlerCheckpoints.push(backCheckpoint);
|
||||
this.crawlerCheckpoints.push(forwardCheckpoint);
|
||||
}));
|
||||
};
|
||||
|
||||
// If our indexer is empty we're most likely running Riot the
|
||||
// first time with indexing support or running it with an
|
||||
// initial sync. Add checkpoints to crawl our encrypted rooms.
|
||||
const eventIndexWasEmpty = await indexManager.isEventIndexEmpty();
|
||||
if (eventIndexWasEmpty) await addInitialCheckpoints();
|
||||
if (eventIndexWasEmpty) await this.addInitialCheckpoints();
|
||||
|
||||
// Start our crawler.
|
||||
this.startCrawler();
|
||||
return;
|
||||
}
|
||||
|
@ -182,13 +190,11 @@ export default class EventIndex {
|
|||
indexManager.addEventToIndex(e, profile);
|
||||
}
|
||||
|
||||
async crawlerFunc() {
|
||||
// TODO either put this in a better place or find a library provided
|
||||
// method that does this.
|
||||
const sleep = async (ms) => {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
};
|
||||
emitNewCheckpoint() {
|
||||
this.emit("changedCheckpoint", this.currentRoom());
|
||||
}
|
||||
|
||||
async crawlerFunc() {
|
||||
let cancelled = false;
|
||||
|
||||
console.log("EventIndex: Started crawler function");
|
||||
|
@ -202,11 +208,27 @@ export default class EventIndex {
|
|||
cancelled = true;
|
||||
};
|
||||
|
||||
let idle = false;
|
||||
|
||||
while (!cancelled) {
|
||||
// This is a low priority task and we don't want to spam our
|
||||
// homeserver with /messages requests so we set a hefty timeout
|
||||
// here.
|
||||
await sleep(this._crawlerTimeout);
|
||||
let sleepTime = this._crawlerSleepTime;
|
||||
|
||||
// Don't let the user configure a lower sleep time than 100 ms.
|
||||
sleepTime = Math.max(sleepTime, 100);
|
||||
|
||||
if (idle) {
|
||||
sleepTime = this._crawlerIdleTime;
|
||||
}
|
||||
|
||||
if (this._currentCheckpoint !== null) {
|
||||
this._currentCheckpoint = null;
|
||||
this.emitNewCheckpoint();
|
||||
}
|
||||
|
||||
await sleep(sleepTime);
|
||||
|
||||
console.log("EventIndex: Running the crawler loop.");
|
||||
|
||||
|
@ -219,9 +241,15 @@ export default class EventIndex {
|
|||
/// There is no checkpoint available currently, one may appear if
|
||||
// a sync with limited room timelines happens, so go back to sleep.
|
||||
if (checkpoint === undefined) {
|
||||
idle = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
this._currentCheckpoint = checkpoint;
|
||||
this.emitNewCheckpoint();
|
||||
|
||||
idle = false;
|
||||
|
||||
console.log("EventIndex: crawling using checkpoint", checkpoint);
|
||||
|
||||
// We have a checkpoint, let us fetch some messages, again, very
|
||||
|
@ -241,6 +269,11 @@ export default class EventIndex {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (cancelled) {
|
||||
this.crawlerCheckpoints.push(checkpoint);
|
||||
break;
|
||||
}
|
||||
|
||||
if (res.chunk.length === 0) {
|
||||
console.log("EventIndex: Done with the checkpoint", checkpoint);
|
||||
// We got to the start/end of our timeline, lets just
|
||||
|
@ -600,4 +633,29 @@ export default class EventIndex {
|
|||
|
||||
return paginationPromise;
|
||||
}
|
||||
|
||||
async getStats() {
|
||||
const indexManager = PlatformPeg.get().getEventIndexingManager();
|
||||
return indexManager.getStats();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the room that we are currently crawling.
|
||||
*
|
||||
* @returns {Room} A MatrixRoom that is being currently crawled, null
|
||||
* if no room is currently being crawled.
|
||||
*/
|
||||
currentRoom() {
|
||||
if (this._currentCheckpoint === null && this.crawlerCheckpoints.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const client = MatrixClientPeg.get();
|
||||
|
||||
if (this._currentCheckpoint !== null) {
|
||||
return client.getRoom(this._currentCheckpoint.roomId);
|
||||
} else {
|
||||
return client.getRoom(this.crawlerCheckpoints[0].roomId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue