You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

794 lines
25 KiB

8 months ago
  1. using Mirle.Component.Record;
  2. using Mirle.Component.SocketDirver.FrameBuilder;
  3. using NLog;
  4. using System;
  5. using System.Collections.Concurrent;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.Linq;
  9. using System.Net;
  10. using System.Net.Sockets;
  11. using System.Reactive.Linq;
  12. using System.Reactive.Subjects;
  13. using System.Reflection;
  14. using System.Text;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. namespace Mirle.Component.SocketDirver
  18. {
  19. /// <summary>
  20. /// Class for Socket wrapper
  21. /// </summary>
  22. public class SocketWrapper : IDisposable
  23. {
  24. /// <summary>
  25. /// Constructure
  26. /// </summary>
  27. /// <param name="log">Log</param>
  28. public SocketWrapper(Logger log)
  29. {
  30. _log = log;
  31. _ = Task.Factory.StartNew(async delegate
  32. {
  33. while (true)
  34. {
  35. await StartupSocketProcess();
  36. await Task.Delay(TimeSpan.FromMilliseconds(1.0));
  37. }
  38. }, TaskCreationOptions.LongRunning);
  39. _ = Task.Factory.StartNew(async delegate
  40. {
  41. while (true)
  42. {
  43. await StartupMessageProcess();
  44. await Task.Delay(TimeSpan.FromMilliseconds(1.0));
  45. }
  46. }, TaskCreationOptions.LongRunning);
  47. }
  48. /// <summary>
  49. /// Constructure
  50. /// </summary>
  51. /// <param name="log">Log</param>
  52. /// <param name="frameBuilder">Interface for message frame builder</param>
  53. public SocketWrapper(Logger log, IFrameBuilder frameBuilder)
  54. {
  55. _log = log;
  56. _frameBuilder = frameBuilder;
  57. _ = Task.Factory.StartNew(async delegate
  58. {
  59. while (true)
  60. {
  61. await StartupSocketProcess();
  62. await Task.Delay(TimeSpan.FromMilliseconds(1.0));
  63. }
  64. }, TaskCreationOptions.LongRunning);
  65. _ = Task.Factory.StartNew(async delegate
  66. {
  67. while (true)
  68. {
  69. await StartupMessageProcess();
  70. await Task.Delay(TimeSpan.FromMilliseconds(1.0));
  71. }
  72. }, TaskCreationOptions.LongRunning);
  73. }
  74. #region === [Private Properties] ===
  75. private readonly Logger _log;
  76. private readonly IFrameBuilder _frameBuilder;
  77. private ConnectState _connectState = ConnectState.Stop;
  78. private Socket _tcpServer;
  79. private Socket _tcpClient;
  80. private readonly List<Socket> _tcpClients = new List<Socket>();
  81. private IPAddress _address;
  82. private int _port;
  83. private readonly SemaphoreSlim _sendMessageLock = new SemaphoreSlim(1);
  84. private readonly Stopwatch _socketFailedStopWatch = Stopwatch.StartNew();
  85. private TimeSpan _socketFailedTime = TimeSpan.Zero;
  86. // For connect mode = server
  87. private readonly List<ConcurrentQueue<byte[]>> _receivedQueueList = new List<ConcurrentQueue<byte[]>>();
  88. private readonly List<Subject<byte[]>> _onReceivedSubjectList = new List<Subject<byte[]>>();
  89. // For connect mode = client
  90. private readonly ConcurrentQueue<byte[]> _receivedQueue = new ConcurrentQueue<byte[]>();
  91. private readonly Subject<byte[]> _onReceivedSubject = new Subject<byte[]>();
  92. #endregion
  93. /// <summary>
  94. /// Socket connect mode
  95. /// </summary>
  96. public ConnectMode ConnectMode { get; set; }
  97. /// <summary>
  98. /// TCP client list
  99. /// </summary>
  100. public List<Socket> TCPClients => _tcpClients;
  101. /// <summary>
  102. /// TCP client count threshold
  103. /// </summary>
  104. public int ClientCount { get; set; }
  105. /// <summary>
  106. /// Socket server/client address
  107. /// </summary>
  108. public string Address
  109. {
  110. get
  111. {
  112. return _address.ToString();
  113. }
  114. set
  115. {
  116. _address = IPAddress.Parse(value);
  117. }
  118. }
  119. /// <summary>
  120. /// Socket server/client port
  121. /// </summary>
  122. public int Port
  123. {
  124. get
  125. {
  126. return _port;
  127. }
  128. set
  129. {
  130. if (value > 1024 && value <= 65535)
  131. {
  132. _port = value;
  133. return;
  134. }
  135. throw new ArgumentOutOfRangeException($"Port is out of range: {value}");
  136. }
  137. }
  138. /// <summary>
  139. /// Is socket server/client connected
  140. /// </summary>
  141. public bool IsConnected => _connectState == ConnectState.Connected;
  142. /// <summary>
  143. /// Time out of keep connected
  144. /// </summary>
  145. public TimeSpan KeepConnectedTimeout { get; set; } = TimeSpan.Zero;
  146. /// <summary>
  147. /// Subject event of receive
  148. /// </summary>
  149. public IObservable<byte[]> OnReceived => _onReceivedSubject.AsObservable();
  150. /// <summary>
  151. /// Subject event of receive
  152. /// </summary>
  153. /// <param name="index">Index of client receive subject list</param>
  154. /// <returns></returns>
  155. public IObservable<byte[]> OnReceivedList(int index) => _onReceivedSubjectList[index].AsObservable();
  156. /// <summary>
  157. /// Open socket connect
  158. /// </summary>
  159. /// <returns>Completed task successfully</returns>
  160. public Task OpenAsync()
  161. {
  162. switch (ConnectMode)
  163. {
  164. case ConnectMode.Server:
  165. {
  166. if (_tcpServer != null)
  167. {
  168. return Task.CompletedTask;
  169. }
  170. break;
  171. }
  172. case ConnectMode.Client:
  173. {
  174. if (_tcpClient != null)
  175. {
  176. return Task.CompletedTask;
  177. }
  178. break;
  179. }
  180. default:
  181. {
  182. throw new ArgumentException($"Unknown socket connect mode by {ConnectMode}");
  183. }
  184. }
  185. if (ConnectMode == ConnectMode.Server)
  186. {
  187. for (int i = 0; i < ClientCount; i++)
  188. {
  189. ConcurrentQueue<byte[]> received_queue_list = new ConcurrentQueue<byte[]>();
  190. _receivedQueueList.Add(received_queue_list);
  191. Subject<byte[]> on_received_subject = new Subject<byte[]>();
  192. _onReceivedSubjectList.Add(on_received_subject);
  193. }
  194. }
  195. SetConnectState(ConnectState.Initial);
  196. return Task.CompletedTask;
  197. }
  198. /// <summary>
  199. /// Close socket connect
  200. /// </summary>
  201. /// <returns>Completed task successfully</returns>
  202. public Task CloseAsync()
  203. {
  204. SetConnectState(ConnectState.Stop);
  205. return Task.CompletedTask;
  206. }
  207. #region === [Send Message] ===
  208. /// <summary>
  209. /// Send message
  210. /// </summary>
  211. /// <param name="payload">Message payload</param>
  212. /// <param name="endPoint">Remote end point of address</param>
  213. public void Send(byte[] payload, EndPoint endPoint = null)
  214. {
  215. try
  216. {
  217. _sendMessageLock.Wait();
  218. InspectClientAndPayload(out Socket client, payload, endPoint);
  219. _ = _frameBuilder == null ?
  220. client.Send(new ArraySegment<byte>(payload), SocketFlags.None) :
  221. client.Send(new ArraySegment<byte>(_frameBuilder.EncodeFrame(payload)), SocketFlags.None);
  222. ResetSocketFailedTime();
  223. }
  224. catch
  225. {
  226. SetSocketFailedTime();
  227. throw;
  228. }
  229. finally
  230. {
  231. _sendMessageLock.Release();
  232. }
  233. }
  234. /// <summary>
  235. /// Send message
  236. /// </summary>
  237. /// <param name="payload">Message payload</param>
  238. /// <param name="endPoint">Client remote end point address</param>
  239. /// <returns>Completed task successfully</returns>
  240. public async Task SendAsync(byte[] payload, EndPoint endPoint = null)
  241. {
  242. CancellationTokenSource cancellationToken_source = new CancellationTokenSource();
  243. CancellationToken cts = cancellationToken_source.Token;
  244. try
  245. {
  246. await _sendMessageLock.WaitAsync();
  247. InspectClientAndPayload(out Socket client, payload, endPoint);
  248. _ = _frameBuilder == null ?
  249. await client.SendAsync(new ArraySegment<byte>(payload), SocketFlags.None, cts) :
  250. await client.SendAsync(new ArraySegment<byte>(_frameBuilder.EncodeFrame(payload)), SocketFlags.None, cts);
  251. ResetSocketFailedTime();
  252. }
  253. catch
  254. {
  255. SetSocketFailedTime();
  256. cancellationToken_source.Cancel();
  257. throw;
  258. }
  259. finally
  260. {
  261. cancellationToken_source.Dispose();
  262. _sendMessageLock.Release();
  263. }
  264. }
  265. private void InspectClientAndPayload(out Socket client, byte[] payload, EndPoint endPoint = null)
  266. {
  267. client = endPoint == null ? _tcpClient : _tcpClients.Where(x => x.RemoteEndPoint == endPoint).Select(x => x).FirstOrDefault() ??
  268. throw new ArgumentNullException($"Can't found socket client by {endPoint} or {_tcpClient.LocalEndPoint}");
  269. if (payload == null)
  270. {
  271. throw new ArgumentNullException($"Payload is null: {payload}");
  272. }
  273. if (!IsConnected)
  274. {
  275. throw new ArgumentException($"Socket client is not client by {client.LocalEndPoint}");
  276. }
  277. if (!client.Connected)
  278. {
  279. throw new SocketException(10004);
  280. }
  281. }
  282. #endregion
  283. #region === [Socket Failed Time Process] ===
  284. private void ResetSocketFailedTime()
  285. {
  286. _socketFailedTime = TimeSpan.Zero;
  287. }
  288. private void SetSocketFailedTime()
  289. {
  290. TimeSpan elapsed = _socketFailedStopWatch.Elapsed;
  291. if (KeepConnectedTimeout == TimeSpan.Zero)
  292. {
  293. SetConnectState(ConnectState.Initial);
  294. }
  295. else if (_socketFailedTime == TimeSpan.Zero)
  296. {
  297. _socketFailedTime = elapsed;
  298. }
  299. else
  300. {
  301. CheckSocketFailedTime();
  302. }
  303. }
  304. private void CheckSocketFailedTime()
  305. {
  306. if (_socketFailedTime != TimeSpan.Zero && _socketFailedStopWatch.Elapsed - _socketFailedTime > KeepConnectedTimeout)
  307. {
  308. SetConnectState(ConnectState.Initial);
  309. }
  310. }
  311. #endregion
  312. private void SetConnectState(ConnectState connectState)
  313. {
  314. _connectState = connectState;
  315. }
  316. private async Task StartupSocketProcess()
  317. {
  318. try
  319. {
  320. await ConnectionStatus();
  321. }
  322. catch
  323. {
  324. SetConnectState(ConnectState.Initial);
  325. await Task.Delay(2000);
  326. }
  327. }
  328. private async Task ConnectionStatus()
  329. {
  330. switch (_connectState)
  331. {
  332. case ConnectState.Initial:
  333. {
  334. ProcessConnectStateByInitial();
  335. break;
  336. }
  337. case ConnectState.WaitForConnecting:
  338. {
  339. await ProcessConnectStateByWatiForConnecting();
  340. break;
  341. }
  342. case ConnectState.Connected:
  343. {
  344. await ProcessConnectStateByConnected();
  345. break;
  346. }
  347. case ConnectState.Stop:
  348. {
  349. ProcessConnectStateByStop();
  350. break;
  351. }
  352. }
  353. }
  354. #region === [Process Socket Connect State By Initial] ===
  355. private void ProcessConnectStateByInitial()
  356. {
  357. try
  358. {
  359. Initial();
  360. ResetSocketFailedTime();
  361. SetConnectState(ConnectState.WaitForConnecting);
  362. }
  363. catch (Exception ex)
  364. {
  365. _log.LogError(MethodBase.GetCurrentMethod().Name, ex);
  366. SetConnectState(ConnectState.Initial);
  367. }
  368. }
  369. private void Initial()
  370. {
  371. switch (ConnectMode)
  372. {
  373. case ConnectMode.Client:
  374. {
  375. while (_receivedQueue.TryDequeue(out byte[] result))
  376. {
  377. _log.Warn($"Clear buffer after TCP client disconnect: {Encoding.ASCII.GetString(result)}");
  378. }
  379. _tcpClient?.Dispose();
  380. _tcpClient = null;
  381. _tcpServer?.Dispose();
  382. _tcpServer = null;
  383. break;
  384. }
  385. case ConnectMode.Server:
  386. {
  387. _tcpClients.ForEach(x => x.Dispose());
  388. _tcpClients.Clear();
  389. foreach (ConcurrentQueue<byte[]> message_queue in _receivedQueueList)
  390. {
  391. while (_receivedQueue.TryDequeue(out byte[] result))
  392. {
  393. _log.Warn($"Clear buffer after TCP client disconnect: {Encoding.ASCII.GetString(result)}");
  394. }
  395. }
  396. break;
  397. }
  398. default:
  399. {
  400. throw new ArgumentException($"Unknown socket connect mode by {ConnectMode}");
  401. }
  402. }
  403. }
  404. #endregion
  405. #region === [Process Socket Connect State By Wait For Connecting] ===
  406. /// <summary>
  407. /// Process socket connect state by wait for connecting
  408. /// </summary>
  409. /// <returns>Completed task successfully</returns>
  410. private async Task ProcessConnectStateByWatiForConnecting()
  411. {
  412. IPEndPoint end_point = new IPEndPoint(_address, _port);
  413. switch (ConnectMode)
  414. {
  415. case ConnectMode.Server:
  416. {
  417. _tcpServer = new Socket(end_point.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  418. _tcpServer.SetIPProtectionLevel(IPProtectionLevel.EdgeRestricted);
  419. _tcpServer.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.IpTimeToLive, false);
  420. _tcpServer.Bind(end_point);
  421. _tcpServer.Listen(ClientCount);
  422. while (_tcpClients.Count != ClientCount)
  423. {
  424. try
  425. {
  426. Socket client = await _tcpServer.AcceptAsync();
  427. SetSocketOptions(client);
  428. _tcpClients.Add(client);
  429. }
  430. catch
  431. {
  432. SetConnectState(ConnectState.Initial);
  433. }
  434. }
  435. SetConnectState(ConnectState.Connected);
  436. break;
  437. }
  438. case ConnectMode.Client:
  439. {
  440. _tcpClient = new Socket(end_point.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  441. SetSocketOptions(_tcpClient);
  442. await _tcpClient.ConnectAsync(end_point);
  443. if (_tcpClient.Connected)
  444. {
  445. SetConnectState(ConnectState.Connected);
  446. }
  447. else
  448. {
  449. SetConnectState(ConnectState.Initial);
  450. }
  451. break;
  452. }
  453. }
  454. }
  455. private static void SetSocketOptions(Socket socket)
  456. {
  457. socket.ReceiveBufferSize = 4096;
  458. socket.SendBufferSize = 4096;
  459. socket.ReceiveTimeout = (int)TimeSpan.Zero.TotalMilliseconds;
  460. socket.SendTimeout = (int)TimeSpan.Zero.TotalMilliseconds;
  461. socket.NoDelay = true;
  462. socket.LingerState = new LingerOption(true, 0);
  463. socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.IpTimeToLive, true);
  464. }
  465. #endregion
  466. #region === [Process Socket Connect State By Connected] ===
  467. /// <summary>
  468. /// Process socket connect state by connected
  469. /// </summary>
  470. /// <returns>Completed task successfully</returns>
  471. private async Task ProcessConnectStateByConnected()
  472. {
  473. if (ConnectMode == ConnectMode.Client)
  474. {
  475. await ReadyReadFromOneClient();
  476. }
  477. else
  478. {
  479. await ReadyReadFromMultipleClient();
  480. }
  481. }
  482. private async Task ReadyReadFromOneClient()
  483. {
  484. try
  485. {
  486. await ReadyRead(_tcpClient, _receivedQueue);
  487. }
  488. catch (Exception ex)
  489. {
  490. _log.Error(ex.ToString());
  491. }
  492. finally
  493. {
  494. CheckSocketFailedTime();
  495. }
  496. }
  497. private async Task ReadyReadFromMultipleClient()
  498. {
  499. try
  500. {
  501. List<Task> task_list = new List<Task>();
  502. for (int i = 0; i < _tcpClients.Count; i++)
  503. {
  504. Task result = Task.Run(() => ReadyRead(_tcpClients[i], _receivedQueueList[i]));
  505. await Task.Delay(100);
  506. task_list.Add(result);
  507. }
  508. Task.WaitAll(task_list.ToArray());
  509. }
  510. catch (Exception ex)
  511. {
  512. _log.LogError(MethodBase.GetCurrentMethod().Name, ex);
  513. }
  514. finally
  515. {
  516. CheckSocketFailedTime();
  517. }
  518. }
  519. private async Task ReadyRead(Socket client, ConcurrentQueue<byte[]> queue)
  520. {
  521. List<byte> buffer = new List<byte>();
  522. int decodeFaileCount = 0;
  523. while (true)
  524. {
  525. int available = client.Available;
  526. if (available > 0)
  527. {
  528. byte[] array = new byte[available];
  529. if (client.Receive(array) != array.Length)
  530. {
  531. _log.Warn("Receive buffer length error");
  532. }
  533. buffer.AddRange(array);
  534. }
  535. while (buffer.Any())
  536. {
  537. if (_frameBuilder != null)
  538. {
  539. if (_frameBuilder.TryDecodeFrame(buffer.ToArray(), out int receiveCount, out byte[] payload))
  540. {
  541. decodeFaileCount = 0;
  542. buffer.RemoveRange(0, receiveCount);
  543. queue.Enqueue(payload);
  544. }
  545. else
  546. {
  547. await Task.Delay(100);
  548. decodeFaileCount++;
  549. _log.Warn($"Decode frame failed times: {decodeFaileCount}");
  550. break;
  551. }
  552. }
  553. else
  554. {
  555. byte[] result = buffer.ToArray();
  556. buffer.RemoveRange(0, result.Count());
  557. queue.Enqueue(result);
  558. }
  559. }
  560. if (client.Poll(10, SelectMode.SelectRead) && client.Available == 0)
  561. {
  562. SetConnectState(ConnectState.Initial);
  563. }
  564. if (_connectState != ConnectState.Connected)
  565. {
  566. break;
  567. }
  568. SpinWait.SpinUntil(() => client.Available > 0, TimeSpan.FromSeconds(1.0));
  569. }
  570. }
  571. #endregion
  572. #region === [Process Connect State By Stop] ===
  573. /// <summary>
  574. /// Process socket connect state by stop
  575. /// </summary>
  576. private void ProcessConnectStateByStop()
  577. {
  578. Initial();
  579. ResetSocketFailedTime();
  580. Task.Delay(1000);
  581. }
  582. #endregion
  583. private async Task StartupMessageProcess()
  584. {
  585. try
  586. {
  587. switch (ConnectMode)
  588. {
  589. case ConnectMode.Client:
  590. {
  591. HandleReceivedMessages();
  592. break;
  593. }
  594. case ConnectMode.Server:
  595. {
  596. if (_tcpClients.Count == ClientCount)
  597. {
  598. List<Task> task_list = new List<Task>();
  599. for (int i = 0; i < ClientCount; i++)
  600. {
  601. Task result = Task.Run(() => HandleReceivedMessages(_receivedQueueList[i], _onReceivedSubjectList[i]));
  602. await Task.Delay(100);
  603. task_list.Add(result);
  604. }
  605. Task.WaitAll(task_list.ToArray());
  606. }
  607. break;
  608. }
  609. default:
  610. {
  611. throw new ArgumentException($"Unknown socket connect mode by {ConnectMode}");
  612. }
  613. }
  614. }
  615. catch (Exception ex)
  616. {
  617. _log.LogError(MethodBase.GetCurrentMethod().Name, ex);
  618. }
  619. if (_connectState == ConnectState.Stop)
  620. {
  621. await Task.Delay(100);
  622. }
  623. }
  624. #region === [Received to message]
  625. private void HandleReceivedMessages()
  626. {
  627. while (_receivedQueue.TryDequeue(out byte[] result))
  628. {
  629. _onReceivedSubject.OnNext(result);
  630. }
  631. }
  632. private void HandleReceivedMessages(ConcurrentQueue<byte[]> messageQueue, Subject<byte[]> receivedSubject)
  633. {
  634. while (true)
  635. {
  636. while (messageQueue.TryDequeue(out byte[] result))
  637. {
  638. receivedSubject.OnNext(result);
  639. }
  640. Thread.Sleep(1);
  641. if (_connectState != ConnectState.Connected)
  642. {
  643. break;
  644. }
  645. }
  646. }
  647. #endregion
  648. #region === [Dispose] ===
  649. private bool disposedValue;
  650. /// <inheritdoc/>
  651. protected virtual void Dispose(bool disposing)
  652. {
  653. if (!disposedValue)
  654. {
  655. if (disposing)
  656. {
  657. Initial();
  658. }
  659. disposedValue = true;
  660. }
  661. }
  662. /// <inheritdoc/>
  663. ~SocketWrapper()
  664. {
  665. // 請勿變更此程式碼。請將清除程式碼放入 'Dispose(bool disposing)' 方法
  666. Dispose(disposing: false);
  667. }
  668. /// <inheritdoc/>
  669. public void Dispose()
  670. {
  671. // 請勿變更此程式碼。請將清除程式碼放入 'Dispose(bool disposing)' 方法
  672. Dispose(disposing: true);
  673. GC.SuppressFinalize(this);
  674. }
  675. #endregion
  676. }
  677. /// <summary>
  678. /// Connect mode
  679. /// </summary>
  680. public enum ConnectMode
  681. {
  682. /// <summary>
  683. /// Server
  684. /// </summary>
  685. Server,
  686. /// <summary>
  687. /// Client
  688. /// </summary>
  689. Client
  690. }
  691. /// <summary>
  692. /// 連線狀態
  693. /// </summary>
  694. public enum ConnectState
  695. {
  696. /// <summary>
  697. /// Initial
  698. /// </summary>
  699. Initial,
  700. /// <summary>
  701. /// Wait for connecting
  702. /// </summary>
  703. WaitForConnecting,
  704. /// <summary>
  705. /// Connected
  706. /// </summary>
  707. Connected,
  708. /// <summary>
  709. /// Stop
  710. /// </summary>
  711. Stop
  712. }
  713. }