1 module des.util.socket; 2 3 public import std.socket; 4 import std.socketstream; 5 6 import des.util.data.pdata; 7 import des.util.helpers; 8 import des.util.arch.emm; 9 import des.util.logsys; 10 11 class SocketException: Exception 12 { 13 @safe pure nothrow this( string msg, string file=__FILE__, int line=__LINE__ ) 14 { super( msg, file, line ); } 15 } 16 17 interface DSocket 18 { 19 protected: 20 alias ptrdiff_t delegate( const (void)[], size_t bs ) sendFunc; 21 final void formSend( sendFunc func, in void[] data, int bs ) 22 { 23 func( [bs], int.sizeof ); 24 int data_length = cast(int)data.length; 25 func( [data_length], int.sizeof ); 26 27 void[] raw_data = data.dup; 28 raw_data.length += bs - raw_data.length % bs; 29 auto block_count = raw_data.length / bs; 30 foreach( i; 0 .. block_count ) 31 func( raw_data[i*bs .. (i+1)*bs], bs ); 32 } 33 34 alias ptrdiff_t delegate( void[] ) receiveFunc; 35 final void[] formReceive( receiveFunc func ) 36 { 37 int bs = -1; 38 int full_size = -1; 39 int data_size = -1; 40 41 void[] raw_data; 42 43 while( full_size != 0 ) 44 { 45 void[] buffer; 46 47 buffer.length = bs == -1 || full_size == -1 ? int.sizeof : bs; 48 49 auto receive = func( buffer ); 50 if( receive <= 0 ) 51 return []; 52 53 if( full_size == -1 ) 54 { 55 auto val = (cast(int[])(buffer))[0]; 56 if( bs == -1 ) 57 bs = val; 58 else 59 { 60 data_size = val; 61 full_size = data_size + bs - data_size % bs; 62 } 63 continue; 64 } 65 66 raw_data ~= buffer; 67 full_size -= bs; 68 } 69 return raw_data[ 0 .. data_size ].dup; 70 } 71 } 72 73 class SListener : DSocket, ExternalMemoryManager 74 { 75 mixin EMM; 76 77 protected override void selfDestroy() 78 { 79 server.shutdown( SocketShutdown.BOTH ); 80 server.close(); 81 } 82 83 private: 84 Socket server; 85 Socket client; 86 alias void[] delegate( void[] ) callback; 87 callback cb; 88 89 void checkClient() 90 { 91 if( client is null ) 92 { 93 logger.Debug( "client is null" ); 94 auto set = new SocketSet; 95 set.add( server ); 96 if( Socket.select(set,null,null,dur!"msecs"(500) ) > 0 && set.isSet(server) ) 97 { 98 logger.Debug( "locking" ); 99 server.blocking(true); 100 client = server.accept(); 101 server.blocking(false); 102 } 103 } 104 } 105 int block_size = 16; 106 public: 107 this( Address addr ) 108 { 109 server = new TcpSocket(); 110 server.setOption( SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true ); 111 server.setOption( SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"usecs"(0) ); 112 server.bind( addr ); 113 server.listen(10); 114 client = null; 115 } 116 117 this( ushort port ){ this( new InternetAddress( port ) ); } 118 119 void setReceiveCB( callback _cb ){ cb = _cb; } 120 121 void step() 122 { 123 logger.Debug("step"); 124 checkClient(); 125 126 if( client is null ) 127 return; 128 logger.Debug(" client not null"); 129 130 131 auto set = new SocketSet; 132 set.add( client ); 133 if( Socket.select(set,null,null,dur!"msecs"(0) ) <= 0 || !set.isSet(client) ) return; 134 135 auto data = formReceive( &client.receive ); 136 if( data.length == 0 ) 137 { 138 client = null; 139 return; 140 } 141 logger.Debug(" data recived"); 142 if( cb !is null ) 143 { 144 auto send_data = cb( data ); 145 if( send_data.length != 0 ) 146 formSend( (const(void)[] dd, size_t block_size){return client.send(dd);}, send_data, block_size ); 147 } 148 } 149 } 150 151 class SSender : DSocket, ExternalMemoryManager 152 { 153 mixin EMM; 154 155 private: 156 Socket sender; 157 int bs = 128; 158 159 alias void delegate( void[] ) callback; 160 callback cb; 161 Address address; 162 SocketStream ss; 163 public: 164 this( Address addr ) 165 { 166 sender = new TcpSocket(); 167 address = addr; 168 sender.connect( address ); 169 ss = new SocketStream( sender ); 170 sender.blocking = false; 171 } 172 void setReceiveCB( callback _cb ){ cb = _cb; } 173 174 this( ushort port ) { this( new InternetAddress(port) ); } 175 176 void step() 177 { 178 auto data = formReceive(( void[] data ) 179 { 180 auto ptr = sender.receiveFrom(data, address); 181 return ptr; 182 }); 183 if( cb !is null ) 184 cb( data ); 185 } 186 187 void send( in void[] data ) 188 { 189 formSend( (const (void)[] dd, size_t block_size ){ return cast(ptrdiff_t)(ss.writeBlock( cast(void*)dd.ptr, block_size )); }, data, bs ); 190 } 191 192 protected: 193 194 override void selfDestroy() 195 { 196 sender.shutdown( SocketShutdown.BOTH ); 197 sender.close(); 198 } 199 } 200 201 unittest 202 { 203 import std.random; 204 SListener ll = new SListener( 12345 ); 205 SSender ss = new SSender( 12345 ); 206 ubyte[] data; 207 data.length = 100; 208 foreach( ref d; data ) 209 d = cast(ubyte)uniform( 100, 255 ); 210 ubyte[] rdata; 211 rdata.length = 100; 212 213 auto cb = ( void[] data ) 214 { 215 void[] res; 216 rdata = cast(ubyte[])data.dup; 217 return res; 218 }; 219 ll.setReceiveCB( cb ); 220 ss.send( data ); 221 ll.step(); 222 assert( data == rdata ); 223 }