1 /+ 2 The MIT License (MIT) 3 4 Copyright (c) <2013> <Oleg Butko (deviator), Anton Akzhigitov (Akzwar)> 5 6 Permission is hereby granted, free of charge, to any person obtaining a copy 7 of this software and associated documentation files (the "Software"), to deal 8 in the Software without restriction, including without limitation the rights 9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 copies of the Software, and to permit persons to whom the Software is 11 furnished to do so, subject to the following conditions: 12 13 The above copyright notice and this permission notice shall be included in 14 all copies or substantial portions of the Software. 15 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 22 THE SOFTWARE. 23 +/ 24 25 module des.flow.sync; 26 27 import std.traits; 28 29 import des.flow.base; 30 import des.flow.event; 31 import des.flow.signal; 32 import des.flow.thread; 33 34 /// FThread communication struct 35 struct Communication 36 { 37 shared SyncList!Command commands; /// 38 shared SyncList!Signal signals; /// 39 shared SyncList!(FThread.Info) info; /// 40 shared SyncList!Event eventbus; /// 41 shared HubOutput!Event listener; /// 42 43 /// create all fields 44 void initialize() 45 { 46 commands = new shared SyncList!Command; 47 signals = new shared SyncList!Signal; 48 info = new shared SyncList!(FThread.Info); 49 eventbus = new shared SyncList!Event; 50 listener = new shared HubOutput!Event; 51 52 info.pushBack( FThread.Info( FThread.State.NONE, FThread.Error.NONE, "" ) ); 53 } 54 } 55 56 /// 57 interface SyncOutput(T) { /++ +/ synchronized void pushBack( in T val ); } 58 59 /// 60 synchronized class SyncList(T) : SyncOutput!T 61 { 62 /// 63 protected T[] list; 64 65 /// 66 void pushBack( in T obj ) 67 { 68 static if( isBasicType!T ) list ~= obj; 69 else static if( isArray!T ) list ~= obj.dup; 70 else list = list ~ T(obj); 71 } 72 73 /// 74 void popBack() { if( list.length ) list = list[0..$-1]; } 75 76 /// 77 T popAndReturnBack() 78 { 79 auto buf = back; 80 popBack(); 81 return buf; 82 } 83 84 /// 85 T[] clearAndReturnAll() 86 { 87 auto r = cast(T[])list.dup; 88 list.length = 0; 89 return r; 90 } 91 92 @property 93 { 94 /// 95 bool empty() const { return list.length == 0; } 96 97 /// 98 auto back() const 99 { 100 static if( isBasicType!T ) return list[$-1]; 101 else static if( isArray!T ) return list[$-1].idup; 102 else return T(list[$-1]); 103 } 104 } 105 } 106 107 108 /// 109 version(unittest) void syncTest(T)( T a, T b ) 110 { 111 auto sl = new shared SyncList!T; 112 113 assert( sl.empty ); 114 sl.pushBack( a ); 115 assert( !sl.empty ); 116 assert( eq( a, sl.back ) ); 117 assert( !sl.empty ); 118 assert( eq( a, sl.back ) ); 119 sl.pushBack( b ); 120 assert( eq( b, sl.back ) ); 121 sl.popBack(); 122 assert( eq( a, sl.back ) ); 123 auto val = sl.popAndReturnBack(); 124 assert( sl.empty ); 125 assert( eq( a, val ) ); 126 sl.pushBack( a ); 127 sl.pushBack( b ); 128 assert( !sl.empty ); 129 auto arr = sl.clearAndReturnAll(); 130 assert( sl.empty ); 131 assert( eq( arr, [ a, b ] ) ); 132 } 133 134 unittest 135 { 136 assert( creationTest( Signal(0) ) ); 137 assert( creationTest( FThread.Info( FThread.State.PAUSE ) ) ); 138 assert( creationTest( Event( 0, [1,2] ) ) ); 139 140 syncTest( 1.2, 3.4 ); 141 syncTest( "hello", "world" ); 142 syncTest( Command.START, Command.PAUSE ); 143 syncTest( Signal(0), Signal(1) ); 144 syncTest( FThread.Info( FThread.State.PAUSE ), 145 FThread.Info( FThread.State.WORK )); 146 syncTest( Event(0,[1,2]), Event(1,[2,3]) ); 147 } 148 149 /// 150 synchronized class HubOutput(T): SyncOutput!T 151 { 152 /// 153 alias SyncOutput!T LST; 154 155 /// 156 protected LST[] listeners; 157 158 private void check( shared LST checked ) 159 { 160 auto ll = cast(shared HubOutput)checked; 161 if( ll is null ) return; 162 if( ll is this ) throw new FlowException( "listener found cycle link" ); 163 foreach( lll; ll.listeners ) check( lll ); 164 } 165 166 /// 167 void pushBack( in T val ) 168 { 169 foreach( listener; listeners ) 170 listener.pushBack( val ); 171 } 172 173 /// 174 bool inList( shared LST sync ) 175 { 176 foreach( l; listeners ) 177 if( l is sync ) return true; 178 return false; 179 } 180 181 /// 182 void add( shared LST[] syncs... ) 183 { 184 foreach( s; syncs ) 185 { 186 debug check(s); 187 if( !inList(s) ) 188 listeners ~= s; 189 } 190 } 191 192 /// 193 void del( shared LST[] syncs... ) 194 { 195 typeof(listeners) moved; 196 m: foreach( lst; listeners ) 197 { 198 foreach( ds; syncs ) 199 if( ds == lst ) 200 continue m; 201 moved ~= lst; 202 } 203 listeners = moved; 204 } 205 } 206 207 unittest 208 { 209 auto sl1 = new shared SyncList!int; 210 auto sl2 = new shared SyncList!int; 211 212 auto hub = new shared HubOutput!int; 213 214 hub.add( sl1 ); 215 // duplicate adding 216 hub.add( sl1 ); 217 hub.pushBack( 12 ); 218 219 hub.add( sl2 ); 220 hub.pushBack( 23 ); 221 222 assert( eq( sl1.list, [ 12, 23 ] ) ); 223 assert( eq( sl2.list, [ 23 ] ) ); 224 225 hub.del( sl1 ); 226 hub.pushBack( 34 ); 227 assert( eq( sl1.list, [ 12, 23 ] ) ); 228 assert( eq( sl2.list, [ 23, 34 ] ) ); 229 230 hub.add( sl1, sl2 ); 231 hub.pushBack( 45 ); 232 assert( eq( sl1.list, [ 12, 23, 45 ] ) ); 233 assert( eq( sl2.list, [ 23, 34, 45 ] ) ); 234 }