Line data Source code
1 : /*
2 : * Famedly Matrix SDK
3 : * Copyright (C) 2019, 2020, 2021 Famedly GmbH
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU Affero General Public License as
7 : * published by the Free Software Foundation, either version 3 of the
8 : * License, or (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU Affero General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU Affero General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : import 'dart:async';
20 : import 'dart:convert';
21 :
22 : import 'package:collection/collection.dart';
23 :
24 : import 'package:matrix/matrix.dart';
25 : import 'package:matrix/src/models/timeline_chunk.dart';
26 :
27 : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
28 : /// automatically. The initial
29 : /// event list will be retreived when created by the `room.getTimeline()` method.
30 :
31 : class Timeline {
32 : final Room room;
33 24 : List<Event> get events => chunk.events;
34 :
35 : /// Map of event ID to map of type to set of aggregated events
36 : final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
37 :
38 : final void Function()? onUpdate;
39 : final void Function(int index)? onChange;
40 : final void Function(int index)? onInsert;
41 : final void Function(int index)? onRemove;
42 : final void Function()? onNewEvent;
43 :
44 : StreamSubscription<EventUpdate>? sub;
45 : StreamSubscription<SyncUpdate>? roomSub;
46 : StreamSubscription<String>? sessionIdReceivedSub;
47 : StreamSubscription<String>? cancelSendEventSub;
48 : bool isRequestingHistory = false;
49 : bool isRequestingFuture = false;
50 :
51 : bool allowNewEvent = true;
52 : bool isFragmentedTimeline = false;
53 :
54 : final Map<String, Event> _eventCache = {};
55 :
56 : TimelineChunk chunk;
57 :
58 : /// Searches for the event in this timeline. If not
59 : /// found, requests from the server. Requested events
60 : /// are cached.
61 2 : Future<Event?> getEventById(String id) async {
62 4 : for (final event in events) {
63 4 : if (event.eventId == id) return event;
64 : }
65 4 : if (_eventCache.containsKey(id)) return _eventCache[id];
66 4 : final requestedEvent = await room.getEventById(id);
67 : if (requestedEvent == null) return null;
68 4 : _eventCache[id] = requestedEvent;
69 4 : return _eventCache[id];
70 : }
71 :
72 : // When fetching history, we will collect them into the `_historyUpdates` set
73 : // first, and then only process all events at once, once we have the full history.
74 : // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
75 : // even if /sync's complete while history is being proccessed.
76 : bool _collectHistoryUpdates = false;
77 :
78 1 : bool get canRequestHistory {
79 2 : if (events.isEmpty) return true;
80 0 : return room.prev_batch != null && events.last.type != EventTypes.RoomCreate;
81 : }
82 :
83 2 : Future<void> requestHistory(
84 : {int historyCount = Room.defaultHistoryCount}) async {
85 2 : if (isRequestingHistory) {
86 : return;
87 : }
88 :
89 2 : isRequestingHistory = true;
90 2 : await _requestEvents(direction: Direction.b, historyCount: historyCount);
91 2 : isRequestingHistory = false;
92 : }
93 :
94 0 : bool get canRequestFuture => !allowNewEvent;
95 :
96 1 : Future<void> requestFuture(
97 : {int historyCount = Room.defaultHistoryCount}) async {
98 1 : if (allowNewEvent) {
99 : return; // we shouldn't force to add new events if they will autatically be added
100 : }
101 :
102 1 : if (isRequestingFuture) return;
103 1 : isRequestingFuture = true;
104 1 : await _requestEvents(direction: Direction.f, historyCount: historyCount);
105 1 : isRequestingFuture = false;
106 : }
107 :
108 3 : Future<void> _requestEvents(
109 : {int historyCount = Room.defaultHistoryCount,
110 : required Direction direction}) async {
111 4 : onUpdate?.call();
112 :
113 : try {
114 : // Look up for events in the database first. With fragmented view, we should delete the database cache
115 3 : final eventsFromStore = isFragmentedTimeline
116 : ? null
117 8 : : await room.client.database?.getEventList(
118 2 : room,
119 4 : start: events.length,
120 : limit: historyCount,
121 : );
122 :
123 2 : if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
124 : // Fetch all users from database we have got here.
125 0 : for (final event in events) {
126 0 : if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
127 : continue;
128 : }
129 : final dbUser =
130 0 : await room.client.database?.getUser(event.senderId, room);
131 0 : if (dbUser != null) room.setState(dbUser);
132 : }
133 :
134 0 : if (direction == Direction.b) {
135 0 : events.addAll(eventsFromStore);
136 0 : final startIndex = events.length - eventsFromStore.length;
137 0 : final endIndex = events.length;
138 0 : for (var i = startIndex; i < endIndex; i++) {
139 0 : onInsert?.call(i);
140 : }
141 : } else {
142 0 : events.insertAll(0, eventsFromStore);
143 0 : final startIndex = eventsFromStore.length;
144 : final endIndex = 0;
145 0 : for (var i = startIndex; i > endIndex; i--) {
146 0 : onInsert?.call(i);
147 : }
148 : }
149 : } else {
150 6 : Logs().i('No more events found in the store. Request from server...');
151 3 : if (isFragmentedTimeline) {
152 1 : await getRoomEvents(
153 : historyCount: historyCount,
154 : direction: direction,
155 : );
156 : } else {
157 4 : await room.requestHistory(
158 : historyCount: historyCount,
159 : direction: direction,
160 2 : onHistoryReceived: () {
161 2 : _collectHistoryUpdates = true;
162 : },
163 : );
164 : }
165 : }
166 : } finally {
167 3 : _collectHistoryUpdates = false;
168 3 : isRequestingHistory = false;
169 4 : onUpdate?.call();
170 : }
171 : }
172 :
173 : /// Request more previous events from the server. [historyCount] defines how much events should
174 : /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
175 : /// the historical events will be published in the onEvent stream.
176 : /// Returns the actual count of received timeline events.
177 1 : Future<int> getRoomEvents(
178 : {int historyCount = Room.defaultHistoryCount,
179 : direction = Direction.b}) async {
180 3 : final resp = await room.client.getRoomEvents(
181 2 : room.id,
182 : direction,
183 3 : from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
184 : limit: historyCount,
185 3 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
186 : );
187 :
188 1 : if (resp.end == null) {
189 2 : Logs().w('We reached the end of the timeline');
190 : }
191 :
192 2 : final newNextBatch = direction == Direction.b ? resp.start : resp.end;
193 2 : final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
194 :
195 1 : final type = direction == Direction.b
196 : ? EventUpdateType.history
197 : : EventUpdateType.timeline;
198 :
199 3 : if ((resp.state?.length ?? 0) == 0 &&
200 3 : resp.start != resp.end &&
201 : newPrevBatch != null &&
202 : newNextBatch != null) {
203 1 : if (type == EventUpdateType.history) {
204 0 : Logs().w(
205 0 : '[nav] we can still request history prevBatch: $type $newPrevBatch');
206 : } else {
207 2 : Logs().w(
208 1 : '[nav] we can still request timeline nextBatch: $type $newNextBatch');
209 : }
210 : }
211 :
212 : final newEvents =
213 6 : resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
214 :
215 1 : if (!allowNewEvent) {
216 3 : if (resp.start == resp.end ||
217 3 : (resp.end == null && direction == Direction.f)) allowNewEvent = true;
218 :
219 1 : if (allowNewEvent) {
220 2 : Logs().d('We now allow sync update into the timeline.');
221 1 : newEvents.addAll(
222 5 : await room.client.database?.getEventList(room, onlySending: true) ??
223 0 : []);
224 : }
225 : }
226 :
227 : // Try to decrypt encrypted events but don't update the database.
228 2 : if (room.encrypted && room.client.encryptionEnabled) {
229 0 : for (var i = 0; i < newEvents.length; i++) {
230 0 : if (newEvents[i].type == EventTypes.Encrypted) {
231 0 : newEvents[i] = await room.client.encryption!.decryptRoomEvent(
232 0 : room.id,
233 0 : newEvents[i],
234 : );
235 : }
236 : }
237 : }
238 :
239 : // update chunk anchors
240 1 : if (type == EventUpdateType.history) {
241 0 : chunk.prevBatch = newPrevBatch ?? '';
242 :
243 0 : final offset = chunk.events.length;
244 :
245 0 : chunk.events.addAll(newEvents);
246 :
247 0 : for (var i = 0; i < newEvents.length; i++) {
248 0 : onInsert?.call(i + offset);
249 : }
250 : } else {
251 2 : chunk.nextBatch = newNextBatch ?? '';
252 4 : chunk.events.insertAll(0, newEvents.reversed);
253 :
254 3 : for (var i = 0; i < newEvents.length; i++) {
255 2 : onInsert?.call(i);
256 : }
257 : }
258 :
259 1 : if (onUpdate != null) {
260 2 : onUpdate!();
261 : }
262 2 : return resp.chunk.length;
263 : }
264 :
265 8 : Timeline(
266 : {required this.room,
267 : this.onUpdate,
268 : this.onChange,
269 : this.onInsert,
270 : this.onRemove,
271 : this.onNewEvent,
272 : required this.chunk}) {
273 56 : sub = room.client.onEvent.stream.listen(_handleEventUpdate);
274 :
275 : // If the timeline is limited we want to clear our events cache
276 40 : roomSub = room.client.onSync.stream
277 55 : .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
278 16 : .listen(_removeEventsNotInThisSync);
279 :
280 8 : sessionIdReceivedSub =
281 40 : room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
282 8 : cancelSendEventSub =
283 48 : room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
284 :
285 : // we want to populate our aggregated events
286 14 : for (final e in events) {
287 6 : addAggregatedEvent(e);
288 : }
289 :
290 : // we are using a fragmented timeline
291 24 : if (chunk.nextBatch != '') {
292 1 : allowNewEvent = false;
293 1 : isFragmentedTimeline = true;
294 : }
295 : }
296 :
297 2 : void _cleanUpCancelledEvent(String eventId) {
298 2 : final i = _findEvent(event_id: eventId);
299 6 : if (i < events.length) {
300 6 : removeAggregatedEvent(events[i]);
301 4 : events.removeAt(i);
302 4 : onRemove?.call(i);
303 4 : onUpdate?.call();
304 : }
305 : }
306 :
307 : /// Removes all entries from [events] which are not in this SyncUpdate.
308 2 : void _removeEventsNotInThisSync(SyncUpdate sync) {
309 15 : final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
310 4 : final keepEventIds = newSyncEvents.map((e) => e.eventId);
311 7 : events.removeWhere((e) => !keepEventIds.contains(e.eventId));
312 : }
313 :
314 : /// Don't forget to call this before you dismiss this object!
315 0 : void cancelSubscriptions() {
316 : // ignore: discarded_futures
317 0 : sub?.cancel();
318 : // ignore: discarded_futures
319 0 : roomSub?.cancel();
320 : // ignore: discarded_futures
321 0 : sessionIdReceivedSub?.cancel();
322 : // ignore: discarded_futures
323 0 : cancelSendEventSub?.cancel();
324 : }
325 :
326 1 : void _sessionKeyReceived(String sessionId) async {
327 : var decryptAtLeastOneEvent = false;
328 1 : Future<void> decryptFn() async {
329 3 : final encryption = room.client.encryption;
330 3 : if (!room.client.encryptionEnabled || encryption == null) {
331 : return;
332 : }
333 4 : for (var i = 0; i < events.length; i++) {
334 4 : if (events[i].type == EventTypes.Encrypted &&
335 4 : events[i].messageType == MessageTypes.BadEncrypted &&
336 0 : events[i].content['session_id'] == sessionId) {
337 0 : events[i] = await encryption.decryptRoomEvent(
338 0 : room.id,
339 0 : events[i],
340 : store: true,
341 : updateType: EventUpdateType.history,
342 : );
343 0 : addAggregatedEvent(events[i]);
344 0 : onChange?.call(i);
345 0 : if (events[i].type != EventTypes.Encrypted) {
346 : decryptAtLeastOneEvent = true;
347 : }
348 : }
349 : }
350 : }
351 :
352 3 : if (room.client.database != null) {
353 4 : await room.client.database?.transaction(decryptFn);
354 : } else {
355 0 : await decryptFn();
356 : }
357 0 : if (decryptAtLeastOneEvent) onUpdate?.call();
358 : }
359 :
360 : /// Request the keys for undecryptable events of this timeline
361 0 : void requestKeys({
362 : bool tryOnlineBackup = true,
363 : bool onlineKeyBackupOnly = true,
364 : }) {
365 0 : for (final event in events) {
366 0 : if (event.type == EventTypes.Encrypted &&
367 0 : event.messageType == MessageTypes.BadEncrypted &&
368 0 : event.content['can_request_session'] == true) {
369 0 : final sessionId = event.content.tryGet<String>('session_id');
370 0 : final senderKey = event.content.tryGet<String>('sender_key');
371 : if (sessionId != null && senderKey != null) {
372 0 : room.client.encryption?.keyManager.maybeAutoRequest(
373 0 : room.id,
374 : sessionId,
375 : senderKey,
376 : tryOnlineBackup: tryOnlineBackup,
377 : onlineKeyBackupOnly: onlineKeyBackupOnly,
378 : );
379 : }
380 : }
381 : }
382 : }
383 :
384 : /// Set the read marker to the last synced event in this timeline.
385 2 : Future<void> setReadMarker({String? eventId, bool? public}) async {
386 : eventId ??=
387 12 : events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
388 : if (eventId == null) return;
389 4 : return room.setReadMarker(eventId, mRead: eventId, public: public);
390 : }
391 :
392 7 : int _findEvent({String? event_id, String? unsigned_txid}) {
393 : // we want to find any existing event where either the passed event_id or the passed unsigned_txid
394 : // matches either the event_id or transaction_id of the existing event.
395 : // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
396 : // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
397 : // thus meaning we found our element.
398 : final searchNeedle = <String>{};
399 : if (event_id != null) {
400 7 : searchNeedle.add(event_id);
401 : }
402 : if (unsigned_txid != null) {
403 4 : searchNeedle.add(unsigned_txid);
404 : }
405 : int i;
406 28 : for (i = 0; i < events.length; i++) {
407 21 : final searchHaystack = <String>{events[i].eventId};
408 :
409 28 : final txnid = events[i].unsigned?.tryGet<String>('transaction_id');
410 : if (txnid != null) {
411 4 : searchHaystack.add(txnid);
412 : }
413 14 : if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
414 : break;
415 : }
416 : }
417 : return i;
418 : }
419 :
420 3 : void _removeEventFromSet(Set<Event> eventSet, Event event) {
421 6 : eventSet.removeWhere((e) =>
422 6 : e.matchesEventOrTransactionId(event.eventId) ||
423 3 : event.unsigned != null &&
424 3 : e.matchesEventOrTransactionId(
425 6 : event.unsigned?.tryGet<String>('transaction_id')));
426 : }
427 :
428 8 : void addAggregatedEvent(Event event) {
429 : // we want to add an event to the aggregation tree
430 8 : final relationshipType = event.relationshipType;
431 8 : final relationshipEventId = event.relationshipEventId;
432 : if (relationshipType == null || relationshipEventId == null) {
433 : return; // nothing to do
434 : }
435 6 : final events = (aggregatedEvents[relationshipEventId] ??=
436 6 : <String, Set<Event>>{})[relationshipType] ??= <Event>{};
437 : // remove a potential old event
438 3 : _removeEventFromSet(events, event);
439 : // add the new one
440 3 : events.add(event);
441 3 : if (onChange != null) {
442 0 : final index = _findEvent(event_id: relationshipEventId);
443 0 : onChange?.call(index);
444 : }
445 : }
446 :
447 3 : void removeAggregatedEvent(Event event) {
448 9 : aggregatedEvents.remove(event.eventId);
449 3 : if (event.unsigned != null) {
450 12 : aggregatedEvents.remove(event.unsigned?['transaction_id']);
451 : }
452 7 : for (final types in aggregatedEvents.values) {
453 2 : for (final events in types.values) {
454 1 : _removeEventFromSet(events, event);
455 : }
456 : }
457 : }
458 :
459 7 : void _handleEventUpdate(EventUpdate eventUpdate, {bool update = true}) {
460 : try {
461 28 : if (eventUpdate.roomID != room.id) return;
462 :
463 14 : if (eventUpdate.type != EventUpdateType.timeline &&
464 12 : eventUpdate.type != EventUpdateType.history) {
465 : return;
466 : }
467 :
468 14 : if (eventUpdate.type == EventUpdateType.timeline) {
469 7 : onNewEvent?.call();
470 : }
471 :
472 7 : if (!allowNewEvent) return;
473 :
474 21 : final status = eventStatusFromInt(eventUpdate.content['status'] ??
475 15 : (eventUpdate.content['unsigned'] is Map<String, dynamic>
476 15 : ? eventUpdate.content['unsigned'][messageSendingStatusKey]
477 : : null) ??
478 4 : EventStatus.synced.intValue);
479 :
480 7 : final i = _findEvent(
481 14 : event_id: eventUpdate.content['event_id'],
482 21 : unsigned_txid: eventUpdate.content['unsigned'] is Map
483 21 : ? eventUpdate.content['unsigned']['transaction_id']
484 : : null);
485 :
486 21 : if (i < events.length) {
487 : // if the old status is larger than the new one, we also want to preserve the old status
488 21 : final oldStatus = events[i].status;
489 21 : events[i] = Event.fromJson(
490 7 : eventUpdate.content,
491 7 : room,
492 : );
493 : // do we preserve the status? we should allow 0 -> -1 updates and status increases
494 14 : if ((latestEventStatus(status, oldStatus) == oldStatus) &&
495 9 : !(status.isError && oldStatus.isSending)) {
496 21 : events[i].status = oldStatus;
497 : }
498 21 : addAggregatedEvent(events[i]);
499 9 : onChange?.call(i);
500 : } else {
501 6 : final newEvent = Event.fromJson(
502 6 : eventUpdate.content,
503 6 : room,
504 : );
505 :
506 12 : if (eventUpdate.type == EventUpdateType.history &&
507 6 : events.indexWhere(
508 18 : (e) => e.eventId == eventUpdate.content['event_id']) !=
509 3 : -1) return;
510 12 : var index = events.length;
511 12 : if (eventUpdate.type == EventUpdateType.history) {
512 6 : events.add(newEvent);
513 : } else {
514 8 : index = events.firstIndexWhereNotError;
515 8 : events.insert(index, newEvent);
516 : }
517 10 : onInsert?.call(index);
518 :
519 6 : addAggregatedEvent(newEvent);
520 : }
521 :
522 : // Handle redaction events
523 21 : if (eventUpdate.content['type'] == EventTypes.Redaction) {
524 : final index =
525 3 : _findEvent(event_id: eventUpdate.content.tryGet<String>('redacts'));
526 3 : if (index < events.length) {
527 3 : removeAggregatedEvent(events[index]);
528 :
529 : // Is the redacted event a reaction? Then update the event this
530 : // belongs to:
531 1 : if (onChange != null) {
532 3 : final relationshipEventId = events[index].relationshipEventId;
533 : if (relationshipEventId != null) {
534 0 : onChange?.call(_findEvent(event_id: relationshipEventId));
535 : return;
536 : }
537 : }
538 :
539 4 : events[index].setRedactionEvent(Event.fromJson(
540 1 : eventUpdate.content,
541 1 : room,
542 : ));
543 2 : onChange?.call(index);
544 : }
545 : }
546 :
547 7 : if (update && !_collectHistoryUpdates) {
548 9 : onUpdate?.call();
549 : }
550 : } catch (e, s) {
551 0 : Logs().w('Handle event update failed', e, s);
552 : }
553 : }
554 :
555 0 : @Deprecated('Use [startSearch] instead.')
556 : Stream<List<Event>> searchEvent({
557 : String? searchTerm,
558 : int requestHistoryCount = 100,
559 : int maxHistoryRequests = 10,
560 : String? sinceEventId,
561 : int? limit,
562 : bool Function(Event)? searchFunc,
563 : }) =>
564 0 : startSearch(
565 : searchTerm: searchTerm,
566 : requestHistoryCount: requestHistoryCount,
567 : maxHistoryRequests: maxHistoryRequests,
568 : // ignore: deprecated_member_use_from_same_package
569 : sinceEventId: sinceEventId,
570 : limit: limit,
571 : searchFunc: searchFunc,
572 0 : ).map((result) => result.$1);
573 :
574 : /// Searches [searchTerm] in this timeline. It first searches in the
575 : /// cache, then in the database and then on the server. The search can
576 : /// take a while, which is why this returns a stream so the already found
577 : /// events can already be displayed.
578 : /// Override the [searchFunc] if you need another search. This will then
579 : /// ignore [searchTerm].
580 : /// Returns the List of Events and the next prevBatch at the end of the
581 : /// search.
582 0 : Stream<(List<Event>, String?)> startSearch({
583 : String? searchTerm,
584 : int requestHistoryCount = 100,
585 : int maxHistoryRequests = 10,
586 : String? prevBatch,
587 : @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
588 : int? limit,
589 : bool Function(Event)? searchFunc,
590 : }) async* {
591 0 : assert(searchTerm != null || searchFunc != null);
592 0 : searchFunc ??= (event) =>
593 0 : event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
594 0 : final found = <Event>[];
595 :
596 : if (sinceEventId == null) {
597 : // Search locally
598 0 : for (final event in events) {
599 0 : if (searchFunc(event)) {
600 0 : yield (found..add(event), null);
601 : }
602 : }
603 :
604 : // Search in database
605 0 : var start = events.length;
606 : while (true) {
607 0 : final eventsFromStore = await room.client.database?.getEventList(
608 0 : room,
609 : start: start,
610 : limit: requestHistoryCount,
611 : ) ??
612 0 : [];
613 0 : if (eventsFromStore.isEmpty) break;
614 0 : start += eventsFromStore.length;
615 0 : for (final event in eventsFromStore) {
616 0 : if (searchFunc(event)) {
617 0 : yield (found..add(event), null);
618 : }
619 : }
620 : }
621 : }
622 :
623 : // Search on the server
624 0 : prevBatch ??= room.prev_batch;
625 : if (sinceEventId != null) {
626 : prevBatch =
627 0 : (await room.client.getEventContext(room.id, sinceEventId)).end;
628 : }
629 0 : final encryption = room.client.encryption;
630 0 : for (var i = 0; i < maxHistoryRequests; i++) {
631 : if (prevBatch == null) break;
632 0 : if (limit != null && found.length >= limit) break;
633 : try {
634 0 : final resp = await room.client.getRoomEvents(
635 0 : room.id,
636 : Direction.b,
637 : from: prevBatch,
638 : limit: requestHistoryCount,
639 0 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
640 : );
641 0 : for (final matrixEvent in resp.chunk) {
642 0 : var event = Event.fromMatrixEvent(matrixEvent, room);
643 0 : if (event.type == EventTypes.Encrypted && encryption != null) {
644 0 : event = await encryption.decryptRoomEvent(room.id, event);
645 0 : if (event.type == EventTypes.Encrypted &&
646 0 : event.messageType == MessageTypes.BadEncrypted &&
647 0 : event.content['can_request_session'] == true) {
648 : // Await requestKey() here to ensure decrypted message bodies
649 0 : await event.requestKey();
650 : }
651 : }
652 0 : if (searchFunc(event)) {
653 0 : yield (found..add(event), resp.end);
654 0 : if (limit != null && found.length >= limit) break;
655 : }
656 : }
657 0 : prevBatch = resp.end;
658 : // We are at the beginning of the room
659 0 : if (resp.chunk.length < requestHistoryCount) break;
660 0 : } on MatrixException catch (e) {
661 : // We have no permission anymore to request the history
662 0 : if (e.error == MatrixError.M_FORBIDDEN) {
663 : break;
664 : }
665 : rethrow;
666 : }
667 : }
668 : return;
669 : }
670 : }
671 :
672 : extension on List<Event> {
673 4 : int get firstIndexWhereNotError {
674 4 : if (isEmpty) return 0;
675 16 : final index = indexWhere((event) => !event.status.isError);
676 9 : if (index == -1) return length;
677 : return index;
678 : }
679 : }
|