...
 
Commits (2)
......@@ -16,7 +16,7 @@ void main() async {
for (Room room in await user.rooms.get()) {
print(room.name);
}
user.stopSync();
await user.stopSync();
await sub.cancel();
});
......
......@@ -7,6 +7,7 @@
import 'dart:convert';
import 'package:async/async.dart';
import 'package:chopper/chopper.dart';
import 'package:kiwi/kiwi.dart';
import 'package:matrix_sdk/src/api/access_token_interceptor.dart';
......@@ -172,8 +173,8 @@ class LocalUser extends User {
String _syncToken;
bool _syncing = false;
bool get isSyncing => _syncing;
bool _isSyncing;
bool get isSyncing => _isSyncing;
final ChopperClient _chopper;
ClientService get _client => _chopper.getService();
......@@ -251,6 +252,10 @@ class LocalUser extends User {
/// Invalidates the access token of the user. Makes all
/// [LocalUser] calls unusable.
Future<void> logout() async {
if (isSyncing) {
await stopSync();
}
await _client.logout();
_isLoggedOut = true;
}
......@@ -306,31 +311,48 @@ class LocalUser extends User {
final _syncSubject = PublishSubject<SyncState>();
Observable<SyncState> get sync => _syncSubject.stream;
Future<void> _syncFuture;
CancelableOperation<Response> _cancelableSyncOnceResponse;
/// Syncs data with the user's [homeserver]. Use the [sync] [Observable] to
/// listen to sync changes.
///
/// If you `await` this method, it will wait until the sync has been stopped
/// by [stopSync]. In most cases you would want to call it like this:
/// `unawaited(user.startSync())`.
Future<void> startSync({
void startSync({
Duration maxRetryAfter = const Duration(seconds: 30),
}) async {
}) {
if (isLoggedOut) {
throw StateError('The user can not be logged out');
}
_syncing = true;
_syncFuture = _startSync(maxRetryAfter: maxRetryAfter);
}
bool _shouldStopSync = false;
Future<void> _startSync({
Duration maxRetryAfter = const Duration(seconds: 30),
}) async {
_shouldStopSync = false;
_isSyncing = true;
// This var is used to implements exponential backoff
// until it reaches maxRetryAfter
var retryAfter = 500;
while (_syncing) {
final state = await syncOnce(timeout: Duration(seconds: 10));
while (!_shouldStopSync) {
final state = await _syncOnce(
timeout: Duration(seconds: 10),
partOfSyncStream: true,
);
if (_shouldStopSync) return;
_syncSubject.add(state);
if (!state.succeeded) {
await Future.delayed(Duration(milliseconds: retryAfter));
if (_shouldStopSync) return;
retryAfter = (retryAfter * 1.5).floor();
if (retryAfter > maxRetryAfter.inMilliseconds) {
retryAfter = maxRetryAfter.inMilliseconds;
......@@ -347,11 +369,28 @@ class LocalUser extends User {
Map<String, dynamic> filter,
timeout = Duration.zero,
bool fullState = false,
}) =>
_syncOnce(
updateSyncToken: updateSyncToken,
filter: filter,
timeout: timeout,
fullState: fullState,
partOfSyncStream: false,
);
Future<SyncState> _syncOnce({
bool updateSyncToken = true,
Map<String, dynamic> filter,
timeout = Duration.zero,
bool fullState = false,
bool partOfSyncStream = false,
}) async {
if (isLoggedOut) {
throw StateError('The user can not be logged out');
}
if (partOfSyncStream && _shouldStopSync) return null;
_syncToken = await _store?.getSyncToken() ?? _syncToken;
if (filter == null) {
......@@ -369,15 +408,29 @@ class LocalUser extends User {
filter['room']['state']['lazy_load_members'] = true;
try {
Response response = await _client.sync(
final cancelable = CancelableOperation.fromFuture(
_client.sync(
since: _syncToken,
fullState: fullState,
filter: json.encode(filter),
timeout: timeout.inMilliseconds);
timeout: timeout.inMilliseconds,
),
);
if (partOfSyncStream) {
_cancelableSyncOnceResponse = cancelable;
}
Response response = await cancelable.valueOrCancellation();
if (response == null) return null;
final body = json.decode(response.body);
final changedRooms = await _processRooms(body, body['account_data']);
if (partOfSyncStream && _shouldStopSync) return null;
if (updateSyncToken) {
// Store sync token
_syncToken = body['next_batch'] as String;
......@@ -390,8 +443,11 @@ class LocalUser extends User {
}
}
void stopSync() {
_syncing = false;
Future<void> stopSync() async {
_shouldStopSync = true;
await _cancelableSyncOnceResponse?.cancel();
await _syncFuture;
_isSyncing = false;
}
// Returns list of changed rooms
......
......@@ -16,6 +16,7 @@ dependencies:
kiwi: ^0.2.0
rxdart: ^0.22.6
ffi: ^0.1.3
async: ^2.4.0
dev_dependencies:
pedantic: ^1.0.0
......