Commit 7e98e23f gujlg

socket同步

1 个父辈 456a558d
/*
* @Description: 用于AIOBOX-32系列一体化IO模块
* @CreateDate: 2019-02-28
* @UpdateDate: 2019-05-21
* @UpdateDate: 2019-05-22
* @Author: Asa
* @Version: 1.8
* @Version: 1.9
*
*/
......@@ -12,48 +12,64 @@ using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Collections.Generic;
using OnlineStore.Common;
namespace Asa.IOModule
{
/// <summary>
/// AIOBOX操作类,同步通信
/// AIOBOX操作类,socket同步
/// </summary>
public class AIOBOX2
{
private ushort _flag; //ModBusTCP标识
/// <summary>
/// 暂停次数
/// WriteDO 命令非常多时,暂停一次发送 ReadDI 或 ReadDO
/// </summary>
private int suspend;
private bool suspendDI; //暂停一次DI
private bool suspendDO; //暂停一次DI
private Socket _client; //客户端
private Box_Type _type; //类型
private byte[] _addr; //地址
private Box_Sta[] _sta; //状态
private int _unrevd; //没有收到数据的时间
private int _unrevdRemote; //本地还是远程没有收到数据
private bool _readDI; //自动读取DI
private bool _readDO; //自动读取DO
private int _readDISleep; //自动读取DI间隔
private int _readDOSleep; //自动读取DO间隔
private List<string> _log; //日志
//private byte[] _send; //发送的命令
private System.Collections.Concurrent.ConcurrentQueue<byte[]> _receive; //接收的数据
private System.Collections.Concurrent.ConcurrentQueue<ushort> _flag;
private System.Collections.Concurrent.ConcurrentQueue<byte[]> _send;
private System.Collections.Concurrent.ConcurrentQueue<byte[]> _receive;
//private bool suspendWrite;
//private AutoResetEvent aWrite;
//private AutoResetEvent aReadDI;
//private AutoResetEvent aReadDO;
private Thread tSend;
private Thread tSend; //发送命令处理
private Thread tReceive; //接收信息处理
private Thread tListen; //监听网络
private Thread tTrigger; //触发DI、DO改变事件
private Thread tReadDI; //自动读取DI线程
private Thread tReadDO; //自动读取DO线程
private Thread tLogOut; //日志输出
private Thread tFlag; //ModBusTCP标识
private bool suspendWrite;
private bool accept;
private AutoResetEvent aWrite;
private AutoResetEvent aReadDI;
private AutoResetEvent aReadDO;
private const int SEND_SLEEP = 30; //发送命令间隔
private const int NET_SLEEP = 30; //接收网络间隔
private const int TRIG_SLEEP = 30; //触发事件间隔
/// <summary>
/// 每条命令发送的间隔
/// 不能小于15,会出现IO接收不到的情况
/// 小于30时,会出现接收数据连包的情况
/// </summary>
private const int SEND_SLEEP = 15;
/// <summary>
/// 监听网络接收数据的间隔
/// 必须小于SEND_SLEEP
/// </summary>
private const int NET_SLEEP = 10;
/// <summary>
/// 触发DIO状态事件的间隔
/// </summary>
private const int TRIG_SLEEP = 20;
/// <summary>
/// 自动读取DI委托
......@@ -101,9 +117,9 @@ namespace Asa.IOModule
_log = new List<string>();
Type = Box_Type.DIO_32;
aWrite = new AutoResetEvent(false);
aReadDI = new AutoResetEvent(false);
aReadDO = new AutoResetEvent(false);
//aWrite = new AutoResetEvent(false);
//aReadDI = new AutoResetEvent(false);
//aReadDO = new AutoResetEvent(false);
}
/// <summary>
......@@ -245,6 +261,7 @@ namespace Asa.IOModule
/// <returns></returns>
public bool Connect()
{
IsConn = false;
try
{
//IP合法
......@@ -258,40 +275,46 @@ namespace Asa.IOModule
//Ping服务端
System.Net.NetworkInformation.Ping ping = new System.Net.NetworkInformation.Ping();
System.Net.NetworkInformation.PingReply result = ping.Send(IP, 3000);
System.Net.NetworkInformation.PingReply result = ping.Send(IP, 5000);
if (result.Status != System.Net.NetworkInformation.IPStatus.Success)
{
ErrInfo = "Ping " + IP + " 请求没有回应";
return false;
}
_unrevd = 0;
_unrevdRemote = 0;
_flag = 0;
suspendWrite = false;
accept = false;
_log.Clear();
//suspendWrite = false;
suspend = 0;
suspendDI = false;
suspendDO = false;
_send = new System.Collections.Concurrent.ConcurrentQueue<byte[]>();
_receive = new System.Collections.Concurrent.ConcurrentQueue<byte[]>();
_flag = new System.Collections.Concurrent.ConcurrentQueue<ushort>();
_log.Clear();
//建立连接
_client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, 1000);
_client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, 1000);
_client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, 500);
_client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, 500);
_client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, 1);
//_client.BeginConnect(IP, Port, new AsyncCallback(ConnectCallback), null);
_client.Connect(IPAddress.Parse(IP), Port);
Thread.Sleep(100); //需要等待一会才能获取连接状态
tSend = new Thread(new ThreadStart(Send));
tListen = new Thread(new ThreadStart(Listen));
tTrigger = new Thread(new ThreadStart(TriggerDIO));
tFlag = new Thread(new ThreadStart(Flag));
tFlag.Start();
Thread.Sleep(10);
tReadDI = new Thread(new ThreadStart(AutoReadDI));
tReadDO = new Thread(new ThreadStart(AutoReadDO));
tSend = new Thread(new ThreadStart(Send));
tReceive = new Thread(new ThreadStart(Receive));
tLogOut = new Thread(new ThreadStart(LogPrint));
tSend.Start();
tListen = new Thread(new ThreadStart(Listen));
tTrigger = new Thread(new ThreadStart(TriggerDIO));
tListen.Start();
tTrigger.Start();
tSend.Start();
tReceive.Start();
tLogOut.Start();
tReadDI.Start();
......@@ -304,7 +327,6 @@ namespace Asa.IOModule
catch (Exception ex)
{
ErrInfo = ex.Message;
IsConn = false;
return false;
}
}
......@@ -329,6 +351,8 @@ namespace Asa.IOModule
tSend = null;
if (tReceive != null) tReceive.Abort();
tReceive = null;
if (tFlag != null) tFlag.Abort();
tFlag = null;
if (_client != null)
{
......@@ -466,6 +490,16 @@ namespace Asa.IOModule
{
try
{
//suspendWrite = true;
//if (_readDI) aWrite.WaitOne();
//if (_readDO) aWrite.WaitOne();
//if (_send.Count > 10)
//{
// while (_send.TryDequeue(out byte[] result))
// { }
//}
byte[] data = Command();
byte[] buff = new byte[12];
Array.Copy(data, 0, buff, 0, data.Length);
......@@ -473,51 +507,24 @@ namespace Asa.IOModule
buff[9] = _addr[(int)add]; //地址
buff[10] = (byte)sta; //写入值
suspendWrite = true;
if (_readDI) aWrite.WaitOne();
if (_readDO) aWrite.WaitOne();
accept = false; //接收完数据
int n = 0; //超时时间
int time = 0; //重试次数
while (!accept)
{
if (n == 0)
{
_client.Send(buff);
if (LogOut) AddLog(string.Format("WriteDO({0},{1})", add.ToString(), sta.ToString()), buff);
}
Thread.Sleep(SEND_SLEEP);
n += 5;
if (n >= 1000) //1秒内没有收到返回
{
time++;
n = 0;
}
if (time == 3)
if (LogOut)
{
ErrInfo = "重试3次都没有响应";
break;
}
byte[] bb = new byte[2];
bb[0] = buff[1];
bb[1] = buff[0];
ushort flag = BitConverter.ToUInt16(bb, 0);
string s = string.Format("{0:HH:mm:ss.fff} WriteDO {1} ({2},{3})", DateTime.Now, flag, add.ToString(), sta.ToString());
_log.Add(s);
}
if (accept)
_log.Add("WriteDO Receive");
else
_log.Add(string.Format("{0:HH:mm:ss:fff} WriteDO 重试3次都没有响应", DateTime.Now));
if (time > 0)
{
return false;
}
else
{
suspendWrite = false;
aReadDI.Set();
aReadDO.Set();
ErrInfo = "OK";
suspend++;
_send.Enqueue(buff);
//suspendWrite = false;
//aReadDI.Set();
//aReadDO.Set();
if (_unrevdRemote == 0) ErrInfo = "OK";
return true;
}
}
catch (Exception ex)
{
ErrInfo = ex.Message;
......@@ -544,18 +551,15 @@ namespace Asa.IOModule
private void AddLog(string text, byte[] buff)
{
byte[] bb = new byte[2];
bb[0] = buff[1];
bb[1] = buff[0];
ushort flag = BitConverter.ToUInt16(bb, 0);
string s = string.Format("{0:HH:mm:ss:fff} " + text + " {1} fun={2} len={3}", DateTime.Now, flag, buff[7], buff.Length);
_log.Add(s);
}
/// <summary>
/// 发送命令
/// </summary>
private void Send()
{
string s;
ushort flag;
while (true)
{
bool rtn = _send.TryDequeue(out byte[] result);
......@@ -563,27 +567,26 @@ namespace Asa.IOModule
{
try
{
accept = false;
_client.Send(result);
if (LogOut) AddLog("Send", result);
int n = 0;
while (!accept)
{
n += 5;
Thread.Sleep(5);
if (n >= 500)
//_client.BeginSend(result, 0, result.Length, SocketFlags.None, new AsyncCallback(SendCallback), _client);
if (LogOut)
{
_log.Add("Send Timeout 500ms");
break;
}
byte[] bb = new byte[2];
bb[0] = result[1];
bb[1] = result[0];
flag = BitConverter.ToUInt16(bb, 0);
s = string.Format("{0:HH:mm:ss:fff} Send {1} fun={2} len={3}", DateTime.Now, flag, result[7], result.Length);
_log.Add(s);
}
}
catch (Exception ex)
{
ErrInfo = ex.Message;
_unrevdRemote = 1;
break;
}
}
Thread.Sleep(SEND_SLEEP);
}
}
......@@ -602,7 +605,7 @@ namespace Asa.IOModule
else if (buff[7] == 2)
ReadDI(buff);
else if (buff[7] == 5)
{ }
ReadSingle(buff);
}
Thread.Sleep(10);
}
......@@ -624,7 +627,7 @@ namespace Asa.IOModule
bb[0] = buff[1];
bb[1] = buff[0];
ushort flag = BitConverter.ToUInt16(bb, 0);
s = string.Format("{0:HH:mm:ss:fff} WriteDO Receive {1}", DateTime.Now, flag);
s = string.Format("{0:HH:mm:ss:fff} WriteDO {1} fun={2} len={3}", DateTime.Now, flag, buff[7], buff.Length);
_log.Add(s);
}
......@@ -676,10 +679,11 @@ namespace Asa.IOModule
bb[0] = buff[1];
bb[1] = buff[0];
ushort flag = BitConverter.ToUInt16(bb, 0);
s = string.Format("{0:HH:mm:ss:fff} ReadDO id={1} fun={2} len={3}", DateTime.Now, flag, buff[7], buff.Length);
s = string.Format("{0:HH:mm:ss:fff} ReadDO {1} fun={2} len={3} (", DateTime.Now, flag, buff[7], buff.Length);
s += Convert.ToString(buff[9], 2);
if (buff[8] == 2)
s += "," + Convert.ToString(buff[10], 2);
s += ")";
_log.Add(s);
}
......@@ -712,7 +716,6 @@ namespace Asa.IOModule
catch (Exception ex)
{
ErrInfo = ex.Message;
LogUtil.error(ex.ToString());
return false;
}
......@@ -735,10 +738,11 @@ namespace Asa.IOModule
bb[0] = buff[1];
bb[1] = buff[0];
ushort flag = BitConverter.ToUInt16(bb, 0);
s = string.Format("{0:HH:mm:ss:fff} ReadDI id={1} fun={2} len={3}", DateTime.Now, flag, buff[7], buff.Length);
s = string.Format("{0:HH:mm:ss:fff} ReadDI {1} fun={2} len={3} (", DateTime.Now, flag, buff[7], buff.Length);
s += Convert.ToString(buff[9], 2);
if (buff[8] == 2)
s += "," + Convert.ToString(buff[10], 2);
s += ")";
_log.Add(s);
}
......@@ -771,7 +775,6 @@ namespace Asa.IOModule
catch (Exception ex)
{
ErrInfo = ex.Message;
LogUtil.error(ex.ToString());
return false;
}
......@@ -785,7 +788,8 @@ namespace Asa.IOModule
/// <returns></returns>
private byte[] Command()
{
byte[] add = BitConverter.GetBytes(++_flag);
_flag.TryDequeue(out ushort result);
byte[] add = BitConverter.GetBytes(result);
byte[] data = new byte[7];
data[0] = add[1];
data[1] = add[0];
......@@ -794,7 +798,6 @@ namespace Asa.IOModule
data[4] = 0;
data[5] = 0;
data[6] = 1;
if (_flag == ushort.MaxValue) _flag = 0;
return data;
}
......@@ -851,8 +854,6 @@ namespace Asa.IOModule
_log.RemoveRange(0, len);
len = 0;
}
Thread.Sleep(1000);
}
}
......@@ -866,12 +867,19 @@ namespace Asa.IOModule
{
if (IsConn && _readDI)
{
if (suspendWrite)
//if (suspendWrite)
//{
// aWrite.Set();
// aReadDI.WaitOne();
//}
if (suspendDI && suspend > 0)
{
aWrite.Set();
aReadDI.WaitOne();
suspendDI = false;
suspend--;
}
else
{
byte[] data = Command();
byte[] buff = new byte[12];
Array.Copy(data, 0, buff, 0, data.Length);
......@@ -882,6 +890,8 @@ namespace Asa.IOModule
else if (_type == Box_Type.DIO_32)
buff[11] = 16; //个数
_send.Enqueue(buff);
suspendDI = true;
}
}
Thread.Sleep(_readDISleep);
}
......@@ -896,12 +906,19 @@ namespace Asa.IOModule
{
if (IsConn && _readDO)
{
if (suspendWrite)
//if (suspendWrite)
//{
// aWrite.Set();
// aReadDO.WaitOne();
//}
if (suspendDO && suspend > 0)
{
aWrite.Set();
aReadDO.WaitOne();
suspendDO = false;
suspend--;
}
else
{
byte[] data = Command();
byte[] buff = new byte[12];
Array.Copy(data, 0, buff, 0, data.Length);
......@@ -914,6 +931,8 @@ namespace Asa.IOModule
else if (_type == Box_Type.DO_16)
buff[11] = 16; //个数
_send.Enqueue(buff);
suspendDO = true;
}
}
Thread.Sleep(_readDOSleep);
}
......@@ -924,22 +943,104 @@ namespace Asa.IOModule
/// </summary>
private void Listen()
{
byte[] bb = new byte[100];
byte[] temp = new byte[50];
while (true)
{
if (_client.Available > 0)
{
Thread.Sleep(2);
int len = _client.Receive(bb);
byte[] buff = new byte[len];
Array.Copy(bb, buff, len);
_receive.Enqueue(buff);
accept = true;
//_client.BeginReceive(temp, 0, temp.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), _client);
int len = _client.Receive(temp);
if (len > 15) //连包
{
byte[] aa = new byte[len / 2];
byte[] bb = new byte[len - aa.Length];
Array.Copy(temp, 0, aa, 0, aa.Length);
Array.Copy(temp, aa.Length, bb, 0, bb.Length);
_receive.Enqueue(aa);
_receive.Enqueue(bb);
}
else
{
byte[] cc = new byte[len];
Array.Copy(temp, cc, len);
_receive.Enqueue(cc);
}
}
Thread.Sleep(NET_SLEEP);
}
}
private void SendCallback(IAsyncResult ar)
{
try
{
Socket client = (Socket)ar.AsyncState;
int bytesSent = client.EndSend(ar);
}
catch (Exception ex)
{
ErrInfo = ex.Message;
}
}
private void ReceiveCallback(IAsyncResult ar)
{
try
{
int len = _client.EndReceive(ar);
//if (len > 0)
//{
// if (len > 15) //连包
// {
// byte[] aa = new byte[len / 2];
// byte[] bb = new byte[len - aa.Length];
// Array.Copy(temp, 0, aa, 0, aa.Length);
// Array.Copy(temp, aa.Length, bb, 0, bb.Length);
// _receive.Enqueue(aa);
// _receive.Enqueue(bb);
// }
// else
// {
// byte[] cc = new byte[len];
// Array.Copy(temp, cc, len);
// _receive.Enqueue(cc);
// }
//}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
private void ConnectCallback(IAsyncResult ar)
{
if (!ar.IsCompleted) return;
if (_client == null || !_client.Connected) return;
_client.EndConnect(ar);
}
private void Flag()
{
ushort n = 0;
while (true)
{
if (_flag.Count < 10)
{
_flag.Enqueue(++n);
if (n == ushort.MaxValue) n = 0;
}
Thread.Sleep(5);
}
}
}
}
\ No newline at end of file
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!