1 module des.util.socket;
2 
3 public import std.socket;
4 import std.socketstream;
5 
6 import des.util.pdata;
7 import des.util.helpers;
8 import des.util.emm;
9 
10 void log(string file=__FILE__, size_t line=__LINE__, Args...)( Args args )
11 { 
12     std.stdio.stderr.writef( "%s:%d ", file, line );
13     std.stdio.stderr.writeln( args );
14 }
15 
16 class SocketException: Exception
17 { 
18     @safe pure nothrow this( string msg, string file=__FILE__, int line=__LINE__ ) 
19     { super( msg, file, line ); } 
20 }
21 
22 interface DSocket
23 {
24 protected:
25     alias ptrdiff_t delegate( const (void)[], size_t bs ) sendFunc;
26     final void formSend( sendFunc func, in void[] data, int bs )
27     {
28         func( [bs], int.sizeof ); 
29         int data_length = cast(int)data.length;
30         func( [data_length], int.sizeof );
31 
32         void[] raw_data = data.dup;
33         raw_data.length += bs - raw_data.length % bs;
34         auto block_count = raw_data.length / bs;
35         foreach( i; 0 .. block_count )
36             func( raw_data[i*bs .. (i+1)*bs], bs );
37     }
38 
39     alias ptrdiff_t delegate( void[] ) receiveFunc;
40     final void[] formReceive( receiveFunc func )
41     {
42         int bs = -1;
43         int full_size = -1;
44         int data_size = -1;
45 
46         void[] raw_data;
47 
48         while( full_size != 0 )
49         {
50             void[] buffer;
51 
52             buffer.length = bs == -1 || full_size == -1 ? int.sizeof : bs;
53 
54             auto receive = func( buffer );
55             if( receive <= 0 )
56                 return [];
57 
58             if( full_size == -1 )
59             {
60                 auto val = (cast(int[])(buffer))[0];
61                 if( bs == -1 )
62                     bs = val;
63                 else
64                 {
65                     data_size = val;
66                     full_size = data_size + bs - data_size % bs;
67                 }
68                 continue;
69             }
70 
71             raw_data ~= buffer;
72             full_size -= bs;
73         }
74         return raw_data[ 0 .. data_size ].dup;
75     }
76 }
77 
78 class SListener : DSocket, ExternalMemoryManager
79 {
80     mixin DirectEMM;
81 
82 protected override void selfDestroy()
83 { server.close(); }
84 private:
85     Socket server;
86     Socket client;
87     alias void[] delegate( void[] ) callback;
88     callback cb;
89 
90     void checkClient()
91     {
92         if( client is null )
93         {
94             version(socketlog) log( "client is null" );
95             auto set = new SocketSet;
96             set.add( server );
97             if( Socket.select(set,null,null,dur!"msecs"(500) ) > 0 && set.isSet(server) )
98             {
99                 version(socketlog) log( "locking" );
100                 server.blocking(true);
101                 client = server.accept();
102                 server.blocking(false);
103             }
104         }
105     }
106     int block_size = 16;
107 public:
108     this( Address addr )
109     {
110         server = new TcpSocket();
111         server.setOption( SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true );
112         server.setOption( SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"usecs"(0) );
113         server.bind( addr );
114         server.listen(10);
115         client = null;
116     }
117 
118     this( ushort port ){ this( new InternetAddress( port ) ); }
119 
120     void setReceiveCB( callback _cb ){ cb = _cb; }
121 
122     void step()
123     {
124         version(socketlog) log("step");
125         checkClient();
126 
127         if( client is null )
128             return;
129         version(socketlog) log("   client not null");
130 
131 
132         auto set = new SocketSet;
133         set.add( client );
134         if( Socket.select(set,null,null,dur!"msecs"(0) ) <= 0 || !set.isSet(client) ) return;
135 
136         auto data = formReceive( &client.receive );
137         if( data.length == 0 )
138         {
139             client = null;
140             return;
141         }
142         version(socketlog) log("   data recived");
143         if( cb !is null )
144         {
145             auto send_data = cb( data );
146             if( send_data.length != 0 )
147                 formSend( (const(void)[] dd, size_t block_size){return client.send(dd);}, send_data, block_size );
148         }
149     }
150 }
151 
152 class SSender : DSocket, ExternalMemoryManager
153 {
154     mixin DirectEMM;
155 
156 protected override void selfDestroy()
157 { sender.close(); }
158 private:
159     Socket sender;
160     int bs = 128;
161 
162     alias void delegate( void[] ) callback;
163     callback cb;
164     Address address;
165     SocketStream ss;
166 public:
167     this( Address addr )
168     {
169         sender = new TcpSocket();
170         address = addr;
171         sender.connect( address );
172         ss = new SocketStream( sender );
173         sender.blocking = false;
174     }
175     void setReceiveCB( callback _cb ){ cb = _cb; }
176 
177     this( ushort port ) { this( new InternetAddress(port) ); }
178 
179     void step()
180     {
181         auto data = formReceive(( void[] data ) 
182         { 
183             auto ptr = sender.receiveFrom(data, address);
184             return ptr; 
185         });
186         if( cb !is null )
187             cb( data );
188     }
189 
190     void send( in ubyte[] data )
191     {
192         formSend( (const (void)[] dd, size_t block_size ){ return cast(ptrdiff_t)(ss.writeBlock( cast(void*)dd.ptr, block_size )); }, data, bs );
193     }
194 }
195 
196 unittest
197 {
198     import std.random;
199     SListener ll = new SListener( 4040 );
200     SSender ss = new SSender( 4040 );
201     ubyte[] data;
202     data.length = 100;
203     foreach( ref d; data )
204         d = cast(ubyte)uniform( 100, 255 );
205     ubyte[] rdata;
206     rdata.length = 100;
207 
208     auto cb = ( void[] data )
209     {
210         void[] res;
211         rdata = cast(ubyte[])data.dup;
212         return res;
213     };
214     ll.setReceiveCB( cb );
215     ss.send( data );
216     ll.step();
217     assert( data == rdata );
218 }