Adding more logs (#3563)
* Adding more logs * post merge fix * fixup merge error * review --------- Co-authored-by: Timo K <toger5@hotmail.de>
This commit is contained in:
@@ -241,6 +241,7 @@ export class CallViewModel {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
logger: logger,
|
||||||
});
|
});
|
||||||
|
|
||||||
private matrixLivekitMembers$ = createMatrixLivekitMembers$({
|
private matrixLivekitMembers$ = createMatrixLivekitMembers$({
|
||||||
@@ -271,6 +272,7 @@ export class CallViewModel {
|
|||||||
trackProcessorState$: this.trackProcessorState$,
|
trackProcessorState$: this.trackProcessorState$,
|
||||||
widget,
|
widget,
|
||||||
options: this.connectOptions$,
|
options: this.connectOptions$,
|
||||||
|
logger: logger.getChild(`[${Date.now()}]`),
|
||||||
});
|
});
|
||||||
|
|
||||||
private localRtcMembership$ = this.scope.behavior(
|
private localRtcMembership$ = this.scope.behavior(
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import { ClientEvent, SyncState, type Room as MatrixRoom } from "matrix-js-sdk";
|
|||||||
import {
|
import {
|
||||||
BehaviorSubject,
|
BehaviorSubject,
|
||||||
combineLatest,
|
combineLatest,
|
||||||
|
distinctUntilChanged,
|
||||||
fromEvent,
|
fromEvent,
|
||||||
map,
|
map,
|
||||||
type Observable,
|
type Observable,
|
||||||
@@ -29,8 +30,9 @@ import {
|
|||||||
scan,
|
scan,
|
||||||
startWith,
|
startWith,
|
||||||
switchMap,
|
switchMap,
|
||||||
|
tap,
|
||||||
} from "rxjs";
|
} from "rxjs";
|
||||||
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
|
import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||||
|
|
||||||
import { type Behavior } from "../../Behavior";
|
import { type Behavior } from "../../Behavior";
|
||||||
import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
|
import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
|
||||||
@@ -52,7 +54,7 @@ import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
|
|||||||
import { MatrixRTCMode } from "../../../settings/settings.ts";
|
import { MatrixRTCMode } from "../../../settings/settings.ts";
|
||||||
import { Config } from "../../../config/Config.ts";
|
import { Config } from "../../../config/Config.ts";
|
||||||
import { type Connection } from "../remoteMembers/Connection.ts";
|
import { type Connection } from "../remoteMembers/Connection.ts";
|
||||||
const logger = rootLogger.getChild("[LocalMembership]");
|
|
||||||
export enum LivekitState {
|
export enum LivekitState {
|
||||||
Uninitialized = "uninitialized",
|
Uninitialized = "uninitialized",
|
||||||
Connecting = "connecting",
|
Connecting = "connecting",
|
||||||
@@ -61,6 +63,7 @@ export enum LivekitState {
|
|||||||
Disconnected = "disconnected",
|
Disconnected = "disconnected",
|
||||||
Disconnecting = "disconnecting",
|
Disconnecting = "disconnecting",
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalMemberLivekitState =
|
type LocalMemberLivekitState =
|
||||||
| { state: LivekitState.Error; error: string }
|
| { state: LivekitState.Error; error: string }
|
||||||
| { state: LivekitState.Connected }
|
| { state: LivekitState.Connected }
|
||||||
@@ -74,6 +77,7 @@ export enum MatrixState {
|
|||||||
Disconnected = "disconnected",
|
Disconnected = "disconnected",
|
||||||
Connecting = "connecting",
|
Connecting = "connecting",
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalMemberMatrixState =
|
type LocalMemberMatrixState =
|
||||||
| { state: MatrixState.Connected }
|
| { state: MatrixState.Connected }
|
||||||
| { state: MatrixState.Connecting }
|
| { state: MatrixState.Connecting }
|
||||||
@@ -106,6 +110,7 @@ interface Props {
|
|||||||
localTransport$: Behavior<LivekitTransport | null>;
|
localTransport$: Behavior<LivekitTransport | null>;
|
||||||
trackProcessorState$: Behavior<ProcessorState>;
|
trackProcessorState$: Behavior<ProcessorState>;
|
||||||
widget: WidgetHelpers | null;
|
widget: WidgetHelpers | null;
|
||||||
|
logger: Logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -132,6 +137,7 @@ export const createLocalMembership$ = ({
|
|||||||
matrixRoom,
|
matrixRoom,
|
||||||
trackProcessorState$,
|
trackProcessorState$,
|
||||||
widget,
|
widget,
|
||||||
|
logger: parentLogger,
|
||||||
}: Props): {
|
}: Props): {
|
||||||
// publisher: Publisher
|
// publisher: Publisher
|
||||||
requestConnect: () => LocalMemberConnectionState;
|
requestConnect: () => LocalMemberConnectionState;
|
||||||
@@ -157,6 +163,8 @@ export const createLocalMembership$ = ({
|
|||||||
/** @deprecated use state instead*/
|
/** @deprecated use state instead*/
|
||||||
configError$: Behavior<ElementCallError | null>;
|
configError$: Behavior<ElementCallError | null>;
|
||||||
} => {
|
} => {
|
||||||
|
const logger = parentLogger.getChild("[LocalMembership]");
|
||||||
|
logger.debug(`Creating local membership..`);
|
||||||
const state = {
|
const state = {
|
||||||
livekit$: new BehaviorSubject<LocalMemberLivekitState>({
|
livekit$: new BehaviorSubject<LocalMemberLivekitState>({
|
||||||
state: LivekitState.Uninitialized,
|
state: LivekitState.Uninitialized,
|
||||||
@@ -178,17 +186,23 @@ export const createLocalMembership$ = ({
|
|||||||
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
|
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
|
||||||
|
|
||||||
// Drop Epoch data here since we will not combine this anymore
|
// Drop Epoch data here since we will not combine this anymore
|
||||||
const connection$ = scope.behavior(
|
const localConnection$ = scope.behavior(
|
||||||
combineLatest(
|
combineLatest([connectionManager.connections$, localTransport$]).pipe(
|
||||||
[connectionManager.connections$, localTransport$],
|
map(([connections, localTransport]) => {
|
||||||
(connections, transport) => {
|
if (localTransport === null) {
|
||||||
if (transport === null) return null;
|
return null;
|
||||||
|
}
|
||||||
return (
|
return (
|
||||||
connections.value.find((connection) =>
|
connections.value.find((connection) =>
|
||||||
areLivekitTransportsEqual(connection.transport, transport),
|
areLivekitTransportsEqual(connection.transport, localTransport),
|
||||||
) ?? null
|
) ?? null
|
||||||
);
|
);
|
||||||
},
|
}),
|
||||||
|
tap((connection) => {
|
||||||
|
logger.info(
|
||||||
|
`Local connection updated: ${connection?.transport?.livekit_service_url}`,
|
||||||
|
);
|
||||||
|
}),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
/**
|
/**
|
||||||
@@ -219,6 +233,10 @@ export const createLocalMembership$ = ({
|
|||||||
startWith(null),
|
startWith(null),
|
||||||
map(() => matrixRTCSession.probablyLeft !== true),
|
map(() => matrixRTCSession.probablyLeft !== true),
|
||||||
),
|
),
|
||||||
|
).pipe(
|
||||||
|
tap((connected) => {
|
||||||
|
logger.info(`Homeserver connected update: ${connected}`);
|
||||||
|
}),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -230,7 +248,7 @@ export const createLocalMembership$ = ({
|
|||||||
const connected$ = scope.behavior(
|
const connected$ = scope.behavior(
|
||||||
and$(
|
and$(
|
||||||
homeserverConnected$,
|
homeserverConnected$,
|
||||||
connection$.pipe(
|
localConnection$.pipe(
|
||||||
switchMap((c) =>
|
switchMap((c) =>
|
||||||
c
|
c
|
||||||
? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom"))
|
? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom"))
|
||||||
@@ -241,8 +259,9 @@ export const createLocalMembership$ = ({
|
|||||||
);
|
);
|
||||||
|
|
||||||
const publisher$ = new BehaviorSubject<Publisher | null>(null);
|
const publisher$ = new BehaviorSubject<Publisher | null>(null);
|
||||||
connection$.subscribe((connection) => {
|
localConnection$.subscribe((connection) => {
|
||||||
if (connection !== null && publisher$.value === null) {
|
if (connection !== null && publisher$.value === null) {
|
||||||
|
// TODO looks strange to not change publisher if connection changes.
|
||||||
publisher$.next(
|
publisher$.next(
|
||||||
new Publisher(
|
new Publisher(
|
||||||
scope,
|
scope,
|
||||||
@@ -375,7 +394,7 @@ export const createLocalMembership$ = ({
|
|||||||
// We use matrixConnected$ rather than reconnecting$ because we want to
|
// We use matrixConnected$ rather than reconnecting$ because we want to
|
||||||
// pause tracks during the initial joining sequence too until we're sure
|
// pause tracks during the initial joining sequence too until we're sure
|
||||||
// that our own media is displayed on screen.
|
// that our own media is displayed on screen.
|
||||||
combineLatest([connection$, homeserverConnected$])
|
combineLatest([localConnection$, homeserverConnected$])
|
||||||
.pipe(scope.bind())
|
.pipe(scope.bind())
|
||||||
.subscribe(([connection, connected]) => {
|
.subscribe(([connection, connected]) => {
|
||||||
if (connection?.state$.value.state !== "ConnectedToLkRoom") return;
|
if (connection?.state$.value.state !== "ConnectedToLkRoom") return;
|
||||||
@@ -462,7 +481,7 @@ export const createLocalMembership$ = ({
|
|||||||
* Whether the user is currently sharing their screen.
|
* Whether the user is currently sharing their screen.
|
||||||
*/
|
*/
|
||||||
const sharingScreen$ = scope.behavior(
|
const sharingScreen$ = scope.behavior(
|
||||||
connection$.pipe(
|
localConnection$.pipe(
|
||||||
switchMap((c) =>
|
switchMap((c) =>
|
||||||
c === null
|
c === null
|
||||||
? of(false)
|
? of(false)
|
||||||
@@ -483,7 +502,7 @@ export const createLocalMembership$ = ({
|
|||||||
// We also allow screen sharing to be toggled even if the connection
|
// We also allow screen sharing to be toggled even if the connection
|
||||||
// is still initializing or publishing tracks, because there's no
|
// is still initializing or publishing tracks, because there's no
|
||||||
// technical reason to disallow this. LiveKit will publish if it can.
|
// technical reason to disallow this. LiveKit will publish if it can.
|
||||||
void connection$.value?.livekitRoom.localParticipant
|
void localConnection$.value?.livekitRoom.localParticipant
|
||||||
.setScreenShareEnabled(!sharingScreen$.value, {
|
.setScreenShareEnabled(!sharingScreen$.value, {
|
||||||
audio: true,
|
audio: true,
|
||||||
selfBrowserSurface: "include",
|
selfBrowserSurface: "include",
|
||||||
@@ -494,7 +513,7 @@ export const createLocalMembership$ = ({
|
|||||||
: null;
|
: null;
|
||||||
|
|
||||||
const participant$ = scope.behavior(
|
const participant$ = scope.behavior(
|
||||||
connection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)),
|
localConnection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)),
|
||||||
);
|
);
|
||||||
return {
|
return {
|
||||||
startTracks,
|
startTracks,
|
||||||
@@ -508,7 +527,7 @@ export const createLocalMembership$ = ({
|
|||||||
sharingScreen$,
|
sharingScreen$,
|
||||||
toggleScreenSharing,
|
toggleScreenSharing,
|
||||||
participant$,
|
participant$,
|
||||||
connection$,
|
connection$: localConnection$,
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -532,6 +551,7 @@ interface EnterRTCSessionOptions {
|
|||||||
* @param rtcSession
|
* @param rtcSession
|
||||||
* @param transport
|
* @param transport
|
||||||
* @param options
|
* @param options
|
||||||
|
* @throws If the widget could not send ElementWidgetActions.JoinCall action.
|
||||||
*/
|
*/
|
||||||
async function enterRTCSession(
|
async function enterRTCSession(
|
||||||
rtcSession: MatrixRTCSession,
|
rtcSession: MatrixRTCSession,
|
||||||
@@ -576,10 +596,6 @@ async function enterRTCSession(
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
if (widget) {
|
if (widget) {
|
||||||
try {
|
await widget.api.transport.send(ElementWidgetActions.JoinCall, {});
|
||||||
await widget.api.transport.send(ElementWidgetActions.JoinCall, {});
|
|
||||||
} catch (e) {
|
|
||||||
logger.error("Failed to send join action", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ import {
|
|||||||
onTestFinished,
|
onTestFinished,
|
||||||
vi,
|
vi,
|
||||||
} from "vitest";
|
} from "vitest";
|
||||||
import { BehaviorSubject } from "rxjs";
|
|
||||||
import {
|
import {
|
||||||
type LocalParticipant,
|
type LocalParticipant,
|
||||||
type RemoteParticipant,
|
type RemoteParticipant,
|
||||||
@@ -25,11 +24,9 @@ import {
|
|||||||
import fetchMock from "fetch-mock";
|
import fetchMock from "fetch-mock";
|
||||||
import EventEmitter from "events";
|
import EventEmitter from "events";
|
||||||
import { type IOpenIDToken } from "matrix-js-sdk";
|
import { type IOpenIDToken } from "matrix-js-sdk";
|
||||||
|
import { logger } from "matrix-js-sdk/lib/logger";
|
||||||
|
|
||||||
import type {
|
import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||||
CallMembership,
|
|
||||||
LivekitTransport,
|
|
||||||
} from "matrix-js-sdk/lib/matrixrtc";
|
|
||||||
import {
|
import {
|
||||||
Connection,
|
Connection,
|
||||||
type ConnectionOpts,
|
type ConnectionOpts,
|
||||||
@@ -39,6 +36,7 @@ import {
|
|||||||
import { ObservableScope } from "../../ObservableScope.ts";
|
import { ObservableScope } from "../../ObservableScope.ts";
|
||||||
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
|
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
|
||||||
import { FailToGetOpenIdToken } from "../../../utils/errors.ts";
|
import { FailToGetOpenIdToken } from "../../../utils/errors.ts";
|
||||||
|
|
||||||
let testScope: ObservableScope;
|
let testScope: ObservableScope;
|
||||||
|
|
||||||
let client: MockedObject<OpenIDClientParts>;
|
let client: MockedObject<OpenIDClientParts>;
|
||||||
@@ -49,9 +47,9 @@ let localParticipantEventEmiter: EventEmitter;
|
|||||||
let fakeLocalParticipant: MockedObject<LocalParticipant>;
|
let fakeLocalParticipant: MockedObject<LocalParticipant>;
|
||||||
|
|
||||||
let fakeRoomEventEmiter: EventEmitter;
|
let fakeRoomEventEmiter: EventEmitter;
|
||||||
let fakeMembershipsFocusMap$: BehaviorSubject<
|
// let fakeMembershipsFocusMap$: BehaviorSubject<
|
||||||
{ membership: CallMembership; transport: LivekitTransport }[]
|
// { membership: CallMembership; transport: LivekitTransport }[]
|
||||||
>;
|
// >;
|
||||||
|
|
||||||
const livekitFocus: LivekitTransport = {
|
const livekitFocus: LivekitTransport = {
|
||||||
livekit_alias: "!roomID:example.org",
|
livekit_alias: "!roomID:example.org",
|
||||||
@@ -70,9 +68,6 @@ function setupTest(): void {
|
|||||||
}),
|
}),
|
||||||
getDeviceId: vi.fn().mockReturnValue("ABCDEF"),
|
getDeviceId: vi.fn().mockReturnValue("ABCDEF"),
|
||||||
} as unknown as OpenIDClientParts);
|
} as unknown as OpenIDClientParts);
|
||||||
fakeMembershipsFocusMap$ = new BehaviorSubject<
|
|
||||||
{ membership: CallMembership; transport: LivekitTransport }[]
|
|
||||||
>([]);
|
|
||||||
|
|
||||||
localParticipantEventEmiter = new EventEmitter();
|
localParticipantEventEmiter = new EventEmitter();
|
||||||
|
|
||||||
@@ -131,7 +126,7 @@ function setupRemoteConnection(): Connection {
|
|||||||
|
|
||||||
fakeLivekitRoom.connect.mockResolvedValue(undefined);
|
fakeLivekitRoom.connect.mockResolvedValue(undefined);
|
||||||
|
|
||||||
return new Connection(opts);
|
return new Connection(opts, logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
@@ -150,7 +145,7 @@ describe("Start connection states", () => {
|
|||||||
scope: testScope,
|
scope: testScope,
|
||||||
livekitRoomFactory: () => fakeLivekitRoom,
|
livekitRoomFactory: () => fakeLivekitRoom,
|
||||||
};
|
};
|
||||||
const connection = new Connection(opts);
|
const connection = new Connection(opts, logger);
|
||||||
|
|
||||||
expect(connection.state$.getValue().state).toEqual("Initialized");
|
expect(connection.state$.getValue().state).toEqual("Initialized");
|
||||||
});
|
});
|
||||||
@@ -166,7 +161,7 @@ describe("Start connection states", () => {
|
|||||||
livekitRoomFactory: () => fakeLivekitRoom,
|
livekitRoomFactory: () => fakeLivekitRoom,
|
||||||
};
|
};
|
||||||
|
|
||||||
const connection = new Connection(opts, undefined);
|
const connection = new Connection(opts, logger);
|
||||||
|
|
||||||
const capturedStates: ConnectionState[] = [];
|
const capturedStates: ConnectionState[] = [];
|
||||||
const s = connection.state$.subscribe((value) => {
|
const s = connection.state$.subscribe((value) => {
|
||||||
@@ -218,7 +213,7 @@ describe("Start connection states", () => {
|
|||||||
livekitRoomFactory: () => fakeLivekitRoom,
|
livekitRoomFactory: () => fakeLivekitRoom,
|
||||||
};
|
};
|
||||||
|
|
||||||
const connection = new Connection(opts, undefined);
|
const connection = new Connection(opts, logger);
|
||||||
|
|
||||||
const capturedStates: ConnectionState[] = [];
|
const capturedStates: ConnectionState[] = [];
|
||||||
const s = connection.state$.subscribe((value) => {
|
const s = connection.state$.subscribe((value) => {
|
||||||
@@ -274,7 +269,7 @@ describe("Start connection states", () => {
|
|||||||
livekitRoomFactory: () => fakeLivekitRoom,
|
livekitRoomFactory: () => fakeLivekitRoom,
|
||||||
};
|
};
|
||||||
|
|
||||||
const connection = new Connection(opts, undefined);
|
const connection = new Connection(opts, logger);
|
||||||
|
|
||||||
const capturedStates: ConnectionState[] = [];
|
const capturedStates: ConnectionState[] = [];
|
||||||
const s = connection.state$.subscribe((value) => {
|
const s = connection.state$.subscribe((value) => {
|
||||||
|
|||||||
@@ -98,6 +98,7 @@ export class Connection {
|
|||||||
// TODO dont make this throw and instead store a connection error state in this class?
|
// TODO dont make this throw and instead store a connection error state in this class?
|
||||||
// TODO consider an autostart pattern...
|
// TODO consider an autostart pattern...
|
||||||
public async start(): Promise<void> {
|
public async start(): Promise<void> {
|
||||||
|
this.logger.debug("Starting Connection");
|
||||||
this.stopped = false;
|
this.stopped = false;
|
||||||
try {
|
try {
|
||||||
this._state$.next({
|
this._state$.next({
|
||||||
@@ -145,6 +146,7 @@ export class Connection {
|
|||||||
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
|
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
|
||||||
});
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
|
||||||
this._state$.next({
|
this._state$.next({
|
||||||
state: "FailedToStart",
|
state: "FailedToStart",
|
||||||
error: error instanceof Error ? error : new Error(`${error}`),
|
error: error instanceof Error ? error : new Error(`${error}`),
|
||||||
@@ -169,6 +171,9 @@ export class Connection {
|
|||||||
* If the connection is already stopped, this is a no-op.
|
* If the connection is already stopped, this is a no-op.
|
||||||
*/
|
*/
|
||||||
public async stop(): Promise<void> {
|
public async stop(): Promise<void> {
|
||||||
|
this.logger.debug(
|
||||||
|
`Stopping connection to ${this.transport.livekit_service_url}`,
|
||||||
|
);
|
||||||
if (this.stopped) return;
|
if (this.stopped) return;
|
||||||
await this.livekitRoom.disconnect();
|
await this.livekitRoom.disconnect();
|
||||||
this._state$.next({
|
this._state$.next({
|
||||||
@@ -195,15 +200,18 @@ export class Connection {
|
|||||||
private readonly client: OpenIDClientParts;
|
private readonly client: OpenIDClientParts;
|
||||||
public readonly livekitRoom: LivekitRoom;
|
public readonly livekitRoom: LivekitRoom;
|
||||||
|
|
||||||
|
private readonly logger: Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new connection to a matrix RTC LiveKit backend.
|
* Creates a new connection to a matrix RTC LiveKit backend.
|
||||||
*
|
*
|
||||||
* @param livekitRoom - LiveKit room instance to use.
|
|
||||||
* @param opts - Connection options {@link ConnectionOpts}.
|
* @param opts - Connection options {@link ConnectionOpts}.
|
||||||
*
|
*
|
||||||
|
* @param logger
|
||||||
*/
|
*/
|
||||||
public constructor(opts: ConnectionOpts, logger?: Logger) {
|
public constructor(opts: ConnectionOpts, logger: Logger) {
|
||||||
logger?.info(
|
this.logger = logger.getChild("[Connection]");
|
||||||
|
this.logger.info(
|
||||||
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
|
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
|
||||||
);
|
);
|
||||||
const { transport, client, scope } = opts;
|
const { transport, client, scope } = opts;
|
||||||
@@ -223,15 +231,17 @@ export class Connection {
|
|||||||
],
|
],
|
||||||
}).pipe(
|
}).pipe(
|
||||||
map((participants) => {
|
map((participants) => {
|
||||||
const partsFiltered = participants.filter(
|
return participants.filter(
|
||||||
(participant) => participant.getTrackPublications().length > 0,
|
(participant) => participant.getTrackPublications().length > 0,
|
||||||
);
|
);
|
||||||
return partsFiltered;
|
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
[],
|
[],
|
||||||
);
|
);
|
||||||
|
|
||||||
scope.onEnd(() => void this.stop());
|
scope.onEnd(() => {
|
||||||
|
this.logger.info(`Connection scope ended, stopping connection`);
|
||||||
|
void this.stop();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
|||||||
import { BehaviorSubject } from "rxjs";
|
import { BehaviorSubject } from "rxjs";
|
||||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||||
import { type Participant as LivekitParticipant } from "livekit-client";
|
import { type Participant as LivekitParticipant } from "livekit-client";
|
||||||
|
import { logger } from "matrix-js-sdk/lib/logger";
|
||||||
|
|
||||||
import { Epoch, ObservableScope } from "../../ObservableScope.ts";
|
import { Epoch, ObservableScope } from "../../ObservableScope.ts";
|
||||||
import {
|
import {
|
||||||
@@ -78,6 +79,7 @@ describe("connections$ stream", () => {
|
|||||||
inputTransports$: behavior("a", {
|
inputTransports$: behavior("a", {
|
||||||
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
|
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
|
||||||
}),
|
}),
|
||||||
|
logger: logger,
|
||||||
});
|
});
|
||||||
|
|
||||||
expectObservable(connections$).toBe("a", {
|
expectObservable(connections$).toBe("a", {
|
||||||
@@ -119,6 +121,7 @@ describe("connections$ stream", () => {
|
|||||||
e: new Epoch([TRANSPORT_1], 4),
|
e: new Epoch([TRANSPORT_1], 4),
|
||||||
f: new Epoch([TRANSPORT_1, TRANSPORT_2], 5),
|
f: new Epoch([TRANSPORT_1, TRANSPORT_2], 5),
|
||||||
}),
|
}),
|
||||||
|
logger: logger,
|
||||||
});
|
});
|
||||||
|
|
||||||
expectObservable(connections$).toBe("xxxxxa", {
|
expectObservable(connections$).toBe("xxxxxa", {
|
||||||
@@ -158,6 +161,7 @@ describe("connections$ stream", () => {
|
|||||||
b: new Epoch([TRANSPORT_1, TRANSPORT_2], 1),
|
b: new Epoch([TRANSPORT_1, TRANSPORT_2], 1),
|
||||||
c: new Epoch([TRANSPORT_1], 2),
|
c: new Epoch([TRANSPORT_1], 2),
|
||||||
}),
|
}),
|
||||||
|
logger: logger,
|
||||||
});
|
});
|
||||||
|
|
||||||
expectObservable(connections$).toBe("xab", {
|
expectObservable(connections$).toBe("xab", {
|
||||||
@@ -272,6 +276,7 @@ describe("connectionManagerData$ stream", () => {
|
|||||||
inputTransports$: behavior("a", {
|
inputTransports$: behavior("a", {
|
||||||
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
|
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
|
||||||
}),
|
}),
|
||||||
|
logger,
|
||||||
});
|
});
|
||||||
|
|
||||||
expectObservable(connectionManagerData$).toBe("abcd", {
|
expectObservable(connectionManagerData$).toBe("abcd", {
|
||||||
|
|||||||
@@ -13,8 +13,8 @@ import {
|
|||||||
type LivekitTransport,
|
type LivekitTransport,
|
||||||
type ParticipantId,
|
type ParticipantId,
|
||||||
} from "matrix-js-sdk/lib/matrixrtc";
|
} from "matrix-js-sdk/lib/matrixrtc";
|
||||||
import { BehaviorSubject, combineLatest, map, of, switchMap } from "rxjs";
|
import { BehaviorSubject, combineLatest, map, of, switchMap, tap } from "rxjs";
|
||||||
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
|
import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||||
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
|
import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
|
||||||
|
|
||||||
import { type Behavior } from "../../Behavior.ts";
|
import { type Behavior } from "../../Behavior.ts";
|
||||||
@@ -91,6 +91,7 @@ interface Props {
|
|||||||
scope: ObservableScope;
|
scope: ObservableScope;
|
||||||
connectionFactory: ConnectionFactory;
|
connectionFactory: ConnectionFactory;
|
||||||
inputTransports$: Behavior<Epoch<LivekitTransport[]>>;
|
inputTransports$: Behavior<Epoch<LivekitTransport[]>>;
|
||||||
|
logger: Logger;
|
||||||
}
|
}
|
||||||
// TODO - write test for scopes (do we really need to bind scope)
|
// TODO - write test for scopes (do we really need to bind scope)
|
||||||
export interface IConnectionManager {
|
export interface IConnectionManager {
|
||||||
@@ -116,8 +117,9 @@ export function createConnectionManager$({
|
|||||||
scope,
|
scope,
|
||||||
connectionFactory,
|
connectionFactory,
|
||||||
inputTransports$,
|
inputTransports$,
|
||||||
|
logger: parentLogger,
|
||||||
}: Props): IConnectionManager {
|
}: Props): IConnectionManager {
|
||||||
const logger = rootLogger.getChild("[ConnectionManager]");
|
const logger = parentLogger.getChild("[ConnectionManager]");
|
||||||
|
|
||||||
const running$ = new BehaviorSubject(true);
|
const running$ = new BehaviorSubject(true);
|
||||||
scope.onEnd(() => running$.next(false));
|
scope.onEnd(() => running$.next(false));
|
||||||
@@ -137,6 +139,11 @@ export function createConnectionManager$({
|
|||||||
transports.mapInner((transport) => (running ? transport : [])),
|
transports.mapInner((transport) => (running ? transport : [])),
|
||||||
),
|
),
|
||||||
map((transports) => transports.mapInner(removeDuplicateTransports)),
|
map((transports) => transports.mapInner(removeDuplicateTransports)),
|
||||||
|
tap(({ value: transports }) => {
|
||||||
|
logger.trace(
|
||||||
|
`Managing transports: ${transports.map((t) => t.livekit_service_url).join(", ")}`,
|
||||||
|
);
|
||||||
|
}),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -154,6 +161,7 @@ export function createConnectionManager$({
|
|||||||
};
|
};
|
||||||
},
|
},
|
||||||
(scope, _data$, serviceUrl, alias) => {
|
(scope, _data$, serviceUrl, alias) => {
|
||||||
|
logger.debug(`Creating connection to ${serviceUrl} (${alias})`);
|
||||||
const connection = connectionFactory.createConnection(
|
const connection = connectionFactory.createConnection(
|
||||||
{
|
{
|
||||||
type: "livekit",
|
type: "livekit",
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import { type Room as LivekitRoom } from "livekit-client";
|
|||||||
import EventEmitter from "events";
|
import EventEmitter from "events";
|
||||||
import fetchMock from "fetch-mock";
|
import fetchMock from "fetch-mock";
|
||||||
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
||||||
|
import { logger } from "matrix-js-sdk/lib/logger";
|
||||||
|
|
||||||
import {
|
import {
|
||||||
type Epoch,
|
type Epoch,
|
||||||
@@ -120,6 +121,7 @@ test("bob, carl, then bob joining no tracks yet", () => {
|
|||||||
scope: testScope,
|
scope: testScope,
|
||||||
connectionFactory: ecConnectionFactory,
|
connectionFactory: ecConnectionFactory,
|
||||||
inputTransports$: membershipsAndTransports.transports$,
|
inputTransports$: membershipsAndTransports.transports$,
|
||||||
|
logger: logger,
|
||||||
});
|
});
|
||||||
|
|
||||||
const matrixLivekitItems$ = createMatrixLivekitMembers$({
|
const matrixLivekitItems$ = createMatrixLivekitMembers$({
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { describe, expect, it } from "vitest";
|
import { describe, expect, it } from "vitest";
|
||||||
import { BehaviorSubject, combineLatest, of, Subject } from "rxjs";
|
import { BehaviorSubject, combineLatest, Subject } from "rxjs";
|
||||||
import { logger } from "matrix-js-sdk/lib/logger";
|
import { logger } from "matrix-js-sdk/lib/logger";
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ import {
|
|||||||
} from "rxjs";
|
} from "rxjs";
|
||||||
|
|
||||||
import { type Behavior } from "./Behavior";
|
import { type Behavior } from "./Behavior";
|
||||||
import { logger } from "matrix-js-sdk/lib/logger";
|
|
||||||
|
|
||||||
type MonoTypeOperator = <T>(o: Observable<T>) => Observable<T>;
|
type MonoTypeOperator = <T>(o: Observable<T>) => Observable<T>;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user