Commit f1b5c800 张东亮

12

1 个父辈 59b2af93
此文件类型无法预览
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
......@@ -6,7 +7,6 @@ using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Agv
{
public class Server
......@@ -22,15 +22,13 @@ namespace Agv
public event NodeOnlineEvent NodeOnline;
private string serverIp;
private int serverPort;
private bool _IsdecodeSingleNode = false;
private Dictionary<string, Node> nodeMap = new Dictionary<string, Node>();
/// <summary>
///
/// </summary>
/// <param name="_IsdecodeSingleNode">解码方式,默认解码多个节点</param>
public Server(bool _IsdecodeSingleNode=false)
public Server()
{
this._IsdecodeSingleNode = _IsdecodeSingleNode;
}
/// <summary>
/// 服务端信息
......@@ -45,14 +43,18 @@ namespace Agv
/// <summary>
/// 开启服务
/// </summary>
public void Start(string ip = "127.0.0.1", int port = 12000)
public void Start(string ip = "0.0.0.0", int port = 12000)
{
try
{
log.info("AGVServer服务启动");
serverIp = ip;
serverPort = port;
IPEndPoint localEP = new IPEndPoint(IPAddress.Parse(serverIp), serverPort);
IPEndPoint localEP;
if (ip.Equals("0.0.0.0"))
localEP = new IPEndPoint(IPAddress.Any, serverPort);
else
localEP = new IPEndPoint(IPAddress.Parse(ip), serverPort);
_server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_server.Bind(localEP);
_server.Listen(100);
......@@ -99,7 +101,12 @@ namespace Agv
if (nodeMap.Count > 0)
nodeMap.Clear();
log.info("AGVServer服务关闭");
if (_server != null)
{
_server.Close();
_server.Dispose();
}
}
catch (Exception ex)
{
......@@ -243,12 +250,21 @@ namespace Agv
{
// 开始监听客户端连接请求,Accept方法会阻断当前的线程;
Socket sokConnection = _server.Accept(); // 一旦监听到一个客户端的请求,就返回一个与该客户端通信的 套接字;
log.info(string.Format("客户端[{0}]连接服务端[{1}]成功", sokConnection.RemoteEndPoint.ToString(), ServerInfo));
IPEndPoint endPoint = (IPEndPoint)sokConnection.RemoteEndPoint;
string ip = endPoint.Address.ToString();
int port = endPoint.Port;
if (dictClient.TryGetValue(ip, out Client client1))
{
Offline(client1);
log.debug(string.Format("断开"));
}
log.info(string.Format("客户端[{0}]连接服务端[{1}]成功", endPoint.ToString(), ServerInfo));
Thread thr = new Thread(ListenNet);
Client client = new Client(sokConnection, thr, sokConnection.RemoteEndPoint.ToString());
Client client = new Client(sokConnection, thr, ip, endPoint.ToString());
thr.IsBackground = true;
thr.Start(client);
dictClient.Add(sokConnection.RemoteEndPoint.ToString(), client);
dictClient.Add(ip, client);
}
catch (SocketException)
{
......@@ -260,7 +276,148 @@ namespace Agv
}
}
}
#region 粘包处理
//线程安全的字典
ConcurrentDictionary<string, byte[]> dic = new ConcurrentDictionary<string, byte[]>();
/// <summary>
/// 处理客户端发来的数据
/// </summary>
/// <param name="obj">每个客户的会话ID</param>
/// <param name="bytes">缓冲区数据</param>
/// <returns></returns>
private void StickyBagHandle(Client client, byte[] bytes)
{
//bytes 为系统缓冲区数据
//bytesRead为系统缓冲区长度
int bytesRead = bytes.Length;
string endpoint = client.IP;
if (bytesRead > 0)
{
byte[] surplusBuffer = null;
if (dic.TryGetValue(endpoint, out surplusBuffer))
{
byte[] curBuffer = surplusBuffer.Concat(bytes).ToArray();//拼接上一次剩余的包
//更新会话ID 的最新字节
dic.TryUpdate(endpoint, curBuffer, surplusBuffer);
surplusBuffer = curBuffer;//同步
}
else
{
if (Common.CheckHeadChar(bytes, out int startIdx))//检查是否存在指定包头识别字符
{
//添加会话ID的bytes
byte[] tmp = new byte[bytesRead - startIdx];
Buffer.BlockCopy(bytes, startIdx, tmp, 0, bytesRead - startIdx);
dic.TryAdd(endpoint, tmp);
surplusBuffer = tmp;//同步
}
else
return;
}
//已经完成读取每个数据包长度
int haveRead = 0;
//这里totalLen的长度有可能大于缓冲区大小的(因为 这里的surplusBuffer 是系统缓冲区+不完整的数据包)
int totalLen = surplusBuffer.Length;
while (haveRead <= totalLen)
{
//如果在N次拆解后剩余的数据包连一个包头的长度都不够
//说明是上次读取N个完整数据包后,剩下的最后一个非完整的数据包
if (totalLen - haveRead < Common.headSize)
{
byte[] byteSub = new byte[totalLen - haveRead];
//把剩下不够一个完整的数据包存起来
Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead);
dic.TryUpdate(endpoint, byteSub, surplusBuffer);
surplusBuffer = byteSub;
totalLen = 0;
break;
}
//如果够了一个完整包,则读取包头的数据
byte[] headByte = new byte[Common.headSize];
Buffer.BlockCopy(surplusBuffer, haveRead, headByte, 0, Common.headSize);//从缓冲区里读取包头的字节
//判断包头起始符
int bodySize = BitConverter.ToUInt16(headByte, 2);//从包头里面分析出包体的长度
//这里的 haveRead=等于N个数据包的长度 从0开始;0,1,2,3....N
//如果自定义缓冲区拆解N个包后的长度 大于 总长度,说最后一段数据不够一个完整的包了,拆出来保存
if (haveRead + Common.headSize + bodySize > totalLen)
{
byte[] byteSub = new byte[totalLen - haveRead];
Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead);
dic.TryUpdate(endpoint, byteSub, surplusBuffer);
surplusBuffer = byteSub;
break;
}
else
{
//挨个分解每个包,解析成实际文字
//String strc = Encoding.UTF8.GetString(surplusBuffer, haveRead + headSize, bodySize);
byte[] resBytes = new byte[bodySize];
Buffer.BlockCopy(surplusBuffer, haveRead + Common.headSize, resBytes, 0, bodySize);
log.debug(string.Format("[Receive data from:{0}] -> {1}", endpoint, Common.HexBuff(resBytes)));
DecodeNode(client, resBytes);
//依次累加当前的数据包的长度
haveRead = haveRead + Common.headSize + bodySize;
if (Common.headSize + bodySize == bytesRead)//如果当前接收的数据包长度正好等于缓冲区长度,则待拼接的不规则数据长度归0
{
byte[] xbtye = null;
dic.TryRemove(endpoint, out xbtye);
surplusBuffer = null;//设置空 回到原始状态
totalLen = 0;//清0
}
}
}
}
}
#endregion
private void DecodeNode(Client client, byte[] resultBytes)
{
//解码单个节点
if (Common.EnDecodeSingleNode)
{
Node node = Common.Decode(resultBytes);
if (node == null)
{
log.error("命令解析失败: " + Common.HexBuff(resultBytes));
}
else
{
//CommonVar.LogUtil.info("Receive[" + client.IP + "] " + node.ToText());
int idx = client.nodeName.FindIndex(s => s == node.Name);
if (idx == -1) client.nodeName.Add(node.Name);
UpdateNode(node);
}
}
else
{
//解码多个节点
List<Node> nodes = Common.DecodeNodes(resultBytes);
if (nodes == null)
{
log.error("命令解析失败: " + Common.HexBuff(resultBytes));
}
else
{
int idx = -1;
foreach (var node in nodes)
{
idx = client.nodeName.FindIndex(s => s == node.Name);
if (idx == -1) client.nodeName.Add(node.Name);
}
UpdateNodes(nodes);
}
}
}
/// <summary>
/// 客户端数据接收
/// </summary>
......@@ -278,28 +435,14 @@ namespace Agv
byte[] arrMsgRec = new byte[1024];
// 将接收到的数据存入到输入 arrMsgRec中;
int length = -1;
if (sokClient.Poll(-1, SelectMode.SelectRead))
if (!client.Loop) return;
if (sokClient != null && sokClient.Poll(-1, SelectMode.SelectRead))
{
try
{
length = sokClient.Receive(arrMsgRec); // 接收数据,并返回数据的长度;
if (length == 0)//连接正常断开
{
Offline(client);
log.debug(string.Format("客户端[{0}]断开与服务端[{1}]的连接", sokClient.RemoteEndPoint.ToString(), ServerInfo));
return;
}
}
catch (SocketException se)
{
Offline(client);
log.error(string.Format("客户端[{0}]接收服务端[{1}]的消息出现异常", sokClient.RemoteEndPoint.ToString(), ServerInfo), se);
return;
}
catch (Exception e)
length = sokClient.Receive(arrMsgRec); // 接收数据,并返回数据的长度;
if (length == 0)//连接正常断开
{
Offline(client);
log.error(string.Format("客户端[{0}]接收服务端[{1}]的消息出现异常", sokClient.RemoteEndPoint.ToString(), ServerInfo), e);
log.info(string.Format("客户端[{0}]断开与服务端[{1}]的连接", sokClient.RemoteEndPoint.ToString(), ServerInfo));
return;
}
}
......@@ -308,51 +451,13 @@ namespace Agv
{
byte[] buff = new byte[length];
Array.Copy(arrMsgRec, 0, buff, 0, length);
//解码单个节点
if(_IsdecodeSingleNode)
{
Node node = Common.Decode(buff);
if (node == null)
{
log.error("命令解析失败: " + HexBuff(buff));
}
else
{
//CommonVar.LogUtil.info("Receive[" + client.IP + "] " + node.ToText());
int idx = client.nodeName.FindIndex(s => s == node.Name);
if (idx == -1) client.nodeName.Add(node.Name);
UpdateNode(node);
}
}
else
{
//解码多个节点
List<Node> nodes = Common.DecodeNodes(buff);
if (nodes == null)
{
log.error("命令解析失败: " + HexBuff(buff));
}
else
{
int idx = -1;
foreach (var node in nodes)
{
idx = client.nodeName.FindIndex(s => s == node.Name);
if (idx == -1) client.nodeName.Add(node.Name);
}
UpdateNodes(nodes);
}
}
StickyBagHandle(client, buff);
}
}
catch (Exception ex)
catch (Exception e)
{
log.error("ListenNet", ex);
}
}
}
}
......@@ -367,12 +472,12 @@ namespace Agv
}
private void UpdateNode(Node node)
{
if(!nodeOnline.Keys.Contains(node.Name))
if (!nodeOnline.Keys.Contains(node.Name))
{
nodeOnline.Add(node.Name,true);
nodeOnline.Add(node.Name, true);
NodeOnline?.Invoke(node.Name, true);
}
else if(nodeOnline[node.Name].Equals(false))
else if (nodeOnline[node.Name].Equals(false))
{
nodeOnline[node.Name] = true;
NodeOnline?.Invoke(node.Name, true);
......@@ -385,7 +490,7 @@ namespace Agv
}
else if (!nodeMap[node.Name].Equals(node))
{
nodeMap[node.Name]= node;
nodeMap[node.Name] = node;
NodeChanged?.Invoke(node);
log.info(string.Format("节点状态更新[{0}]", node.ToText()));
}
......@@ -396,29 +501,44 @@ namespace Agv
foreach (var node in nodes)
{
UpdateNode(node);
Thread.Sleep(500);
}
}
private void Offline(Client client)
{
client.Loop = false;
client.Socket.Close();
for (int i = 0; i < client.nodeName.Count; i++)
try
{
if (!nodeOnline.Keys.Contains(client.nodeName[i]))
client.Loop = false;
dictClient.Remove(client.IP);
if (client.Socket != null)
{
nodeOnline.Add(client.nodeName[i], false);
NodeOnline?.Invoke(client.nodeName[i], false);
client.Socket.Close();
client.Socket = null;
}
else if (nodeOnline[client.nodeName[i]].Equals(true))
for (int i = 0; i < client.nodeName.Count; i++)
{
nodeOnline[client.nodeName[i]] = false;
NodeOnline?.Invoke(client.nodeName[i], false);
if (!nodeOnline.Keys.Contains(client.nodeName[i]))
{
nodeOnline.Add(client.nodeName[i], false);
NodeOnline?.Invoke(client.nodeName[i], false);
}
else if (nodeOnline[client.nodeName[i]].Equals(true))
{
nodeOnline[client.nodeName[i]] = false;
NodeOnline?.Invoke(client.nodeName[i], false);
}
if (nodeMap.ContainsKey(client.nodeName[i]))
nodeMap.Remove(client.nodeName[i]);
}
client.nodeName.Clear();
client.ListenNet.Abort();
log.info(string.Format("关闭对客户端[{0}]的监听线程", client.Endpoint));
}
catch (ThreadAbortException)
{
log.info(string.Format("关闭对客户端[{0}]的监听线程", client.Endpoint));
}
client.nodeName.Clear();
client.ListenNet.Abort();
client.Socket = null;
dictClient.Remove(client.IP);
}
private void Offline(string clientKey)
{
......@@ -445,15 +565,6 @@ namespace Agv
client.Socket = null;
dictClient.Remove(client.IP);
}
private string HexBuff(byte[] buff)
{
string s = "";
if (buff == null) return s;
for (int i = 0; i < buff.Length; i++)
s += buff[i].ToString("X2") + " ";
return s;
}
/// <summary>
/// 发送命令
......@@ -468,7 +579,7 @@ namespace Agv
try
{
dictClient[clientKey].Socket.Send(buff);
log.debug(string.Format("服务端[{0}]向客户端[{1}]发送消息:[{2}]", ServerInfo, clientKey, HexBuff(buff)));
log.debug(string.Format("服务端[{0}]向客户端[{1}]发送消息:[{2}]", ServerInfo, clientKey, Common.HexBuff(buff)));
return true;
}
catch (Exception ex)
......@@ -486,16 +597,18 @@ namespace Agv
private class Client
{
public bool Loop;
public string Endpoint;
public string IP;
public List<string> nodeName;
public Socket Socket;
public Thread ListenNet;
public Client(Socket socket,Thread thread,string IP)
public Client(Socket socket, Thread thread, string ip, string Endpoint)
{
Socket = socket;
ListenNet = thread;
Loop = true;
this.IP = IP;
IP = ip;
this.Endpoint = Endpoint;
nodeName = new List<string>();
}
}
......@@ -514,12 +627,11 @@ namespace Agv
/// 是否可调用
/// </summary>
public bool IsUse { set; get; }
public string AliceName { get; set; }
/// <summary>
/// 线体名
/// </summary>
public string LineName { set; get; }
public string AliceName { get; set; }
/// <summary>
/// 客户端的节点
......@@ -538,6 +650,19 @@ namespace Agv
/// <param name="name"></param>
/// <param name="ip"></param>
/// <param name="isUse"></param>
public ClientNode(string name, string aliceName, bool isUse, int priority = 0) : base(name)
{
AliceName = aliceName;
Online = false;
IsUse = isUse;
Priority = priority;
}
/// <summary>
/// 客户端节点
/// </summary>
/// <param name="name"></param>
/// <param name="ip"></param>
/// <param name="isUse"></param>
public ClientNode(string name, string aliceName, string lineName, bool isUse) : base(name)
{
LineName = lineName;
......
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!