Use server side relations for voice broadcasts (#9534)
This commit is contained in:
parent
3747464b41
commit
36a574a14f
11 changed files with 396 additions and 192 deletions
|
@ -38,6 +38,8 @@ export class RelationsHelper
|
|||
extends TypedEventEmitter<RelationsHelperEvent, EventMap>
|
||||
implements IDestroyable {
|
||||
private relations?: Relations;
|
||||
private eventId: string;
|
||||
private roomId: string;
|
||||
|
||||
public constructor(
|
||||
private event: MatrixEvent,
|
||||
|
@ -46,6 +48,21 @@ export class RelationsHelper
|
|||
private client: MatrixClient,
|
||||
) {
|
||||
super();
|
||||
|
||||
const eventId = event.getId();
|
||||
|
||||
if (!eventId) {
|
||||
throw new Error("unable to create RelationsHelper: missing event ID");
|
||||
}
|
||||
|
||||
const roomId = event.getRoomId();
|
||||
|
||||
if (!roomId) {
|
||||
throw new Error("unable to create RelationsHelper: missing room ID");
|
||||
}
|
||||
|
||||
this.eventId = eventId;
|
||||
this.roomId = roomId;
|
||||
this.setUpRelations();
|
||||
}
|
||||
|
||||
|
@ -73,7 +90,7 @@ export class RelationsHelper
|
|||
private setRelations(): void {
|
||||
const room = this.client.getRoom(this.event.getRoomId());
|
||||
this.relations = room?.getUnfilteredTimelineSet()?.relations?.getChildEventsForEvent(
|
||||
this.event.getId(),
|
||||
this.eventId,
|
||||
this.relationType,
|
||||
this.relationEventType,
|
||||
);
|
||||
|
@ -87,6 +104,32 @@ export class RelationsHelper
|
|||
this.relations?.getRelations()?.forEach(e => this.emit(RelationsHelperEvent.Add, e));
|
||||
}
|
||||
|
||||
public getCurrent(): MatrixEvent[] {
|
||||
return this.relations?.getRelations() || [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches all related events from the server and emits them.
|
||||
*/
|
||||
public async emitFetchCurrent(): Promise<void> {
|
||||
let nextBatch: string | undefined = undefined;
|
||||
|
||||
do {
|
||||
const response = await this.client.relations(
|
||||
this.roomId,
|
||||
this.eventId,
|
||||
this.relationType,
|
||||
this.relationEventType,
|
||||
{
|
||||
from: nextBatch,
|
||||
limit: 50,
|
||||
},
|
||||
);
|
||||
nextBatch = response?.nextBatch;
|
||||
response?.events.forEach(e => this.emit(RelationsHelperEvent.Add, e));
|
||||
} while (nextBatch);
|
||||
}
|
||||
|
||||
public destroy(): void {
|
||||
this.removeAllListeners();
|
||||
this.event.off(MatrixEventEvent.RelationsCreated, this.onRelationsCreated);
|
||||
|
|
|
@ -32,7 +32,6 @@ import { MediaEventHelper } from "../../utils/MediaEventHelper";
|
|||
import { IDestroyable } from "../../utils/IDestroyable";
|
||||
import { VoiceBroadcastInfoEventType, VoiceBroadcastInfoState } from "..";
|
||||
import { RelationsHelper, RelationsHelperEvent } from "../../events/RelationsHelper";
|
||||
import { getReferenceRelationsForEvent } from "../../events";
|
||||
import { VoiceBroadcastChunkEvents } from "../utils/VoiceBroadcastChunkEvents";
|
||||
|
||||
export enum VoiceBroadcastPlaybackState {
|
||||
|
@ -89,15 +88,27 @@ export class VoiceBroadcastPlayback
|
|||
this.setUpRelationsHelper();
|
||||
}
|
||||
|
||||
private setUpRelationsHelper(): void {
|
||||
private async setUpRelationsHelper(): Promise<void> {
|
||||
this.infoRelationHelper = new RelationsHelper(
|
||||
this.infoEvent,
|
||||
RelationType.Reference,
|
||||
VoiceBroadcastInfoEventType,
|
||||
this.client,
|
||||
);
|
||||
this.infoRelationHelper.on(RelationsHelperEvent.Add, this.addInfoEvent);
|
||||
this.infoRelationHelper.emitCurrent();
|
||||
this.infoRelationHelper.getCurrent().forEach(this.addInfoEvent);
|
||||
|
||||
if (this.infoState !== VoiceBroadcastInfoState.Stopped) {
|
||||
// Only required if not stopped. Stopped is the final state.
|
||||
this.infoRelationHelper.on(RelationsHelperEvent.Add, this.addInfoEvent);
|
||||
|
||||
try {
|
||||
await this.infoRelationHelper.emitFetchCurrent();
|
||||
} catch (err) {
|
||||
logger.warn("error fetching server side relation for voice broadcast info", err);
|
||||
// fall back to local events
|
||||
this.infoRelationHelper.emitCurrent();
|
||||
}
|
||||
}
|
||||
|
||||
this.chunkRelationHelper = new RelationsHelper(
|
||||
this.infoEvent,
|
||||
|
@ -106,7 +117,15 @@ export class VoiceBroadcastPlayback
|
|||
this.client,
|
||||
);
|
||||
this.chunkRelationHelper.on(RelationsHelperEvent.Add, this.addChunkEvent);
|
||||
this.chunkRelationHelper.emitCurrent();
|
||||
|
||||
try {
|
||||
// TODO Michael W: only fetch events if needed, blocked by PSF-1708
|
||||
await this.chunkRelationHelper.emitFetchCurrent();
|
||||
} catch (err) {
|
||||
logger.warn("error fetching server side relation for voice broadcast chunks", err);
|
||||
// fall back to local events
|
||||
this.chunkRelationHelper.emitCurrent();
|
||||
}
|
||||
}
|
||||
|
||||
private addChunkEvent = async (event: MatrixEvent): Promise<boolean> => {
|
||||
|
@ -150,23 +169,18 @@ export class VoiceBroadcastPlayback
|
|||
this.setInfoState(state);
|
||||
};
|
||||
|
||||
private async loadChunks(): Promise<void> {
|
||||
const relations = getReferenceRelationsForEvent(this.infoEvent, EventType.RoomMessage, this.client);
|
||||
const chunkEvents = relations?.getRelations();
|
||||
private async enqueueChunks(): Promise<void> {
|
||||
const promises = this.chunkEvents.getEvents().reduce((promises, event: MatrixEvent) => {
|
||||
if (!this.playbacks.has(event.getId() || "")) {
|
||||
promises.push(this.enqueueChunk(event));
|
||||
}
|
||||
return promises;
|
||||
}, [] as Promise<void>[]);
|
||||
|
||||
if (!chunkEvents) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.chunkEvents.addEvents(chunkEvents);
|
||||
this.setDuration(this.chunkEvents.getLength());
|
||||
|
||||
for (const chunkEvent of chunkEvents) {
|
||||
await this.enqueueChunk(chunkEvent);
|
||||
}
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
private async enqueueChunk(chunkEvent: MatrixEvent) {
|
||||
private async enqueueChunk(chunkEvent: MatrixEvent): Promise<void> {
|
||||
const eventId = chunkEvent.getId();
|
||||
|
||||
if (!eventId) {
|
||||
|
@ -317,10 +331,7 @@ export class VoiceBroadcastPlayback
|
|||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
if (this.playbacks.size === 0) {
|
||||
await this.loadChunks();
|
||||
}
|
||||
|
||||
await this.enqueueChunks();
|
||||
const chunkEvents = this.chunkEvents.getEvents();
|
||||
|
||||
const toPlay = this.getInfoState() === VoiceBroadcastInfoState.Stopped
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue