1 module des.flow.thread; 2 3 import std..string; 4 5 import std.algorithm; 6 import std.array; 7 8 import std.datetime; 9 import core.thread; 10 11 import des.util.arch.emm; 12 import des.util.logsys; 13 14 import des.flow.base; 15 import des.flow.event; 16 import des.flow.element; 17 import des.flow.signal; 18 import des.flow.sync; 19 import des.flow.sysevdata; 20 21 /// 22 class FThreadException : FlowException 23 { 24 /// 25 this( string msg, string file=__FILE__, size_t line=__LINE__ ) @safe pure nothrow 26 { super( msg, file, line ); } 27 } 28 29 /// Thread wrap 30 class FThread 31 { 32 protected: 33 34 /// 35 Communication com; 36 37 /// core.thread.Thread 38 Thread thread; 39 40 /// 41 string self_name; 42 43 public: 44 45 /// 46 enum State 47 { 48 NONE, /// not inited 49 PAUSE, /// inited, not worked 50 WORK /// inited, worked 51 }; 52 53 /// addational info part 54 enum Error 55 { 56 NONE, /// 57 FTHREAD, /// 58 FLOW, /// 59 EXCEPT, /// 60 FATAL /// unrecoverable error 61 }; 62 63 /// 64 static struct Info 65 { 66 /// 67 State state; 68 /// 69 Error error; 70 /// 71 string message; 72 /// 73 ulong timestamp; 74 75 /// 76 this( State state, Error error=Error.NONE, string msg="" ) 77 { 78 this.state = state; 79 this.error = error; 80 message = msg; 81 timestamp = currentTick; 82 } 83 84 private enum ctor_text = ` this( in Info fts ) 85 { 86 state = fts.state; 87 error = fts.error; 88 message = fts.message; 89 timestamp = fts.timestamp; 90 } 91 `; 92 93 mixin( ctor_text ); 94 mixin( "const" ~ ctor_text ); 95 mixin( "immutable" ~ ctor_text ); 96 mixin( "shared" ~ ctor_text ); 97 mixin( "shared const" ~ ctor_text ); 98 } 99 100 /++ 101 params: 102 name = name of thread 103 func = work element creation function 104 args = args for function 105 +/ 106 this(Args...)( string name, WorkElement function(Args) func, Args args ) 107 in { assert( func !is null ); } body 108 { 109 thread = new Thread({ tmain( com, func, args ); }); 110 thread.name = name; 111 self_name = name; 112 com.initialize(); 113 thread.start(); 114 debug logger.Debug( "name: '%s'", name ); 115 } 116 117 @property 118 { 119 /++ getting last information of thread 120 returns: 121 `Info` 122 +/ 123 auto info() const { return Info(com.info.back); } 124 125 /// getting name of fthread 126 auto name() const { return self_name; } 127 } 128 129 /// take all signals from work element 130 auto takeAllSignals() 131 { return com.signals.clearAndReturnAll(); } 132 133 /// push command for changing state of thread 134 void pushCommand( Command cmd ) 135 { 136 com.commands.pushBack( cmd ); 137 debug logger.trace( "thread: '%s', command: '%s'", name, cmd ); 138 } 139 140 /// push event for processing in work element 141 void pushEvent( in Event ev ) 142 { 143 com.eventbus.pushBack( ev ); 144 debug logger.trace( "thread: '%s', event code: %d", name, ev.code ); 145 } 146 147 /// core.thread.Thread.join 148 void join() { thread.join(); } 149 150 /// add listener FThread, listeners gets events from work element 151 void addListener( FThread[] thrs... ) 152 { 153 foreach( t; thrs ) 154 com.listener.add( t.com.eventbus ); 155 debug logger.Debug( "thread: '%s', listeners: %s", name, array(map!(a=>a.name)(thrs)) ); 156 } 157 158 /// delete listener 159 void delListener( FThread th ) 160 { 161 com.listener.del( th.com.eventbus ); 162 debug logger.Debug( "thread: '%s', listener: %s", name, th.name ); 163 } 164 } 165 166 version(none) 167 { 168 unittest 169 { 170 static class TestElement : WorkElement 171 { 172 this() { stderr.writeln( "init" ); } 173 override EventProcessor[] getEventProcessors() 174 { 175 return [ 176 new FunctionEventProcessor( (in Event ev) 177 { 178 if( ev.isSystem ) 179 stderr.writeln( "system event: ", 180 ((cast(Event)ev).data.as!SysEvData).msg ); 181 }) 182 ]; 183 } 184 override void process() { stderr.writeln( "process" ); } 185 protected void selfDestroy() { stderr.writeln( "destroy" ); } 186 } 187 188 auto fth = new FThread( "test", { return new TestElement; } ); 189 fth.pushCommand( Command.START ); 190 Thread.sleep(dur!"usecs"(20)); 191 fth.pushCommand( Command.PAUSE ); 192 Thread.sleep(dur!"msecs"(20)); 193 fth.pushCommand( Command.START ); 194 Thread.sleep(dur!"usecs"(20)); 195 fth.pushCommand( Command.REINIT ); 196 Thread.sleep(dur!"usecs"(20)); 197 fth.pushCommand( Command.START ); 198 Thread.sleep(dur!"usecs"(20)); 199 fth.pushCommand( Command.CLOSE ); 200 fth.join(); 201 } 202 } 203 204 private 205 { 206 void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args ) 207 { 208 209 Logger logger = new InstanceLogger( __MODULE__ ~ ".WorkProcessor", Thread.getThis().name ); 210 auto wp = createProcessor( com, logger, func, args ); 211 if( wp is null ) return; 212 scope(exit) fullTerminate(wp); 213 try while( wp.hasWork ) wp.process(); 214 catch( Throwable e ) com.info.pushBack( convertToErrorInfo(logger,e) ); 215 } 216 217 auto createProcessor(Args...)( Communication com, Logger logger, WorkElement function(Args) func, Args args ) 218 { 219 try return new WorkProcessor!Args( com, logger, func, args ); 220 catch( Throwable e ) 221 { 222 com.info.pushBack( convertToErrorInfo(logger,e) ); 223 return null; 224 } 225 } 226 227 final class WorkProcessor(Args...) : ExternalMemoryManager, SignalProcessor, EventProcessor 228 { 229 mixin EMM; 230 231 Args args; 232 WorkElement function(Args) func; 233 WorkElement elem; 234 EventProcessor[] evprocs; 235 236 Communication com; 237 238 bool work = false; 239 bool has_work = true; 240 241 Logger logger; 242 243 this( Communication com, Logger logger, WorkElement function(Args) func, Args args ) 244 { 245 this.com = com; 246 this.func = func; 247 this.args = args; 248 249 this.logger = logger; 250 251 init(); 252 debug logger.Debug( "pass" ); 253 } 254 255 @property bool hasWork() const { return has_work; } 256 257 void process() 258 { 259 debug logger.trace( "start process" ); 260 if( work ) 261 { 262 foreach( e; com.eventbus.clearAndReturnAll() ) 263 transmitEvent( e ); 264 265 elem.process(); 266 267 debug logger.trace( "events processed" ); 268 } 269 270 foreach( scmd; com.commands.clearAndReturnAll() ) 271 { 272 auto cmd = cast(Command)scmd; 273 274 debug logger.trace( "process command '%s'", cmd ); 275 final switch( cmd ) 276 { 277 case Command.START: start(); break; 278 case Command.PAUSE: pause(); break; 279 case Command.STOP: stop(); break; 280 case Command.REINIT: reinit(); break; 281 case Command.CLOSE: close(); break; 282 } 283 debug logger.trace( "process command '%s' pass", cmd ); 284 } 285 debug logger.trace( "process pass" ); 286 } 287 288 void transmitEvent( in Event event ) 289 { 290 foreach( ep; evprocs ) 291 ep.processEvent( event ); 292 } 293 294 // SignalProcessor 295 void processSignal( in Signal sig ) { com.signals.pushBack(sig); } 296 297 // EventProcessor 298 void processEvent( in Event ev ) { com.listener.pushBack(ev); } 299 300 void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); } 301 302 void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" ) 303 { com.info.pushBack( FThread.Info( state, error, msg ) ); } 304 305 void init() 306 { 307 debug logger.Debug( "start" ); 308 elem = func(args); 309 310 if( elem is null ) 311 throw new FThreadException( "creation func return null" ); 312 313 elem.setSignalProcessor( this ); 314 elem.setEventListener( this ); 315 316 evprocs = elem.getEventProcessors(); 317 318 pause(); 319 debug logger.Debug( "pass" ); 320 } 321 322 void start() 323 { 324 pushInfo( FThread.State.WORK ); 325 transmitEvent( Event.system( SysEvData.work ) ); 326 work = true; 327 debug logger.Debug( "pass" ); 328 } 329 330 void pause() 331 { 332 pushInfo( FThread.State.PAUSE ); 333 transmitEvent( Event.system( SysEvData.pause ) ); 334 work = false; 335 debug logger.Debug( "pass" ); 336 } 337 338 void stop() 339 { 340 pushInfo( FThread.State.NONE ); 341 if( elem is null ) return; 342 pause(); 343 evprocs.length = 0; 344 elem.destroy(); 345 elem = null; 346 debug logger.Debug( "pass" ); 347 } 348 349 void reinit() 350 { 351 stop(); 352 init(); 353 } 354 355 void close() 356 { 357 stop(); 358 has_work = false; 359 debug logger.Debug( "pass" ); 360 } 361 362 protected: 363 364 void selfDestroy() 365 { 366 stop(); 367 debug logger.Debug( "pass" ); 368 } 369 } 370 371 void fullTerminate(T)( ref T obj ) 372 { 373 if( is( T : ExternalMemoryManager ) && obj !is null ) 374 obj.destroy(); 375 obj = null; 376 } 377 378 FThread.Info convertToErrorInfo( Logger logger, Throwable e ) 379 in{ assert( logger !is null ); } body 380 { 381 if( auto pe = cast(FThreadException)e ) 382 { 383 logger.error( "EXCEPTION: fthread exc: %s", pe ); 384 return errorInfo( FThread.Error.FTHREAD, e.msg ); 385 } 386 else if( auto fe = cast(FlowException)e ) 387 { 388 logger.error( "EXCEPTION: flow exc: %s", fe ); 389 return errorInfo( FThread.Error.FLOW, e.msg ); 390 } 391 else if( auto se = cast(Exception)e ) 392 { 393 logger.error( "EXCEPTION: %s", se ); 394 return errorInfo( FThread.Error.EXCEPT, e.msg ); 395 } 396 else if( auto te = cast(Throwable)e ) 397 { 398 logger.error( "FATAL: %s", te ); 399 return errorInfo( FThread.Error.FATAL, e.msg ); 400 } 401 assert(0,"unknown exception"); 402 } 403 404 FThread.Info errorInfo( FThread.Error error, string msg ) 405 { return FThread.Info( FThread.State.NONE, error, msg ); } 406 }