Merge pull request #3624 from robintown/local-remote-confusion

Dispel confusion about whether media is remote or local
This commit is contained in:
Robin
2025-12-15 16:43:57 -05:00
committed by GitHub
16 changed files with 380 additions and 282 deletions

View File

@@ -88,6 +88,7 @@ import { ReactionsOverlay } from "./ReactionsOverlay";
import { CallEventAudioRenderer } from "./CallEventAudioRenderer"; import { CallEventAudioRenderer } from "./CallEventAudioRenderer";
import { import {
debugTileLayout as debugTileLayoutSetting, debugTileLayout as debugTileLayoutSetting,
matrixRTCMode as matrixRTCModeSetting,
useSetting, useSetting,
} from "../settings/settings"; } from "../settings/settings";
import { ReactionsReader } from "../reactions/ReactionsReader"; import { ReactionsReader } from "../reactions/ReactionsReader";
@@ -144,6 +145,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
encryptionSystem: props.e2eeSystem, encryptionSystem: props.e2eeSystem,
autoLeaveWhenOthersLeft, autoLeaveWhenOthersLeft,
waitForCallPickup: waitForCallPickup && sendNotificationType === "ring", waitForCallPickup: waitForCallPickup && sendNotificationType === "ring",
matrixRTCMode$: matrixRTCModeSetting.value$,
}, },
reactionsReader.raisedHands$, reactionsReader.raisedHands$,
reactionsReader.reactions$, reactionsReader.reactions$,

View File

@@ -60,7 +60,8 @@ import {
import { MediaDevices } from "../MediaDevices.ts"; import { MediaDevices } from "../MediaDevices.ts";
import { getValue } from "../../utils/observable.ts"; import { getValue } from "../../utils/observable.ts";
import { type Behavior, constant } from "../Behavior.ts"; import { type Behavior, constant } from "../Behavior.ts";
import { withCallViewModel } from "./CallViewModelTestUtils.ts"; import { withCallViewModel as withCallViewModelInMode } from "./CallViewModelTestUtils.ts";
import { MatrixRTCMode } from "../../settings/settings.ts";
vi.mock("rxjs", async (importOriginal) => ({ vi.mock("rxjs", async (importOriginal) => ({
...(await importOriginal()), ...(await importOriginal()),
@@ -229,7 +230,13 @@ function mockRingEvent(
// need a value to fill in for them when emitting notifications // need a value to fill in for them when emitting notifications
const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent; const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent;
describe("CallViewModel", () => { describe.each([
[MatrixRTCMode.Legacy],
[MatrixRTCMode.Compatibil],
[MatrixRTCMode.Matrix_2_0],
])("CallViewModel (%s mode)", (mode) => {
const withCallViewModel = withCallViewModelInMode(mode);
test("participants are retained during a focus switch", () => { test("participants are retained during a focus switch", () => {
withTestScheduler(({ behavior, expectObservable }) => { withTestScheduler(({ behavior, expectObservable }) => {
// Participants disappear on frame 2 and come back on frame 3 // Participants disappear on frame 2 and come back on frame 3

View File

@@ -53,11 +53,15 @@ import {
ScreenShareViewModel, ScreenShareViewModel,
type UserMediaViewModel, type UserMediaViewModel,
} from "../MediaViewModel"; } from "../MediaViewModel";
import { accumulate, generateItems, pauseWhen } from "../../utils/observable"; import {
accumulate,
filterBehavior,
generateItems,
pauseWhen,
} from "../../utils/observable";
import { import {
duplicateTiles, duplicateTiles,
MatrixRTCMode, MatrixRTCMode,
matrixRTCMode,
playReactionsSound, playReactionsSound,
showReactions, showReactions,
} from "../../settings/settings"; } from "../../settings/settings";
@@ -111,7 +115,8 @@ import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts";
import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts"; import { createConnectionManager$ } from "./remoteMembers/ConnectionManager.ts";
import { import {
createMatrixLivekitMembers$, createMatrixLivekitMembers$,
type MatrixLivekitMember, type TaggedParticipant,
type LocalMatrixLivekitMember,
} from "./remoteMembers/MatrixLivekitMembers.ts"; } from "./remoteMembers/MatrixLivekitMembers.ts";
import { import {
type AutoLeaveReason, type AutoLeaveReason,
@@ -150,6 +155,8 @@ export interface CallViewModelOptions {
connectionState$?: Behavior<ConnectionState>; connectionState$?: Behavior<ConnectionState>;
/** Optional behavior overriding the computed window size, mainly for testing purposes. */ /** Optional behavior overriding the computed window size, mainly for testing purposes. */
windowSize$?: Behavior<{ width: number; height: number }>; windowSize$?: Behavior<{ width: number; height: number }>;
/** The version & compatibility mode of MatrixRTC that we should use. */
matrixRTCMode$: Behavior<MatrixRTCMode>;
} }
// Do not play any sounds if the participant count has exceeded this // Do not play any sounds if the participant count has exceeded this
@@ -406,7 +413,7 @@ export function createCallViewModel$(
client, client,
roomId: matrixRoom.roomId, roomId: matrixRoom.roomId,
useOldestMember$: scope.behavior( useOldestMember$: scope.behavior(
matrixRTCMode.value$.pipe(map((v) => v === MatrixRTCMode.Legacy)), options.matrixRTCMode$.pipe(map((v) => v === MatrixRTCMode.Legacy)),
), ),
}); });
@@ -458,7 +465,7 @@ export function createCallViewModel$(
}); });
const connectOptions$ = scope.behavior( const connectOptions$ = scope.behavior(
matrixRTCMode.value$.pipe( options.matrixRTCMode$.pipe(
map((mode) => ({ map((mode) => ({
encryptMedia: livekitKeyProvider !== undefined, encryptMedia: livekitKeyProvider !== undefined,
// TODO. This might need to get called again on each change of matrixRTCMode... // TODO. This might need to get called again on each change of matrixRTCMode...
@@ -512,22 +519,21 @@ export function createCallViewModel$(
), ),
); );
const localMatrixLivekitMemberUninitialized = { const localMatrixLivekitMember$: Behavior<LocalMatrixLivekitMember | null> =
membership$: localRtcMembership$,
participant$: localMembership.participant$,
connection$: localMembership.connection$,
userId: userId,
};
const localMatrixLivekitMember$: Behavior<MatrixLivekitMember | null> =
scope.behavior( scope.behavior(
localRtcMembership$.pipe( localRtcMembership$.pipe(
switchMap((membership) => { filterBehavior((membership) => membership !== null),
if (!membership) return of(null); map((membership$) => {
return of( if (membership$ === null) return null;
// casting is save here since we know that localRtcMembership$ is !== null since we reached this case. return {
localMatrixLivekitMemberUninitialized as MatrixLivekitMember, membership$,
); participant: {
type: "local" as const,
value$: localMembership.participant$,
},
connection$: localMembership.connection$,
userId,
};
}), }),
), ),
); );
@@ -596,7 +602,7 @@ export function createCallViewModel$(
const members = membersWithEpoch.value; const members = membersWithEpoch.value;
const a$ = combineLatest( const a$ = combineLatest(
members.map((member) => members.map((member) =>
combineLatest([member.connection$, member.participant$]).pipe( combineLatest([member.connection$, member.participant.value$]).pipe(
map(([connection, participant]) => { map(([connection, participant]) => {
// do not render audio for local participant // do not render audio for local participant
if (!connection || !participant || participant.isLocal) if (!connection || !participant || participant.isLocal)
@@ -674,7 +680,7 @@ export function createCallViewModel$(
let localParticipantId: string | undefined = undefined; let localParticipantId: string | undefined = undefined;
// add local member if available // add local member if available
if (localMatrixLivekitMember) { if (localMatrixLivekitMember) {
const { userId, participant$, connection$, membership$ } = const { userId, participant, connection$, membership$ } =
localMatrixLivekitMember; localMatrixLivekitMember;
localParticipantId = `${userId}:${membership$.value.deviceId}`; // should be membership$.value.membershipID which is not optional localParticipantId = `${userId}:${membership$.value.deviceId}`; // should be membership$.value.membershipID which is not optional
// const participantId = membership$.value.membershipID; // const participantId = membership$.value.membershipID;
@@ -685,7 +691,7 @@ export function createCallViewModel$(
dup, dup,
localParticipantId, localParticipantId,
userId, userId,
participant$, participant satisfies TaggedParticipant as TaggedParticipant, // Widen the type safely
connection$, connection$,
], ],
data: undefined, data: undefined,
@@ -696,7 +702,7 @@ export function createCallViewModel$(
// add remote members that are available // add remote members that are available
for (const { for (const {
userId, userId,
participant$, participant,
connection$, connection$,
membership$, membership$,
} of matrixLivekitMembers) { } of matrixLivekitMembers) {
@@ -705,7 +711,7 @@ export function createCallViewModel$(
// const participantId = membership$.value?.identity; // const participantId = membership$.value?.identity;
for (let dup = 0; dup < 1 + duplicateTiles; dup++) { for (let dup = 0; dup < 1 + duplicateTiles; dup++) {
yield { yield {
keys: [dup, participantId, userId, participant$, connection$], keys: [dup, participantId, userId, participant, connection$],
data: undefined, data: undefined,
}; };
} }
@@ -717,7 +723,7 @@ export function createCallViewModel$(
dup, dup,
participantId, participantId,
userId, userId,
participant$, participant,
connection$, connection$,
) => { ) => {
const livekitRoom$ = scope.behavior( const livekitRoom$ = scope.behavior(
@@ -736,7 +742,7 @@ export function createCallViewModel$(
scope, scope,
`${participantId}:${dup}`, `${participantId}:${dup}`,
userId, userId,
participant$, participant,
options.encryptionSystem, options.encryptionSystem,
livekitRoom$, livekitRoom$,
focusUrl$, focusUrl$,

View File

@@ -53,6 +53,7 @@ import {
import { type Behavior, constant } from "../Behavior"; import { type Behavior, constant } from "../Behavior";
import { type ProcessorState } from "../../livekit/TrackProcessorContext"; import { type ProcessorState } from "../../livekit/TrackProcessorContext";
import { type MediaDevices } from "../MediaDevices"; import { type MediaDevices } from "../MediaDevices";
import { type MatrixRTCMode } from "../../settings/settings";
mockConfig({ mockConfig({
livekit: { livekit_service_url: "http://my-default-service-url.com" }, livekit: { livekit_service_url: "http://my-default-service-url.com" },
@@ -80,7 +81,8 @@ export interface CallViewModelInputs {
const localParticipant = mockLocalParticipant({ identity: "" }); const localParticipant = mockLocalParticipant({ identity: "" });
export function withCallViewModel( export function withCallViewModel(mode: MatrixRTCMode) {
return (
{ {
remoteParticipants$ = constant([]), remoteParticipants$ = constant([]),
rtcMembers$ = constant([localRtcMember]), rtcMembers$ = constant([localRtcMember]),
@@ -95,14 +97,13 @@ export function withCallViewModel(
continuation: ( continuation: (
vm: CallViewModel, vm: CallViewModel,
rtcSession: MockRTCSession, rtcSession: MockRTCSession,
subjects: { raisedHands$: BehaviorSubject<Record<string, RaisedHandInfo>> }, subjects: {
raisedHands$: BehaviorSubject<Record<string, RaisedHandInfo>>;
},
setSyncState: (value: SyncState) => void, setSyncState: (value: SyncState) => void,
) => void, ) => void,
options: CallViewModelOptions = { options: Partial<CallViewModelOptions> = {},
encryptionSystem: { kind: E2eeType.PER_PARTICIPANT }, ): void => {
autoLeaveWhenOthersLeft: false,
},
): void {
let syncState = initialSyncState; let syncState = initialSyncState;
const setSyncState = (value: SyncState): void => { const setSyncState = (value: SyncState): void => {
const prev = syncState; const prev = syncState;
@@ -130,7 +131,9 @@ export function withCallViewModel(
getMembers: () => Array.from(roomMembers.values()), getMembers: () => Array.from(roomMembers.values()),
getMembersWithMembership: () => Array.from(roomMembers.values()), getMembersWithMembership: () => Array.from(roomMembers.values()),
}); });
const rtcSession = new MockRTCSession(room, []).withMemberships(rtcMembers$); const rtcSession = new MockRTCSession(room, []).withMemberships(
rtcMembers$,
);
const participantsSpy = vi const participantsSpy = vi
.spyOn(ComponentsCore, "connectedParticipantsObserver") .spyOn(ComponentsCore, "connectedParticipantsObserver")
.mockReturnValue(remoteParticipants$); .mockReturnValue(remoteParticipants$);
@@ -157,7 +160,9 @@ export function withCallViewModel(
.spyOn(ComponentsCore, "roomEventSelector") .spyOn(ComponentsCore, "roomEventSelector")
.mockImplementation((_room, _eventType) => of()); .mockImplementation((_room, _eventType) => of());
const muteStates = mockMuteStates(); const muteStates = mockMuteStates();
const raisedHands$ = new BehaviorSubject<Record<string, RaisedHandInfo>>({}); const raisedHands$ = new BehaviorSubject<Record<string, RaisedHandInfo>>(
{},
);
const reactions$ = new BehaviorSubject<Record<string, ReactionInfo>>({}); const reactions$ = new BehaviorSubject<Record<string, ReactionInfo>>({});
const vm = createCallViewModel$( const vm = createCallViewModel$(
@@ -167,7 +172,8 @@ export function withCallViewModel(
mediaDevices, mediaDevices,
muteStates, muteStates,
{ {
...options, encryptionSystem: { kind: E2eeType.PER_PARTICIPANT },
autoLeaveWhenOthersLeft: false,
livekitRoomFactory: (): LivekitRoom => livekitRoomFactory: (): LivekitRoom =>
mockLivekitRoom({ mockLivekitRoom({
localParticipant, localParticipant,
@@ -176,6 +182,8 @@ export function withCallViewModel(
}), }),
connectionState$, connectionState$,
windowSize$, windowSize$,
matrixRTCMode$: constant(mode),
...options,
}, },
raisedHands$, raisedHands$,
reactions$, reactions$,
@@ -193,4 +201,5 @@ export function withCallViewModel(
}); });
continuation(vm, rtcSession, { raisedHands$: raisedHands$ }, setSyncState); continuation(vm, rtcSession, { raisedHands$: raisedHands$ }, setSyncState);
};
} }

View File

@@ -32,7 +32,6 @@ import {
Connection, Connection,
ConnectionState, ConnectionState,
type ConnectionOpts, type ConnectionOpts,
type PublishingParticipant,
} from "./Connection.ts"; } from "./Connection.ts";
import { ObservableScope } from "../../ObservableScope.ts"; import { ObservableScope } from "../../ObservableScope.ts";
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts"; import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
@@ -395,7 +394,7 @@ describe("Publishing participants observations", () => {
const bobIsAPublisher = Promise.withResolvers<void>(); const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>(); const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: PublishingParticipant[][] = []; const observedPublishers: RemoteParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe( const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => { (publishers) => {
observedPublishers.push(publishers); observedPublishers.push(publishers);
@@ -408,7 +407,7 @@ describe("Publishing participants observations", () => {
}, },
); );
onTestFinished(() => s.unsubscribe()); onTestFinished(() => s.unsubscribe());
// The publishingParticipants$ observable is derived from the current members of the // The remoteParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing // livekitRoom and the rtc membership in order to publish the members that are publishing
// on this connection. // on this connection.
@@ -450,7 +449,7 @@ describe("Publishing participants observations", () => {
const connection = setupRemoteConnection(); const connection = setupRemoteConnection();
let observedPublishers: PublishingParticipant[][] = []; let observedPublishers: RemoteParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe( const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => { (publishers) => {
observedPublishers.push(publishers); observedPublishers.push(publishers);

View File

@@ -13,7 +13,6 @@ import {
import { import {
ConnectionError, ConnectionError,
type Room as LivekitRoom, type Room as LivekitRoom,
type LocalParticipant,
type RemoteParticipant, type RemoteParticipant,
RoomEvent, RoomEvent,
} from "livekit-client"; } from "livekit-client";
@@ -35,8 +34,6 @@ import {
UnknownCallError, UnknownCallError,
} from "../../../utils/errors.ts"; } from "../../../utils/errors.ts";
export type PublishingParticipant = LocalParticipant | RemoteParticipant;
export interface ConnectionOpts { export interface ConnectionOpts {
/** The media transport to connect to. */ /** The media transport to connect to. */
transport: LivekitTransport; transport: LivekitTransport;
@@ -103,9 +100,7 @@ export class Connection {
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`. * This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection. * It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/ */
public readonly remoteParticipantsWithTracks$: Behavior< public readonly remoteParticipantsWithTracks$: Behavior<RemoteParticipant[]>;
PublishingParticipant[]
>;
/** /**
* Whether the connection has been stopped. * Whether the connection has been stopped.

View File

@@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details.
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { BehaviorSubject } from "rxjs"; import { BehaviorSubject } from "rxjs";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client"; import { type RemoteParticipant } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts"; import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts";
@@ -201,23 +201,20 @@ describe("connections$ stream", () => {
describe("connectionManagerData$ stream", () => { describe("connectionManagerData$ stream", () => {
// Used in test to control fake connections' remoteParticipantsWithTracks$ streams // Used in test to control fake connections' remoteParticipantsWithTracks$ streams
let fakePublishingParticipantsStreams: Map< let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>;
string,
Behavior<LivekitParticipant[]>
>;
function keyForTransport(transport: LivekitTransport): string { function keyForTransport(transport: LivekitTransport): string {
return `${transport.livekit_service_url}|${transport.livekit_alias}`; return `${transport.livekit_service_url}|${transport.livekit_alias}`;
} }
beforeEach(() => { beforeEach(() => {
fakePublishingParticipantsStreams = new Map(); fakeRemoteParticipantsStreams = new Map();
function getPublishingParticipantsFor( function getRemoteParticipantsFor(
transport: LivekitTransport, transport: LivekitTransport,
): Behavior<LivekitParticipant[]> { ): Behavior<RemoteParticipant[]> {
return ( return (
fakePublishingParticipantsStreams.get(keyForTransport(transport)) ?? fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ??
new BehaviorSubject([]) new BehaviorSubject([])
); );
} }
@@ -227,13 +224,12 @@ describe("connectionManagerData$ stream", () => {
.fn() .fn()
.mockImplementation( .mockImplementation(
(transport: LivekitTransport, scope: ObservableScope) => { (transport: LivekitTransport, scope: ObservableScope) => {
const fakePublishingParticipants$ = new BehaviorSubject< const fakeRemoteParticipants$ = new BehaviorSubject<
LivekitParticipant[] RemoteParticipant[]
>([]); >([]);
const mockConnection = { const mockConnection = {
transport, transport,
remoteParticipantsWithTracks$: remoteParticipantsWithTracks$: getRemoteParticipantsFor(transport),
getPublishingParticipantsFor(transport),
} as unknown as Connection; } as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).stop = vi.fn(); vi.mocked(mockConnection).stop = vi.fn();
@@ -242,36 +238,36 @@ describe("connectionManagerData$ stream", () => {
void mockConnection.stop(); void mockConnection.stop();
}); });
fakePublishingParticipantsStreams.set( fakeRemoteParticipantsStreams.set(
keyForTransport(transport), keyForTransport(transport),
fakePublishingParticipants$, fakeRemoteParticipants$,
); );
return mockConnection; return mockConnection;
}, },
); );
}); });
test("Should report connections with the publishing participants", () => { test("Should report connections with the remote participants", () => {
withTestScheduler(({ expectObservable, schedule, behavior }) => { withTestScheduler(({ expectObservable, schedule, behavior }) => {
// Setup the fake participants streams behavior // Setup the fake participants streams behavior
// ============================== // ==============================
fakePublishingParticipantsStreams.set( fakeRemoteParticipantsStreams.set(
keyForTransport(TRANSPORT_1), keyForTransport(TRANSPORT_1),
behavior("oa-b", { behavior("oa-b", {
o: [], o: [],
a: [{ identity: "user1A" } as LivekitParticipant], a: [{ identity: "user1A" } as RemoteParticipant],
b: [ b: [
{ identity: "user1A" } as LivekitParticipant, { identity: "user1A" } as RemoteParticipant,
{ identity: "user1B" } as LivekitParticipant, { identity: "user1B" } as RemoteParticipant,
], ],
}), }),
); );
fakePublishingParticipantsStreams.set( fakeRemoteParticipantsStreams.set(
keyForTransport(TRANSPORT_2), keyForTransport(TRANSPORT_2),
behavior("o-a", { behavior("o-a", {
o: [], o: [],
a: [{ identity: "user2A" } as LivekitParticipant], a: [{ identity: "user2A" } as RemoteParticipant],
}), }),
); );
// ============================== // ==============================

View File

@@ -12,7 +12,7 @@ import {
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, of, switchMap, tap } from "rxjs"; import { combineLatest, map, of, switchMap, tap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger"; import { type Logger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type RemoteParticipant } from "livekit-client";
import { type Behavior } from "../../Behavior.ts"; import { type Behavior } from "../../Behavior.ts";
import { type Connection } from "./Connection.ts"; import { type Connection } from "./Connection.ts";
@@ -22,17 +22,12 @@ import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData { export class ConnectionManagerData {
private readonly store: Map< private readonly store: Map<string, [Connection, RemoteParticipant[]]> =
string, new Map();
[Connection, (LocalParticipant | RemoteParticipant)[]]
> = new Map();
public constructor() {} public constructor() {}
public add( public add(connection: Connection, participants: RemoteParticipant[]): void {
connection: Connection,
participants: (LocalParticipant | RemoteParticipant)[],
): void {
const key = this.getKey(connection.transport); const key = this.getKey(connection.transport);
const existing = this.store.get(key); const existing = this.store.get(key);
if (!existing) { if (!existing) {
@@ -58,7 +53,7 @@ export class ConnectionManagerData {
public getParticipantForTransport( public getParticipantForTransport(
transport: LivekitTransport, transport: LivekitTransport,
): (LocalParticipant | RemoteParticipant)[] { ): RemoteParticipant[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias; const key = transport.livekit_service_url + "|" + transport.livekit_alias;
return this.store.get(key)?.[1] ?? []; return this.store.get(key)?.[1] ?? [];
} }
@@ -172,23 +167,24 @@ export function createConnectionManager$({
const epoch = connections.epoch; const epoch = connections.epoch;
// Map the connections to list of {connection, participants}[] // Map the connections to list of {connection, participants}[]
const listOfConnectionsWithPublishingParticipants = const listOfConnectionsWithRemoteParticipants = connections.value.map(
connections.value.map((connection) => { (connection) => {
return connection.remoteParticipantsWithTracks$.pipe( return connection.remoteParticipantsWithTracks$.pipe(
map((participants) => ({ map((participants) => ({
connection, connection,
participants, participants,
})), })),
); );
}); },
);
// probably not required // probably not required
if (listOfConnectionsWithPublishingParticipants.length === 0) { if (listOfConnectionsWithRemoteParticipants.length === 0) {
return of(new Epoch(new ConnectionManagerData(), epoch)); return of(new Epoch(new ConnectionManagerData(), epoch));
} }
// combineLatest the several streams into a single stream with the ConnectionManagerData // combineLatest the several streams into a single stream with the ConnectionManagerData
return combineLatest(listOfConnectionsWithPublishingParticipants).pipe( return combineLatest(listOfConnectionsWithRemoteParticipants).pipe(
map( map(
(lists) => (lists) =>
new Epoch( new Epoch(

View File

@@ -15,7 +15,7 @@ import { combineLatest, map, type Observable } from "rxjs";
import { type IConnectionManager } from "./ConnectionManager.ts"; import { type IConnectionManager } from "./ConnectionManager.ts";
import { import {
type MatrixLivekitMember, type RemoteMatrixLivekitMember,
createMatrixLivekitMembers$, createMatrixLivekitMembers$,
} from "./MatrixLivekitMembers.ts"; } from "./MatrixLivekitMembers.ts";
import { import {
@@ -100,12 +100,12 @@ test("should signal participant not yet connected to livekit", () => {
}); });
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => { a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1); expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", { expectObservable(data[0].membership$).toBe("a", {
a: bobMembership, a: bobMembership,
}); });
expectObservable(data[0].participant$).toBe("a", { expectObservable(data[0].participant.value$).toBe("a", {
a: null, a: null,
}); });
expectObservable(data[0].connection$).toBe("a", { expectObservable(data[0].connection$).toBe("a", {
@@ -180,12 +180,12 @@ test("should signal participant on a connection that is publishing", () => {
}); });
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => { a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1); expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", { expectObservable(data[0].membership$).toBe("a", {
a: bobMembership, a: bobMembership,
}); });
expectObservable(data[0].participant$).toBe("a", { expectObservable(data[0].participant.value$).toBe("a", {
a: expect.toSatisfy((participant) => { a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined(); expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId); expect(participant!.identity).toEqual(bobParticipantId);
@@ -231,12 +231,12 @@ test("should signal participant on a connection that is not publishing", () => {
}); });
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", { expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => { a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(1); expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", { expectObservable(data[0].membership$).toBe("a", {
a: bobMembership, a: bobMembership,
}); });
expectObservable(data[0].participant$).toBe("a", { expectObservable(data[0].participant.value$).toBe("a", {
a: null, a: null,
}); });
expectObservable(data[0].connection$).toBe("a", { expectObservable(data[0].connection$).toBe("a", {
@@ -296,7 +296,7 @@ describe("Publication edge case", () => {
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
"a", "a",
{ {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => { a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(2); expect(data.length).toEqual(2);
expectObservable(data[0].membership$).toBe("a", { expectObservable(data[0].membership$).toBe("a", {
a: bobMembership, a: bobMembership,
@@ -305,7 +305,7 @@ describe("Publication edge case", () => {
// The real connection should be from transportA as per the membership // The real connection should be from transportA as per the membership
a: connectionA, a: connectionA,
}); });
expectObservable(data[0].participant$).toBe("a", { expectObservable(data[0].participant.value$).toBe("a", {
a: expect.toSatisfy((participant) => { a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined(); expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId); expect(participant!.identity).toEqual(bobParticipantId);
@@ -362,7 +362,7 @@ describe("Publication edge case", () => {
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe( expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
"a", "a",
{ {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => { a: expect.toSatisfy((data: RemoteMatrixLivekitMember[]) => {
expect(data.length).toEqual(2); expect(data.length).toEqual(2);
expectObservable(data[0].membership$).toBe("a", { expectObservable(data[0].membership$).toBe("a", {
a: bobMembership, a: bobMembership,
@@ -371,7 +371,7 @@ describe("Publication edge case", () => {
// The real connection should be from transportA as per the membership // The real connection should be from transportA as per the membership
a: connectionA, a: connectionA,
}); });
expectObservable(data[0].participant$).toBe("a", { expectObservable(data[0].participant.value$).toBe("a", {
// No participant as Bob is not publishing on his membership transport // No participant as Bob is not publishing on his membership transport
a: null, a: null,
}); });

View File

@@ -5,10 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
type LocalParticipant as LocalLivekitParticipant,
type RemoteParticipant as RemoteLivekitParticipant,
} from "livekit-client";
import { import {
type LivekitTransport, type LivekitTransport,
type CallMembership, type CallMembership,
@@ -24,22 +21,44 @@ import { generateItemsWithEpoch } from "../../../utils/observable";
const logger = rootLogger.getChild("[MatrixLivekitMembers]"); const logger = rootLogger.getChild("[MatrixLivekitMembers]");
/** interface LocalTaggedParticipant {
* Represents a Matrix call member and their associated LiveKit participation. type: "local";
* `livekitParticipant` can be undefined if the member is not yet connected to the livekit room value$: Behavior<LocalParticipant | null>;
* or if it has no livekit transport at all. }
*/ interface RemoteTaggedParticipant {
export interface MatrixLivekitMember { type: "remote";
value$: Behavior<RemoteParticipant | null>;
}
export type TaggedParticipant =
| LocalTaggedParticipant
| RemoteTaggedParticipant;
interface MatrixLivekitMember {
membership$: Behavior<CallMembership>; membership$: Behavior<CallMembership>;
participant$: Behavior<
LocalLivekitParticipant | RemoteLivekitParticipant | null
>;
connection$: Behavior<Connection | null>; connection$: Behavior<Connection | null>;
// participantId: string; We do not want a participantId here since it will be generated by the jwt // participantId: string; We do not want a participantId here since it will be generated by the jwt
// TODO decide if we can also drop the userId. Its in the matrix membership anyways. // TODO decide if we can also drop the userId. Its in the matrix membership anyways.
userId: string; userId: string;
} }
/**
* Represents the local Matrix call member and their associated LiveKit participation.
* `livekitParticipant` can be null if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface LocalMatrixLivekitMember extends MatrixLivekitMember {
participant: LocalTaggedParticipant;
}
/**
* Represents a remote Matrix call member and their associated LiveKit participation.
* `livekitParticipant` can be null if the member is not yet connected to the livekit room
* or if it has no livekit transport at all.
*/
export interface RemoteMatrixLivekitMember extends MatrixLivekitMember {
participant: RemoteTaggedParticipant;
}
interface Props { interface Props {
scope: ObservableScope; scope: ObservableScope;
membershipsWithTransport$: Behavior< membershipsWithTransport$: Behavior<
@@ -61,7 +80,7 @@ export function createMatrixLivekitMembers$({
scope, scope,
membershipsWithTransport$, membershipsWithTransport$,
connectionManager, connectionManager,
}: Props): Behavior<Epoch<MatrixLivekitMember[]>> { }: Props): Behavior<Epoch<RemoteMatrixLivekitMember[]>> {
/** /**
* Stream of all the call members and their associated livekit data (if available). * Stream of all the call members and their associated livekit data (if available).
*/ */
@@ -110,12 +129,14 @@ export function createMatrixLivekitMembers$({
logger.debug( logger.debug(
`Generating member for participantId: ${participantId}, userId: ${userId}`, `Generating member for participantId: ${participantId}, userId: ${userId}`,
); );
const { participant$, ...rest } = scope.splitBehavior(data$);
// will only get called once per `participantId, userId` pair. // will only get called once per `participantId, userId` pair.
// updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent.
return { return {
participantId, participantId,
userId, userId,
...scope.splitBehavior(data$), participant: { type: "remote" as const, value$: participant$ },
...rest,
}; };
}, },
), ),

View File

@@ -29,7 +29,7 @@ import { type ProcessorState } from "../../../livekit/TrackProcessorContext.tsx"
import { import {
areLivekitTransportsEqual, areLivekitTransportsEqual,
createMatrixLivekitMembers$, createMatrixLivekitMembers$,
type MatrixLivekitMember, type RemoteMatrixLivekitMember,
} from "./MatrixLivekitMembers.ts"; } from "./MatrixLivekitMembers.ts";
import { createConnectionManager$ } from "./ConnectionManager.ts"; import { createConnectionManager$ } from "./ConnectionManager.ts";
import { membershipsAndTransports$ } from "../../SessionBehaviors.ts"; import { membershipsAndTransports$ } from "../../SessionBehaviors.ts";
@@ -132,7 +132,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
}); });
expectObservable(matrixLivekitItems$).toBe(vMarble, { expectObservable(matrixLivekitItems$).toBe(vMarble, {
a: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => { a: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
const items = e.value; const items = e.value;
expect(items.length).toBe(1); expect(items.length).toBe(1);
const item = items[0]!; const item = items[0]!;
@@ -147,12 +147,12 @@ test("bob, carl, then bob joining no tracks yet", () => {
), ),
), ),
}); });
expectObservable(item.participant$).toBe("a", { expectObservable(item.participant.value$).toBe("a", {
a: null, a: null,
}); });
return true; return true;
}), }),
b: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => { b: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
const items = e.value; const items = e.value;
expect(items.length).toBe(2); expect(items.length).toBe(2);
@@ -161,7 +161,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
expectObservable(item.membership$).toBe("a", { expectObservable(item.membership$).toBe("a", {
a: bobMembership, a: bobMembership,
}); });
expectObservable(item.participant$).toBe("a", { expectObservable(item.participant.value$).toBe("a", {
a: null, a: null,
}); });
} }
@@ -172,7 +172,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
expectObservable(item.membership$).toBe("a", { expectObservable(item.membership$).toBe("a", {
a: carlMembership, a: carlMembership,
}); });
expectObservable(item.participant$).toBe("a", { expectObservable(item.participant.value$).toBe("a", {
a: null, a: null,
}); });
expectObservable(item.connection$).toBe("a", { expectObservable(item.connection$).toBe("a", {
@@ -189,7 +189,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
} }
return true; return true;
}), }),
c: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => { c: expect.toSatisfy((e: Epoch<RemoteMatrixLivekitMember[]>) => {
const items = e.value; const items = e.value;
expect(items.length).toBe(3); expect(items.length).toBe(3);
@@ -216,7 +216,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
return true; return true;
}), }),
}); });
expectObservable(item.participant$).toBe("a", { expectObservable(item.participant.value$).toBe("a", {
a: null, a: null,
}); });
} }

View File

@@ -15,6 +15,7 @@ import { constant } from "./Behavior.ts";
import { aliceParticipant, localRtcMember } from "../utils/test-fixtures.ts"; import { aliceParticipant, localRtcMember } from "../utils/test-fixtures.ts";
import { ElementWidgetActions, widget } from "../widget.ts"; import { ElementWidgetActions, widget } from "../widget.ts";
import { E2eeType } from "../e2ee/e2eeType.ts"; import { E2eeType } from "../e2ee/e2eeType.ts";
import { MatrixRTCMode } from "../settings/settings.ts";
vi.mock("@livekit/components-core", { spy: true }); vi.mock("@livekit/components-core", { spy: true });
@@ -34,9 +35,15 @@ vi.mock("../widget", () => ({
}, },
})); }));
it("expect leave when ElementWidgetActions.HangupCall is called", async () => { it.each([
[MatrixRTCMode.Legacy],
[MatrixRTCMode.Compatibil],
[MatrixRTCMode.Matrix_2_0],
])(
"expect leave when ElementWidgetActions.HangupCall is called (%s mode)",
async (mode) => {
const pr = Promise.withResolvers<string>(); const pr = Promise.withResolvers<string>();
withCallViewModel( withCallViewModel(mode)(
{ {
remoteParticipants$: constant([aliceParticipant]), remoteParticipants$: constant([aliceParticipant]),
rtcMembers$: constant([localRtcMember]), rtcMembers$: constant([localRtcMember]),
@@ -66,4 +73,5 @@ it("expect leave when ElementWidgetActions.HangupCall is called", async () => {
const source = await pr.promise; const source = await pr.promise;
expect(source).toBe("user"); expect(source).toBe("user");
}); },
);

View File

@@ -27,6 +27,7 @@ import type { ReactionOption } from "../reactions";
import { observeSpeaker$ } from "./observeSpeaker.ts"; import { observeSpeaker$ } from "./observeSpeaker.ts";
import { generateItems } from "../utils/observable.ts"; import { generateItems } from "../utils/observable.ts";
import { ScreenShare } from "./ScreenShare.ts"; import { ScreenShare } from "./ScreenShare.ts";
import { type TaggedParticipant } from "./CallViewModel/remoteMembers/MatrixLivekitMembers.ts";
/** /**
* Sorting bins defining the order in which media tiles appear in the layout. * Sorting bins defining the order in which media tiles appear in the layout.
@@ -68,12 +69,13 @@ enum SortingBin {
* for inclusion in the call layout and tracks associated screen shares. * for inclusion in the call layout and tracks associated screen shares.
*/ */
export class UserMedia { export class UserMedia {
public readonly vm: UserMediaViewModel = this.participant$.value?.isLocal public readonly vm: UserMediaViewModel =
this.participant.type === "local"
? new LocalUserMediaViewModel( ? new LocalUserMediaViewModel(
this.scope, this.scope,
this.id, this.id,
this.userId, this.userId,
this.participant$ as Behavior<LocalParticipant | null>, this.participant.value$,
this.encryptionSystem, this.encryptionSystem,
this.livekitRoom$, this.livekitRoom$,
this.focusUrl$, this.focusUrl$,
@@ -87,7 +89,7 @@ export class UserMedia {
this.scope, this.scope,
this.id, this.id,
this.userId, this.userId,
this.participant$ as Behavior<RemoteParticipant | null>, this.participant.value$,
this.encryptionSystem, this.encryptionSystem,
this.livekitRoom$, this.livekitRoom$,
this.focusUrl$, this.focusUrl$,
@@ -102,6 +104,11 @@ export class UserMedia {
observeSpeaker$(this.vm.speaking$), observeSpeaker$(this.vm.speaking$),
); );
// TypeScript needs this widening of the type to happen in a separate statement
private readonly participant$: Behavior<
LocalParticipant | RemoteParticipant | null
> = this.participant.value$;
/** /**
* All screen share media associated with this user media. * All screen share media associated with this user media.
*/ */
@@ -184,9 +191,7 @@ export class UserMedia {
private readonly scope: ObservableScope, private readonly scope: ObservableScope,
public readonly id: string, public readonly id: string,
private readonly userId: string, private readonly userId: string,
private readonly participant$: Behavior< private readonly participant: TaggedParticipant,
LocalParticipant | RemoteParticipant | null
>,
private readonly encryptionSystem: EncryptionSystem, private readonly encryptionSystem: EncryptionSystem,
private readonly livekitRoom$: Behavior<LivekitRoom | undefined>, private readonly livekitRoom$: Behavior<LivekitRoom | undefined>,
private readonly focusUrl$: Behavior<string | undefined>, private readonly focusUrl$: Behavior<string | undefined>,

View File

@@ -5,11 +5,12 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { test } from "vitest"; import { expect, test } from "vitest";
import { Subject } from "rxjs"; import { type Observable, of, Subject, switchMap } from "rxjs";
import { withTestScheduler } from "./test"; import { withTestScheduler } from "./test";
import { generateItems, pauseWhen } from "./observable"; import { filterBehavior, generateItems, pauseWhen } from "./observable";
import { type Behavior } from "../state/Behavior";
test("pauseWhen", () => { test("pauseWhen", () => {
withTestScheduler(({ behavior, expectObservable }) => { withTestScheduler(({ behavior, expectObservable }) => {
@@ -72,3 +73,31 @@ test("generateItems", () => {
expectObservable(scope4$).toBe(scope4Marbles); expectObservable(scope4$).toBe(scope4Marbles);
}); });
}); });
test("filterBehavior", () => {
withTestScheduler(({ behavior, expectObservable }) => {
// Filtering the input should segment it into 2 modes of non-null behavior.
const inputMarbles = " abcxabx";
const filteredMarbles = "a--xa-x";
const input$ = behavior(inputMarbles, {
a: "a",
b: "b",
c: "c",
x: null,
});
const filtered$: Observable<Behavior<string> | null> = input$.pipe(
filterBehavior((value) => typeof value === "string"),
);
expectObservable(filtered$).toBe(filteredMarbles, {
a: expect.any(Object),
x: null,
});
expectObservable(
filtered$.pipe(
switchMap((value$) => (value$ === null ? of(null) : value$)),
),
).toBe(inputMarbles, { a: "a", b: "b", c: "c", x: null });
});
});

View File

@@ -22,6 +22,7 @@ import {
withLatestFrom, withLatestFrom,
BehaviorSubject, BehaviorSubject,
type OperatorFunction, type OperatorFunction,
distinctUntilChanged,
} from "rxjs"; } from "rxjs";
import { type Behavior } from "../state/Behavior"; import { type Behavior } from "../state/Behavior";
@@ -185,6 +186,28 @@ export function generateItemsWithEpoch<
); );
} }
/**
* Segments a behavior into periods during which its value matches the filter
* (outputting a behavior with a narrowed type) and periods during which it does
* not match (outputting null).
*/
export function filterBehavior<T, S extends T>(
predicate: (value: T) => value is S,
): OperatorFunction<T, Behavior<S> | null> {
return (input$) =>
input$.pipe(
scan<T, BehaviorSubject<S> | null>((acc$, input) => {
if (predicate(input)) {
const output$ = acc$ ?? new BehaviorSubject(input);
output$.next(input);
return output$;
}
return null;
}, null),
distinctUntilChanged(),
);
}
function generateItemsInternal< function generateItemsInternal<
Input, Input,
Keys extends [unknown, ...unknown[]], Keys extends [unknown, ...unknown[]],

View File

@@ -37,6 +37,7 @@ import {
import { aliceRtcMember, localRtcMember } from "./test-fixtures"; import { aliceRtcMember, localRtcMember } from "./test-fixtures";
import { type RaisedHandInfo, type ReactionInfo } from "../reactions"; import { type RaisedHandInfo, type ReactionInfo } from "../reactions";
import { constant } from "../state/Behavior"; import { constant } from "../state/Behavior";
import { MatrixRTCMode } from "../settings/settings";
mockConfig({ livekit: { livekit_service_url: "https://example.com" } }); mockConfig({ livekit: { livekit_service_url: "https://example.com" } });
@@ -162,6 +163,7 @@ export function getBasicCallViewModelEnvironment(
setE2EEEnabled: async () => Promise.resolve(), setE2EEEnabled: async () => Promise.resolve(),
}), }),
connectionState$: constant(ConnectionState.Connected), connectionState$: constant(ConnectionState.Connected),
matrixRTCMode$: constant(MatrixRTCMode.Legacy),
...callViewModelOptions, ...callViewModelOptions,
}, },
handRaisedSubject$, handRaisedSubject$,