Error handling if broadcast events could not be sent (#9885)

This commit is contained in:
Michael Weimann 2023-01-17 08:57:59 +01:00 committed by GitHub
parent 7af4891cb7
commit fe0d3a7668
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 436 additions and 84 deletions

View file

@ -16,6 +16,8 @@ limitations under the License.
import { logger } from "matrix-js-sdk/src/logger";
import {
ClientEvent,
ClientEventHandlerMap,
EventType,
MatrixClient,
MatrixEvent,
@ -43,14 +45,17 @@ import dis from "../../dispatcher/dispatcher";
import { ActionPayload } from "../../dispatcher/payloads";
import { VoiceBroadcastChunkEvents } from "../utils/VoiceBroadcastChunkEvents";
import { RelationsHelper, RelationsHelperEvent } from "../../events/RelationsHelper";
import { createReconnectedListener } from "../../utils/connection";
export enum VoiceBroadcastRecordingEvent {
StateChanged = "liveness_changed",
TimeLeftChanged = "time_left_changed",
}
export type VoiceBroadcastRecordingState = VoiceBroadcastInfoState | "connection_error";
interface EventMap {
[VoiceBroadcastRecordingEvent.StateChanged]: (state: VoiceBroadcastInfoState) => void;
[VoiceBroadcastRecordingEvent.StateChanged]: (state: VoiceBroadcastRecordingState) => void;
[VoiceBroadcastRecordingEvent.TimeLeftChanged]: (timeLeft: number) => void;
}
@ -58,13 +63,17 @@ export class VoiceBroadcastRecording
extends TypedEventEmitter<VoiceBroadcastRecordingEvent, EventMap>
implements IDestroyable
{
private state: VoiceBroadcastInfoState;
private recorder: VoiceBroadcastRecorder;
private state: VoiceBroadcastRecordingState;
private recorder: VoiceBroadcastRecorder | null = null;
private dispatcherRef: string;
private chunkEvents = new VoiceBroadcastChunkEvents();
private chunkRelationHelper: RelationsHelper;
private maxLength: number;
private timeLeft: number;
private toRetry: Array<() => Promise<void>> = [];
private reconnectedListener: ClientEventHandlerMap[ClientEvent.Sync];
private roomId: string;
private infoEventId: string;
/**
* Broadcast chunks have a sequence number to bring them in the correct order and to know if a message is missing.
@ -82,11 +91,13 @@ export class VoiceBroadcastRecording
super();
this.maxLength = getMaxBroadcastLength();
this.timeLeft = this.maxLength;
this.infoEventId = this.determineEventIdFromInfoEvent();
this.roomId = this.determineRoomIdFromInfoEvent();
if (initialState) {
this.state = initialState;
} else {
this.setInitialStateFromInfoEvent();
this.state = this.determineInitialStateFromInfoEvent();
}
// TODO Michael W: listen for state updates
@ -94,6 +105,8 @@ export class VoiceBroadcastRecording
this.infoEvent.on(MatrixEventEvent.BeforeRedaction, this.onBeforeRedaction);
this.dispatcherRef = dis.register(this.onAction);
this.chunkRelationHelper = this.initialiseChunkEventRelation();
this.reconnectedListener = createReconnectedListener(this.onReconnect);
this.client.on(ClientEvent.Sync, this.reconnectedListener);
}
private initialiseChunkEventRelation(): RelationsHelper {
@ -125,17 +138,37 @@ export class VoiceBroadcastRecording
this.chunkEvents.addEvent(event);
};
private setInitialStateFromInfoEvent(): void {
const room = this.client.getRoom(this.infoEvent.getRoomId());
private determineEventIdFromInfoEvent(): string {
const infoEventId = this.infoEvent.getId();
if (!infoEventId) {
throw new Error("Cannot create broadcast for info event without Id.");
}
return infoEventId;
}
private determineRoomIdFromInfoEvent(): string {
const roomId = this.infoEvent.getRoomId();
if (!roomId) {
throw new Error(`Cannot create broadcast for unknown room (info event ${this.infoEventId})`);
}
return roomId;
}
/**
* Determines the initial broadcast state.
* Checks all related events. If one has the "stopped" state stopped, else started.
*/
private determineInitialStateFromInfoEvent(): VoiceBroadcastRecordingState {
const room = this.client.getRoom(this.roomId);
const relations = room
?.getUnfilteredTimelineSet()
?.relations?.getChildEventsForEvent(
this.infoEvent.getId(),
RelationType.Reference,
VoiceBroadcastInfoEventType,
);
?.relations?.getChildEventsForEvent(this.infoEventId, RelationType.Reference, VoiceBroadcastInfoEventType);
const relatedEvents = relations?.getRelations();
this.state = !relatedEvents?.find((event: MatrixEvent) => {
return !relatedEvents?.find((event: MatrixEvent) => {
return event.getContent()?.state === VoiceBroadcastInfoState.Stopped;
})
? VoiceBroadcastInfoState.Started
@ -146,6 +179,35 @@ export class VoiceBroadcastRecording
return this.timeLeft;
}
/**
* Retries failed actions on reconnect.
*/
private onReconnect = async (): Promise<void> => {
// Do nothing if not in connection_error state.
if (this.state !== "connection_error") return;
// Copy the array, so that it is possible to remove elements from it while iterating over the original.
const toRetryCopy = [...this.toRetry];
for (const retryFn of this.toRetry) {
try {
await retryFn();
// Successfully retried. Remove from array copy.
toRetryCopy.splice(toRetryCopy.indexOf(retryFn), 1);
} catch {
// The current retry callback failed. Stop the loop.
break;
}
}
this.toRetry = toRetryCopy;
if (this.toRetry.length === 0) {
// Everything has been successfully retried. Recover from error state to paused.
await this.pause();
}
};
private async setTimeLeft(timeLeft: number): Promise<void> {
if (timeLeft <= 0) {
// time is up - stop the recording
@ -173,7 +235,12 @@ export class VoiceBroadcastRecording
public async pause(): Promise<void> {
// stopped or already paused recordings cannot be paused
if ([VoiceBroadcastInfoState.Stopped, VoiceBroadcastInfoState.Paused].includes(this.state)) return;
if (
(
[VoiceBroadcastInfoState.Stopped, VoiceBroadcastInfoState.Paused] as VoiceBroadcastRecordingState[]
).includes(this.state)
)
return;
this.setState(VoiceBroadcastInfoState.Paused);
await this.stopRecorder();
@ -191,12 +258,16 @@ export class VoiceBroadcastRecording
public toggle = async (): Promise<void> => {
if (this.getState() === VoiceBroadcastInfoState.Paused) return this.resume();
if ([VoiceBroadcastInfoState.Started, VoiceBroadcastInfoState.Resumed].includes(this.getState())) {
if (
(
[VoiceBroadcastInfoState.Started, VoiceBroadcastInfoState.Resumed] as VoiceBroadcastRecordingState[]
).includes(this.getState())
) {
return this.pause();
}
};
public getState(): VoiceBroadcastInfoState {
public getState(): VoiceBroadcastRecordingState {
return this.state;
}
@ -221,6 +292,7 @@ export class VoiceBroadcastRecording
dis.unregister(this.dispatcherRef);
this.chunkEvents = new VoiceBroadcastChunkEvents();
this.chunkRelationHelper.destroy();
this.client.off(ClientEvent.Sync, this.reconnectedListener);
}
private onBeforeRedaction = (): void => {
@ -238,7 +310,7 @@ export class VoiceBroadcastRecording
this.pause();
};
private setState(state: VoiceBroadcastInfoState): void {
private setState(state: VoiceBroadcastRecordingState): void {
this.state = state;
this.emit(VoiceBroadcastRecordingEvent.StateChanged, this.state);
}
@ -248,56 +320,102 @@ export class VoiceBroadcastRecording
};
private onChunkRecorded = async (chunk: ChunkRecordedPayload): Promise<void> => {
const { url, file } = await this.uploadFile(chunk);
await this.sendVoiceMessage(chunk, url, file);
const uploadAndSendFn = async (): Promise<void> => {
const { url, file } = await this.uploadFile(chunk);
await this.sendVoiceMessage(chunk, url, file);
};
await this.callWithRetry(uploadAndSendFn);
};
private uploadFile(chunk: ChunkRecordedPayload): ReturnType<typeof uploadFile> {
/**
* This function is called on connection errors.
* It sets the connection error state and stops the recorder.
*/
private async onConnectionError(): Promise<void> {
await this.stopRecorder();
this.setState("connection_error");
}
private async uploadFile(chunk: ChunkRecordedPayload): ReturnType<typeof uploadFile> {
return uploadFile(
this.client,
this.infoEvent.getRoomId(),
this.roomId,
new Blob([chunk.buffer], {
type: this.getRecorder().contentType,
}),
);
}
private async sendVoiceMessage(chunk: ChunkRecordedPayload, url: string, file: IEncryptedFile): Promise<void> {
const content = createVoiceMessageContent(
url,
this.getRecorder().contentType,
Math.round(chunk.length * 1000),
chunk.buffer.length,
file,
);
content["m.relates_to"] = {
rel_type: RelationType.Reference,
event_id: this.infoEvent.getId(),
};
content["io.element.voice_broadcast_chunk"] = {
/** Increment the last sequence number and use it for this message. Also see {@link sequence}. */
sequence: ++this.sequence,
private async sendVoiceMessage(chunk: ChunkRecordedPayload, url?: string, file?: IEncryptedFile): Promise<void> {
/**
* Increment the last sequence number and use it for this message.
* Done outside of the sendMessageFn to get a scoped value.
* Also see {@link VoiceBroadcastRecording.sequence}.
*/
const sequence = ++this.sequence;
const sendMessageFn = async (): Promise<void> => {
const content = createVoiceMessageContent(
url,
this.getRecorder().contentType,
Math.round(chunk.length * 1000),
chunk.buffer.length,
file,
);
content["m.relates_to"] = {
rel_type: RelationType.Reference,
event_id: this.infoEventId,
};
content["io.element.voice_broadcast_chunk"] = {
sequence,
};
await this.client.sendMessage(this.roomId, content);
};
await this.client.sendMessage(this.infoEvent.getRoomId(), content);
await this.callWithRetry(sendMessageFn);
}
/**
* Sends an info state event with given state.
* On error stores a resend function and setState(state) in {@link toRetry} and
* sets the broadcast state to connection_error.
*/
private async sendInfoStateEvent(state: VoiceBroadcastInfoState): Promise<void> {
// TODO Michael W: add error handling for state event
await this.client.sendStateEvent(
this.infoEvent.getRoomId(),
VoiceBroadcastInfoEventType,
{
device_id: this.client.getDeviceId(),
state,
last_chunk_sequence: this.sequence,
["m.relates_to"]: {
rel_type: RelationType.Reference,
event_id: this.infoEvent.getId(),
},
} as VoiceBroadcastInfoEventContent,
this.client.getUserId(),
);
const sendEventFn = async (): Promise<void> => {
await this.client.sendStateEvent(
this.roomId,
VoiceBroadcastInfoEventType,
{
device_id: this.client.getDeviceId(),
state,
last_chunk_sequence: this.sequence,
["m.relates_to"]: {
rel_type: RelationType.Reference,
event_id: this.infoEventId,
},
} as VoiceBroadcastInfoEventContent,
this.client.getSafeUserId(),
);
};
await this.callWithRetry(sendEventFn);
}
/**
* Calls the function.
* On failure adds it to the retry list and triggers connection error.
* {@link toRetry}
* {@link onConnectionError}
*/
private async callWithRetry(retryAbleFn: () => Promise<void>): Promise<void> {
try {
await retryAbleFn();
} catch {
this.toRetry.push(retryAbleFn);
this.onConnectionError();
}
}
private async stopRecorder(): Promise<void> {