1 /+
2 The MIT License (MIT)
3 
4     Copyright (c) <2013> <Oleg Butko (deviator), Anton Akzhigitov (Akzwar)>
5 
6     Permission is hereby granted, free of charge, to any person obtaining a copy
7     of this software and associated documentation files (the "Software"), to deal
8     in the Software without restriction, including without limitation the rights
9     to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10     copies of the Software, and to permit persons to whom the Software is
11     furnished to do so, subject to the following conditions:
12 
13     The above copyright notice and this permission notice shall be included in
14     all copies or substantial portions of the Software.
15 
16     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21     OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22     THE SOFTWARE.
23 +/
24 
25 module des.flow.thread;
26 
27 import std.string;
28 
29 import std.datetime;
30 import core.thread;
31 
32 import des.util.emm;
33 
34 import des.flow.base;
35 import des.flow.event;
36 import des.flow.element;
37 import des.flow.signal;
38 import des.flow.sync;
39 import des.flow.sysevdata;
40 
41 class FThreadException : FlowException
42 {
43     @safe pure nothrow this( string msg, string file=__FILE__, size_t line=__LINE__ )
44     { super( msg, file, line ); }
45 }
46 
47 class FThread
48 {
49 protected:
50     Communication com;
51     Thread thread;
52     string self_name;
53 
54 public:
55     enum State { NONE, PAUSE, WORK };
56     enum Error { NONE, FTHREAD, FLOW, EXCEPT, FATAL };
57 
58     static struct Info
59     {
60         State state;
61         Error error;
62         string message;
63         ulong timestamp;
64 
65         this( State state, Error error=Error.NONE, string msg="" )
66         {
67             this.state = state;
68             this.error = error;
69             message = msg;
70             timestamp = currentTick;
71         }
72 
73         enum ctor_text = ` this( in Info fts )
74         {
75             state = fts.state;
76             error = fts.error;
77             message = fts.message;
78             timestamp = fts.timestamp;
79         }
80         `;
81 
82         mixin( ctor_text );
83         mixin( "const" ~ ctor_text );
84         mixin( "immutable" ~ ctor_text );
85         mixin( "shared" ~ ctor_text );
86         mixin( "shared const" ~ ctor_text );
87     }
88 
89     this(Args...)( string name, WorkElement function(Args) func, Args args )
90     in { assert( func !is null ); } body
91     {
92         thread = new Thread({ tmain( com, func, args ); });
93         thread.name = name;
94         self_name = name;
95         com.initialize();
96         thread.start();
97     }
98 
99     @property auto info() const { return Info(com.info.back); }
100     @property auto name() const { return self_name; }
101 
102     void pushCommand( Command cmd ) { com.commands.pushBack( cmd ); }
103     void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); }
104 
105     void join() { thread.join(); }
106 
107     void addListener( FThread[] thrs... )
108     { foreach( t; thrs ) com.listener.add( t.com.eventbus ); }
109 
110     void delListener( FThread th ) { com.listener.del( th.com.eventbus ); }
111 }
112 
113 version(none)
114 {
115 unittest
116 {
117     static class TestElement : WorkElement
118     {
119         this() { stderr.writeln( "init" ); }
120         override EventProcessor[] getEventProcessors()
121         {
122             return [
123                 new FunctionEventProcessor( (in Event ev) 
124                 {
125                     if( ev.isSystem )
126                         stderr.writeln( "system event: ",
127                             ((cast(Event)ev).data.as!SysEvData).msg );
128                 })
129             ];
130         }
131         override void process() { stderr.writeln( "process" ); }
132         protected void selfDestroy() { stderr.writeln( "destroy" ); }
133     }
134 
135     auto fth = new FThread( "test", { return new TestElement; } );
136     fth.pushCommand( Command.START );
137     Thread.sleep(dur!"usecs"(20));
138     fth.pushCommand( Command.PAUSE );
139     Thread.sleep(dur!"msecs"(20));
140     fth.pushCommand( Command.START );
141     Thread.sleep(dur!"usecs"(20));
142     fth.pushCommand( Command.REINIT );
143     Thread.sleep(dur!"usecs"(20));
144     fth.pushCommand( Command.START );
145     Thread.sleep(dur!"usecs"(20));
146     fth.pushCommand( Command.CLOSE );
147     fth.join();
148 }
149 }
150 
151 private
152 {
153     void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args )
154     {
155         auto wp = createProcessor( com, func, args );
156         if( wp is null ) return;
157         scope(exit) fullTerminate(wp);
158         try while( wp.hasWork ) wp.process();
159         catch( Throwable e ) com.info.pushBack( convertToErrorInfo(e) );
160     }
161 
162     auto createProcessor(Args...)( Communication com, WorkElement function(Args) func, Args args )
163     {
164         WorkProcessor!Args ret;
165         try ret = new WorkProcessor!Args( com, func, args );
166         catch( Throwable e ) com.info.pushBack( convertToErrorInfo(e) );
167         return ret;
168     }
169 
170     final class WorkProcessor(Args...) : SignalProcessor, EventProcessor
171     {
172         Args args;
173         WorkElement function(Args) func;
174         WorkElement elem;
175         EventProcessor[] evprocs;
176 
177         Communication com;
178 
179         bool work = false;
180         bool has_work = true;
181 
182         this( Communication com, WorkElement function(Args) func, Args args )
183         {
184             this.com = com;
185             this.func = func;
186             this.args = args;
187 
188             init();
189         }
190 
191         @property bool hasWork() const { return has_work; }
192 
193         void process()
194         {
195             if( work )
196             {
197                 foreach( e; com.eventbus.clearAndReturnAll() )
198                     transmitEvent( e );
199 
200                 elem.process();
201             }
202 
203             foreach( scmd; com.commands.clearAndReturnAll() )
204             {
205                 auto cmd = cast(Command)scmd;
206                 auto state = com.info.back.state;
207 
208                 final switch( cmd )
209                 {
210                     case Command.START:  start();  break;
211                     case Command.PAUSE:  pause();  break;
212                     case Command.STOP:   stop();   break;
213                     case Command.REINIT: reinit(); break;
214                     case Command.CLOSE:  close();  break;
215                 }
216             }
217         }
218 
219         void transmitEvent( in Event event )
220         {
221             foreach( ep; evprocs )
222                 ep.processEvent( event );
223         }
224 
225         // SignalProcessor
226         void processSignal( in Signal sig ) { com.signals.pushBack(sig); }
227 
228         // EventProcessor
229         void processEvent( in Event ev ) { com.listener.pushBack(ev); }
230 
231         void init()
232         {
233             elem = func(args);
234 
235             if( elem is null )
236                 throw new FThreadException( "creation func return null" );
237 
238             elem.setSignalProcessor( this );
239             elem.setEventListener( this );
240 
241             evprocs = elem.getEventProcessors();
242 
243             pause();
244         }
245 
246         void start()
247         {
248             pushInfo( FThread.State.WORK );
249             transmitEvent( Event.system( SysEvData.work ) );
250             work = true;
251         }
252 
253         void pause()
254         {
255             pushInfo( FThread.State.PAUSE );
256             transmitEvent( Event.system( SysEvData.pause ) );
257             work = false;
258         }
259 
260         void stop()
261         {
262             pushInfo( FThread.State.NONE );
263             if( elem is null ) return;
264             pause();
265             evprocs.length = 0;
266             elem.destroy();
267             elem = null;
268         }
269 
270         void reinit()
271         {
272             stop();
273             init();
274         }
275 
276         void close()
277         {
278             stop();
279             has_work = false;
280         }
281 
282         void destroy() { stop(); }
283 
284         void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); }
285 
286         void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" )
287         { com.info.pushBack( FThread.Info( state, error, msg ) ); }
288     }
289 
290     void fullTerminate(T)( ref T obj )
291     {
292         if( is( T : ExternalMemoryManager ) && obj !is null )
293             obj.destroy();
294         obj = null;
295     }
296 
297     FThread.Info convertToErrorInfo( Throwable e )
298     {
299         if( auto pe = cast(FThreadException)e ) 
300         {
301             log_error( "EXCEPTION: fthread exc: %s", pe );
302             return errorInfo( FThread.Error.FTHREAD, e.msg );
303         }
304         else if( auto fe = cast(FlowException)e )
305         {
306             log_error( "EXCEPTION: flow exc: %s", fe );
307             return errorInfo( FThread.Error.FLOW, e.msg );
308         }
309         else if( auto se = cast(Exception)e )
310         {
311             log_error( "EXCEPTION: %s", se );
312             return errorInfo( FThread.Error.EXCEPT, e.msg );
313         }
314         else if( auto te = cast(Throwable)e )
315         {
316             log_error( "FATAL: %s", te );
317             return errorInfo( FThread.Error.FATAL, e.msg );
318         }
319         assert(0,"unknown exception");
320     }
321 
322     FThread.Info errorInfo( FThread.Error error, string msg )
323     { return FThread.Info( FThread.State.NONE, error, msg ); }
324 }