feat: Rate limit streams so that large accounts have a smoother UI
parent
b225f9c2c8
commit
7e1e4e4844
@ -0,0 +1,46 @@
|
||||
import 'dart:async';
|
||||
|
||||
extension StreamExtension on Stream {
|
||||
/// Returns a new Stream which outputs only `true` for every update of the original
|
||||
/// stream, ratelimited by the Duration t
|
||||
Stream<bool> rateLimit(Duration t) {
|
||||
final controller = StreamController<bool>();
|
||||
Timer timer;
|
||||
var gotMessage = false;
|
||||
// as we call our inline-defined function recursively we need to make sure that the
|
||||
// variable exists prior of creating the function. Silly dart.
|
||||
Function _onMessage;
|
||||
// callback to determine if we should send out an update
|
||||
_onMessage = () {
|
||||
// do nothing if it is already closed
|
||||
if (controller.isClosed) {
|
||||
return;
|
||||
}
|
||||
if (timer == null) {
|
||||
// if we don't have a timer yet, send out the update and start a timer
|
||||
gotMessage = false;
|
||||
controller.add(true);
|
||||
timer = Timer(t, () {
|
||||
// the timer has ended...delete it and, if we got a message, re-run the
|
||||
// method to send out an update!
|
||||
timer = null;
|
||||
if (gotMessage) {
|
||||
_onMessage();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// set that we got a message
|
||||
gotMessage = true;
|
||||
}
|
||||
};
|
||||
final subscription = listen((_) => _onMessage(),
|
||||
onDone: () => controller.close(),
|
||||
onError: (e, s) => controller.addError(e, s));
|
||||
// add proper cleanup to the subscription and the controller, to not memory leak
|
||||
controller.onCancel = () {
|
||||
subscription.cancel();
|
||||
controller.close();
|
||||
};
|
||||
return controller.stream;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue