1 module des.util.socket; 2 3 public import std.socket; 4 import std.socketstream; 5 6 import des.util.pdata; 7 import des.util.helpers; 8 import des.util.emm; 9 10 void log(string file=__FILE__, size_t line=__LINE__, Args...)( Args args ) 11 { 12 std.stdio.stderr.writef( "%s:%d ", file, line ); 13 std.stdio.stderr.writeln( args ); 14 } 15 16 class SocketException: Exception 17 { 18 @safe pure nothrow this( string msg, string file=__FILE__, int line=__LINE__ ) 19 { super( msg, file, line ); } 20 } 21 22 interface DSocket 23 { 24 protected: 25 alias ptrdiff_t delegate( const (void)[], size_t bs ) sendFunc; 26 final void formSend( sendFunc func, in void[] data, int bs ) 27 { 28 func( [bs], int.sizeof ); 29 int data_length = cast(int)data.length; 30 func( [data_length], int.sizeof ); 31 32 void[] raw_data = data.dup; 33 raw_data.length += bs - raw_data.length % bs; 34 auto block_count = raw_data.length / bs; 35 foreach( i; 0 .. block_count ) 36 func( raw_data[i*bs .. (i+1)*bs], bs ); 37 } 38 39 alias ptrdiff_t delegate( void[] ) receiveFunc; 40 final void[] formReceive( receiveFunc func ) 41 { 42 int bs = -1; 43 int full_size = -1; 44 int data_size = -1; 45 46 void[] raw_data; 47 48 while( full_size != 0 ) 49 { 50 void[] buffer; 51 52 buffer.length = bs == -1 || full_size == -1 ? int.sizeof : bs; 53 54 auto receive = func( buffer ); 55 if( receive <= 0 ) 56 return []; 57 58 if( full_size == -1 ) 59 { 60 auto val = (cast(int[])(buffer))[0]; 61 if( bs == -1 ) 62 bs = val; 63 else 64 { 65 data_size = val; 66 full_size = data_size + bs - data_size % bs; 67 } 68 continue; 69 } 70 71 raw_data ~= buffer; 72 full_size -= bs; 73 } 74 return raw_data[ 0 .. data_size ].dup; 75 } 76 } 77 78 class SListener : DSocket, ExternalMemoryManager 79 { 80 mixin DirectEMM; 81 82 protected override void selfDestroy() 83 { server.close(); } 84 private: 85 Socket server; 86 Socket client; 87 alias void[] delegate( void[] ) callback; 88 callback cb; 89 90 void checkClient() 91 { 92 if( client is null ) 93 { 94 version(socketlog) log( "client is null" ); 95 auto set = new SocketSet; 96 set.add( server ); 97 if( Socket.select(set,null,null,dur!"msecs"(500) ) > 0 && set.isSet(server) ) 98 { 99 version(socketlog) log( "locking" ); 100 server.blocking(true); 101 client = server.accept(); 102 server.blocking(false); 103 } 104 } 105 } 106 int block_size = 16; 107 public: 108 this( Address addr ) 109 { 110 server = new TcpSocket(); 111 server.setOption( SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true ); 112 server.setOption( SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"usecs"(0) ); 113 server.bind( addr ); 114 server.listen(10); 115 client = null; 116 } 117 118 this( ushort port ){ this( new InternetAddress( port ) ); } 119 120 void setReceiveCB( callback _cb ){ cb = _cb; } 121 122 void step() 123 { 124 version(socketlog) log("step"); 125 checkClient(); 126 127 if( client is null ) 128 return; 129 version(socketlog) log(" client not null"); 130 131 132 auto set = new SocketSet; 133 set.add( client ); 134 if( Socket.select(set,null,null,dur!"msecs"(0) ) <= 0 || !set.isSet(client) ) return; 135 136 auto data = formReceive( &client.receive ); 137 if( data.length == 0 ) 138 { 139 client = null; 140 return; 141 } 142 version(socketlog) log(" data recived"); 143 if( cb !is null ) 144 { 145 auto send_data = cb( data ); 146 if( send_data.length != 0 ) 147 formSend( (const(void)[] dd, size_t block_size){return client.send(dd);}, send_data, block_size ); 148 } 149 } 150 } 151 152 class SSender : DSocket, ExternalMemoryManager 153 { 154 mixin DirectEMM; 155 156 protected override void selfDestroy() 157 { sender.close(); } 158 private: 159 Socket sender; 160 int bs = 128; 161 162 alias void delegate( void[] ) callback; 163 callback cb; 164 Address address; 165 SocketStream ss; 166 public: 167 this( Address addr ) 168 { 169 sender = new TcpSocket(); 170 address = addr; 171 sender.connect( address ); 172 ss = new SocketStream( sender ); 173 sender.blocking = false; 174 } 175 void setReceiveCB( callback _cb ){ cb = _cb; } 176 177 this( ushort port ) { this( new InternetAddress(port) ); } 178 179 void step() 180 { 181 auto data = formReceive(( void[] data ) 182 { 183 auto ptr = sender.receiveFrom(data, address); 184 return ptr; 185 }); 186 if( cb !is null ) 187 cb( data ); 188 } 189 190 void send( in ubyte[] data ) 191 { 192 formSend( (const (void)[] dd, size_t block_size ){ return cast(ptrdiff_t)(ss.writeBlock( cast(void*)dd.ptr, block_size )); }, data, bs ); 193 } 194 } 195 196 unittest 197 { 198 import std.random; 199 SListener ll = new SListener( 4040 ); 200 SSender ss = new SSender( 4040 ); 201 ubyte[] data; 202 data.length = 100; 203 foreach( ref d; data ) 204 d = cast(ubyte)uniform( 100, 255 ); 205 ubyte[] rdata; 206 rdata.length = 100; 207 208 auto cb = ( void[] data ) 209 { 210 void[] res; 211 rdata = cast(ubyte[])data.dup; 212 return res; 213 }; 214 ll.setReceiveCB( cb ); 215 ss.send( data ); 216 ll.step(); 217 assert( data == rdata ); 218 }