1 /+ 2 The MIT License (MIT) 3 4 Copyright (c) <2013> <Oleg Butko (deviator), Anton Akzhigitov (Akzwar)> 5 6 Permission is hereby granted, free of charge, to any person obtaining a copy 7 of this software and associated documentation files (the "Software"), to deal 8 in the Software without restriction, including without limitation the rights 9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 copies of the Software, and to permit persons to whom the Software is 11 furnished to do so, subject to the following conditions: 12 13 The above copyright notice and this permission notice shall be included in 14 all copies or substantial portions of the Software. 15 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 22 THE SOFTWARE. 23 +/ 24 25 module des.flow.thread; 26 27 import std.string; 28 29 import std.datetime; 30 import core.thread; 31 32 import des.util.emm; 33 34 import des.flow.base; 35 import des.flow.event; 36 import des.flow.element; 37 import des.flow.signal; 38 import des.flow.sync; 39 import des.flow.sysevdata; 40 41 class FThreadException : FlowException 42 { 43 @safe pure nothrow this( string msg, string file=__FILE__, size_t line=__LINE__ ) 44 { super( msg, file, line ); } 45 } 46 47 class FThread 48 { 49 protected: 50 Communication com; 51 Thread thread; 52 string self_name; 53 54 public: 55 enum State { NONE, PAUSE, WORK }; 56 enum Error { NONE, FTHREAD, FLOW, EXCEPT, FATAL }; 57 58 static struct Info 59 { 60 State state; 61 Error error; 62 string message; 63 ulong timestamp; 64 65 this( State state, Error error=Error.NONE, string msg="" ) 66 { 67 this.state = state; 68 this.error = error; 69 message = msg; 70 timestamp = currentTick; 71 } 72 73 enum ctor_text = ` this( in Info fts ) 74 { 75 state = fts.state; 76 error = fts.error; 77 message = fts.message; 78 timestamp = fts.timestamp; 79 } 80 `; 81 82 mixin( ctor_text ); 83 mixin( "const" ~ ctor_text ); 84 mixin( "immutable" ~ ctor_text ); 85 mixin( "shared" ~ ctor_text ); 86 mixin( "shared const" ~ ctor_text ); 87 } 88 89 this(Args...)( string name, WorkElement function(Args) func, Args args ) 90 in { assert( func !is null ); } body 91 { 92 thread = new Thread({ tmain( com, func, args ); }); 93 thread.name = name; 94 self_name = name; 95 com.initialize(); 96 thread.start(); 97 } 98 99 @property auto info() const { return Info(com.info.back); } 100 @property auto name() const { return self_name; } 101 102 void pushCommand( Command cmd ) { com.commands.pushBack( cmd ); } 103 void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); } 104 105 void join() { thread.join(); } 106 107 void addListener( FThread[] thrs... ) 108 { foreach( t; thrs ) com.listener.add( t.com.eventbus ); } 109 110 void delListener( FThread th ) { com.listener.del( th.com.eventbus ); } 111 } 112 113 version(none) 114 { 115 unittest 116 { 117 static class TestElement : WorkElement 118 { 119 this() { stderr.writeln( "init" ); } 120 override EventProcessor[] getEventProcessors() 121 { 122 return [ 123 new FunctionEventProcessor( (in Event ev) 124 { 125 if( ev.isSystem ) 126 stderr.writeln( "system event: ", 127 ((cast(Event)ev).data.as!SysEvData).msg ); 128 }) 129 ]; 130 } 131 override void process() { stderr.writeln( "process" ); } 132 protected void selfDestroy() { stderr.writeln( "destroy" ); } 133 } 134 135 auto fth = new FThread( "test", { return new TestElement; } ); 136 fth.pushCommand( Command.START ); 137 Thread.sleep(dur!"usecs"(20)); 138 fth.pushCommand( Command.PAUSE ); 139 Thread.sleep(dur!"msecs"(20)); 140 fth.pushCommand( Command.START ); 141 Thread.sleep(dur!"usecs"(20)); 142 fth.pushCommand( Command.REINIT ); 143 Thread.sleep(dur!"usecs"(20)); 144 fth.pushCommand( Command.START ); 145 Thread.sleep(dur!"usecs"(20)); 146 fth.pushCommand( Command.CLOSE ); 147 fth.join(); 148 } 149 } 150 151 private 152 { 153 void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args ) 154 { 155 auto wp = createProcessor( com, func, args ); 156 if( wp is null ) return; 157 scope(exit) fullTerminate(wp); 158 try while( wp.hasWork ) wp.process(); 159 catch( Throwable e ) com.info.pushBack( convertToErrorInfo(e) ); 160 } 161 162 auto createProcessor(Args...)( Communication com, WorkElement function(Args) func, Args args ) 163 { 164 WorkProcessor!Args ret; 165 try ret = new WorkProcessor!Args( com, func, args ); 166 catch( Throwable e ) com.info.pushBack( convertToErrorInfo(e) ); 167 return ret; 168 } 169 170 final class WorkProcessor(Args...) : SignalProcessor, EventProcessor 171 { 172 Args args; 173 WorkElement function(Args) func; 174 WorkElement elem; 175 EventProcessor[] evprocs; 176 177 Communication com; 178 179 bool work = false; 180 bool has_work = true; 181 182 this( Communication com, WorkElement function(Args) func, Args args ) 183 { 184 this.com = com; 185 this.func = func; 186 this.args = args; 187 188 init(); 189 } 190 191 @property bool hasWork() const { return has_work; } 192 193 void process() 194 { 195 if( work ) 196 { 197 foreach( e; com.eventbus.clearAndReturnAll() ) 198 transmitEvent( e ); 199 200 elem.process(); 201 } 202 203 foreach( scmd; com.commands.clearAndReturnAll() ) 204 { 205 auto cmd = cast(Command)scmd; 206 auto state = com.info.back.state; 207 208 final switch( cmd ) 209 { 210 case Command.START: start(); break; 211 case Command.PAUSE: pause(); break; 212 case Command.STOP: stop(); break; 213 case Command.REINIT: reinit(); break; 214 case Command.CLOSE: close(); break; 215 } 216 } 217 } 218 219 void transmitEvent( in Event event ) 220 { 221 foreach( ep; evprocs ) 222 ep.processEvent( event ); 223 } 224 225 // SignalProcessor 226 void processSignal( in Signal sig ) { com.signals.pushBack(sig); } 227 228 // EventProcessor 229 void processEvent( in Event ev ) { com.listener.pushBack(ev); } 230 231 void init() 232 { 233 elem = func(args); 234 235 if( elem is null ) 236 throw new FThreadException( "creation func return null" ); 237 238 elem.setSignalProcessor( this ); 239 elem.setEventListener( this ); 240 241 evprocs = elem.getEventProcessors(); 242 243 pause(); 244 } 245 246 void start() 247 { 248 pushInfo( FThread.State.WORK ); 249 transmitEvent( Event.system( SysEvData.work ) ); 250 work = true; 251 } 252 253 void pause() 254 { 255 pushInfo( FThread.State.PAUSE ); 256 transmitEvent( Event.system( SysEvData.pause ) ); 257 work = false; 258 } 259 260 void stop() 261 { 262 pushInfo( FThread.State.NONE ); 263 if( elem is null ) return; 264 pause(); 265 evprocs.length = 0; 266 elem.destroy(); 267 elem = null; 268 } 269 270 void reinit() 271 { 272 stop(); 273 init(); 274 } 275 276 void close() 277 { 278 stop(); 279 has_work = false; 280 } 281 282 void destroy() { stop(); } 283 284 void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); } 285 286 void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" ) 287 { com.info.pushBack( FThread.Info( state, error, msg ) ); } 288 } 289 290 void fullTerminate(T)( ref T obj ) 291 { 292 if( is( T : ExternalMemoryManager ) && obj !is null ) 293 obj.destroy(); 294 obj = null; 295 } 296 297 FThread.Info convertToErrorInfo( Throwable e ) 298 { 299 if( auto pe = cast(FThreadException)e ) 300 { 301 log_error( "EXCEPTION: fthread exc: %s", pe ); 302 return errorInfo( FThread.Error.FTHREAD, e.msg ); 303 } 304 else if( auto fe = cast(FlowException)e ) 305 { 306 log_error( "EXCEPTION: flow exc: %s", fe ); 307 return errorInfo( FThread.Error.FLOW, e.msg ); 308 } 309 else if( auto se = cast(Exception)e ) 310 { 311 log_error( "EXCEPTION: %s", se ); 312 return errorInfo( FThread.Error.EXCEPT, e.msg ); 313 } 314 else if( auto te = cast(Throwable)e ) 315 { 316 log_error( "FATAL: %s", te ); 317 return errorInfo( FThread.Error.FATAL, e.msg ); 318 } 319 assert(0,"unknown exception"); 320 } 321 322 FThread.Info errorInfo( FThread.Error error, string msg ) 323 { return FThread.Info( FThread.State.NONE, error, msg ); } 324 }