1 /+ 2 The MIT License (MIT) 3 4 Copyright (c) <2013> <Oleg Butko (deviator), Anton Akzhigitov (Akzwar)> 5 6 Permission is hereby granted, free of charge, to any person obtaining a copy 7 of this software and associated documentation files (the "Software"), to deal 8 in the Software without restriction, including without limitation the rights 9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 copies of the Software, and to permit persons to whom the Software is 11 furnished to do so, subject to the following conditions: 12 13 The above copyright notice and this permission notice shall be included in 14 all copies or substantial portions of the Software. 15 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 22 THE SOFTWARE. 23 +/ 24 25 module des.flow.thread; 26 27 import std.string; 28 29 import std.algorithm; 30 import std.array; 31 32 import std.datetime; 33 import core.thread; 34 35 import des.util.arch.emm; 36 import des.util.logsys; 37 38 import des.flow.base; 39 import des.flow.event; 40 import des.flow.element; 41 import des.flow.signal; 42 import des.flow.sync; 43 import des.flow.sysevdata; 44 45 /// 46 class FThreadException : FlowException 47 { 48 /// 49 this( string msg, string file=__FILE__, size_t line=__LINE__ ) @safe pure nothrow 50 { super( msg, file, line ); } 51 } 52 53 /// Thread wrap 54 class FThread 55 { 56 protected: 57 58 /// 59 Communication com; 60 61 /// core.thread.Thread 62 Thread thread; 63 64 /// 65 string self_name; 66 67 public: 68 69 /// 70 enum State 71 { 72 NONE, /// not inited 73 PAUSE, /// inited, not worked 74 WORK /// inited, worked 75 }; 76 77 /// addational info part 78 enum Error 79 { 80 NONE, /// 81 FTHREAD, /// 82 FLOW, /// 83 EXCEPT, /// 84 FATAL /// unrecoverable error 85 }; 86 87 /// 88 static struct Info 89 { 90 /// 91 State state; 92 /// 93 Error error; 94 /// 95 string message; 96 /// 97 ulong timestamp; 98 99 /// 100 this( State state, Error error=Error.NONE, string msg="" ) 101 { 102 this.state = state; 103 this.error = error; 104 message = msg; 105 timestamp = currentTick; 106 } 107 108 private enum ctor_text = ` this( in Info fts ) 109 { 110 state = fts.state; 111 error = fts.error; 112 message = fts.message; 113 timestamp = fts.timestamp; 114 } 115 `; 116 117 mixin( ctor_text ); 118 mixin( "const" ~ ctor_text ); 119 mixin( "immutable" ~ ctor_text ); 120 mixin( "shared" ~ ctor_text ); 121 mixin( "shared const" ~ ctor_text ); 122 } 123 124 /++ 125 params: 126 name = name of thread 127 func = work element creation function 128 args = args for function 129 +/ 130 this(Args...)( string name, WorkElement function(Args) func, Args args ) 131 in { assert( func !is null ); } body 132 { 133 thread = new Thread({ tmain( com, func, args ); }); 134 thread.name = name; 135 self_name = name; 136 com.initialize(); 137 thread.start(); 138 debug logger.Debug( "name: '%s'", name ); 139 } 140 141 @property 142 { 143 /++ getting last information of thread 144 returns: 145 `Info` 146 +/ 147 auto info() const { return Info(com.info.back); } 148 149 /// getting name of fthread 150 auto name() const { return self_name; } 151 } 152 153 /// take all signals from work element 154 auto takeAllSignals() 155 { return com.signals.clearAndReturnAll(); } 156 157 /// push command for changing state of thread 158 void pushCommand( Command cmd ) 159 { 160 com.commands.pushBack( cmd ); 161 debug logger.trace( "thread: '%s', command: '%s'", name, cmd ); 162 } 163 164 /// push event for processing in work element 165 void pushEvent( in Event ev ) 166 { 167 com.eventbus.pushBack( ev ); 168 debug logger.trace( "thread: '%s', event code: %d", name, ev.code ); 169 } 170 171 /// core.thread.Thread.join 172 void join() { thread.join(); } 173 174 /// add listener FThread, listeners gets events from work element 175 void addListener( FThread[] thrs... ) 176 { 177 foreach( t; thrs ) 178 com.listener.add( t.com.eventbus ); 179 debug logger.Debug( "thread: '%s', listeners: %s", name, array(map!(a=>a.name)(thrs)) ); 180 } 181 182 /// delete listener 183 void delListener( FThread th ) 184 { 185 com.listener.del( th.com.eventbus ); 186 debug logger.Debug( "thread: '%s', listener: %s", name, th.name ); 187 } 188 } 189 190 version(none) 191 { 192 unittest 193 { 194 static class TestElement : WorkElement 195 { 196 this() { stderr.writeln( "init" ); } 197 override EventProcessor[] getEventProcessors() 198 { 199 return [ 200 new FunctionEventProcessor( (in Event ev) 201 { 202 if( ev.isSystem ) 203 stderr.writeln( "system event: ", 204 ((cast(Event)ev).data.as!SysEvData).msg ); 205 }) 206 ]; 207 } 208 override void process() { stderr.writeln( "process" ); } 209 protected void selfDestroy() { stderr.writeln( "destroy" ); } 210 } 211 212 auto fth = new FThread( "test", { return new TestElement; } ); 213 fth.pushCommand( Command.START ); 214 Thread.sleep(dur!"usecs"(20)); 215 fth.pushCommand( Command.PAUSE ); 216 Thread.sleep(dur!"msecs"(20)); 217 fth.pushCommand( Command.START ); 218 Thread.sleep(dur!"usecs"(20)); 219 fth.pushCommand( Command.REINIT ); 220 Thread.sleep(dur!"usecs"(20)); 221 fth.pushCommand( Command.START ); 222 Thread.sleep(dur!"usecs"(20)); 223 fth.pushCommand( Command.CLOSE ); 224 fth.join(); 225 } 226 } 227 228 private 229 { 230 void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args ) 231 { 232 233 Logger logger = new InstanceLogger( __MODULE__ ~ ".WorkProcessor", Thread.getThis().name ); 234 auto wp = createProcessor( com, logger, func, args ); 235 if( wp is null ) return; 236 scope(exit) fullTerminate(wp); 237 try while( wp.hasWork ) wp.process(); 238 catch( Throwable e ) com.info.pushBack( convertToErrorInfo(logger,e) ); 239 } 240 241 auto createProcessor(Args...)( Communication com, Logger logger, WorkElement function(Args) func, Args args ) 242 { 243 try return new WorkProcessor!Args( com, logger, func, args ); 244 catch( Throwable e ) 245 { 246 com.info.pushBack( convertToErrorInfo(logger,e) ); 247 return null; 248 } 249 } 250 251 final class WorkProcessor(Args...) : ExternalMemoryManager, SignalProcessor, EventProcessor 252 { 253 mixin EMM; 254 255 Args args; 256 WorkElement function(Args) func; 257 WorkElement elem; 258 EventProcessor[] evprocs; 259 260 Communication com; 261 262 bool work = false; 263 bool has_work = true; 264 265 Logger logger; 266 267 this( Communication com, Logger logger, WorkElement function(Args) func, Args args ) 268 { 269 this.com = com; 270 this.func = func; 271 this.args = args; 272 273 this.logger = logger; 274 275 init(); 276 debug logger.Debug( "pass" ); 277 } 278 279 @property bool hasWork() const { return has_work; } 280 281 void process() 282 { 283 debug logger.trace( "start process" ); 284 if( work ) 285 { 286 foreach( e; com.eventbus.clearAndReturnAll() ) 287 transmitEvent( e ); 288 289 elem.process(); 290 291 debug logger.trace( "events processed" ); 292 } 293 294 foreach( scmd; com.commands.clearAndReturnAll() ) 295 { 296 auto cmd = cast(Command)scmd; 297 298 debug logger.trace( "process command '%s'", cmd ); 299 final switch( cmd ) 300 { 301 case Command.START: start(); break; 302 case Command.PAUSE: pause(); break; 303 case Command.STOP: stop(); break; 304 case Command.REINIT: reinit(); break; 305 case Command.CLOSE: close(); break; 306 } 307 debug logger.trace( "process command '%s' pass", cmd ); 308 } 309 debug logger.trace( "process pass" ); 310 } 311 312 void transmitEvent( in Event event ) 313 { 314 foreach( ep; evprocs ) 315 ep.processEvent( event ); 316 } 317 318 // SignalProcessor 319 void processSignal( in Signal sig ) { com.signals.pushBack(sig); } 320 321 // EventProcessor 322 void processEvent( in Event ev ) { com.listener.pushBack(ev); } 323 324 void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); } 325 326 void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" ) 327 { com.info.pushBack( FThread.Info( state, error, msg ) ); } 328 329 void init() 330 { 331 debug logger.Debug( "start" ); 332 elem = func(args); 333 334 if( elem is null ) 335 throw new FThreadException( "creation func return null" ); 336 337 elem.setSignalProcessor( this ); 338 elem.setEventListener( this ); 339 340 evprocs = elem.getEventProcessors(); 341 342 pause(); 343 debug logger.Debug( "pass" ); 344 } 345 346 void start() 347 { 348 pushInfo( FThread.State.WORK ); 349 transmitEvent( Event.system( SysEvData.work ) ); 350 work = true; 351 debug logger.Debug( "pass" ); 352 } 353 354 void pause() 355 { 356 pushInfo( FThread.State.PAUSE ); 357 transmitEvent( Event.system( SysEvData.pause ) ); 358 work = false; 359 debug logger.Debug( "pass" ); 360 } 361 362 void stop() 363 { 364 pushInfo( FThread.State.NONE ); 365 if( elem is null ) return; 366 pause(); 367 evprocs.length = 0; 368 elem.destroy(); 369 elem = null; 370 debug logger.Debug( "pass" ); 371 } 372 373 void reinit() 374 { 375 stop(); 376 init(); 377 } 378 379 void close() 380 { 381 stop(); 382 has_work = false; 383 debug logger.Debug( "pass" ); 384 } 385 386 protected: 387 388 void selfDestroy() 389 { 390 stop(); 391 debug logger.Debug( "pass" ); 392 } 393 } 394 395 void fullTerminate(T)( ref T obj ) 396 { 397 if( is( T : ExternalMemoryManager ) && obj !is null ) 398 obj.destroy(); 399 obj = null; 400 } 401 402 FThread.Info convertToErrorInfo( Logger logger, Throwable e ) 403 in{ assert( logger !is null ); } body 404 { 405 if( auto pe = cast(FThreadException)e ) 406 { 407 logger.error( "EXCEPTION: fthread exc: %s", pe ); 408 return errorInfo( FThread.Error.FTHREAD, e.msg ); 409 } 410 else if( auto fe = cast(FlowException)e ) 411 { 412 logger.error( "EXCEPTION: flow exc: %s", fe ); 413 return errorInfo( FThread.Error.FLOW, e.msg ); 414 } 415 else if( auto se = cast(Exception)e ) 416 { 417 logger.error( "EXCEPTION: %s", se ); 418 return errorInfo( FThread.Error.EXCEPT, e.msg ); 419 } 420 else if( auto te = cast(Throwable)e ) 421 { 422 logger.error( "FATAL: %s", te ); 423 return errorInfo( FThread.Error.FATAL, e.msg ); 424 } 425 assert(0,"unknown exception"); 426 } 427 428 FThread.Info errorInfo( FThread.Error error, string msg ) 429 { return FThread.Info( FThread.State.NONE, error, msg ); } 430 }