Fix: never stop ring feedback on the sender side. (#3502)

* make ring$ a behavior and add code comments to justify/explain the change.

Signed-off-by: Timo K <toger5@hotmail.de>

* Add test: reproduce "ring does not stop" race.

Signed-off-by: Timo K <toger5@hotmail.de>

---------

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo
2025-09-19 16:42:47 +02:00
committed by GitHub
parent 0774d18f92
commit 317b2dc796
2 changed files with 96 additions and 51 deletions

View File

@@ -266,7 +266,7 @@ const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent;
interface CallViewModelInputs { interface CallViewModelInputs {
remoteParticipants$: Behavior<RemoteParticipant[]>; remoteParticipants$: Behavior<RemoteParticipant[]>;
rtcMembers$: Behavior<Partial<CallMembership>[]>; rtcMembers$: Behavior<Partial<CallMembership>[]>;
connectionState$: Observable<ECConnectionState>; livekitConnectionState$: Observable<ECConnectionState>;
speaking: Map<Participant, Observable<boolean>>; speaking: Map<Participant, Observable<boolean>>;
mediaDevices: MediaDevices; mediaDevices: MediaDevices;
initialSyncState: SyncState; initialSyncState: SyncState;
@@ -276,7 +276,7 @@ function withCallViewModel(
{ {
remoteParticipants$ = constant([]), remoteParticipants$ = constant([]),
rtcMembers$ = constant([localRtcMember]), rtcMembers$ = constant([localRtcMember]),
connectionState$ = of(ConnectionState.Connected), livekitConnectionState$: connectionState$ = of(ConnectionState.Connected),
speaking = new Map(), speaking = new Map(),
mediaDevices = mockMediaDevices({}), mediaDevices = mockMediaDevices({}),
initialSyncState = SyncState.Syncing, initialSyncState = SyncState.Syncing,
@@ -384,7 +384,7 @@ test("participants are retained during a focus switch", () => {
b: [], b: [],
}), }),
rtcMembers$: constant([localRtcMember, aliceRtcMember, bobRtcMember]), rtcMembers$: constant([localRtcMember, aliceRtcMember, bobRtcMember]),
connectionState$: behavior(connectionInputMarbles, { livekitConnectionState$: behavior(connectionInputMarbles, {
c: ConnectionState.Connected, c: ConnectionState.Connected,
s: ECAddonConnectionState.ECSwitchingFocus, s: ECAddonConnectionState.ECSwitchingFocus,
}), }),
@@ -1251,6 +1251,41 @@ describe("waitForCallPickup$", () => {
}); });
}); });
test("regression test: does stop ringing in case livekitConnectionState$ emits after didSendCallNotification$ has already emitted", () => {
withTestScheduler(({ schedule, expectObservable, behavior }) => {
withCallViewModel(
{
livekitConnectionState$: behavior("d 9ms c", {
d: ConnectionState.Disconnected,
c: ConnectionState.Connected,
}),
},
(vm, rtcSession) => {
// Fire a call notification IMMEDIATELY (its important for this test, that this happens before the livekitConnectionState$ emits)
schedule("n", {
n: () => {
rtcSession.emit(
MatrixRTCSessionEvent.DidSendCallNotification,
mockRingEvent("$notif1", 30),
mockLegacyRingEvent,
);
},
});
expectObservable(vm.callPickupState$).toBe("a 9ms b 29ms c", {
a: "unknown",
b: "ringing",
c: "timeout",
});
},
{
waitForCallPickup: true,
encryptionSystem: { kind: E2eeType.PER_PARTICIPANT },
},
);
});
});
test("ringing -> success if someone joins before timeout", () => { test("ringing -> success if someone joins before timeout", () => {
withTestScheduler(({ behavior, schedule, expectObservable }) => { withTestScheduler(({ behavior, schedule, expectObservable }) => {
// Someone joins at 20ms (both LiveKit participant and MatrixRTC member) // Someone joins at 20ms (both LiveKit participant and MatrixRTC member)
@@ -1305,7 +1340,7 @@ describe("waitForCallPickup$", () => {
a: [localRtcMember], a: [localRtcMember],
b: [localRtcMember, aliceRtcMember], b: [localRtcMember, aliceRtcMember],
}), }),
connectionState$, livekitConnectionState$: connectionState$,
}, },
(vm, rtcSession) => { (vm, rtcSession) => {
// Notify at 5ms so we enter ringing, then get disconnected 5ms later // Notify at 5ms so we enter ringing, then get disconnected 5ms later

View File

@@ -880,60 +880,68 @@ export class CallViewModel extends ViewModel {
? this.allOthersLeft$ ? this.allOthersLeft$
: NEVER; : NEVER;
private readonly didSendCallNotification$ = fromEvent(
this.matrixRTCSession,
MatrixRTCSessionEvent.DidSendCallNotification,
) as Observable<
Parameters<
MatrixRTCSessionEventHandlerMap[MatrixRTCSessionEvent.DidSendCallNotification]
>
>;
/** /**
* Whenever the RTC session tells us that it intends to ring the remote * Whenever the RTC session tells us that it intends to ring the remote
* participant's devices, this emits an Observable tracking the current state of * participant's devices, this emits an Observable tracking the current state of
* that ringing process. * that ringing process.
*/ */
private readonly ring$: Observable< // This is a behavior since we need to store the latest state for when we subscribe to this after `didSendCallNotification$`
Observable<"ringing" | "timeout" | "decline"> // has already emitted but we still need the latest observable with a timeout timer that only gets created on after receiving `notificationEvent`.
> = ( // A behavior will emit the latest observable with the running timer to new subscribers.
fromEvent( // see also: callPickupState$ and in particular the line: `return this.ring$.pipe(mergeAll());` here we otherwise might get an EMPTY observable if
this.matrixRTCSession, // `ring$` would not be a behavior.
MatrixRTCSessionEvent.DidSendCallNotification, private readonly ring$: Behavior<
) as Observable< Observable<"ringing" | "timeout" | "decline"> | Observable<never>
Parameters< > = this.scope.behavior(
MatrixRTCSessionEventHandlerMap[MatrixRTCSessionEvent.DidSendCallNotification] this.didSendCallNotification$.pipe(
> filter(
> ([notificationEvent]) => notificationEvent.notification_type === "ring",
).pipe( ),
filter( map(([notificationEvent]) => {
([notificationEvent]) => notificationEvent.notification_type === "ring", const lifetimeMs = notificationEvent?.lifetime ?? 0;
), return concat(
map(([notificationEvent]) => { lifetimeMs === 0
const lifetimeMs = notificationEvent?.lifetime ?? 0; ? // If no lifetime, skip the ring state
return concat( EMPTY
lifetimeMs === 0 : // Ring until lifetime ms have passed
? // If no lifetime, skip the ring state timer(lifetimeMs).pipe(
EMPTY ignoreElements(),
: // Ring until lifetime ms have passed startWith("ringing" as const),
timer(lifetimeMs).pipe( ),
ignoreElements(), // The notification lifetime has timed out, meaning ringing has likely
startWith("ringing" as const), // stopped on all receiving clients.
), of("timeout" as const),
// The notification lifetime has timed out, meaning ringing has likely NEVER,
// stopped on all receiving clients. ).pipe(
of("timeout" as const), takeUntil(
NEVER, (
).pipe( fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable<
takeUntil( Parameters<EventTimelineSetHandlerMap[RoomEvent.Timeline]>
( >
fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable< ).pipe(
Parameters<EventTimelineSetHandlerMap[RoomEvent.Timeline]> filter(
> ([event]) =>
).pipe( event.getType() === EventType.RTCDecline &&
filter( event.getRelation()?.rel_type === "m.reference" &&
([event]) => event.getRelation()?.event_id ===
event.getType() === EventType.RTCDecline && notificationEvent.event_id &&
event.getRelation()?.rel_type === "m.reference" && event.getSender() !== this.userId,
event.getRelation()?.event_id === notificationEvent.event_id && ),
event.getSender() !== this.userId,
), ),
), ),
), endWith("decline" as const),
endWith("decline" as const), );
); }),
}), ),
EMPTY,
); );
/** /**
@@ -972,6 +980,8 @@ export class CallViewModel extends ViewModel {
return of("success" as const); return of("success" as const);
} }
// Show the ringing state of the most recent ringing attempt. // Show the ringing state of the most recent ringing attempt.
// ring$ is a behavior so it will emit the latest observable which very well might already have a running timer.
// this is important in case livekitConnectionState$ after didSendCallNotification$ has already emitted.
return this.ring$.pipe(switchAll()); return this.ring$.pipe(switchAll());
}), }),
// The state starts as 'unknown' because we don't know if the RTC // The state starts as 'unknown' because we don't know if the RTC