Merge branch 'livekit' into toger5/sdk-improvements

This commit is contained in:
Timo K
2026-01-31 11:53:15 +01:00
15 changed files with 167 additions and 156 deletions

View File

@@ -78,8 +78,8 @@ export type OpenIDClientParts = Pick<
* @param membership Our own membership identity parts used to send to jwt service. * @param membership Our own membership identity parts used to send to jwt service.
* @param serviceUrl The URL of the livekit SFU service * @param serviceUrl The URL of the livekit SFU service
* @param roomId The room id used in the jwt request. This is NOT the livekit_alias. The jwt service will provide the alias. It maps matrix room ids <-> Livekit aliases. * @param roomId The room id used in the jwt request. This is NOT the livekit_alias. The jwt service will provide the alias. It maps matrix room ids <-> Livekit aliases.
* @param opts Additional options to modify which endpoint with which data will be used to aquire the jwt token. * @param opts Additional options to modify which endpoint with which data will be used to acquire the jwt token.
* @param opts.forceJwtEndpoint This will use the old jwt endpoint which will create the rtc backend identity based on string concatination * @param opts.forceJwtEndpoint This will use the old jwt endpoint which will create the rtc backend identity based on string concatenation
* instead of a hash. * instead of a hash.
* This function by default uses whatever is possible with the current jwt service installed next to the SFU. * This function by default uses whatever is possible with the current jwt service installed next to the SFU.
* For remote connections this does not matter, since we will not publish there we can rely on the newest option. * For remote connections this does not matter, since we will not publish there we can rely on the newest option.

View File

@@ -42,7 +42,7 @@ import {
import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { import {
MembershipManagerEvent, MembershipManagerEvent,
type LivekitTransport, type LivekitTransportConfig,
type MatrixRTCSession, type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { type IWidgetApiRequest } from "matrix-widget-api"; import { type IWidgetApiRequest } from "matrix-widget-api";
@@ -103,7 +103,7 @@ import {
type SpotlightPortraitLayoutMedia, type SpotlightPortraitLayoutMedia,
} from "../layout-types.ts"; } from "../layout-types.ts";
import { ElementCallError, UnknownCallError } from "../../utils/errors.ts"; import { ElementCallError, UnknownCallError } from "../../utils/errors.ts";
import { type ObservableScope } from "../ObservableScope.ts"; import { type Epoch, type ObservableScope } from "../ObservableScope.ts";
import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts"; import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts";
import { import {
createLocalMembership$, createLocalMembership$,
@@ -468,6 +468,7 @@ export function createCallViewModel$(
const connectionFactory = new ECConnectionFactory( const connectionFactory = new ECConnectionFactory(
client, client,
matrixRoom.roomId,
mediaDevices, mediaDevices,
trackProcessorState$, trackProcessorState$,
livekitKeyProvider, livekitKeyProvider,
@@ -496,12 +497,13 @@ export function createCallViewModel$(
ownMembershipIdentity, ownMembershipIdentity,
}); });
const matrixLivekitMembers$ = createMatrixLivekitMembers$({ const matrixLivekitMembers$: Behavior<Epoch<RemoteMatrixLivekitMember[]>> =
scope: scope, createMatrixLivekitMembers$({
membershipsWithTransport$: scope: scope,
membershipsAndTransports.membershipsWithTransport$, membershipsWithTransport$:
connectionManager: connectionManager, membershipsAndTransports.membershipsWithTransport$,
}); connectionManager: connectionManager,
});
const connectOptions$ = scope.behavior( const connectOptions$ = scope.behavior(
matrixRTCMode$.pipe( matrixRTCMode$.pipe(
@@ -521,7 +523,7 @@ export function createCallViewModel$(
matrixRTCSession, matrixRTCSession,
), ),
muteStates: muteStates, muteStates: muteStates,
joinMatrixRTC: (transport: LivekitTransport) => { joinMatrixRTC: (transport: LivekitTransportConfig) => {
return enterRTCSession( return enterRTCSession(
matrixRTCSession, matrixRTCSession,
ownMembershipIdentity, ownMembershipIdentity,

View File

@@ -8,7 +8,7 @@ Please see LICENSE in the repository root for full details.
import { import {
Status as RTCMemberStatus, Status as RTCMemberStatus,
type LivekitTransport, type LivekitTransportConfig,
type MatrixRTCSession, type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
@@ -281,7 +281,7 @@ describe("LocalMembership", () => {
const aTransport = { const aTransport = {
transport: { transport: {
livekit_service_url: "a", livekit_service_url: "a",
} as LivekitTransport, } as LivekitTransportConfig,
sfuConfig: { sfuConfig: {
url: "sfu-url", url: "sfu-url",
jwt: "sfu-token", jwt: "sfu-token",
@@ -290,7 +290,7 @@ describe("LocalMembership", () => {
const bTransport = { const bTransport = {
transport: { transport: {
livekit_service_url: "b", livekit_service_url: "b",
} as LivekitTransport, } as LivekitTransportConfig,
sfuConfig: { sfuConfig: {
url: "sfu-url", url: "sfu-url",
jwt: "sfu-token", jwt: "sfu-token",

View File

@@ -17,6 +17,7 @@ import { observeParticipantEvents } from "@livekit/components-core";
import { import {
Status as RTCSessionStatus, Status as RTCSessionStatus,
type LivekitTransport, type LivekitTransport,
type LivekitTransportConfig,
type MatrixRTCSession, type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { import {
@@ -125,7 +126,7 @@ interface Props {
muteStates: MuteStates; muteStates: MuteStates;
connectionManager: IConnectionManager; connectionManager: IConnectionManager;
createPublisherFactory: (connection: Connection) => Publisher; createPublisherFactory: (connection: Connection) => Publisher;
joinMatrixRTC: (transport: LivekitTransport) => void; joinMatrixRTC: (transport: LivekitTransportConfig) => void;
homeserverConnected: HomeserverConnected; homeserverConnected: HomeserverConnected;
localTransport$: Behavior<LocalTransportWithSFUConfig | null>; localTransport$: Behavior<LocalTransportWithSFUConfig | null>;
matrixRTCSession: Pick< matrixRTCSession: Pick<
@@ -717,7 +718,7 @@ interface EnterRTCSessionOptions {
export function enterRTCSession( export function enterRTCSession(
rtcSession: MatrixRTCSession, rtcSession: MatrixRTCSession,
ownMembershipIdentity: CallMembershipIdentityParts, ownMembershipIdentity: CallMembershipIdentityParts,
transport: LivekitTransport, transport: LivekitTransportConfig,
options: EnterRTCSessionOptions, options: EnterRTCSessionOptions,
): void { ): void {
const { encryptMedia, matrixRTCMode } = options; const { encryptMedia, matrixRTCMode } = options;
@@ -735,12 +736,26 @@ export function enterRTCSession(
const multiSFU = const multiSFU =
matrixRTCMode === MatrixRTCMode.Compatibility || matrixRTCMode === MatrixRTCMode.Compatibility ||
matrixRTCMode === MatrixRTCMode.Matrix_2_0; matrixRTCMode === MatrixRTCMode.Matrix_2_0;
// For backwards compatibility with Element Call versions that do not do Matrix 2.0,
// we add the livekit alias to the transport.
let backwardCompatibleTransport: LivekitTransport | LivekitTransportConfig;
if (matrixRTCMode === MatrixRTCMode.Matrix_2_0) {
backwardCompatibleTransport = transport;
} else {
backwardCompatibleTransport = {
livekit_alias: rtcSession.room.roomId,
...transport,
};
}
// Multi-sfu does not need a preferred foci list. just the focus that is actually used. // Multi-sfu does not need a preferred foci list. just the focus that is actually used.
// TODO where/how do we track errors originating from the ongoing rtcSession? // TODO where/how do we track errors originating from the ongoing rtcSession?
rtcSession.joinRTCSession( rtcSession.joinRTCSession(
ownMembershipIdentity, ownMembershipIdentity,
multiSFU ? [] : [transport], multiSFU ? [] : [backwardCompatibleTransport],
multiSFU ? transport : undefined, multiSFU ? backwardCompatibleTransport : undefined,
{ {
notificationType, notificationType,
callIntent, callIntent,

View File

@@ -34,7 +34,7 @@ describe("LocalTransport", () => {
const openIdResponse: openIDSFU.SFUConfig = { const openIdResponse: openIDSFU.SFUConfig = {
url: "https://lk.example.org", url: "https://lk.example.org",
jwt: testJWTToken, jwt: testJWTToken,
livekitAlias: "!example_room_id", livekitAlias: "Akph4alDMhen",
livekitIdentity: "@lk_user:ABCDEF", livekitIdentity: "@lk_user:ABCDEF",
}; };
@@ -147,7 +147,7 @@ describe("LocalTransport", () => {
openIdResolver.resolve?.({ openIdResolver.resolve?.({
url: "https://lk.example.org", url: "https://lk.example.org",
jwt: "jwt", jwt: "jwt",
livekitAlias: "!room:example.org", livekitAlias: "Akph4alDMhen",
livekitIdentity: ownMemberMock.userId + ":" + ownMemberMock.deviceId, livekitIdentity: ownMemberMock.userId + ":" + ownMemberMock.deviceId,
}); });
expect(localTransport$.value).toBe(null); expect(localTransport$.value).toBe(null);
@@ -155,13 +155,12 @@ describe("LocalTransport", () => {
// final // final
expect(localTransport$.value).toStrictEqual({ expect(localTransport$.value).toStrictEqual({
transport: { transport: {
livekit_alias: "!room:example.org",
livekit_service_url: "https://lk.example.org", livekit_service_url: "https://lk.example.org",
type: "livekit", type: "livekit",
}, },
sfuConfig: { sfuConfig: {
jwt: "jwt", jwt: "jwt",
livekitAlias: "!room:example.org", livekitAlias: "Akph4alDMhen",
livekitIdentity: "@alice:example.org:DEVICE", livekitIdentity: "@alice:example.org:DEVICE",
url: "https://lk.example.org", url: "https://lk.example.org",
}, },
@@ -204,13 +203,12 @@ describe("LocalTransport", () => {
// final // final
expect(localTransport$.value).toStrictEqual({ expect(localTransport$.value).toStrictEqual({
transport: { transport: {
livekit_alias: "!example_room_id",
livekit_service_url: "https://lk.example.org", livekit_service_url: "https://lk.example.org",
type: "livekit", type: "livekit",
}, },
sfuConfig: { sfuConfig: {
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
livekitAlias: "!example_room_id", livekitAlias: "Akph4alDMhen",
livekitIdentity: "@lk_user:ABCDEF", livekitIdentity: "@lk_user:ABCDEF",
url: "https://lk.example.org", url: "https://lk.example.org",
}, },
@@ -264,13 +262,12 @@ describe("LocalTransport", () => {
await flushPromises(); await flushPromises();
expect(localTransport$.value).toStrictEqual({ expect(localTransport$.value).toStrictEqual({
transport: { transport: {
livekit_alias: "!example_room_id",
livekit_service_url: "https://lk.example.org", livekit_service_url: "https://lk.example.org",
type: "livekit", type: "livekit",
}, },
sfuConfig: { sfuConfig: {
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
livekitAlias: "!example_room_id", livekitAlias: "Akph4alDMhen",
livekitIdentity: "@lk_user:ABCDEF", livekitIdentity: "@lk_user:ABCDEF",
url: "https://lk.example.org", url: "https://lk.example.org",
}, },
@@ -284,13 +281,12 @@ describe("LocalTransport", () => {
await flushPromises(); await flushPromises();
expect(localTransport$.value).toStrictEqual({ expect(localTransport$.value).toStrictEqual({
transport: { transport: {
livekit_alias: "!example_room_id",
livekit_service_url: "https://lk.example.org", livekit_service_url: "https://lk.example.org",
type: "livekit", type: "livekit",
}, },
sfuConfig: { sfuConfig: {
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
livekitAlias: "!example_room_id", livekitAlias: "Akph4alDMhen",
livekitIdentity: "@lk_user:ABCDEF", livekitIdentity: "@lk_user:ABCDEF",
url: "https://lk.example.org", url: "https://lk.example.org",
}, },
@@ -306,13 +302,12 @@ describe("LocalTransport", () => {
await flushPromises(); await flushPromises();
expect(localTransport$.value).toStrictEqual({ expect(localTransport$.value).toStrictEqual({
transport: { transport: {
livekit_alias: "!example_room_id",
livekit_service_url: "https://lk.example.org", livekit_service_url: "https://lk.example.org",
type: "livekit", type: "livekit",
}, },
sfuConfig: { sfuConfig: {
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
livekitAlias: "!example_room_id", livekitAlias: "Akph4alDMhen",
livekitIdentity: "@lk_user:ABCDEF", livekitIdentity: "@lk_user:ABCDEF",
url: "https://lk.example.org", url: "https://lk.example.org",
}, },
@@ -345,13 +340,12 @@ describe("LocalTransport", () => {
await flushPromises(); await flushPromises();
expect(localTransport$.value).toStrictEqual({ expect(localTransport$.value).toStrictEqual({
transport: { transport: {
livekit_alias: "!example_room_id",
livekit_service_url: "https://lk.example.org", livekit_service_url: "https://lk.example.org",
type: "livekit", type: "livekit",
}, },
sfuConfig: { sfuConfig: {
jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=", jwt: "e30=.eyJzdWIiOiJAbWU6ZXhhbXBsZS5vcmc6QUJDREVGIiwidmlkZW8iOnsicm9vbSI6IiFleGFtcGxlX3Jvb21faWQifX0=.e30=",
livekitAlias: "!example_room_id", livekitAlias: "Akph4alDMhen",
livekitIdentity: "@lk_user:ABCDEF", livekitIdentity: "@lk_user:ABCDEF",
url: "https://lk.example.org", url: "https://lk.example.org",
}, },

View File

@@ -7,10 +7,9 @@ Please see LICENSE in the repository root for full details.
import { import {
type CallMembership, type CallMembership,
isLivekitTransport,
type LivekitTransport,
isLivekitTransportConfig, isLivekitTransportConfig,
type Transport, type Transport,
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { MatrixError, type MatrixClient } from "matrix-js-sdk"; import { MatrixError, type MatrixClient } from "matrix-js-sdk";
import { import {
@@ -57,6 +56,7 @@ interface Props {
"getDomain" | "baseUrl" | "_unstable_getRTCTransports" "getDomain" | "baseUrl" | "_unstable_getRTCTransports"
> & > &
OpenIDClientParts; OpenIDClientParts;
// Used by the jwt service to create the livekit room and compute the livekit alias.
roomId: string; roomId: string;
useOldestMember$: Behavior<boolean>; useOldestMember$: Behavior<boolean>;
forceJwtEndpoint$: Behavior<JwtEndpointVersion>; forceJwtEndpoint$: Behavior<JwtEndpointVersion>;
@@ -90,11 +90,11 @@ export enum JwtEndpointVersion {
// 2. // 2.
// We need to make sure we do not sent livekit_alias in sticky events and that we drop all code for sending state events! // We need to make sure we do not sent livekit_alias in sticky events and that we drop all code for sending state events!
export interface LocalTransportWithSFUConfig { export interface LocalTransportWithSFUConfig {
transport: LivekitTransport; transport: LivekitTransportConfig;
sfuConfig: SFUConfig; sfuConfig: SFUConfig;
} }
export function isLocalTransportWithSFUConfig( export function isLocalTransportWithSFUConfig(
obj: LivekitTransport | LocalTransportWithSFUConfig, obj: LivekitTransportConfig | LocalTransportWithSFUConfig,
): obj is LocalTransportWithSFUConfig { ): obj is LocalTransportWithSFUConfig {
return "transport" in obj && "sfuConfig" in obj; return "transport" in obj && "sfuConfig" in obj;
} }
@@ -137,11 +137,10 @@ export const createLocalTransport$ = ({
return transport; return transport;
}), }),
switchMap((transport) => { switchMap((transport) => {
if (transport !== null && isLivekitTransport(transport)) { if (transport !== null && isLivekitTransportConfig(transport)) {
// Get the open jwt token to connect to the sfu // Get the open jwt token to connect to the sfu
const computeLocalTransportWithSFUConfig = const computeLocalTransportWithSFUConfig =
async (): Promise<LocalTransportWithSFUConfig> => { async (): Promise<LocalTransportWithSFUConfig> => {
// await sleep(1000);
return { return {
transport, transport,
sfuConfig: await getSFUConfigWithOpenID( sfuConfig: await getSFUConfigWithOpenID(
@@ -288,18 +287,6 @@ async function makeTransport(
transport: { transport: {
type: "livekit", type: "livekit",
livekit_service_url: url, livekit_service_url: url,
// WARNING PLS READ ME!!!
// This looks unintuitive especially considering that `sfuConfig.livekitAlias` exists.
// Why do we not use: `livekit_alias: sfuConfig.livekitAlias`
//
// - This is going to be used for sending our state event transport (focus_preferred)
// - In sticky events it is expected to NOT send this field at all. The transport is only the `type`, `livekit_service_url`
// - If we set it to the hased alias we get from the jwt, we will end up using the hashed alias as the body.roomId field
// in v0.16.0. (It will use oldest member transport. It is using the transport.livekit_alias as the body.roomId)
//
// TLDR this is a temporal field that allow for comaptibilty but the spec expects it to not exists. (but its existance also does not break anything)
// It is just named poorly: It was intetended to be the actual alias. But now we do pseudonymys ids so we use a hashed alias.
livekit_alias: roomId,
}, },
sfuConfig, sfuConfig,
}; };

View File

@@ -26,7 +26,7 @@ import fetchMock from "fetch-mock";
import EventEmitter from "events"; import EventEmitter from "events";
import { type IOpenIDToken } from "matrix-js-sdk"; import { type IOpenIDToken } from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc/LivekitTransport"; import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
import { import {
Connection, Connection,
@@ -51,8 +51,9 @@ let fakeLivekitRoom: MockedObject<LivekitRoom>;
let localParticipantEventEmiter: EventEmitter; let localParticipantEventEmiter: EventEmitter;
let fakeLocalParticipant: MockedObject<LocalParticipant>; let fakeLocalParticipant: MockedObject<LocalParticipant>;
const livekitFocus: LivekitTransport = { const ROOM_ID = "!roomID:example.org";
livekit_alias: "!roomID:example.org",
const livekitFocus: LivekitTransportConfig = {
livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt",
type: "livekit", type: "livekit",
}; };
@@ -112,6 +113,7 @@ function setupTest(): void {
function setupRemoteConnection(): Connection { function setupRemoteConnection(): Connection {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
roomId: ROOM_ID,
transport: livekitFocus, transport: livekitFocus,
scope: testScope, scope: testScope,
ownMembershipIdentity: ownMemberMock, ownMembershipIdentity: ownMemberMock,
@@ -154,6 +156,7 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
roomId: ROOM_ID,
transport: livekitFocus, transport: livekitFocus,
scope: testScope, scope: testScope,
ownMembershipIdentity: ownMemberMock, ownMembershipIdentity: ownMemberMock,
@@ -170,6 +173,7 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
roomId: ROOM_ID,
transport: livekitFocus, transport: livekitFocus,
scope: testScope, scope: testScope,
ownMembershipIdentity: ownMemberMock, ownMembershipIdentity: ownMemberMock,
@@ -221,6 +225,7 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
roomId: ROOM_ID,
transport: livekitFocus, transport: livekitFocus,
scope: testScope, scope: testScope,
ownMembershipIdentity: ownMemberMock, ownMembershipIdentity: ownMemberMock,
@@ -279,6 +284,7 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
roomId: ROOM_ID,
transport: livekitFocus, transport: livekitFocus,
scope: testScope, scope: testScope,
ownMembershipIdentity: ownMemberMock, ownMembershipIdentity: ownMemberMock,

View File

@@ -15,7 +15,7 @@ import {
type Room as LivekitRoom, type Room as LivekitRoom,
type RemoteParticipant, type RemoteParticipant,
} from "livekit-client"; } from "livekit-client";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, map } from "rxjs"; import { BehaviorSubject, map } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger"; import { type Logger } from "matrix-js-sdk/lib/logger";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager"; import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
@@ -49,9 +49,11 @@ export interface ConnectionOpts {
/** The identity parts to use on this connection */ /** The identity parts to use on this connection */
ownMembershipIdentity: CallMembershipIdentityParts; ownMembershipIdentity: CallMembershipIdentityParts;
/** The media transport to connect to. */ /** The media transport to connect to. */
transport: LivekitTransport; transport: LivekitTransportConfig;
/** The Matrix client to use for OpenID and SFU config requests. */ /** The Matrix client to use for OpenID and SFU config requests. */
client: OpenIDClientParts; client: OpenIDClientParts;
/** The room ID this connection is associated with. */
roomId: string;
/** The observable scope to use for this connection. */ /** The observable scope to use for this connection. */
scope: ObservableScope; scope: ObservableScope;
@@ -102,7 +104,7 @@ export class Connection {
/** /**
* The media transport to connect to. * The media transport to connect to.
*/ */
public readonly transport: LivekitTransport; public readonly transport: LivekitTransportConfig;
public readonly livekitRoom: LivekitRoom; public readonly livekitRoom: LivekitRoom;
@@ -131,6 +133,47 @@ export class Connection {
* */ * */
protected stopped = false; protected stopped = false;
// TODO: can we just keep the ConnectionOpts object instead of spreading?
private readonly client: OpenIDClientParts;
private readonly roomId: string;
private readonly logger: Logger;
private readonly ownMembershipIdentity: CallMembershipIdentityParts;
private readonly existingSFUConfig?: SFUConfig;
/**
* Creates a new connection to a matrix RTC LiveKit backend.
*
* @param opts - Connection options {@link ConnectionOpts}.
*
* @param logger - The logger to use.
*/
public constructor(opts: ConnectionOpts, logger: Logger) {
this.ownMembershipIdentity = opts.ownMembershipIdentity;
this.existingSFUConfig = opts.existingSFUConfig;
this.roomId = opts.roomId;
this.logger = logger.getChild(
"[Connection " + opts.transport.livekit_service_url + "]",
);
this.logger.info(
`constructor: ${opts.transport.livekit_service_url} roomId: ${this.roomId} withSfuConfig?: ${opts.existingSFUConfig ? JSON.stringify(opts.existingSFUConfig) : "undefined"}`,
);
const { transport, client, scope } = opts;
this.scope = scope;
this.livekitRoom = opts.livekitRoomFactory();
this.transport = transport;
this.client = client;
this.remoteParticipants$ = scope.behavior(
// Only tracks remote participants
connectedParticipantsObserver(this.livekitRoom),
);
scope.onEnd(() => {
this.logger.info(`Connection scope ended, stopping connection`);
void this.stop();
});
}
/** /**
* Starts the connection. * Starts the connection.
* *
@@ -231,7 +274,7 @@ export class Connection {
this.client, this.client,
this.ownMembershipIdentity, this.ownMembershipIdentity,
this.transport.livekit_service_url, this.transport.livekit_service_url,
this.transport.livekit_alias, this.roomId,
// dont pass any custom opts for the subscribe only connections // dont pass any custom opts for the subscribe only connections
{}, {},
this.logger, this.logger,
@@ -256,42 +299,4 @@ export class Connection {
`stop: DONE disconnecing from lk room ${this.transport.livekit_service_url}`, `stop: DONE disconnecing from lk room ${this.transport.livekit_service_url}`,
); );
} }
private readonly client: OpenIDClientParts;
private readonly logger: Logger;
private readonly ownMembershipIdentity: CallMembershipIdentityParts;
private readonly existingSFUConfig?: SFUConfig;
/**
* Creates a new connection to a matrix RTC LiveKit backend.
*
* @param opts - Connection options {@link ConnectionOpts}.
*
* @param logger - The logger to use.
*/
public constructor(opts: ConnectionOpts, logger: Logger) {
this.ownMembershipIdentity = opts.ownMembershipIdentity;
this.existingSFUConfig = opts.existingSFUConfig;
this.logger = logger.getChild(
"[Connection " + opts.transport.livekit_service_url + "]",
);
this.logger.info(
`constructor: ${opts.transport.livekit_service_url} alias: ${opts.transport.livekit_alias} withSfuConfig?: ${opts.existingSFUConfig ? JSON.stringify(opts.existingSFUConfig) : "undefined"}`,
);
const { transport, client, scope } = opts;
this.scope = scope;
this.livekitRoom = opts.livekitRoomFactory();
this.transport = transport;
this.client = client;
this.remoteParticipants$ = scope.behavior(
// Only tracks remote participants
connectedParticipantsObserver(this.livekitRoom),
);
scope.onEnd(() => {
this.logger.info(`Connection scope ended, stopping connection`);
void this.stop();
});
}
} }

View File

@@ -16,7 +16,7 @@ import { type Logger } from "matrix-js-sdk/lib/logger";
// imported as inline to support worker when loaded from a cdn (cross domain) // imported as inline to support worker when loaded from a cdn (cross domain)
import E2EEWorker from "livekit-client/e2ee-worker?worker&inline"; import E2EEWorker from "livekit-client/e2ee-worker?worker&inline";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager"; import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc/LivekitTransport"; import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
import { type ObservableScope } from "../../ObservableScope.ts"; import { type ObservableScope } from "../../ObservableScope.ts";
import { Connection } from "./Connection.ts"; import { Connection } from "./Connection.ts";
@@ -33,7 +33,7 @@ import { defaultLiveKitOptions } from "../../../livekit/options.ts";
export interface ConnectionFactory { export interface ConnectionFactory {
createConnection( createConnection(
scope: ObservableScope, scope: ObservableScope,
transport: LivekitTransport, transport: LivekitTransportConfig,
ownMembershipIdentity: CallMembershipIdentityParts, ownMembershipIdentity: CallMembershipIdentityParts,
logger: Logger, logger: Logger,
sfuConfig?: SFUConfig, sfuConfig?: SFUConfig,
@@ -47,6 +47,7 @@ export class ECConnectionFactory implements ConnectionFactory {
* Creates a ConnectionFactory for LiveKit connections. * Creates a ConnectionFactory for LiveKit connections.
* *
* @param client - The OpenID client parts for authentication, needed to get openID and JWT tokens. * @param client - The OpenID client parts for authentication, needed to get openID and JWT tokens.
* @param roomId - The current room ID.
* @param devices - Used for video/audio out/in capture options. * @param devices - Used for video/audio out/in capture options.
* @param processorState$ - Effects like background blur (only for publishing connection?) * @param processorState$ - Effects like background blur (only for publishing connection?)
* @param livekitKeyProvider - Optional key provider for end-to-end encryption. * @param livekitKeyProvider - Optional key provider for end-to-end encryption.
@@ -57,6 +58,7 @@ export class ECConnectionFactory implements ConnectionFactory {
*/ */
public constructor( public constructor(
private client: OpenIDClientParts, private client: OpenIDClientParts,
private readonly roomId: string,
private devices: MediaDevices, private devices: MediaDevices,
private processorState$: Behavior<ProcessorState>, private processorState$: Behavior<ProcessorState>,
livekitKeyProvider: BaseKeyProvider | undefined, livekitKeyProvider: BaseKeyProvider | undefined,
@@ -95,7 +97,7 @@ export class ECConnectionFactory implements ConnectionFactory {
*/ */
public createConnection( public createConnection(
scope: ObservableScope, scope: ObservableScope,
transport: LivekitTransport, transport: LivekitTransportConfig,
ownMembershipIdentity: CallMembershipIdentityParts, ownMembershipIdentity: CallMembershipIdentityParts,
logger: Logger, logger: Logger,
sfuConfig?: SFUConfig, sfuConfig?: SFUConfig,
@@ -103,6 +105,7 @@ export class ECConnectionFactory implements ConnectionFactory {
return new Connection( return new Connection(
{ {
existingSFUConfig: sfuConfig, existingSFUConfig: sfuConfig,
roomId: this.roomId,
transport, transport,
client: this.client, client: this.client,
scope: scope, scope: scope,

View File

@@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details.
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { BehaviorSubject } from "rxjs"; import { BehaviorSubject } from "rxjs";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
import { type RemoteParticipant } from "livekit-client"; import { type RemoteParticipant } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
@@ -24,16 +24,14 @@ import { constant, type Behavior } from "../../Behavior.ts";
// Some test constants // Some test constants
const TRANSPORT_1: LivekitTransport = { const TRANSPORT_1: LivekitTransportConfig = {
type: "livekit", type: "livekit",
livekit_service_url: "https://lk.example.org", livekit_service_url: "https://lk.example.org",
livekit_alias: "!alias:example.org",
}; };
const TRANSPORT_2: LivekitTransport = { const TRANSPORT_2: LivekitTransportConfig = {
type: "livekit", type: "livekit",
livekit_service_url: "https://lk.sample.com", livekit_service_url: "https://lk.sample.com",
livekit_alias: "!alias:sample.com",
}; };
let fakeConnectionFactory: ConnectionFactory; let fakeConnectionFactory: ConnectionFactory;
@@ -49,7 +47,7 @@ beforeEach(() => {
vi.mocked(fakeConnectionFactory).createConnection = vi vi.mocked(fakeConnectionFactory).createConnection = vi
.fn() .fn()
.mockImplementation( .mockImplementation(
(scope: ObservableScope, transport: LivekitTransport) => { (scope: ObservableScope, transport: LivekitTransportConfig) => {
const mockConnection = { const mockConnection = {
transport, transport,
remoteParticipants$: new BehaviorSubject([]), remoteParticipants$: new BehaviorSubject([]),
@@ -209,15 +207,15 @@ describe("connectionManagerData$ stream", () => {
// Used in test to control fake connections' remoteParticipants$ streams // Used in test to control fake connections' remoteParticipants$ streams
let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>; let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>;
function keyForTransport(transport: LivekitTransport): string { function keyForTransport(transport: LivekitTransportConfig): string {
return `${transport.livekit_service_url}|${transport.livekit_alias}`; return `${transport.livekit_service_url}`;
} }
beforeEach(() => { beforeEach(() => {
fakeRemoteParticipantsStreams = new Map(); fakeRemoteParticipantsStreams = new Map();
function getRemoteParticipantsFor( function getRemoteParticipantsFor(
transport: LivekitTransport, transport: LivekitTransportConfig,
): Behavior<RemoteParticipant[]> { ): Behavior<RemoteParticipant[]> {
return ( return (
fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ?? fakeRemoteParticipantsStreams.get(keyForTransport(transport)) ??
@@ -229,7 +227,7 @@ describe("connectionManagerData$ stream", () => {
vi.mocked(fakeConnectionFactory).createConnection = vi vi.mocked(fakeConnectionFactory).createConnection = vi
.fn() .fn()
.mockImplementation( .mockImplementation(
(scope: ObservableScope, transport: LivekitTransport) => { (scope: ObservableScope, transport: LivekitTransportConfig) => {
const fakeRemoteParticipants$ = new BehaviorSubject< const fakeRemoteParticipants$ = new BehaviorSubject<
RemoteParticipant[] RemoteParticipant[]
>([]); >([]);

View File

@@ -6,7 +6,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, of, switchMap } from "rxjs"; import { combineLatest, map, of, switchMap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger"; import { type Logger } from "matrix-js-sdk/lib/logger";
import { type RemoteParticipant } from "livekit-client"; import { type RemoteParticipant } from "livekit-client";
@@ -42,8 +42,10 @@ export class ConnectionManagerData {
} }
} }
private getKey(transport: LivekitTransport): string { private getKey(transport: LivekitTransportConfig): string {
return transport.livekit_service_url + "|" + transport.livekit_alias; // This is enough as a key because the ConnectionManager is already scoped by room.
// We also do not need to consider the slotId at this point since each `MatrixRTCSession` is already scoped by `slotDescription: {id, application}`.
return transport.livekit_service_url;
} }
public getConnections(): Connection[] { public getConnections(): Connection[] {
@@ -51,15 +53,15 @@ export class ConnectionManagerData {
} }
public getConnectionForTransport( public getConnectionForTransport(
transport: LivekitTransport, transport: LivekitTransportConfig,
): Connection | null { ): Connection | null {
return this.store.get(this.getKey(transport))?.connection ?? null; return this.store.get(this.getKey(transport))?.connection ?? null;
} }
public getParticipantsForTransport( public getParticipantsForTransport(
transport: LivekitTransport, transport: LivekitTransportConfig,
): RemoteParticipant[] { ): RemoteParticipant[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias; const key = this.getKey(transport);
const existing = this.store.get(key); const existing = this.store.get(key);
if (existing) { if (existing) {
return existing.participants; return existing.participants;
@@ -72,7 +74,7 @@ interface Props {
scope: ObservableScope; scope: ObservableScope;
connectionFactory: ConnectionFactory; connectionFactory: ConnectionFactory;
localTransport$: Behavior<LocalTransportWithSFUConfig | null>; localTransport$: Behavior<LocalTransportWithSFUConfig | null>;
remoteTransports$: Behavior<Epoch<LivekitTransport[]>>; remoteTransports$: Behavior<Epoch<LivekitTransportConfig[]>>;
logger: Logger; logger: Logger;
ownMembershipIdentity: CallMembershipIdentityParts; ownMembershipIdentity: CallMembershipIdentityParts;
@@ -123,7 +125,7 @@ export function createConnectionManager$({
* externally this is modified via `registerTransports()`. * externally this is modified via `registerTransports()`.
*/ */
const localAndRemoteTransports$: Behavior< const localAndRemoteTransports$: Behavior<
Epoch<(LivekitTransport | LocalTransportWithSFUConfig)[]> Epoch<(LivekitTransportConfig | LocalTransportWithSFUConfig)[]>
> = scope.behavior( > = scope.behavior(
combineLatest([remoteTransports$, localTransport$]).pipe( combineLatest([remoteTransports$, localTransport$]).pipe(
// Combine local and remote transports into one transport array // Combine local and remote transports into one transport array
@@ -168,19 +170,13 @@ export function createConnectionManager$({
// This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field // This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field
const { transport, sfuConfig } = transportWithOrWithoutSfuConfig; const { transport, sfuConfig } = transportWithOrWithoutSfuConfig;
yield { yield {
keys: [ keys: [transport.livekit_service_url, sfuConfig],
transport.livekit_service_url,
transport.livekit_alias,
sfuConfig,
],
data: undefined, data: undefined,
}; };
} else { } else {
const transport = transportWithOrWithoutSfuConfig;
yield { yield {
keys: [ keys: [
transport.livekit_service_url, transportWithOrWithoutSfuConfig.livekit_service_url,
transport.livekit_alias,
undefined as undefined | SFUConfig, undefined as undefined | SFUConfig,
], ],
data: undefined, data: undefined,
@@ -188,13 +184,12 @@ export function createConnectionManager$({
} }
} }
}, },
(scope, _data$, serviceUrl, alias, sfuConfig) => { (scope, _data$, serviceUrl, sfuConfig) => {
const connection = connectionFactory.createConnection( const connection = connectionFactory.createConnection(
scope, scope,
{ {
type: "livekit", type: "livekit",
livekit_service_url: serviceUrl, livekit_service_url: serviceUrl,
livekit_alias: alias,
}, },
ownMembershipIdentity, ownMembershipIdentity,
logger, logger,
@@ -254,7 +249,7 @@ export function createConnectionManager$({
return { connectionManagerData$ }; return { connectionManagerData$ };
} }
function removeDuplicateTransports<T extends LivekitTransport>( function removeDuplicateTransports<T extends LivekitTransportConfig>(
transports: T[], transports: T[],
): T[] { ): T[] {
return transports.reduce((acc, transport) => { return transports.reduce((acc, transport) => {

View File

@@ -65,6 +65,7 @@ describe("ECConnectionFactory - Audio inputs options", () => {
const ecConnectionFactory = new ECConnectionFactory( const ecConnectionFactory = new ECConnectionFactory(
mockClient, mockClient,
"!roomid:example.org",
mockMediaDevices({}), mockMediaDevices({}),
new BehaviorSubject<ProcessorState>({ new BehaviorSubject<ProcessorState>({
supported: true, supported: true,
@@ -105,6 +106,7 @@ describe("ECConnectionFactory - ControlledAudioDevice", () => {
const ecConnectionFactory = new ECConnectionFactory( const ecConnectionFactory = new ECConnectionFactory(
mockClient, mockClient,
"!roomid:example.org",
mockMediaDevices({ mockMediaDevices({
audioOutput: { audioOutput: {
available$: constant(new Map<never, never>()), available$: constant(new Map<never, never>()),

View File

@@ -7,8 +7,8 @@ Please see LICENSE in the repository root for full details.
import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import { import {
type LivekitTransport,
type CallMembership, type CallMembership,
type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, filter, map } from "rxjs"; import { combineLatest, filter, map } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
@@ -62,7 +62,7 @@ export interface RemoteMatrixLivekitMember extends MatrixLivekitMember {
interface Props { interface Props {
scope: ObservableScope; scope: ObservableScope;
membershipsWithTransport$: Behavior< membershipsWithTransport$: Behavior<
Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]> Epoch<{ membership: CallMembership; transport?: LivekitTransportConfig }[]>
>; >;
connectionManager: IConnectionManager; connectionManager: IConnectionManager;
} }
@@ -147,18 +147,12 @@ export function createMatrixLivekitMembers$({
// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$) // TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)
// TODO add this to the JS-SDK // TODO add this to the JS-SDK
export function areLivekitTransportsEqual<T extends LivekitTransport>( export function areLivekitTransportsEqual<T extends LivekitTransportConfig>(
t1: T | null, t1: T | null,
t2: T | null, t2: T | null,
): boolean { ): boolean {
if (t1 && t2) if (t1 && t2) {
return ( return t1.livekit_service_url === t2.livekit_service_url;
t1.livekit_service_url === t2.livekit_service_url && }
// In case we have different lk rooms in the same SFU (depends on the livekit authorization service) return !t1 && !t2;
// It is only needed in case the livekit authorization service is not behaving as expected (or custom implementation)
// Also LivekitTransport is planned to become a `ConnectionIdentifier` which moves this equal somewhere else.
t1.livekit_alias === t2.livekit_alias
);
if (!t1 && !t2) return true;
return false;
} }

View File

@@ -10,7 +10,7 @@ import { BehaviorSubject } from "rxjs";
import { type Room as LivekitRoom } from "livekit-client"; import { type Room as LivekitRoom } from "livekit-client";
import EventEmitter from "events"; import EventEmitter from "events";
import fetchMock from "fetch-mock"; import fetchMock from "fetch-mock";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { import {
@@ -71,6 +71,7 @@ beforeEach(() => {
ecConnectionFactory = new ECConnectionFactory( ecConnectionFactory = new ECConnectionFactory(
mockClient, mockClient,
"!roomid:example.org",
mockMediaDevices({}), mockMediaDevices({}),
new BehaviorSubject<ProcessorState>({ new BehaviorSubject<ProcessorState>({
supported: true, supported: true,
@@ -148,7 +149,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
a: expect.toSatisfy((co) => a: expect.toSatisfy((co) =>
areLivekitTransportsEqual( areLivekitTransportsEqual(
co.transport, co.transport,
bobMembership.transports[0]! as LivekitTransport, bobMembership.transports[0]! as LivekitTransportConfig,
), ),
), ),
}); });
@@ -185,7 +186,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
expect( expect(
areLivekitTransportsEqual( areLivekitTransportsEqual(
connection.transport, connection.transport,
carlMembership.transports[0]! as LivekitTransport, carlMembership.transports[0]! as LivekitTransportConfig,
), ),
).toBe(true); ).toBe(true);
return true; return true;
@@ -215,7 +216,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
expect( expect(
areLivekitTransportsEqual( areLivekitTransportsEqual(
connection.transport, connection.transport,
daveMembership.transports[0]! as LivekitTransport, daveMembership.transports[0]! as LivekitTransportConfig,
), ),
).toBe(true); ).toBe(true);
return true; return true;

View File

@@ -7,10 +7,10 @@ Please see LICENSE in the repository root for full details.
import { import {
type CallMembership, type CallMembership,
isLivekitTransport, type LivekitTransportConfig,
type LivekitTransport,
type MatrixRTCSession, type MatrixRTCSession,
MatrixRTCSessionEvent, MatrixRTCSessionEvent,
isLivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { fromEvent } from "rxjs"; import { fromEvent } from "rxjs";
@@ -27,19 +27,26 @@ export const membershipsAndTransports$ = (
memberships$: Behavior<Epoch<CallMembership[]>>, memberships$: Behavior<Epoch<CallMembership[]>>,
): { ): {
membershipsWithTransport$: Behavior< membershipsWithTransport$: Behavior<
Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]> Epoch<{ membership: CallMembership; transport?: LivekitTransportConfig }[]>
>; >;
transports$: Behavior<Epoch<LivekitTransport[]>>; transports$: Behavior<Epoch<LivekitTransportConfig[]>>;
} => { } => {
/** /**
* Lists the transports used by ourselves, plus all other MatrixRTC session * Lists the transports used by ourselves, plus all other MatrixRTC session
* members. For completeness this also lists the preferred transport and * members.
* whether we are in multi-SFU mode or sticky events mode (because * For completeness this also lists the preferred transport and
* advertisedTransport$ wants to read them at the same time, and bundling data * whether we are in multi-SFU mode or sticky events mode.
* together when it might change together is what you have to do in RxJS to * `advertisedTransport$` reads these values together, so bundling them avoids inconsistent state or
* avoid reading inconsistent state or observing too many changes.) * excessive updates when using RxJS.
*/ */
const membershipsWithTransport$ = scope.behavior( const membershipsWithTransport$: Behavior<
Epoch<
{
membership: CallMembership;
transport: LivekitTransportConfig | undefined;
}[]
>
> = scope.behavior(
memberships$.pipe( memberships$.pipe(
mapEpoch((memberships) => { mapEpoch((memberships) => {
return memberships.map((membership) => { return memberships.map((membership) => {
@@ -47,14 +54,16 @@ export const membershipsAndTransports$ = (
const transport = membership.getTransport(oldestMembership); const transport = membership.getTransport(oldestMembership);
return { return {
membership, membership,
transport: isLivekitTransport(transport) ? transport : undefined, transport: isLivekitTransportConfig(transport)
? transport
: undefined,
}; };
}); });
}), }),
), ),
); );
const transports$ = scope.behavior( const transports$: Behavior<Epoch<LivekitTransportConfig[]>> = scope.behavior(
membershipsWithTransport$.pipe( membershipsWithTransport$.pipe(
mapEpoch((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))), mapEpoch((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))),
), ),