LCOV - code coverage report
Current view: top level - lib/src - timeline.dart (source / functions) Hit Total Coverage
Test: merged.info Lines: 190 289 65.7 %
Date: 2024-05-13 12:56:47 Functions: 0 0 -

          Line data    Source code
       1             : /*
       2             :  *   Famedly Matrix SDK
       3             :  *   Copyright (C) 2019, 2020, 2021 Famedly GmbH
       4             :  *
       5             :  *   This program is free software: you can redistribute it and/or modify
       6             :  *   it under the terms of the GNU Affero General Public License as
       7             :  *   published by the Free Software Foundation, either version 3 of the
       8             :  *   License, or (at your option) any later version.
       9             :  *
      10             :  *   This program is distributed in the hope that it will be useful,
      11             :  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      13             :  *   GNU Affero General Public License for more details.
      14             :  *
      15             :  *   You should have received a copy of the GNU Affero General Public License
      16             :  *   along with this program.  If not, see <https://www.gnu.org/licenses/>.
      17             :  */
      18             : 
      19             : import 'dart:async';
      20             : import 'dart:convert';
      21             : 
      22             : import 'package:collection/collection.dart';
      23             : 
      24             : import 'package:matrix/matrix.dart';
      25             : import 'package:matrix/src/models/timeline_chunk.dart';
      26             : 
      27             : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
      28             : /// automatically. The initial
      29             : /// event list will be retreived when created by the `room.getTimeline()` method.
      30             : 
      31             : class Timeline {
      32             :   final Room room;
      33          24 :   List<Event> get events => chunk.events;
      34             : 
      35             :   /// Map of event ID to map of type to set of aggregated events
      36             :   final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
      37             : 
      38             :   final void Function()? onUpdate;
      39             :   final void Function(int index)? onChange;
      40             :   final void Function(int index)? onInsert;
      41             :   final void Function(int index)? onRemove;
      42             :   final void Function()? onNewEvent;
      43             : 
      44             :   StreamSubscription<EventUpdate>? sub;
      45             :   StreamSubscription<SyncUpdate>? roomSub;
      46             :   StreamSubscription<String>? sessionIdReceivedSub;
      47             :   StreamSubscription<String>? cancelSendEventSub;
      48             :   bool isRequestingHistory = false;
      49             :   bool isRequestingFuture = false;
      50             : 
      51             :   bool allowNewEvent = true;
      52             :   bool isFragmentedTimeline = false;
      53             : 
      54             :   final Map<String, Event> _eventCache = {};
      55             : 
      56             :   TimelineChunk chunk;
      57             : 
      58             :   /// Searches for the event in this timeline. If not
      59             :   /// found, requests from the server. Requested events
      60             :   /// are cached.
      61           2 :   Future<Event?> getEventById(String id) async {
      62           4 :     for (final event in events) {
      63           4 :       if (event.eventId == id) return event;
      64             :     }
      65           4 :     if (_eventCache.containsKey(id)) return _eventCache[id];
      66           4 :     final requestedEvent = await room.getEventById(id);
      67             :     if (requestedEvent == null) return null;
      68           4 :     _eventCache[id] = requestedEvent;
      69           4 :     return _eventCache[id];
      70             :   }
      71             : 
      72             :   // When fetching history, we will collect them into the `_historyUpdates` set
      73             :   // first, and then only process all events at once, once we have the full history.
      74             :   // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
      75             :   // even if /sync's complete while history is being proccessed.
      76             :   bool _collectHistoryUpdates = false;
      77             : 
      78           1 :   bool get canRequestHistory {
      79           2 :     if (events.isEmpty) return true;
      80           0 :     return room.prev_batch != null && events.last.type != EventTypes.RoomCreate;
      81             :   }
      82             : 
      83           2 :   Future<void> requestHistory(
      84             :       {int historyCount = Room.defaultHistoryCount}) async {
      85           2 :     if (isRequestingHistory) {
      86             :       return;
      87             :     }
      88             : 
      89           2 :     isRequestingHistory = true;
      90           2 :     await _requestEvents(direction: Direction.b, historyCount: historyCount);
      91           2 :     isRequestingHistory = false;
      92             :   }
      93             : 
      94           0 :   bool get canRequestFuture => !allowNewEvent;
      95             : 
      96           1 :   Future<void> requestFuture(
      97             :       {int historyCount = Room.defaultHistoryCount}) async {
      98           1 :     if (allowNewEvent) {
      99             :       return; // we shouldn't force to add new events if they will autatically be added
     100             :     }
     101             : 
     102           1 :     if (isRequestingFuture) return;
     103           1 :     isRequestingFuture = true;
     104           1 :     await _requestEvents(direction: Direction.f, historyCount: historyCount);
     105           1 :     isRequestingFuture = false;
     106             :   }
     107             : 
     108           3 :   Future<void> _requestEvents(
     109             :       {int historyCount = Room.defaultHistoryCount,
     110             :       required Direction direction}) async {
     111           4 :     onUpdate?.call();
     112             : 
     113             :     try {
     114             :       // Look up for events in the database first. With fragmented view, we should delete the database cache
     115           3 :       final eventsFromStore = isFragmentedTimeline
     116             :           ? null
     117           8 :           : await room.client.database?.getEventList(
     118           2 :               room,
     119           4 :               start: events.length,
     120             :               limit: historyCount,
     121             :             );
     122             : 
     123           2 :       if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
     124             :         // Fetch all users from database we have got here.
     125           0 :         for (final event in events) {
     126           0 :           if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
     127             :             continue;
     128             :           }
     129             :           final dbUser =
     130           0 :               await room.client.database?.getUser(event.senderId, room);
     131           0 :           if (dbUser != null) room.setState(dbUser);
     132             :         }
     133             : 
     134           0 :         if (direction == Direction.b) {
     135           0 :           events.addAll(eventsFromStore);
     136           0 :           final startIndex = events.length - eventsFromStore.length;
     137           0 :           final endIndex = events.length;
     138           0 :           for (var i = startIndex; i < endIndex; i++) {
     139           0 :             onInsert?.call(i);
     140             :           }
     141             :         } else {
     142           0 :           events.insertAll(0, eventsFromStore);
     143           0 :           final startIndex = eventsFromStore.length;
     144             :           final endIndex = 0;
     145           0 :           for (var i = startIndex; i > endIndex; i--) {
     146           0 :             onInsert?.call(i);
     147             :           }
     148             :         }
     149             :       } else {
     150           6 :         Logs().i('No more events found in the store. Request from server...');
     151           3 :         if (isFragmentedTimeline) {
     152           1 :           await getRoomEvents(
     153             :             historyCount: historyCount,
     154             :             direction: direction,
     155             :           );
     156             :         } else {
     157           4 :           await room.requestHistory(
     158             :             historyCount: historyCount,
     159             :             direction: direction,
     160           2 :             onHistoryReceived: () {
     161           2 :               _collectHistoryUpdates = true;
     162             :             },
     163             :           );
     164             :         }
     165             :       }
     166             :     } finally {
     167           3 :       _collectHistoryUpdates = false;
     168           3 :       isRequestingHistory = false;
     169           4 :       onUpdate?.call();
     170             :     }
     171             :   }
     172             : 
     173             :   /// Request more previous events from the server. [historyCount] defines how much events should
     174             :   /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
     175             :   /// the historical events will be published in the onEvent stream.
     176             :   /// Returns the actual count of received timeline events.
     177           1 :   Future<int> getRoomEvents(
     178             :       {int historyCount = Room.defaultHistoryCount,
     179             :       direction = Direction.b}) async {
     180           3 :     final resp = await room.client.getRoomEvents(
     181           2 :       room.id,
     182             :       direction,
     183           3 :       from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
     184             :       limit: historyCount,
     185           3 :       filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
     186             :     );
     187             : 
     188           1 :     if (resp.end == null) {
     189           2 :       Logs().w('We reached the end of the timeline');
     190             :     }
     191             : 
     192           2 :     final newNextBatch = direction == Direction.b ? resp.start : resp.end;
     193           2 :     final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
     194             : 
     195           1 :     final type = direction == Direction.b
     196             :         ? EventUpdateType.history
     197             :         : EventUpdateType.timeline;
     198             : 
     199           3 :     if ((resp.state?.length ?? 0) == 0 &&
     200           3 :         resp.start != resp.end &&
     201             :         newPrevBatch != null &&
     202             :         newNextBatch != null) {
     203           1 :       if (type == EventUpdateType.history) {
     204           0 :         Logs().w(
     205           0 :             '[nav] we can still request history prevBatch: $type $newPrevBatch');
     206             :       } else {
     207           2 :         Logs().w(
     208           1 :             '[nav] we can still request timeline nextBatch: $type $newNextBatch');
     209             :       }
     210             :     }
     211             : 
     212             :     final newEvents =
     213           6 :         resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
     214             : 
     215           1 :     if (!allowNewEvent) {
     216           3 :       if (resp.start == resp.end ||
     217           3 :           (resp.end == null && direction == Direction.f)) allowNewEvent = true;
     218             : 
     219           1 :       if (allowNewEvent) {
     220           2 :         Logs().d('We now allow sync update into the timeline.');
     221           1 :         newEvents.addAll(
     222           5 :             await room.client.database?.getEventList(room, onlySending: true) ??
     223           0 :                 []);
     224             :       }
     225             :     }
     226             : 
     227             :     // Try to decrypt encrypted events but don't update the database.
     228           2 :     if (room.encrypted && room.client.encryptionEnabled) {
     229           0 :       for (var i = 0; i < newEvents.length; i++) {
     230           0 :         if (newEvents[i].type == EventTypes.Encrypted) {
     231           0 :           newEvents[i] = await room.client.encryption!.decryptRoomEvent(
     232           0 :             room.id,
     233           0 :             newEvents[i],
     234             :           );
     235             :         }
     236             :       }
     237             :     }
     238             : 
     239             :     // update chunk anchors
     240           1 :     if (type == EventUpdateType.history) {
     241           0 :       chunk.prevBatch = newPrevBatch ?? '';
     242             : 
     243           0 :       final offset = chunk.events.length;
     244             : 
     245           0 :       chunk.events.addAll(newEvents);
     246             : 
     247           0 :       for (var i = 0; i < newEvents.length; i++) {
     248           0 :         onInsert?.call(i + offset);
     249             :       }
     250             :     } else {
     251           2 :       chunk.nextBatch = newNextBatch ?? '';
     252           4 :       chunk.events.insertAll(0, newEvents.reversed);
     253             : 
     254           3 :       for (var i = 0; i < newEvents.length; i++) {
     255           2 :         onInsert?.call(i);
     256             :       }
     257             :     }
     258             : 
     259           1 :     if (onUpdate != null) {
     260           2 :       onUpdate!();
     261             :     }
     262           2 :     return resp.chunk.length;
     263             :   }
     264             : 
     265           8 :   Timeline(
     266             :       {required this.room,
     267             :       this.onUpdate,
     268             :       this.onChange,
     269             :       this.onInsert,
     270             :       this.onRemove,
     271             :       this.onNewEvent,
     272             :       required this.chunk}) {
     273          56 :     sub = room.client.onEvent.stream.listen(_handleEventUpdate);
     274             : 
     275             :     // If the timeline is limited we want to clear our events cache
     276          40 :     roomSub = room.client.onSync.stream
     277          55 :         .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
     278          16 :         .listen(_removeEventsNotInThisSync);
     279             : 
     280           8 :     sessionIdReceivedSub =
     281          40 :         room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
     282           8 :     cancelSendEventSub =
     283          48 :         room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
     284             : 
     285             :     // we want to populate our aggregated events
     286          14 :     for (final e in events) {
     287           6 :       addAggregatedEvent(e);
     288             :     }
     289             : 
     290             :     // we are using a fragmented timeline
     291          24 :     if (chunk.nextBatch != '') {
     292           1 :       allowNewEvent = false;
     293           1 :       isFragmentedTimeline = true;
     294             :     }
     295             :   }
     296             : 
     297           2 :   void _cleanUpCancelledEvent(String eventId) {
     298           2 :     final i = _findEvent(event_id: eventId);
     299           6 :     if (i < events.length) {
     300           6 :       removeAggregatedEvent(events[i]);
     301           4 :       events.removeAt(i);
     302           4 :       onRemove?.call(i);
     303           4 :       onUpdate?.call();
     304             :     }
     305             :   }
     306             : 
     307             :   /// Removes all entries from [events] which are not in this SyncUpdate.
     308           2 :   void _removeEventsNotInThisSync(SyncUpdate sync) {
     309          15 :     final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
     310           4 :     final keepEventIds = newSyncEvents.map((e) => e.eventId);
     311           7 :     events.removeWhere((e) => !keepEventIds.contains(e.eventId));
     312             :   }
     313             : 
     314             :   /// Don't forget to call this before you dismiss this object!
     315           0 :   void cancelSubscriptions() {
     316             :     // ignore: discarded_futures
     317           0 :     sub?.cancel();
     318             :     // ignore: discarded_futures
     319           0 :     roomSub?.cancel();
     320             :     // ignore: discarded_futures
     321           0 :     sessionIdReceivedSub?.cancel();
     322             :     // ignore: discarded_futures
     323           0 :     cancelSendEventSub?.cancel();
     324             :   }
     325             : 
     326           1 :   void _sessionKeyReceived(String sessionId) async {
     327             :     var decryptAtLeastOneEvent = false;
     328           1 :     Future<void> decryptFn() async {
     329           3 :       final encryption = room.client.encryption;
     330           3 :       if (!room.client.encryptionEnabled || encryption == null) {
     331             :         return;
     332             :       }
     333           4 :       for (var i = 0; i < events.length; i++) {
     334           4 :         if (events[i].type == EventTypes.Encrypted &&
     335           4 :             events[i].messageType == MessageTypes.BadEncrypted &&
     336           0 :             events[i].content['session_id'] == sessionId) {
     337           0 :           events[i] = await encryption.decryptRoomEvent(
     338           0 :             room.id,
     339           0 :             events[i],
     340             :             store: true,
     341             :             updateType: EventUpdateType.history,
     342             :           );
     343           0 :           addAggregatedEvent(events[i]);
     344           0 :           onChange?.call(i);
     345           0 :           if (events[i].type != EventTypes.Encrypted) {
     346             :             decryptAtLeastOneEvent = true;
     347             :           }
     348             :         }
     349             :       }
     350             :     }
     351             : 
     352           3 :     if (room.client.database != null) {
     353           4 :       await room.client.database?.transaction(decryptFn);
     354             :     } else {
     355           0 :       await decryptFn();
     356             :     }
     357           0 :     if (decryptAtLeastOneEvent) onUpdate?.call();
     358             :   }
     359             : 
     360             :   /// Request the keys for undecryptable events of this timeline
     361           0 :   void requestKeys({
     362             :     bool tryOnlineBackup = true,
     363             :     bool onlineKeyBackupOnly = true,
     364             :   }) {
     365           0 :     for (final event in events) {
     366           0 :       if (event.type == EventTypes.Encrypted &&
     367           0 :           event.messageType == MessageTypes.BadEncrypted &&
     368           0 :           event.content['can_request_session'] == true) {
     369           0 :         final sessionId = event.content.tryGet<String>('session_id');
     370           0 :         final senderKey = event.content.tryGet<String>('sender_key');
     371             :         if (sessionId != null && senderKey != null) {
     372           0 :           room.client.encryption?.keyManager.maybeAutoRequest(
     373           0 :             room.id,
     374             :             sessionId,
     375             :             senderKey,
     376             :             tryOnlineBackup: tryOnlineBackup,
     377             :             onlineKeyBackupOnly: onlineKeyBackupOnly,
     378             :           );
     379             :         }
     380             :       }
     381             :     }
     382             :   }
     383             : 
     384             :   /// Set the read marker to the last synced event in this timeline.
     385           2 :   Future<void> setReadMarker({String? eventId, bool? public}) async {
     386             :     eventId ??=
     387          12 :         events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
     388             :     if (eventId == null) return;
     389           4 :     return room.setReadMarker(eventId, mRead: eventId, public: public);
     390             :   }
     391             : 
     392           7 :   int _findEvent({String? event_id, String? unsigned_txid}) {
     393             :     // we want to find any existing event where either the passed event_id or the passed unsigned_txid
     394             :     // matches either the event_id or transaction_id of the existing event.
     395             :     // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
     396             :     // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
     397             :     // thus meaning we found our element.
     398             :     final searchNeedle = <String>{};
     399             :     if (event_id != null) {
     400           7 :       searchNeedle.add(event_id);
     401             :     }
     402             :     if (unsigned_txid != null) {
     403           4 :       searchNeedle.add(unsigned_txid);
     404             :     }
     405             :     int i;
     406          28 :     for (i = 0; i < events.length; i++) {
     407          21 :       final searchHaystack = <String>{events[i].eventId};
     408             : 
     409          28 :       final txnid = events[i].unsigned?.tryGet<String>('transaction_id');
     410             :       if (txnid != null) {
     411           4 :         searchHaystack.add(txnid);
     412             :       }
     413          14 :       if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
     414             :         break;
     415             :       }
     416             :     }
     417             :     return i;
     418             :   }
     419             : 
     420           3 :   void _removeEventFromSet(Set<Event> eventSet, Event event) {
     421           6 :     eventSet.removeWhere((e) =>
     422           6 :         e.matchesEventOrTransactionId(event.eventId) ||
     423           3 :         event.unsigned != null &&
     424           3 :             e.matchesEventOrTransactionId(
     425           6 :                 event.unsigned?.tryGet<String>('transaction_id')));
     426             :   }
     427             : 
     428           8 :   void addAggregatedEvent(Event event) {
     429             :     // we want to add an event to the aggregation tree
     430           8 :     final relationshipType = event.relationshipType;
     431           8 :     final relationshipEventId = event.relationshipEventId;
     432             :     if (relationshipType == null || relationshipEventId == null) {
     433             :       return; // nothing to do
     434             :     }
     435           6 :     final events = (aggregatedEvents[relationshipEventId] ??=
     436           6 :         <String, Set<Event>>{})[relationshipType] ??= <Event>{};
     437             :     // remove a potential old event
     438           3 :     _removeEventFromSet(events, event);
     439             :     // add the new one
     440           3 :     events.add(event);
     441           3 :     if (onChange != null) {
     442           0 :       final index = _findEvent(event_id: relationshipEventId);
     443           0 :       onChange?.call(index);
     444             :     }
     445             :   }
     446             : 
     447           3 :   void removeAggregatedEvent(Event event) {
     448           9 :     aggregatedEvents.remove(event.eventId);
     449           3 :     if (event.unsigned != null) {
     450          12 :       aggregatedEvents.remove(event.unsigned?['transaction_id']);
     451             :     }
     452           7 :     for (final types in aggregatedEvents.values) {
     453           2 :       for (final events in types.values) {
     454           1 :         _removeEventFromSet(events, event);
     455             :       }
     456             :     }
     457             :   }
     458             : 
     459           7 :   void _handleEventUpdate(EventUpdate eventUpdate, {bool update = true}) {
     460             :     try {
     461          28 :       if (eventUpdate.roomID != room.id) return;
     462             : 
     463          14 :       if (eventUpdate.type != EventUpdateType.timeline &&
     464          12 :           eventUpdate.type != EventUpdateType.history) {
     465             :         return;
     466             :       }
     467             : 
     468          14 :       if (eventUpdate.type == EventUpdateType.timeline) {
     469           7 :         onNewEvent?.call();
     470             :       }
     471             : 
     472           7 :       if (!allowNewEvent) return;
     473             : 
     474          21 :       final status = eventStatusFromInt(eventUpdate.content['status'] ??
     475          15 :           (eventUpdate.content['unsigned'] is Map<String, dynamic>
     476          15 :               ? eventUpdate.content['unsigned'][messageSendingStatusKey]
     477             :               : null) ??
     478           4 :           EventStatus.synced.intValue);
     479             : 
     480           7 :       final i = _findEvent(
     481          14 :           event_id: eventUpdate.content['event_id'],
     482          21 :           unsigned_txid: eventUpdate.content['unsigned'] is Map
     483          21 :               ? eventUpdate.content['unsigned']['transaction_id']
     484             :               : null);
     485             : 
     486          21 :       if (i < events.length) {
     487             :         // if the old status is larger than the new one, we also want to preserve the old status
     488          21 :         final oldStatus = events[i].status;
     489          21 :         events[i] = Event.fromJson(
     490           7 :           eventUpdate.content,
     491           7 :           room,
     492             :         );
     493             :         // do we preserve the status? we should allow 0 -> -1 updates and status increases
     494          14 :         if ((latestEventStatus(status, oldStatus) == oldStatus) &&
     495           9 :             !(status.isError && oldStatus.isSending)) {
     496          21 :           events[i].status = oldStatus;
     497             :         }
     498          21 :         addAggregatedEvent(events[i]);
     499           9 :         onChange?.call(i);
     500             :       } else {
     501           6 :         final newEvent = Event.fromJson(
     502           6 :           eventUpdate.content,
     503           6 :           room,
     504             :         );
     505             : 
     506          12 :         if (eventUpdate.type == EventUpdateType.history &&
     507           6 :             events.indexWhere(
     508          18 :                     (e) => e.eventId == eventUpdate.content['event_id']) !=
     509           3 :                 -1) return;
     510          12 :         var index = events.length;
     511          12 :         if (eventUpdate.type == EventUpdateType.history) {
     512           6 :           events.add(newEvent);
     513             :         } else {
     514           8 :           index = events.firstIndexWhereNotError;
     515           8 :           events.insert(index, newEvent);
     516             :         }
     517          10 :         onInsert?.call(index);
     518             : 
     519           6 :         addAggregatedEvent(newEvent);
     520             :       }
     521             : 
     522             :       // Handle redaction events
     523          21 :       if (eventUpdate.content['type'] == EventTypes.Redaction) {
     524             :         final index =
     525           3 :             _findEvent(event_id: eventUpdate.content.tryGet<String>('redacts'));
     526           3 :         if (index < events.length) {
     527           3 :           removeAggregatedEvent(events[index]);
     528             : 
     529             :           // Is the redacted event a reaction? Then update the event this
     530             :           // belongs to:
     531           1 :           if (onChange != null) {
     532           3 :             final relationshipEventId = events[index].relationshipEventId;
     533             :             if (relationshipEventId != null) {
     534           0 :               onChange?.call(_findEvent(event_id: relationshipEventId));
     535             :               return;
     536             :             }
     537             :           }
     538             : 
     539           4 :           events[index].setRedactionEvent(Event.fromJson(
     540           1 :             eventUpdate.content,
     541           1 :             room,
     542             :           ));
     543           2 :           onChange?.call(index);
     544             :         }
     545             :       }
     546             : 
     547           7 :       if (update && !_collectHistoryUpdates) {
     548           9 :         onUpdate?.call();
     549             :       }
     550             :     } catch (e, s) {
     551           0 :       Logs().w('Handle event update failed', e, s);
     552             :     }
     553             :   }
     554             : 
     555           0 :   @Deprecated('Use [startSearch] instead.')
     556             :   Stream<List<Event>> searchEvent({
     557             :     String? searchTerm,
     558             :     int requestHistoryCount = 100,
     559             :     int maxHistoryRequests = 10,
     560             :     String? sinceEventId,
     561             :     int? limit,
     562             :     bool Function(Event)? searchFunc,
     563             :   }) =>
     564           0 :       startSearch(
     565             :         searchTerm: searchTerm,
     566             :         requestHistoryCount: requestHistoryCount,
     567             :         maxHistoryRequests: maxHistoryRequests,
     568             :         // ignore: deprecated_member_use_from_same_package
     569             :         sinceEventId: sinceEventId,
     570             :         limit: limit,
     571             :         searchFunc: searchFunc,
     572           0 :       ).map((result) => result.$1);
     573             : 
     574             :   /// Searches [searchTerm] in this timeline. It first searches in the
     575             :   /// cache, then in the database and then on the server. The search can
     576             :   /// take a while, which is why this returns a stream so the already found
     577             :   /// events can already be displayed.
     578             :   /// Override the [searchFunc] if you need another search. This will then
     579             :   /// ignore [searchTerm].
     580             :   /// Returns the List of Events and the next prevBatch at the end of the
     581             :   /// search.
     582           0 :   Stream<(List<Event>, String?)> startSearch({
     583             :     String? searchTerm,
     584             :     int requestHistoryCount = 100,
     585             :     int maxHistoryRequests = 10,
     586             :     String? prevBatch,
     587             :     @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
     588             :     int? limit,
     589             :     bool Function(Event)? searchFunc,
     590             :   }) async* {
     591           0 :     assert(searchTerm != null || searchFunc != null);
     592           0 :     searchFunc ??= (event) =>
     593           0 :         event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
     594           0 :     final found = <Event>[];
     595             : 
     596             :     if (sinceEventId == null) {
     597             :       // Search locally
     598           0 :       for (final event in events) {
     599           0 :         if (searchFunc(event)) {
     600           0 :           yield (found..add(event), null);
     601             :         }
     602             :       }
     603             : 
     604             :       // Search in database
     605           0 :       var start = events.length;
     606             :       while (true) {
     607           0 :         final eventsFromStore = await room.client.database?.getEventList(
     608           0 :               room,
     609             :               start: start,
     610             :               limit: requestHistoryCount,
     611             :             ) ??
     612           0 :             [];
     613           0 :         if (eventsFromStore.isEmpty) break;
     614           0 :         start += eventsFromStore.length;
     615           0 :         for (final event in eventsFromStore) {
     616           0 :           if (searchFunc(event)) {
     617           0 :             yield (found..add(event), null);
     618             :           }
     619             :         }
     620             :       }
     621             :     }
     622             : 
     623             :     // Search on the server
     624           0 :     prevBatch ??= room.prev_batch;
     625             :     if (sinceEventId != null) {
     626             :       prevBatch =
     627           0 :           (await room.client.getEventContext(room.id, sinceEventId)).end;
     628             :     }
     629           0 :     final encryption = room.client.encryption;
     630           0 :     for (var i = 0; i < maxHistoryRequests; i++) {
     631             :       if (prevBatch == null) break;
     632           0 :       if (limit != null && found.length >= limit) break;
     633             :       try {
     634           0 :         final resp = await room.client.getRoomEvents(
     635           0 :           room.id,
     636             :           Direction.b,
     637             :           from: prevBatch,
     638             :           limit: requestHistoryCount,
     639           0 :           filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
     640             :         );
     641           0 :         for (final matrixEvent in resp.chunk) {
     642           0 :           var event = Event.fromMatrixEvent(matrixEvent, room);
     643           0 :           if (event.type == EventTypes.Encrypted && encryption != null) {
     644           0 :             event = await encryption.decryptRoomEvent(room.id, event);
     645           0 :             if (event.type == EventTypes.Encrypted &&
     646           0 :                 event.messageType == MessageTypes.BadEncrypted &&
     647           0 :                 event.content['can_request_session'] == true) {
     648             :               // Await requestKey() here to ensure decrypted message bodies
     649           0 :               await event.requestKey();
     650             :             }
     651             :           }
     652           0 :           if (searchFunc(event)) {
     653           0 :             yield (found..add(event), resp.end);
     654           0 :             if (limit != null && found.length >= limit) break;
     655             :           }
     656             :         }
     657           0 :         prevBatch = resp.end;
     658             :         // We are at the beginning of the room
     659           0 :         if (resp.chunk.length < requestHistoryCount) break;
     660           0 :       } on MatrixException catch (e) {
     661             :         // We have no permission anymore to request the history
     662           0 :         if (e.error == MatrixError.M_FORBIDDEN) {
     663             :           break;
     664             :         }
     665             :         rethrow;
     666             :       }
     667             :     }
     668             :     return;
     669             :   }
     670             : }
     671             : 
     672             : extension on List<Event> {
     673           4 :   int get firstIndexWhereNotError {
     674           4 :     if (isEmpty) return 0;
     675          16 :     final index = indexWhere((event) => !event.status.isError);
     676           9 :     if (index == -1) return length;
     677             :     return index;
     678             :   }
     679             : }

Generated by: LCOV version 1.14