You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

794 lines
25 KiB

using Mirle.Component.Record;
using Mirle.Component.SocketDirver.FrameBuilder;
using NLog;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Mirle.Component.SocketDirver
{
/// <summary>
/// Class for Socket wrapper
/// </summary>
public class SocketWrapper : IDisposable
{
/// <summary>
/// Constructure
/// </summary>
/// <param name="log">Log</param>
public SocketWrapper(Logger log)
{
_log = log;
_ = Task.Factory.StartNew(async delegate
{
while (true)
{
await StartupSocketProcess();
await Task.Delay(TimeSpan.FromMilliseconds(1.0));
}
}, TaskCreationOptions.LongRunning);
_ = Task.Factory.StartNew(async delegate
{
while (true)
{
await StartupMessageProcess();
await Task.Delay(TimeSpan.FromMilliseconds(1.0));
}
}, TaskCreationOptions.LongRunning);
}
/// <summary>
/// Constructure
/// </summary>
/// <param name="log">Log</param>
/// <param name="frameBuilder">Interface for message frame builder</param>
public SocketWrapper(Logger log, IFrameBuilder frameBuilder)
{
_log = log;
_frameBuilder = frameBuilder;
_ = Task.Factory.StartNew(async delegate
{
while (true)
{
await StartupSocketProcess();
await Task.Delay(TimeSpan.FromMilliseconds(1.0));
}
}, TaskCreationOptions.LongRunning);
_ = Task.Factory.StartNew(async delegate
{
while (true)
{
await StartupMessageProcess();
await Task.Delay(TimeSpan.FromMilliseconds(1.0));
}
}, TaskCreationOptions.LongRunning);
}
#region === [Private Properties] ===
private readonly Logger _log;
private readonly IFrameBuilder _frameBuilder;
private ConnectState _connectState = ConnectState.Stop;
private Socket _tcpServer;
private Socket _tcpClient;
private readonly List<Socket> _tcpClients = new List<Socket>();
private IPAddress _address;
private int _port;
private readonly SemaphoreSlim _sendMessageLock = new SemaphoreSlim(1);
private readonly Stopwatch _socketFailedStopWatch = Stopwatch.StartNew();
private TimeSpan _socketFailedTime = TimeSpan.Zero;
// For connect mode = server
private readonly List<ConcurrentQueue<byte[]>> _receivedQueueList = new List<ConcurrentQueue<byte[]>>();
private readonly List<Subject<byte[]>> _onReceivedSubjectList = new List<Subject<byte[]>>();
// For connect mode = client
private readonly ConcurrentQueue<byte[]> _receivedQueue = new ConcurrentQueue<byte[]>();
private readonly Subject<byte[]> _onReceivedSubject = new Subject<byte[]>();
#endregion
/// <summary>
/// Socket connect mode
/// </summary>
public ConnectMode ConnectMode { get; set; }
/// <summary>
/// TCP client list
/// </summary>
public List<Socket> TCPClients => _tcpClients;
/// <summary>
/// TCP client count threshold
/// </summary>
public int ClientCount { get; set; }
/// <summary>
/// Socket server/client address
/// </summary>
public string Address
{
get
{
return _address.ToString();
}
set
{
_address = IPAddress.Parse(value);
}
}
/// <summary>
/// Socket server/client port
/// </summary>
public int Port
{
get
{
return _port;
}
set
{
if (value > 1024 && value <= 65535)
{
_port = value;
return;
}
throw new ArgumentOutOfRangeException($"Port is out of range: {value}");
}
}
/// <summary>
/// Is socket server/client connected
/// </summary>
public bool IsConnected => _connectState == ConnectState.Connected;
/// <summary>
/// Time out of keep connected
/// </summary>
public TimeSpan KeepConnectedTimeout { get; set; } = TimeSpan.Zero;
/// <summary>
/// Subject event of receive
/// </summary>
public IObservable<byte[]> OnReceived => _onReceivedSubject.AsObservable();
/// <summary>
/// Subject event of receive
/// </summary>
/// <param name="index">Index of client receive subject list</param>
/// <returns></returns>
public IObservable<byte[]> OnReceivedList(int index) => _onReceivedSubjectList[index].AsObservable();
/// <summary>
/// Open socket connect
/// </summary>
/// <returns>Completed task successfully</returns>
public Task OpenAsync()
{
switch (ConnectMode)
{
case ConnectMode.Server:
{
if (_tcpServer != null)
{
return Task.CompletedTask;
}
break;
}
case ConnectMode.Client:
{
if (_tcpClient != null)
{
return Task.CompletedTask;
}
break;
}
default:
{
throw new ArgumentException($"Unknown socket connect mode by {ConnectMode}");
}
}
if (ConnectMode == ConnectMode.Server)
{
for (int i = 0; i < ClientCount; i++)
{
ConcurrentQueue<byte[]> received_queue_list = new ConcurrentQueue<byte[]>();
_receivedQueueList.Add(received_queue_list);
Subject<byte[]> on_received_subject = new Subject<byte[]>();
_onReceivedSubjectList.Add(on_received_subject);
}
}
SetConnectState(ConnectState.Initial);
return Task.CompletedTask;
}
/// <summary>
/// Close socket connect
/// </summary>
/// <returns>Completed task successfully</returns>
public Task CloseAsync()
{
SetConnectState(ConnectState.Stop);
return Task.CompletedTask;
}
#region === [Send Message] ===
/// <summary>
/// Send message
/// </summary>
/// <param name="payload">Message payload</param>
/// <param name="endPoint">Remote end point of address</param>
public void Send(byte[] payload, EndPoint endPoint = null)
{
try
{
_sendMessageLock.Wait();
InspectClientAndPayload(out Socket client, payload, endPoint);
_ = _frameBuilder == null ?
client.Send(new ArraySegment<byte>(payload), SocketFlags.None) :
client.Send(new ArraySegment<byte>(_frameBuilder.EncodeFrame(payload)), SocketFlags.None);
ResetSocketFailedTime();
}
catch
{
SetSocketFailedTime();
throw;
}
finally
{
_sendMessageLock.Release();
}
}
/// <summary>
/// Send message
/// </summary>
/// <param name="payload">Message payload</param>
/// <param name="endPoint">Client remote end point address</param>
/// <returns>Completed task successfully</returns>
public async Task SendAsync(byte[] payload, EndPoint endPoint = null)
{
CancellationTokenSource cancellationToken_source = new CancellationTokenSource();
CancellationToken cts = cancellationToken_source.Token;
try
{
await _sendMessageLock.WaitAsync();
InspectClientAndPayload(out Socket client, payload, endPoint);
_ = _frameBuilder == null ?
await client.SendAsync(new ArraySegment<byte>(payload), SocketFlags.None, cts) :
await client.SendAsync(new ArraySegment<byte>(_frameBuilder.EncodeFrame(payload)), SocketFlags.None, cts);
ResetSocketFailedTime();
}
catch
{
SetSocketFailedTime();
cancellationToken_source.Cancel();
throw;
}
finally
{
cancellationToken_source.Dispose();
_sendMessageLock.Release();
}
}
private void InspectClientAndPayload(out Socket client, byte[] payload, EndPoint endPoint = null)
{
client = endPoint == null ? _tcpClient : _tcpClients.Where(x => x.RemoteEndPoint == endPoint).Select(x => x).FirstOrDefault() ??
throw new ArgumentNullException($"Can't found socket client by {endPoint} or {_tcpClient.LocalEndPoint}");
if (payload == null)
{
throw new ArgumentNullException($"Payload is null: {payload}");
}
if (!IsConnected)
{
throw new ArgumentException($"Socket client is not client by {client.LocalEndPoint}");
}
if (!client.Connected)
{
throw new SocketException(10004);
}
}
#endregion
#region === [Socket Failed Time Process] ===
private void ResetSocketFailedTime()
{
_socketFailedTime = TimeSpan.Zero;
}
private void SetSocketFailedTime()
{
TimeSpan elapsed = _socketFailedStopWatch.Elapsed;
if (KeepConnectedTimeout == TimeSpan.Zero)
{
SetConnectState(ConnectState.Initial);
}
else if (_socketFailedTime == TimeSpan.Zero)
{
_socketFailedTime = elapsed;
}
else
{
CheckSocketFailedTime();
}
}
private void CheckSocketFailedTime()
{
if (_socketFailedTime != TimeSpan.Zero && _socketFailedStopWatch.Elapsed - _socketFailedTime > KeepConnectedTimeout)
{
SetConnectState(ConnectState.Initial);
}
}
#endregion
private void SetConnectState(ConnectState connectState)
{
_connectState = connectState;
}
private async Task StartupSocketProcess()
{
try
{
await ConnectionStatus();
}
catch
{
SetConnectState(ConnectState.Initial);
await Task.Delay(2000);
}
}
private async Task ConnectionStatus()
{
switch (_connectState)
{
case ConnectState.Initial:
{
ProcessConnectStateByInitial();
break;
}
case ConnectState.WaitForConnecting:
{
await ProcessConnectStateByWatiForConnecting();
break;
}
case ConnectState.Connected:
{
await ProcessConnectStateByConnected();
break;
}
case ConnectState.Stop:
{
ProcessConnectStateByStop();
break;
}
}
}
#region === [Process Socket Connect State By Initial] ===
private void ProcessConnectStateByInitial()
{
try
{
Initial();
ResetSocketFailedTime();
SetConnectState(ConnectState.WaitForConnecting);
}
catch (Exception ex)
{
_log.LogError(MethodBase.GetCurrentMethod().Name, ex);
SetConnectState(ConnectState.Initial);
}
}
private void Initial()
{
switch (ConnectMode)
{
case ConnectMode.Client:
{
while (_receivedQueue.TryDequeue(out byte[] result))
{
_log.Warn($"Clear buffer after TCP client disconnect: {Encoding.ASCII.GetString(result)}");
}
_tcpClient?.Dispose();
_tcpClient = null;
_tcpServer?.Dispose();
_tcpServer = null;
break;
}
case ConnectMode.Server:
{
_tcpClients.ForEach(x => x.Dispose());
_tcpClients.Clear();
foreach (ConcurrentQueue<byte[]> message_queue in _receivedQueueList)
{
while (_receivedQueue.TryDequeue(out byte[] result))
{
_log.Warn($"Clear buffer after TCP client disconnect: {Encoding.ASCII.GetString(result)}");
}
}
break;
}
default:
{
throw new ArgumentException($"Unknown socket connect mode by {ConnectMode}");
}
}
}
#endregion
#region === [Process Socket Connect State By Wait For Connecting] ===
/// <summary>
/// Process socket connect state by wait for connecting
/// </summary>
/// <returns>Completed task successfully</returns>
private async Task ProcessConnectStateByWatiForConnecting()
{
IPEndPoint end_point = new IPEndPoint(_address, _port);
switch (ConnectMode)
{
case ConnectMode.Server:
{
_tcpServer = new Socket(end_point.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_tcpServer.SetIPProtectionLevel(IPProtectionLevel.EdgeRestricted);
_tcpServer.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.IpTimeToLive, false);
_tcpServer.Bind(end_point);
_tcpServer.Listen(ClientCount);
while (_tcpClients.Count != ClientCount)
{
try
{
Socket client = await _tcpServer.AcceptAsync();
SetSocketOptions(client);
_tcpClients.Add(client);
}
catch
{
SetConnectState(ConnectState.Initial);
}
}
SetConnectState(ConnectState.Connected);
break;
}
case ConnectMode.Client:
{
_tcpClient = new Socket(end_point.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
SetSocketOptions(_tcpClient);
await _tcpClient.ConnectAsync(end_point);
if (_tcpClient.Connected)
{
SetConnectState(ConnectState.Connected);
}
else
{
SetConnectState(ConnectState.Initial);
}
break;
}
}
}
private static void SetSocketOptions(Socket socket)
{
socket.ReceiveBufferSize = 4096;
socket.SendBufferSize = 4096;
socket.ReceiveTimeout = (int)TimeSpan.Zero.TotalMilliseconds;
socket.SendTimeout = (int)TimeSpan.Zero.TotalMilliseconds;
socket.NoDelay = true;
socket.LingerState = new LingerOption(true, 0);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.IpTimeToLive, true);
}
#endregion
#region === [Process Socket Connect State By Connected] ===
/// <summary>
/// Process socket connect state by connected
/// </summary>
/// <returns>Completed task successfully</returns>
private async Task ProcessConnectStateByConnected()
{
if (ConnectMode == ConnectMode.Client)
{
await ReadyReadFromOneClient();
}
else
{
await ReadyReadFromMultipleClient();
}
}
private async Task ReadyReadFromOneClient()
{
try
{
await ReadyRead(_tcpClient, _receivedQueue);
}
catch (Exception ex)
{
_log.Error(ex.ToString());
}
finally
{
CheckSocketFailedTime();
}
}
private async Task ReadyReadFromMultipleClient()
{
try
{
List<Task> task_list = new List<Task>();
for (int i = 0; i < _tcpClients.Count; i++)
{
Task result = Task.Run(() => ReadyRead(_tcpClients[i], _receivedQueueList[i]));
await Task.Delay(100);
task_list.Add(result);
}
Task.WaitAll(task_list.ToArray());
}
catch (Exception ex)
{
_log.LogError(MethodBase.GetCurrentMethod().Name, ex);
}
finally
{
CheckSocketFailedTime();
}
}
private async Task ReadyRead(Socket client, ConcurrentQueue<byte[]> queue)
{
List<byte> buffer = new List<byte>();
int decodeFaileCount = 0;
while (true)
{
int available = client.Available;
if (available > 0)
{
byte[] array = new byte[available];
if (client.Receive(array) != array.Length)
{
_log.Warn("Receive buffer length error");
}
buffer.AddRange(array);
}
while (buffer.Any())
{
if (_frameBuilder != null)
{
if (_frameBuilder.TryDecodeFrame(buffer.ToArray(), out int receiveCount, out byte[] payload))
{
decodeFaileCount = 0;
buffer.RemoveRange(0, receiveCount);
queue.Enqueue(payload);
}
else
{
await Task.Delay(100);
decodeFaileCount++;
_log.Warn($"Decode frame failed times: {decodeFaileCount}");
break;
}
}
else
{
byte[] result = buffer.ToArray();
buffer.RemoveRange(0, result.Count());
queue.Enqueue(result);
}
}
if (client.Poll(10, SelectMode.SelectRead) && client.Available == 0)
{
SetConnectState(ConnectState.Initial);
}
if (_connectState != ConnectState.Connected)
{
break;
}
SpinWait.SpinUntil(() => client.Available > 0, TimeSpan.FromSeconds(1.0));
}
}
#endregion
#region === [Process Connect State By Stop] ===
/// <summary>
/// Process socket connect state by stop
/// </summary>
private void ProcessConnectStateByStop()
{
Initial();
ResetSocketFailedTime();
Task.Delay(1000);
}
#endregion
private async Task StartupMessageProcess()
{
try
{
switch (ConnectMode)
{
case ConnectMode.Client:
{
HandleReceivedMessages();
break;
}
case ConnectMode.Server:
{
if (_tcpClients.Count == ClientCount)
{
List<Task> task_list = new List<Task>();
for (int i = 0; i < ClientCount; i++)
{
Task result = Task.Run(() => HandleReceivedMessages(_receivedQueueList[i], _onReceivedSubjectList[i]));
await Task.Delay(100);
task_list.Add(result);
}
Task.WaitAll(task_list.ToArray());
}
break;
}
default:
{
throw new ArgumentException($"Unknown socket connect mode by {ConnectMode}");
}
}
}
catch (Exception ex)
{
_log.LogError(MethodBase.GetCurrentMethod().Name, ex);
}
if (_connectState == ConnectState.Stop)
{
await Task.Delay(100);
}
}
#region === [Received to message]
private void HandleReceivedMessages()
{
while (_receivedQueue.TryDequeue(out byte[] result))
{
_onReceivedSubject.OnNext(result);
}
}
private void HandleReceivedMessages(ConcurrentQueue<byte[]> messageQueue, Subject<byte[]> receivedSubject)
{
while (true)
{
while (messageQueue.TryDequeue(out byte[] result))
{
receivedSubject.OnNext(result);
}
Thread.Sleep(1);
if (_connectState != ConnectState.Connected)
{
break;
}
}
}
#endregion
#region === [Dispose] ===
private bool disposedValue;
/// <inheritdoc/>
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
Initial();
}
disposedValue = true;
}
}
/// <inheritdoc/>
~SocketWrapper()
{
// 請勿變更此程式碼。請將清除程式碼放入 'Dispose(bool disposing)' 方法
Dispose(disposing: false);
}
/// <inheritdoc/>
public void Dispose()
{
// 請勿變更此程式碼。請將清除程式碼放入 'Dispose(bool disposing)' 方法
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
#endregion
}
/// <summary>
/// Connect mode
/// </summary>
public enum ConnectMode
{
/// <summary>
/// Server
/// </summary>
Server,
/// <summary>
/// Client
/// </summary>
Client
}
/// <summary>
/// 連線狀態
/// </summary>
public enum ConnectState
{
/// <summary>
/// Initial
/// </summary>
Initial,
/// <summary>
/// Wait for connecting
/// </summary>
WaitForConnecting,
/// <summary>
/// Connected
/// </summary>
Connected,
/// <summary>
/// Stop
/// </summary>
Stop
}
}