1 module des.flow.thread;
2 
3 import std..string;
4 
5 import std.algorithm;
6 import std.array;
7 
8 import std.datetime;
9 import core.thread;
10 
11 import des.util.arch.emm;
12 import des.util.logsys;
13 
14 import des.flow.base;
15 import des.flow.event;
16 import des.flow.element;
17 import des.flow.signal;
18 import des.flow.sync;
19 import des.flow.sysevdata;
20 
21 ///
22 class FThreadException : FlowException
23 {
24     ///
25     this( string msg, string file=__FILE__, size_t line=__LINE__ ) @safe pure nothrow
26     { super( msg, file, line ); }
27 }
28 
29 /// Thread wrap
30 class FThread
31 {
32 protected:
33 
34     ///
35     Communication com;
36 
37     /// core.thread.Thread
38     Thread thread;
39 
40     ///
41     string self_name;
42 
43 public:
44 
45     ///
46     enum State
47     {
48         NONE,  /// not inited
49         PAUSE, /// inited, not worked
50         WORK   /// inited, worked
51     };
52 
53     /// addational info part
54     enum Error
55     {
56         NONE,    /// 
57         FTHREAD, ///
58         FLOW,    ///
59         EXCEPT,  ///
60         FATAL    /// unrecoverable error
61     };
62 
63     ///
64     static struct Info
65     {
66         ///
67         State state;
68         ///
69         Error error;
70         ///
71         string message;
72         ///
73         ulong timestamp;
74 
75         ///
76         this( State state, Error error=Error.NONE, string msg="" )
77         {
78             this.state = state;
79             this.error = error;
80             message = msg;
81             timestamp = currentTick;
82         }
83 
84         private enum ctor_text = ` this( in Info fts )
85         {
86             state = fts.state;
87             error = fts.error;
88             message = fts.message;
89             timestamp = fts.timestamp;
90         }
91         `;
92 
93         mixin( ctor_text );
94         mixin( "const" ~ ctor_text );
95         mixin( "immutable" ~ ctor_text );
96         mixin( "shared" ~ ctor_text );
97         mixin( "shared const" ~ ctor_text );
98     }
99 
100     /++
101         params:
102         name = name of thread
103         func = work element creation function
104         args = args for function
105      +/
106     this(Args...)( string name, WorkElement function(Args) func, Args args )
107     in { assert( func !is null ); } body
108     {
109         thread = new Thread({ tmain( com, func, args ); });
110         thread.name = name;
111         self_name = name;
112         com.initialize();
113         thread.start();
114         debug logger.Debug( "name: '%s'", name );
115     }
116 
117     @property
118     {
119         /++ getting last information of thread 
120             returns:
121             `Info`
122          +/
123         auto info() const { return Info(com.info.back); }
124 
125         /// getting name of fthread
126         auto name() const { return self_name; }
127     }
128 
129     /// take all signals from work element
130     auto takeAllSignals()
131     { return com.signals.clearAndReturnAll(); }
132 
133     /// push command for changing state of thread
134     void pushCommand( Command cmd )
135     {
136         com.commands.pushBack( cmd );
137         debug logger.trace( "thread: '%s', command: '%s'", name, cmd );
138     }
139 
140     /// push event for processing in work element
141     void pushEvent( in Event ev )
142     {
143         com.eventbus.pushBack( ev );
144         debug logger.trace( "thread: '%s', event code: %d", name, ev.code );
145     }
146 
147     /// core.thread.Thread.join
148     void join() { thread.join(); }
149 
150     /// add listener FThread, listeners gets events from work element
151     void addListener( FThread[] thrs... )
152     {
153         foreach( t; thrs )
154             com.listener.add( t.com.eventbus );
155         debug logger.Debug( "thread: '%s', listeners: %s", name, array(map!(a=>a.name)(thrs)) );
156     }
157 
158     /// delete listener
159     void delListener( FThread th )
160     {
161         com.listener.del( th.com.eventbus );
162         debug logger.Debug( "thread: '%s', listener: %s", name, th.name );
163     }
164 }
165 
166 version(none)
167 {
168 unittest
169 {
170     static class TestElement : WorkElement
171     {
172         this() { stderr.writeln( "init" ); }
173         override EventProcessor[] getEventProcessors()
174         {
175             return [
176                 new FunctionEventProcessor( (in Event ev) 
177                 {
178                     if( ev.isSystem )
179                         stderr.writeln( "system event: ",
180                             ((cast(Event)ev).data.as!SysEvData).msg );
181                 })
182             ];
183         }
184         override void process() { stderr.writeln( "process" ); }
185         protected void selfDestroy() { stderr.writeln( "destroy" ); }
186     }
187 
188     auto fth = new FThread( "test", { return new TestElement; } );
189     fth.pushCommand( Command.START );
190     Thread.sleep(dur!"usecs"(20));
191     fth.pushCommand( Command.PAUSE );
192     Thread.sleep(dur!"msecs"(20));
193     fth.pushCommand( Command.START );
194     Thread.sleep(dur!"usecs"(20));
195     fth.pushCommand( Command.REINIT );
196     Thread.sleep(dur!"usecs"(20));
197     fth.pushCommand( Command.START );
198     Thread.sleep(dur!"usecs"(20));
199     fth.pushCommand( Command.CLOSE );
200     fth.join();
201 }
202 }
203 
204 private
205 {
206     void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args )
207     {
208 
209         Logger logger = new InstanceLogger( __MODULE__ ~ ".WorkProcessor", Thread.getThis().name );
210         auto wp = createProcessor( com, logger, func, args );
211         if( wp is null ) return;
212         scope(exit) fullTerminate(wp);
213         try while( wp.hasWork ) wp.process();
214         catch( Throwable e ) com.info.pushBack( convertToErrorInfo(logger,e) );
215     }
216 
217     auto createProcessor(Args...)( Communication com, Logger logger, WorkElement function(Args) func, Args args )
218     {
219         try return new WorkProcessor!Args( com, logger, func, args );
220         catch( Throwable e )
221         {
222             com.info.pushBack( convertToErrorInfo(logger,e) );
223             return null;
224         }
225     }
226 
227     final class WorkProcessor(Args...) : ExternalMemoryManager, SignalProcessor, EventProcessor
228     {
229         mixin EMM;
230 
231         Args args;
232         WorkElement function(Args) func;
233         WorkElement elem;
234         EventProcessor[] evprocs;
235 
236         Communication com;
237 
238         bool work = false;
239         bool has_work = true;
240 
241         Logger logger;
242 
243         this( Communication com, Logger logger, WorkElement function(Args) func, Args args )
244         {
245             this.com = com;
246             this.func = func;
247             this.args = args;
248 
249             this.logger = logger;
250 
251             init();
252             debug logger.Debug( "pass" );
253         }
254 
255         @property bool hasWork() const { return has_work; }
256 
257         void process()
258         {
259             debug logger.trace( "start process" );
260             if( work )
261             {
262                 foreach( e; com.eventbus.clearAndReturnAll() )
263                     transmitEvent( e );
264 
265                 elem.process();
266 
267                 debug logger.trace( "events processed" );
268             }
269 
270             foreach( scmd; com.commands.clearAndReturnAll() )
271             {
272                 auto cmd = cast(Command)scmd;
273 
274                 debug logger.trace( "process command '%s'", cmd );
275                 final switch( cmd )
276                 {
277                     case Command.START:  start();  break;
278                     case Command.PAUSE:  pause();  break;
279                     case Command.STOP:   stop();   break;
280                     case Command.REINIT: reinit(); break;
281                     case Command.CLOSE:  close();  break;
282                 }
283                 debug logger.trace( "process command '%s' pass", cmd );
284             }
285             debug logger.trace( "process pass" );
286         }
287 
288         void transmitEvent( in Event event )
289         {
290             foreach( ep; evprocs )
291                 ep.processEvent( event );
292         }
293 
294         // SignalProcessor
295         void processSignal( in Signal sig ) { com.signals.pushBack(sig); }
296 
297         // EventProcessor
298         void processEvent( in Event ev ) { com.listener.pushBack(ev); }
299 
300         void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); }
301 
302         void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" )
303         { com.info.pushBack( FThread.Info( state, error, msg ) ); }
304 
305         void init()
306         {
307             debug logger.Debug( "start" );
308             elem = func(args);
309 
310             if( elem is null )
311                 throw new FThreadException( "creation func return null" );
312 
313             elem.setSignalProcessor( this );
314             elem.setEventListener( this );
315 
316             evprocs = elem.getEventProcessors();
317 
318             pause();
319             debug logger.Debug( "pass" );
320         }
321 
322         void start()
323         {
324             pushInfo( FThread.State.WORK );
325             transmitEvent( Event.system( SysEvData.work ) );
326             work = true;
327             debug logger.Debug( "pass" );
328         }
329 
330         void pause()
331         {
332             pushInfo( FThread.State.PAUSE );
333             transmitEvent( Event.system( SysEvData.pause ) );
334             work = false;
335             debug logger.Debug( "pass" );
336         }
337 
338         void stop()
339         {
340             pushInfo( FThread.State.NONE );
341             if( elem is null ) return;
342             pause();
343             evprocs.length = 0;
344             elem.destroy();
345             elem = null;
346             debug logger.Debug( "pass" );
347         }
348 
349         void reinit()
350         {
351             stop();
352             init();
353         }
354 
355         void close()
356         {
357             stop();
358             has_work = false;
359             debug logger.Debug( "pass" );
360         }
361 
362     protected:
363 
364         void selfDestroy()
365         {
366             stop();
367             debug logger.Debug( "pass" );
368         }
369     }
370 
371     void fullTerminate(T)( ref T obj )
372     {
373         if( is( T : ExternalMemoryManager ) && obj !is null )
374             obj.destroy();
375         obj = null;
376     }
377 
378     FThread.Info convertToErrorInfo( Logger logger, Throwable e )
379     in{ assert( logger !is null ); } body
380     {
381         if( auto pe = cast(FThreadException)e ) 
382         {
383             logger.error( "EXCEPTION: fthread exc: %s", pe );
384             return errorInfo( FThread.Error.FTHREAD, e.msg );
385         }
386         else if( auto fe = cast(FlowException)e )
387         {
388             logger.error( "EXCEPTION: flow exc: %s", fe );
389             return errorInfo( FThread.Error.FLOW, e.msg );
390         }
391         else if( auto se = cast(Exception)e )
392         {
393             logger.error( "EXCEPTION: %s", se );
394             return errorInfo( FThread.Error.EXCEPT, e.msg );
395         }
396         else if( auto te = cast(Throwable)e )
397         {
398             logger.error( "FATAL: %s", te );
399             return errorInfo( FThread.Error.FATAL, e.msg );
400         }
401         assert(0,"unknown exception");
402     }
403 
404     FThread.Info errorInfo( FThread.Error error, string msg )
405     { return FThread.Info( FThread.State.NONE, error, msg ); }
406 }