1 /+
2 The MIT License (MIT)
3 
4     Copyright (c) <2013> <Oleg Butko (deviator), Anton Akzhigitov (Akzwar)>
5 
6     Permission is hereby granted, free of charge, to any person obtaining a copy
7     of this software and associated documentation files (the "Software"), to deal
8     in the Software without restriction, including without limitation the rights
9     to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10     copies of the Software, and to permit persons to whom the Software is
11     furnished to do so, subject to the following conditions:
12 
13     The above copyright notice and this permission notice shall be included in
14     all copies or substantial portions of the Software.
15 
16     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21     OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22     THE SOFTWARE.
23 +/
24 
25 module des.flow.thread;
26 
27 import std.string;
28 
29 import std.algorithm;
30 import std.array;
31 
32 import std.datetime;
33 import core.thread;
34 
35 import des.util.arch.emm;
36 import des.util.logsys;
37 
38 import des.flow.base;
39 import des.flow.event;
40 import des.flow.element;
41 import des.flow.signal;
42 import des.flow.sync;
43 import des.flow.sysevdata;
44 
45 ///
46 class FThreadException : FlowException
47 {
48     ///
49     this( string msg, string file=__FILE__, size_t line=__LINE__ ) @safe pure nothrow
50     { super( msg, file, line ); }
51 }
52 
53 /// Thread wrap
54 class FThread
55 {
56 protected:
57 
58     ///
59     Communication com;
60 
61     /// core.thread.Thread
62     Thread thread;
63 
64     ///
65     string self_name;
66 
67 public:
68 
69     ///
70     enum State
71     {
72         NONE,  /// not inited
73         PAUSE, /// inited, not worked
74         WORK   /// inited, worked
75     };
76 
77     /// addational info part
78     enum Error
79     {
80         NONE,    /// 
81         FTHREAD, ///
82         FLOW,    ///
83         EXCEPT,  ///
84         FATAL    /// unrecoverable error
85     };
86 
87     ///
88     static struct Info
89     {
90         ///
91         State state;
92         ///
93         Error error;
94         ///
95         string message;
96         ///
97         ulong timestamp;
98 
99         ///
100         this( State state, Error error=Error.NONE, string msg="" )
101         {
102             this.state = state;
103             this.error = error;
104             message = msg;
105             timestamp = currentTick;
106         }
107 
108         private enum ctor_text = ` this( in Info fts )
109         {
110             state = fts.state;
111             error = fts.error;
112             message = fts.message;
113             timestamp = fts.timestamp;
114         }
115         `;
116 
117         mixin( ctor_text );
118         mixin( "const" ~ ctor_text );
119         mixin( "immutable" ~ ctor_text );
120         mixin( "shared" ~ ctor_text );
121         mixin( "shared const" ~ ctor_text );
122     }
123 
124     /++
125         params:
126         name = name of thread
127         func = work element creation function
128         args = args for function
129      +/
130     this(Args...)( string name, WorkElement function(Args) func, Args args )
131     in { assert( func !is null ); } body
132     {
133         thread = new Thread({ tmain( com, func, args ); });
134         thread.name = name;
135         self_name = name;
136         com.initialize();
137         thread.start();
138         debug logger.Debug( "name: '%s'", name );
139     }
140 
141     @property
142     {
143         /++ getting last information of thread 
144             returns:
145             `Info`
146          +/
147         auto info() const { return Info(com.info.back); }
148 
149         /// getting name of fthread
150         auto name() const { return self_name; }
151     }
152 
153     /// take all signals from work element
154     auto takeAllSignals()
155     { return com.signals.clearAndReturnAll(); }
156 
157     /// push command for changing state of thread
158     void pushCommand( Command cmd )
159     {
160         com.commands.pushBack( cmd );
161         debug logger.trace( "thread: '%s', command: '%s'", name, cmd );
162     }
163 
164     /// push event for processing in work element
165     void pushEvent( in Event ev )
166     {
167         com.eventbus.pushBack( ev );
168         debug logger.trace( "thread: '%s', event code: %d", name, ev.code );
169     }
170 
171     /// core.thread.Thread.join
172     void join() { thread.join(); }
173 
174     /// add listener FThread, listeners gets events from work element
175     void addListener( FThread[] thrs... )
176     {
177         foreach( t; thrs )
178             com.listener.add( t.com.eventbus );
179         debug logger.Debug( "thread: '%s', listeners: %s", name, array(map!(a=>a.name)(thrs)) );
180     }
181 
182     /// delete listener
183     void delListener( FThread th )
184     {
185         com.listener.del( th.com.eventbus );
186         debug logger.Debug( "thread: '%s', listener: %s", name, th.name );
187     }
188 }
189 
190 version(none)
191 {
192 unittest
193 {
194     static class TestElement : WorkElement
195     {
196         this() { stderr.writeln( "init" ); }
197         override EventProcessor[] getEventProcessors()
198         {
199             return [
200                 new FunctionEventProcessor( (in Event ev) 
201                 {
202                     if( ev.isSystem )
203                         stderr.writeln( "system event: ",
204                             ((cast(Event)ev).data.as!SysEvData).msg );
205                 })
206             ];
207         }
208         override void process() { stderr.writeln( "process" ); }
209         protected void selfDestroy() { stderr.writeln( "destroy" ); }
210     }
211 
212     auto fth = new FThread( "test", { return new TestElement; } );
213     fth.pushCommand( Command.START );
214     Thread.sleep(dur!"usecs"(20));
215     fth.pushCommand( Command.PAUSE );
216     Thread.sleep(dur!"msecs"(20));
217     fth.pushCommand( Command.START );
218     Thread.sleep(dur!"usecs"(20));
219     fth.pushCommand( Command.REINIT );
220     Thread.sleep(dur!"usecs"(20));
221     fth.pushCommand( Command.START );
222     Thread.sleep(dur!"usecs"(20));
223     fth.pushCommand( Command.CLOSE );
224     fth.join();
225 }
226 }
227 
228 private
229 {
230     void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args )
231     {
232 
233         Logger logger = new InstanceLogger( __MODULE__ ~ ".WorkProcessor", Thread.getThis().name );
234         auto wp = createProcessor( com, logger, func, args );
235         if( wp is null ) return;
236         scope(exit) fullTerminate(wp);
237         try while( wp.hasWork ) wp.process();
238         catch( Throwable e ) com.info.pushBack( convertToErrorInfo(logger,e) );
239     }
240 
241     auto createProcessor(Args...)( Communication com, Logger logger, WorkElement function(Args) func, Args args )
242     {
243         try return new WorkProcessor!Args( com, logger, func, args );
244         catch( Throwable e )
245         {
246             com.info.pushBack( convertToErrorInfo(logger,e) );
247             return null;
248         }
249     }
250 
251     final class WorkProcessor(Args...) : ExternalMemoryManager, SignalProcessor, EventProcessor
252     {
253         mixin EMM;
254 
255         Args args;
256         WorkElement function(Args) func;
257         WorkElement elem;
258         EventProcessor[] evprocs;
259 
260         Communication com;
261 
262         bool work = false;
263         bool has_work = true;
264 
265         Logger logger;
266 
267         this( Communication com, Logger logger, WorkElement function(Args) func, Args args )
268         {
269             this.com = com;
270             this.func = func;
271             this.args = args;
272 
273             this.logger = logger;
274 
275             init();
276             debug logger.Debug( "pass" );
277         }
278 
279         @property bool hasWork() const { return has_work; }
280 
281         void process()
282         {
283             debug logger.trace( "start process" );
284             if( work )
285             {
286                 foreach( e; com.eventbus.clearAndReturnAll() )
287                     transmitEvent( e );
288 
289                 elem.process();
290 
291                 debug logger.trace( "events processed" );
292             }
293 
294             foreach( scmd; com.commands.clearAndReturnAll() )
295             {
296                 auto cmd = cast(Command)scmd;
297 
298                 debug logger.trace( "process command '%s'", cmd );
299                 final switch( cmd )
300                 {
301                     case Command.START:  start();  break;
302                     case Command.PAUSE:  pause();  break;
303                     case Command.STOP:   stop();   break;
304                     case Command.REINIT: reinit(); break;
305                     case Command.CLOSE:  close();  break;
306                 }
307                 debug logger.trace( "process command '%s' pass", cmd );
308             }
309             debug logger.trace( "process pass" );
310         }
311 
312         void transmitEvent( in Event event )
313         {
314             foreach( ep; evprocs )
315                 ep.processEvent( event );
316         }
317 
318         // SignalProcessor
319         void processSignal( in Signal sig ) { com.signals.pushBack(sig); }
320 
321         // EventProcessor
322         void processEvent( in Event ev ) { com.listener.pushBack(ev); }
323 
324         void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); }
325 
326         void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" )
327         { com.info.pushBack( FThread.Info( state, error, msg ) ); }
328 
329         void init()
330         {
331             debug logger.Debug( "start" );
332             elem = func(args);
333 
334             if( elem is null )
335                 throw new FThreadException( "creation func return null" );
336 
337             elem.setSignalProcessor( this );
338             elem.setEventListener( this );
339 
340             evprocs = elem.getEventProcessors();
341 
342             pause();
343             debug logger.Debug( "pass" );
344         }
345 
346         void start()
347         {
348             pushInfo( FThread.State.WORK );
349             transmitEvent( Event.system( SysEvData.work ) );
350             work = true;
351             debug logger.Debug( "pass" );
352         }
353 
354         void pause()
355         {
356             pushInfo( FThread.State.PAUSE );
357             transmitEvent( Event.system( SysEvData.pause ) );
358             work = false;
359             debug logger.Debug( "pass" );
360         }
361 
362         void stop()
363         {
364             pushInfo( FThread.State.NONE );
365             if( elem is null ) return;
366             pause();
367             evprocs.length = 0;
368             elem.destroy();
369             elem = null;
370             debug logger.Debug( "pass" );
371         }
372 
373         void reinit()
374         {
375             stop();
376             init();
377         }
378 
379         void close()
380         {
381             stop();
382             has_work = false;
383             debug logger.Debug( "pass" );
384         }
385 
386     protected:
387 
388         void selfDestroy()
389         {
390             stop();
391             debug logger.Debug( "pass" );
392         }
393     }
394 
395     void fullTerminate(T)( ref T obj )
396     {
397         if( is( T : ExternalMemoryManager ) && obj !is null )
398             obj.destroy();
399         obj = null;
400     }
401 
402     FThread.Info convertToErrorInfo( Logger logger, Throwable e )
403     in{ assert( logger !is null ); } body
404     {
405         if( auto pe = cast(FThreadException)e ) 
406         {
407             logger.error( "EXCEPTION: fthread exc: %s", pe );
408             return errorInfo( FThread.Error.FTHREAD, e.msg );
409         }
410         else if( auto fe = cast(FlowException)e )
411         {
412             logger.error( "EXCEPTION: flow exc: %s", fe );
413             return errorInfo( FThread.Error.FLOW, e.msg );
414         }
415         else if( auto se = cast(Exception)e )
416         {
417             logger.error( "EXCEPTION: %s", se );
418             return errorInfo( FThread.Error.EXCEPT, e.msg );
419         }
420         else if( auto te = cast(Throwable)e )
421         {
422             logger.error( "FATAL: %s", te );
423             return errorInfo( FThread.Error.FATAL, e.msg );
424         }
425         assert(0,"unknown exception");
426     }
427 
428     FThread.Info errorInfo( FThread.Error error, string msg )
429     { return FThread.Info( FThread.State.NONE, error, msg ); }
430 }