forked from miyu/Dargon.Transport
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathTcpFrameTransmitter.cs
173 lines (156 loc) · 5.56 KB
/
TcpFrameTransmitter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
using System;
using System.IO;
using System.Net.Sockets;
namespace Dargon.Transport
{
public class TcpFrameTransmitter : IFrameTransmitter
{
/// <summary>
/// The socket which is used for our DSP connection
/// </summary>
private readonly Socket m_socket;
/// <summary>
/// The network stream associated with our socket
/// </summary>
private readonly NetworkStream m_networkStream;
/// <summary>
/// The binary writer which is used to write from to network stream
/// </summary>
private readonly BinaryWriter m_writer;
/// <summary>
/// All input is stored into this input buffer.
/// </summary>
private readonly byte[] m_inputBuffer = new byte[DTPConstants.kMaxMessageSize];
/// <summary>
/// Initializes a new instance of a TCP Frame Transmitter for DSPEx
/// </summary>
/// <param name="host">
/// The hostname which we are connecting to.
/// </param>
/// <param name="port">
/// The port which we are connecting to.
/// </param>
public TcpFrameTransmitter(string host, int port)
{
m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
m_socket.Connect(host, port);
m_networkStream = new NetworkStream(m_socket);
m_writer = new BinaryWriter(m_networkStream);
// Elevate to DSPEx - this blocks until the byte has been written to the underlying stream.
m_writer.Write((byte)DTP.DSPEX_INIT);
}
/// <summary>
/// Begins receiving message frames.
/// </summary>
/// <param name="onFrameReceived">
/// When a message frame is received, this callback is invoked.
/// </param>
public void BeginReceivingMessageFrames(Action<byte[]> onFrameReceived)
{
BeginReceiveMessage(onFrameReceived);
}
/// <summary>
/// Begins receiving a DSPEx message.
/// In reality, this starts async read to get the UINT messageLength of an incoming message.
/// After that is received, another async read begins to get the transaction id of the message.
/// </summary>
/// <param name="onFrameReceived">
/// This callback is invoked when a frame is received.
/// </param>
private void BeginReceiveMessage(Action<byte[]> onFrameReceived)
{
StateObject so = new StateObject()
{
buffer = m_inputBuffer,
bytesRead = 0
};
ContinueReceiveMessage(so, onFrameReceived);
}
/// <summary>
/// Continues to receive a DSPEx message.
/// The first block of this method runs an async loop which reads the first four bytes of our
/// DSPEx message, which tells us the length of our message.
/// The second block of the method runs an async loop which reads the remainder of the message.
/// </summary>
/// <param name="so"></param>
/// <param name="onFrameReceived">
/// This callback is invoked when a frame is received.
/// </param>
private void ContinueReceiveMessage(StateObject so, Action<byte[]> onFrameReceived)
{
bool readingLength = so.bytesRead < 4;
if (readingLength)
{
m_networkStream.BeginRead(
so.buffer,
so.bytesRead, 4 - so.bytesRead, //Read bytes of index [0, 3]
(asyncResult) =>
{
int bytesRead = m_networkStream.EndRead(asyncResult);
so.bytesRead += bytesRead;
// When we've read four bytes, we're done.
if (so.bytesRead == 4)
{
so.bytesTotal = (int)BitConverter.ToUInt32(so.buffer, 0);
}
ContinueReceiveMessage(so, onFrameReceived);
},
null
);
}
else
{
m_networkStream.BeginRead(
so.buffer,
so.bytesRead, so.bytesTotal - so.bytesRead,
(eSecond) =>
{
int bytesRead = m_networkStream.EndRead(eSecond);
so.bytesRead += bytesRead;
if (so.bytesRead == so.bytesTotal)
{
onFrameReceived(so.buffer);
BeginReceiveMessage(onFrameReceived);
}
else
{
ContinueReceiveMessage(so, onFrameReceived);
}
},
null
);
}
}
public void SendRawFrame(byte[] buffer, int offset, int size, Action onFrameSendComplete)
{
m_networkStream.BeginWrite(
buffer,
offset,
(int)size,
(s) => {
m_networkStream.EndWrite(s);
onFrameSendComplete();
},
m_networkStream
);
}
/// <summary>
/// State object for DSPEx asynchronous receiving.
/// </summary>
public class StateObject
{
/// <summary>
/// The total number of bytes which we have read so far
/// </summary>
public int bytesRead = 0;
/// <summary>
/// The total number of bytes in the DSP Message which we are reading
/// </summary>
public int bytesTotal = 0;
/// <summary>
/// The buffer which we are storing stuff in.
/// </summary>
public byte[] buffer; //Contains the entire message frame, including header
}
}
}