Flutter中的WebSocket重连

持续的套接字连接对于确保正确的应用程序行为至关重要。无论是提供实时聊天更新、股票价格还是应用内指标,可靠的连接都至关重要。

套接字令人恼火的问题之一是突然失去连接。如果真正的原因并不明显,即互联网连接不稳定,那么中断原因往往会被很好地隐藏起来。为了解决这个问题,我们可以实施套接字自动重新连接策略。让我们看看 Dart 的行业标准套接字库——web_socket_channel 中有哪些选项。

经典方法

库中的示例非常简单:

import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;

main() async {
  final wsUrl = Uri.parse('ws://localhost:1234')
  var channel = WebSocketChannel.connect(wsUrl);

  channel.stream.listen((message) {
    channel.sink.add('received!');
    channel.sink.close(status.goingAway);
  });
}

遗憾的是,WebSocketChannel 并不提供处理重新连接的内置配置选项。因此,我们需要手动对流错误做出反应。让我们模仿一下 WebSocket 突然出错的情况。以下是在流监听器中捕获错误的方法:

channel.stream.listen(
        (message) {
          channel.sink.add('received!');
        },
        onError: (error) {
          // Handle error here
        },
        onDone: () {
          // Handle socket disruption
        },
      );

典型的解决方案是再次调用 WebSocketChannel.connect,并在回调中覆盖流。

onDone: () {
  channel = WebSocketChannel.connect(Uri.parse(url));
  stream = channel.stream.listen(
    ...
  );
},

虽然这种方法行之有效,但在具有良好结构的生产应用程序中可能会变得繁琐。

简洁架构解决方案

典型的应用程序架构由不同的层、类和责任区组成。让我们来看看简洁架构示例:

Flutter中的WebSocket重连

套接字通常在数据层中初始化,在表现层中使用。理想情况下,表现层不应该知道套接字的内部工作,包括套接字是否正在尝试重新连接。

然而,前一种方法迫使我们在表现层中管理重新连接逻辑。因此,我们可以将重新连接逻辑整合到数据层,最好是整合到一个类中,而不是让用户界面层承担过多的数据责任。

双流策略

我们的想法是利用两个流:一个内流负责保持与套接字的连接,另一个外流作为其他类的入口,同时保持连接。内层流负责处理错误和重新连接,而外层流则保持不动,等待来自内层流的数据。

Flutter中的WebSocket重连

实现 SocketChannel 类

让我们深入了解 SocketChannel 类的实现,它将处理我们的重连接逻辑。首先,我们将把套接字配置传递给该类:

SocketChannel getChannel() { 
    return SocketChannel( 
      () => IOWebSocketChannel.connect( 
        'ws://localhost:1234' , 
      ), 
    ); 
  }

SocketChannel 类将处理订阅、重新连接、消息发送和数据流。我们还需要一个传递消息的汇和 IOWebSocketChannel 本身,这将从构造函数参数中提取。

如前所述,我们将实现一个内流和一个外流,后者将由 rxdart 库中的 BehaviorSubject 呈现。因此,每次有人连接到我们的套接字类时,他们都将从套接字中获得最新数据。

class SocketChannel {
  SocketChannel(this._getIOWebSocketChannel) {
    _startConnection();
  }

  final IOWebSocketChannel Function() _getIOWebSocketChannel;

  late IOWebSocketChannel _ioWebSocketChannel;

  WebSocketSink get _sink => _ioWebSocketChannel.sink;

  late Stream<dynamic> _innerStream;

  final _outerStreamSubject = BehaviorSubject<dynamic>();

  Stream<dynamic> get stream => _outerStreamSubject.stream;
}

现在,让我们添加 _startConnection() 方法,以便从构造函数启动套接字连接:

void _startConnection() {
  _ioWebSocketChannel = _getIOWebSocketChannel();
  _innerStream = _ioWebSocketChannel.stream;
  _innerStream.listen(
    (event) {
      // Forward data to outer stream
      _outerStreamSubject.add(event);
    },
    onError: (error) {
      // Handle web socket connection error
      _handleLostConnection();
    },
    onDone: () {
      // Handle web socket connection break
      _handleLostConnection();
    },
  );
}

void _handleLostConnection() {
  _startConnection();
}

改进重新连接逻辑

为了增强我们的解决方案,让我们来解决套接字无法立即重新连接的情况。例如,如果互联网连接中断了几分钟,我们可以实施一种 ping 机制来定期检查服务器的状态。第一次重新连接尝试应在初始连接中断后立即进行,随后的尝试将被延迟。

bool _isFirstRestart = false;
bool _isFollowingRestart = false;

void _handleLostConnection() {
  if (_isFirstRestart && !_isFollowingRestart) {
    Future.delayed(const Duration(seconds: 3), () {
      _isFollowingRestart = false;
      _startConnection();
    });
    _isFollowingRestart = true;
  } else {
    _isFirstRestart = true;
    _startConnection();
  }
}

最后,我们可以添加 close() 方法来关闭套接字。关闭套接字将触发 onDone 回调,因此我们需要在方法中设置 _isManuallyClose = true 标志,并在回调中进行检查。

bool _isManuallyClosed = false;

void _startConnection() {
...

  onDone: () {
    if (!_isManuallyClosed) {
      _handleLostConnection();
    }
  },

...
}  

void close() {
  _isManuallyClosed = true;
  _sink.close();
}

最后结果:

import 'package:rxdart/rxdart.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

class SocketChannel {
  SocketChannel(this._getIOWebSocketChannel) {
    _startConnection();
  }

  final IOWebSocketChannel Function() _getIOWebSocketChannel;

  late IOWebSocketChannel _ioWebSocketChannel;

  WebSocketSink get _sink => _ioWebSocketChannel.sink;

  late Stream<dynamic> _innerStream;

  final _outerStreamSubject = BehaviorSubject<dynamic>();

  Stream<dynamic> get stream => _outerStreamSubject.stream;

  bool _isFirstRestart = false;
  bool _isFollowingRestart = false;
  bool _isManuallyClosed = false;

  void _handleLostConnection() {
    if (_isFirstRestart && !_isFollowingRestart) {
      Future.delayed(const Duration(seconds: 3), () {
        _isFollowingRestart = false;
        _startConnection();
      });
      _isFollowingRestart = true;
    } else {
      _isFirstRestart = true;
      _startConnection();
    }
  }

  void _startConnection() {
    _ioWebSocketChannel = _getIOWebSocketChannel();
    _innerStream = _ioWebSocketChannel.stream;
    _innerStream.listen(
      (event) {
        _isFirstRestart = false;
        _outerStreamSubject.add(event);
      },
      onError: (error) {
        _handleLostConnection();
      },
      onDone: () {
        if (!_isManuallyClosed) {
          _handleLostConnection();
        }
      },
    );
  }

  void sendMessage(String message) => _sink.add(message);

  void close() {
    _isManuallyClosed = true;
    _sink.close();
  }
}

结论

在本文中,我们探讨了 Flutter 应用程序中的套接字重连接,并使用 SocketChannel 类实现了一个简洁高效的解决方案。通过在数据层中封装重连接逻辑,我们可以保持表现层的简洁。有了延迟重连接的附加功能,我们就为保持连续的套接字连接奠定了基础。

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/34195.html

(0)

相关推荐

发表回复

登录后才能评论