-
-
+ const controls =
+ recordingState === "connection_error" ? (
+
+ ) : (
+ );
+
+ return (
+
+
+
+ {controls}
{showDeviceSelect && (
void;
+ [VoiceBroadcastRecordingEvent.StateChanged]: (state: VoiceBroadcastRecordingState) => void;
[VoiceBroadcastRecordingEvent.TimeLeftChanged]: (timeLeft: number) => void;
}
@@ -58,13 +63,17 @@ export class VoiceBroadcastRecording
extends TypedEventEmitter
implements IDestroyable
{
- private state: VoiceBroadcastInfoState;
- private recorder: VoiceBroadcastRecorder;
+ private state: VoiceBroadcastRecordingState;
+ private recorder: VoiceBroadcastRecorder | null = null;
private dispatcherRef: string;
private chunkEvents = new VoiceBroadcastChunkEvents();
private chunkRelationHelper: RelationsHelper;
private maxLength: number;
private timeLeft: number;
+ private toRetry: Array<() => Promise> = [];
+ private reconnectedListener: ClientEventHandlerMap[ClientEvent.Sync];
+ private roomId: string;
+ private infoEventId: string;
/**
* Broadcast chunks have a sequence number to bring them in the correct order and to know if a message is missing.
@@ -82,11 +91,13 @@ export class VoiceBroadcastRecording
super();
this.maxLength = getMaxBroadcastLength();
this.timeLeft = this.maxLength;
+ this.infoEventId = this.determineEventIdFromInfoEvent();
+ this.roomId = this.determineRoomIdFromInfoEvent();
if (initialState) {
this.state = initialState;
} else {
- this.setInitialStateFromInfoEvent();
+ this.state = this.determineInitialStateFromInfoEvent();
}
// TODO Michael W: listen for state updates
@@ -94,6 +105,8 @@ export class VoiceBroadcastRecording
this.infoEvent.on(MatrixEventEvent.BeforeRedaction, this.onBeforeRedaction);
this.dispatcherRef = dis.register(this.onAction);
this.chunkRelationHelper = this.initialiseChunkEventRelation();
+ this.reconnectedListener = createReconnectedListener(this.onReconnect);
+ this.client.on(ClientEvent.Sync, this.reconnectedListener);
}
private initialiseChunkEventRelation(): RelationsHelper {
@@ -125,17 +138,37 @@ export class VoiceBroadcastRecording
this.chunkEvents.addEvent(event);
};
- private setInitialStateFromInfoEvent(): void {
- const room = this.client.getRoom(this.infoEvent.getRoomId());
+ private determineEventIdFromInfoEvent(): string {
+ const infoEventId = this.infoEvent.getId();
+
+ if (!infoEventId) {
+ throw new Error("Cannot create broadcast for info event without Id.");
+ }
+
+ return infoEventId;
+ }
+
+ private determineRoomIdFromInfoEvent(): string {
+ const roomId = this.infoEvent.getRoomId();
+
+ if (!roomId) {
+ throw new Error(`Cannot create broadcast for unknown room (info event ${this.infoEventId})`);
+ }
+
+ return roomId;
+ }
+
+ /**
+ * Determines the initial broadcast state.
+ * Checks all related events. If one has the "stopped" state → stopped, else started.
+ */
+ private determineInitialStateFromInfoEvent(): VoiceBroadcastRecordingState {
+ const room = this.client.getRoom(this.roomId);
const relations = room
?.getUnfilteredTimelineSet()
- ?.relations?.getChildEventsForEvent(
- this.infoEvent.getId(),
- RelationType.Reference,
- VoiceBroadcastInfoEventType,
- );
+ ?.relations?.getChildEventsForEvent(this.infoEventId, RelationType.Reference, VoiceBroadcastInfoEventType);
const relatedEvents = relations?.getRelations();
- this.state = !relatedEvents?.find((event: MatrixEvent) => {
+ return !relatedEvents?.find((event: MatrixEvent) => {
return event.getContent()?.state === VoiceBroadcastInfoState.Stopped;
})
? VoiceBroadcastInfoState.Started
@@ -146,6 +179,35 @@ export class VoiceBroadcastRecording
return this.timeLeft;
}
+ /**
+ * Retries failed actions on reconnect.
+ */
+ private onReconnect = async (): Promise => {
+ // Do nothing if not in connection_error state.
+ if (this.state !== "connection_error") return;
+
+ // Copy the array, so that it is possible to remove elements from it while iterating over the original.
+ const toRetryCopy = [...this.toRetry];
+
+ for (const retryFn of this.toRetry) {
+ try {
+ await retryFn();
+ // Successfully retried. Remove from array copy.
+ toRetryCopy.splice(toRetryCopy.indexOf(retryFn), 1);
+ } catch {
+ // The current retry callback failed. Stop the loop.
+ break;
+ }
+ }
+
+ this.toRetry = toRetryCopy;
+
+ if (this.toRetry.length === 0) {
+ // Everything has been successfully retried. Recover from error state to paused.
+ await this.pause();
+ }
+ };
+
private async setTimeLeft(timeLeft: number): Promise {
if (timeLeft <= 0) {
// time is up - stop the recording
@@ -173,7 +235,12 @@ export class VoiceBroadcastRecording
public async pause(): Promise {
// stopped or already paused recordings cannot be paused
- if ([VoiceBroadcastInfoState.Stopped, VoiceBroadcastInfoState.Paused].includes(this.state)) return;
+ if (
+ (
+ [VoiceBroadcastInfoState.Stopped, VoiceBroadcastInfoState.Paused] as VoiceBroadcastRecordingState[]
+ ).includes(this.state)
+ )
+ return;
this.setState(VoiceBroadcastInfoState.Paused);
await this.stopRecorder();
@@ -191,12 +258,16 @@ export class VoiceBroadcastRecording
public toggle = async (): Promise => {
if (this.getState() === VoiceBroadcastInfoState.Paused) return this.resume();
- if ([VoiceBroadcastInfoState.Started, VoiceBroadcastInfoState.Resumed].includes(this.getState())) {
+ if (
+ (
+ [VoiceBroadcastInfoState.Started, VoiceBroadcastInfoState.Resumed] as VoiceBroadcastRecordingState[]
+ ).includes(this.getState())
+ ) {
return this.pause();
}
};
- public getState(): VoiceBroadcastInfoState {
+ public getState(): VoiceBroadcastRecordingState {
return this.state;
}
@@ -221,6 +292,7 @@ export class VoiceBroadcastRecording
dis.unregister(this.dispatcherRef);
this.chunkEvents = new VoiceBroadcastChunkEvents();
this.chunkRelationHelper.destroy();
+ this.client.off(ClientEvent.Sync, this.reconnectedListener);
}
private onBeforeRedaction = (): void => {
@@ -238,7 +310,7 @@ export class VoiceBroadcastRecording
this.pause();
};
- private setState(state: VoiceBroadcastInfoState): void {
+ private setState(state: VoiceBroadcastRecordingState): void {
this.state = state;
this.emit(VoiceBroadcastRecordingEvent.StateChanged, this.state);
}
@@ -248,56 +320,102 @@ export class VoiceBroadcastRecording
};
private onChunkRecorded = async (chunk: ChunkRecordedPayload): Promise => {
- const { url, file } = await this.uploadFile(chunk);
- await this.sendVoiceMessage(chunk, url, file);
+ const uploadAndSendFn = async (): Promise => {
+ const { url, file } = await this.uploadFile(chunk);
+ await this.sendVoiceMessage(chunk, url, file);
+ };
+
+ await this.callWithRetry(uploadAndSendFn);
};
- private uploadFile(chunk: ChunkRecordedPayload): ReturnType {
+ /**
+ * This function is called on connection errors.
+ * It sets the connection error state and stops the recorder.
+ */
+ private async onConnectionError(): Promise {
+ await this.stopRecorder();
+ this.setState("connection_error");
+ }
+
+ private async uploadFile(chunk: ChunkRecordedPayload): ReturnType {
return uploadFile(
this.client,
- this.infoEvent.getRoomId(),
+ this.roomId,
new Blob([chunk.buffer], {
type: this.getRecorder().contentType,
}),
);
}
- private async sendVoiceMessage(chunk: ChunkRecordedPayload, url: string, file: IEncryptedFile): Promise {
- const content = createVoiceMessageContent(
- url,
- this.getRecorder().contentType,
- Math.round(chunk.length * 1000),
- chunk.buffer.length,
- file,
- );
- content["m.relates_to"] = {
- rel_type: RelationType.Reference,
- event_id: this.infoEvent.getId(),
- };
- content["io.element.voice_broadcast_chunk"] = {
- /** Increment the last sequence number and use it for this message. Also see {@link sequence}. */
- sequence: ++this.sequence,
+ private async sendVoiceMessage(chunk: ChunkRecordedPayload, url?: string, file?: IEncryptedFile): Promise {
+ /**
+ * Increment the last sequence number and use it for this message.
+ * Done outside of the sendMessageFn to get a scoped value.
+ * Also see {@link VoiceBroadcastRecording.sequence}.
+ */
+ const sequence = ++this.sequence;
+
+ const sendMessageFn = async (): Promise => {
+ const content = createVoiceMessageContent(
+ url,
+ this.getRecorder().contentType,
+ Math.round(chunk.length * 1000),
+ chunk.buffer.length,
+ file,
+ );
+ content["m.relates_to"] = {
+ rel_type: RelationType.Reference,
+ event_id: this.infoEventId,
+ };
+ content["io.element.voice_broadcast_chunk"] = {
+ sequence,
+ };
+
+ await this.client.sendMessage(this.roomId, content);
};
- await this.client.sendMessage(this.infoEvent.getRoomId(), content);
+ await this.callWithRetry(sendMessageFn);
}
+ /**
+ * Sends an info state event with given state.
+ * On error stores a resend function and setState(state) in {@link toRetry} and
+ * sets the broadcast state to connection_error.
+ */
private async sendInfoStateEvent(state: VoiceBroadcastInfoState): Promise {
- // TODO Michael W: add error handling for state event
- await this.client.sendStateEvent(
- this.infoEvent.getRoomId(),
- VoiceBroadcastInfoEventType,
- {
- device_id: this.client.getDeviceId(),
- state,
- last_chunk_sequence: this.sequence,
- ["m.relates_to"]: {
- rel_type: RelationType.Reference,
- event_id: this.infoEvent.getId(),
- },
- } as VoiceBroadcastInfoEventContent,
- this.client.getUserId(),
- );
+ const sendEventFn = async (): Promise => {
+ await this.client.sendStateEvent(
+ this.roomId,
+ VoiceBroadcastInfoEventType,
+ {
+ device_id: this.client.getDeviceId(),
+ state,
+ last_chunk_sequence: this.sequence,
+ ["m.relates_to"]: {
+ rel_type: RelationType.Reference,
+ event_id: this.infoEventId,
+ },
+ } as VoiceBroadcastInfoEventContent,
+ this.client.getSafeUserId(),
+ );
+ };
+
+ await this.callWithRetry(sendEventFn);
+ }
+
+ /**
+ * Calls the function.
+ * On failure adds it to the retry list and triggers connection error.
+ * {@link toRetry}
+ * {@link onConnectionError}
+ */
+ private async callWithRetry(retryAbleFn: () => Promise): Promise {
+ try {
+ await retryAbleFn();
+ } catch {
+ this.toRetry.push(retryAbleFn);
+ this.onConnectionError();
+ }
}
private async stopRecorder(): Promise {
diff --git a/test/voice-broadcast/components/molecules/VoiceBroadcastRecordingPip-test.tsx b/test/voice-broadcast/components/molecules/VoiceBroadcastRecordingPip-test.tsx
index 662d7b2619..dddaa85827 100644
--- a/test/voice-broadcast/components/molecules/VoiceBroadcastRecordingPip-test.tsx
+++ b/test/voice-broadcast/components/molecules/VoiceBroadcastRecordingPip-test.tsx
@@ -18,9 +18,10 @@ limitations under the License.
import React from "react";
import { act, render, RenderResult, screen } from "@testing-library/react";
import userEvent from "@testing-library/user-event";
-import { MatrixClient, MatrixEvent } from "matrix-js-sdk/src/matrix";
+import { ClientEvent, MatrixClient, MatrixEvent } from "matrix-js-sdk/src/matrix";
import { sleep } from "matrix-js-sdk/src/utils";
import { mocked } from "jest-mock";
+import { SyncState } from "matrix-js-sdk/src/sync";
import {
VoiceBroadcastInfoState,
@@ -182,6 +183,30 @@ describe("VoiceBroadcastRecordingPip", () => {
});
});
});
+
+ describe("and there is no connection and clicking the pause button", () => {
+ beforeEach(async () => {
+ mocked(client.sendStateEvent).mockImplementation(() => {
+ throw new Error();
+ });
+ await userEvent.click(screen.getByLabelText("pause voice broadcast"));
+ });
+
+ it("should show a connection error info", () => {
+ expect(screen.getByText("Connection error - Recording paused")).toBeInTheDocument();
+ });
+
+ describe("and the connection is back", () => {
+ beforeEach(() => {
+ mocked(client.sendStateEvent).mockResolvedValue({ event_id: "e1" });
+ client.emit(ClientEvent.Sync, SyncState.Catchup, SyncState.Error);
+ });
+
+ it("should render a paused recording", () => {
+ expect(screen.getByLabelText("resume voice broadcast")).toBeInTheDocument();
+ });
+ });
+ });
});
describe("when rendering a paused recording", () => {
diff --git a/test/voice-broadcast/models/VoiceBroadcastRecording-test.ts b/test/voice-broadcast/models/VoiceBroadcastRecording-test.ts
index 58c5e7b0cd..3cf71a7a94 100644
--- a/test/voice-broadcast/models/VoiceBroadcastRecording-test.ts
+++ b/test/voice-broadcast/models/VoiceBroadcastRecording-test.ts
@@ -16,6 +16,7 @@ limitations under the License.
import { mocked } from "jest-mock";
import {
+ ClientEvent,
EventTimelineSet,
EventType,
MatrixClient,
@@ -26,6 +27,7 @@ import {
Room,
} from "matrix-js-sdk/src/matrix";
import { Relations } from "matrix-js-sdk/src/models/relations";
+import { SyncState } from "matrix-js-sdk/src/sync";
import { uploadFile } from "../../../src/ContentMessages";
import { IEncryptedFile } from "../../../src/customisations/models/IMediaEventContent";
@@ -41,6 +43,7 @@ import {
VoiceBroadcastRecorderEvent,
VoiceBroadcastRecording,
VoiceBroadcastRecordingEvent,
+ VoiceBroadcastRecordingState,
} from "../../../src/voice-broadcast";
import { mkEvent, mkStubRoom, stubClient } from "../../test-utils";
import dis from "../../../src/dispatcher/dispatcher";
@@ -84,14 +87,14 @@ describe("VoiceBroadcastRecording", () => {
let client: MatrixClient;
let infoEvent: MatrixEvent;
let voiceBroadcastRecording: VoiceBroadcastRecording;
- let onStateChanged: (state: VoiceBroadcastInfoState) => void;
+ let onStateChanged: (state: VoiceBroadcastRecordingState) => void;
let voiceBroadcastRecorder: VoiceBroadcastRecorder;
const mkVoiceBroadcastInfoEvent = (content: VoiceBroadcastInfoEventContent) => {
return mkEvent({
event: true,
type: VoiceBroadcastInfoEventType,
- user: client.getUserId(),
+ user: client.getSafeUserId(),
room: roomId,
content,
});
@@ -105,12 +108,19 @@ describe("VoiceBroadcastRecording", () => {
jest.spyOn(voiceBroadcastRecording, "removeAllListeners");
};
- const itShouldBeInState = (state: VoiceBroadcastInfoState) => {
+ const itShouldBeInState = (state: VoiceBroadcastRecordingState) => {
it(`should be in state stopped ${state}`, () => {
expect(voiceBroadcastRecording.getState()).toBe(state);
});
};
+ const emitFirsChunkRecorded = () => {
+ voiceBroadcastRecorder.emit(VoiceBroadcastRecorderEvent.ChunkRecorded, {
+ buffer: new Uint8Array([1, 2, 3]),
+ length: 23,
+ });
+ };
+
const itShouldSendAnInfoEvent = (state: VoiceBroadcastInfoState, lastChunkSequence: number) => {
it(`should send a ${state} info event`, () => {
expect(client.sendStateEvent).toHaveBeenCalledWith(
@@ -179,13 +189,22 @@ describe("VoiceBroadcastRecording", () => {
});
};
+ const setUpUploadFileMock = () => {
+ mocked(uploadFile).mockResolvedValue({
+ url: uploadedUrl,
+ file: uploadedFile,
+ });
+ };
+
beforeEach(() => {
client = stubClient();
room = mkStubRoom(roomId, "Test Room", client);
- mocked(client.getRoom).mockImplementation((getRoomId: string) => {
+ mocked(client.getRoom).mockImplementation((getRoomId: string | undefined): Room | null => {
if (getRoomId === roomId) {
return room;
}
+
+ return null;
});
onStateChanged = jest.fn();
voiceBroadcastRecorder = new VoiceBroadcastRecorder(new VoiceRecording(), getChunkLength());
@@ -194,14 +213,11 @@ describe("VoiceBroadcastRecording", () => {
jest.spyOn(voiceBroadcastRecorder, "destroy");
mocked(createVoiceBroadcastRecorder).mockReturnValue(voiceBroadcastRecorder);
- mocked(uploadFile).mockResolvedValue({
- url: uploadedUrl,
- file: uploadedFile,
- });
+ setUpUploadFileMock();
mocked(createVoiceMessageContent).mockImplementation(
(
- mxc: string,
+ mxc: string | undefined,
mimetype: string,
duration: number,
size: number,
@@ -238,13 +254,45 @@ describe("VoiceBroadcastRecording", () => {
});
afterEach(() => {
- voiceBroadcastRecording.off(VoiceBroadcastRecordingEvent.StateChanged, onStateChanged);
+ voiceBroadcastRecording?.off(VoiceBroadcastRecordingEvent.StateChanged, onStateChanged);
+ });
+
+ describe("when there is an info event without id", () => {
+ beforeEach(() => {
+ infoEvent = mkVoiceBroadcastInfoEvent({
+ device_id: client.getDeviceId()!,
+ state: VoiceBroadcastInfoState.Started,
+ });
+ jest.spyOn(infoEvent, "getId").mockReturnValue(undefined);
+ });
+
+ it("should raise an error when creating a broadcast", () => {
+ expect(() => {
+ setUpVoiceBroadcastRecording();
+ }).toThrowError("Cannot create broadcast for info event without Id.");
+ });
+ });
+
+ describe("when there is an info event without room", () => {
+ beforeEach(() => {
+ infoEvent = mkVoiceBroadcastInfoEvent({
+ device_id: client.getDeviceId()!,
+ state: VoiceBroadcastInfoState.Started,
+ });
+ jest.spyOn(infoEvent, "getRoomId").mockReturnValue(undefined);
+ });
+
+ it("should raise an error when creating a broadcast", () => {
+ expect(() => {
+ setUpVoiceBroadcastRecording();
+ }).toThrowError(`Cannot create broadcast for unknown room (info event ${infoEvent.getId()})`);
+ });
});
describe("when created for a Voice Broadcast Info without relations", () => {
beforeEach(() => {
infoEvent = mkVoiceBroadcastInfoEvent({
- device_id: client.getDeviceId(),
+ device_id: client.getDeviceId()!,
state: VoiceBroadcastInfoState.Started,
});
setUpVoiceBroadcastRecording();
@@ -278,7 +326,16 @@ describe("VoiceBroadcastRecording", () => {
describe("and the info event is redacted", () => {
beforeEach(() => {
- infoEvent.emit(MatrixEventEvent.BeforeRedaction, null, null);
+ infoEvent.emit(
+ MatrixEventEvent.BeforeRedaction,
+ infoEvent,
+ mkEvent({
+ event: true,
+ type: EventType.RoomRedaction,
+ user: client.getSafeUserId(),
+ content: {},
+ }),
+ );
});
itShouldBeInState(VoiceBroadcastInfoState.Stopped);
@@ -329,10 +386,7 @@ describe("VoiceBroadcastRecording", () => {
describe("and a chunk has been recorded", () => {
beforeEach(async () => {
- voiceBroadcastRecorder.emit(VoiceBroadcastRecorderEvent.ChunkRecorded, {
- buffer: new Uint8Array([1, 2, 3]),
- length: 23,
- });
+ emitFirsChunkRecorded();
});
itShouldSendAVoiceMessage([1, 2, 3], 3, 23, 1);
@@ -388,6 +442,34 @@ describe("VoiceBroadcastRecording", () => {
});
});
+ describe("and there is no connection", () => {
+ beforeEach(() => {
+ mocked(client.sendStateEvent).mockImplementation(() => {
+ throw new Error();
+ });
+ });
+
+ describe.each([
+ ["pause", async () => voiceBroadcastRecording.pause()],
+ ["toggle", async () => voiceBroadcastRecording.toggle()],
+ ])("and calling %s", (_case: string, action: Function) => {
+ beforeEach(async () => {
+ await action();
+ });
+
+ itShouldBeInState("connection_error");
+
+ describe("and the connection is back", () => {
+ beforeEach(() => {
+ mocked(client.sendStateEvent).mockResolvedValue({ event_id: "e1" });
+ client.emit(ClientEvent.Sync, SyncState.Catchup, SyncState.Error);
+ });
+
+ itShouldBeInState(VoiceBroadcastInfoState.Paused);
+ });
+ });
+ });
+
describe("and calling destroy", () => {
beforeEach(() => {
voiceBroadcastRecording.destroy();
@@ -399,6 +481,45 @@ describe("VoiceBroadcastRecording", () => {
expect(mocked(voiceBroadcastRecording.removeAllListeners)).toHaveBeenCalled();
});
});
+
+ describe("and a chunk has been recorded and the upload fails", () => {
+ beforeEach(() => {
+ mocked(uploadFile).mockRejectedValue("Error");
+ emitFirsChunkRecorded();
+ });
+
+ itShouldBeInState("connection_error");
+
+ describe("and the connection is back", () => {
+ beforeEach(() => {
+ setUpUploadFileMock();
+ client.emit(ClientEvent.Sync, SyncState.Catchup, SyncState.Error);
+ });
+
+ itShouldBeInState(VoiceBroadcastInfoState.Paused);
+ itShouldSendAVoiceMessage([1, 2, 3], 3, 23, 1);
+ });
+ });
+
+ describe("and a chunk has been recorded and sending the voice message fails", () => {
+ beforeEach(() => {
+ mocked(client.sendMessage).mockRejectedValue("Error");
+ emitFirsChunkRecorded();
+ });
+
+ itShouldBeInState("connection_error");
+
+ describe("and the connection is back", () => {
+ beforeEach(() => {
+ mocked(client.sendMessage).mockClear();
+ mocked(client.sendMessage).mockResolvedValue({ event_id: "e23" });
+ client.emit(ClientEvent.Sync, SyncState.Catchup, SyncState.Error);
+ });
+
+ itShouldBeInState(VoiceBroadcastInfoState.Paused);
+ itShouldSendAVoiceMessage([1, 2, 3], 3, 23, 1);
+ });
+ });
});
describe("and it is in paused state", () => {
@@ -431,7 +552,7 @@ describe("VoiceBroadcastRecording", () => {
describe("when created for a Voice Broadcast Info with a Stopped relation", () => {
beforeEach(() => {
infoEvent = mkVoiceBroadcastInfoEvent({
- device_id: client.getDeviceId(),
+ device_id: client.getDeviceId()!,
state: VoiceBroadcastInfoState.Started,
chunk_length: 120,
});
@@ -441,11 +562,11 @@ describe("VoiceBroadcastRecording", () => {
} as unknown as Relations;
mocked(relationsContainer.getRelations).mockReturnValue([
mkVoiceBroadcastInfoEvent({
- device_id: client.getDeviceId(),
+ device_id: client.getDeviceId()!,
state: VoiceBroadcastInfoState.Stopped,
["m.relates_to"]: {
rel_type: RelationType.Reference,
- event_id: infoEvent.getId(),
+ event_id: infoEvent.getId()!,
},
}),
]);