1 /+
2 The MIT License (MIT)
3 
4     Copyright (c) <2013> <Oleg Butko (deviator), Anton Akzhigitov (Akzwar)>
5 
6     Permission is hereby granted, free of charge, to any person obtaining a copy
7     of this software and associated documentation files (the "Software"), to deal
8     in the Software without restriction, including without limitation the rights
9     to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10     copies of the Software, and to permit persons to whom the Software is
11     furnished to do so, subject to the following conditions:
12 
13     The above copyright notice and this permission notice shall be included in
14     all copies or substantial portions of the Software.
15 
16     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21     OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22     THE SOFTWARE.
23 +/
24 
25 module des.flow.sync;
26 
27 import std.traits;
28 
29 import des.flow.base;
30 import des.flow.event;
31 import des.flow.signal;
32 import des.flow.thread;
33 
34 struct Communication
35 {
36     shared SyncList!Command        commands;
37     shared SyncList!Signal         signals;
38     shared SyncList!(FThread.Info) info;
39     shared SyncList!Event          eventbus;
40     shared HubOutput!Event         listener;
41 
42     void initialize()
43     {
44         commands = new shared SyncList!Command;
45         signals  = new shared SyncList!Signal;
46         info     = new shared SyncList!(FThread.Info);
47         eventbus = new shared SyncList!Event;
48         listener = new shared HubOutput!Event;
49     }
50 }
51 
52 interface SyncOutput(T) { synchronized void pushBack( in T val ); }
53 
54 synchronized class SyncList(T) : SyncOutput!T
55 {
56     protected T[] list;
57 
58     void pushBack( in T obj )
59     {
60         static if( isBasicType!T ) list ~= obj;
61         else static if( isArray!T ) list ~= obj.dup;
62         else list = list ~ T(obj);
63     }
64 
65     void popBack() { if( list.length ) list = list[0..$-1]; }
66 
67     T popAndReturnBack()
68     {
69         auto buf = back;
70         popBack();
71         return buf;
72     }
73 
74     T[] clearAndReturnAll()
75     {
76         auto r = cast(T[])list.dup;
77         list.length = 0;
78         return r;
79     }
80 
81     @property
82     {
83         bool empty() const { return list.length == 0; }
84 
85         auto back() const
86         {
87             static if( isBasicType!T ) return list[$-1];
88             else static if( isArray!T ) return list[$-1].idup;
89             else return T(list[$-1]);
90         }
91     }
92 }
93 
94 
95 version(unittest) void syncTest(T)( T a, T b )
96 {
97     auto sl = new shared SyncList!T;
98 
99     assert( sl.empty );
100     sl.pushBack( a );
101     assert( !sl.empty );
102     assert( eq( a, sl.back ) );
103     assert( !sl.empty );
104     assert( eq( a, sl.back ) );
105     sl.pushBack( b );
106     assert( eq( b, sl.back ) );
107     sl.popBack();
108     assert( eq( a, sl.back ) );
109     auto val = sl.popAndReturnBack();
110     assert( sl.empty );
111     assert( eq( a, val ) );
112     sl.pushBack( a );
113     sl.pushBack( b );
114     assert( !sl.empty );
115     auto arr = sl.clearAndReturnAll();
116     assert( sl.empty );
117     assert( eq_arr( arr, [ a, b ] ) );
118 }
119 
120 unittest
121 {
122     //assert( creationTest( Command.START ) );
123     assert( creationTest( Signal(0) ) );
124     assert( creationTest( FThread.Info( FThread.State.PAUSE ) ) );
125     assert( creationTest( Event( 0, [1,2] ) ) );
126 
127     syncTest( 1.2, 3.4 );
128     syncTest( "hello", "world" );
129     syncTest( Command.START, Command.PAUSE );
130     syncTest( Signal(0), Signal(1) );
131     syncTest( FThread.Info( FThread.State.PAUSE ), 
132               FThread.Info( FThread.State.WORK ));
133     syncTest( Event(0,[1,2]), Event(1,[2,3]) );
134 }
135 
136 synchronized class HubOutput(T): SyncOutput!T
137 {
138     alias SyncOutput!T LST;
139 
140     protected LST[] listeners;
141 
142     private void check( shared LST checked )
143     {
144         auto ll = cast(shared HubOutput)checked;
145         if( ll is null ) return;
146         if( ll is this ) throw new FlowException( "listener found cycle link" );
147         foreach( lll; ll.listeners ) check( lll );
148     }
149 
150     void pushBack( in T val )
151     {
152         foreach( listener; listeners )
153             listener.pushBack( val );
154     }
155 
156     bool inList( shared LST sync )
157     {
158         foreach( l; listeners )
159             if( l is sync ) return true;
160         return false;
161     }
162 
163     void add( shared LST[] syncs... )
164     {
165         foreach( s; syncs )
166         {
167             debug check(s);
168             if( !inList(s) )
169                 listeners ~= s;
170         }
171     }
172 
173     void del( shared LST[] syncs... )
174     {
175         typeof(listeners) moved;
176         m: foreach( lst; listeners )
177         {
178             foreach( ds; syncs )
179                 if( ds == lst )
180                     continue m;
181             moved ~= lst;
182         }
183         listeners = moved;
184     }
185 }
186 
187 unittest
188 {
189     auto sl1 = new shared SyncList!int;
190     auto sl2 = new shared SyncList!int;
191 
192     auto hub = new shared HubOutput!int;
193 
194     hub.add( sl1 );
195     // duplicate adding
196     hub.add( sl1 );
197     hub.pushBack( 12 );
198 
199     hub.add( sl2 );
200     hub.pushBack( 23 );
201 
202     assert( eq_arr( sl1.list, [ 12, 23 ] ) );
203     assert( eq_arr( sl2.list, [ 23 ] ) );
204 
205     hub.del( sl1 );
206     hub.pushBack( 34 );
207     assert( eq_arr( sl1.list, [ 12, 23 ] ) );
208     assert( eq_arr( sl2.list, [ 23, 34 ] ) );
209 
210     hub.add( sl1, sl2 );
211     hub.pushBack( 45 );
212     assert( eq_arr( sl1.list, [ 12, 23, 45 ] ) );
213     assert( eq_arr( sl2.list, [ 23, 34, 45 ] ) );
214 }