1 /+ 2 The MIT License (MIT) 3 4 Copyright (c) <2013> <Oleg Butko (deviator), Anton Akzhigitov (Akzwar)> 5 6 Permission is hereby granted, free of charge, to any person obtaining a copy 7 of this software and associated documentation files (the "Software"), to deal 8 in the Software without restriction, including without limitation the rights 9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 copies of the Software, and to permit persons to whom the Software is 11 furnished to do so, subject to the following conditions: 12 13 The above copyright notice and this permission notice shall be included in 14 all copies or substantial portions of the Software. 15 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 22 THE SOFTWARE. 23 +/ 24 25 module des.flow.sync; 26 27 import std.traits; 28 29 import des.flow.base; 30 import des.flow.event; 31 import des.flow.signal; 32 import des.flow.thread; 33 34 struct Communication 35 { 36 shared SyncList!Command commands; 37 shared SyncList!Signal signals; 38 shared SyncList!(FThread.Info) info; 39 shared SyncList!Event eventbus; 40 shared HubOutput!Event listener; 41 42 void initialize() 43 { 44 commands = new shared SyncList!Command; 45 signals = new shared SyncList!Signal; 46 info = new shared SyncList!(FThread.Info); 47 eventbus = new shared SyncList!Event; 48 listener = new shared HubOutput!Event; 49 } 50 } 51 52 interface SyncOutput(T) { synchronized void pushBack( in T val ); } 53 54 synchronized class SyncList(T) : SyncOutput!T 55 { 56 protected T[] list; 57 58 void pushBack( in T obj ) 59 { 60 static if( isBasicType!T ) list ~= obj; 61 else static if( isArray!T ) list ~= obj.dup; 62 else list = list ~ T(obj); 63 } 64 65 void popBack() { if( list.length ) list = list[0..$-1]; } 66 67 T popAndReturnBack() 68 { 69 auto buf = back; 70 popBack(); 71 return buf; 72 } 73 74 T[] clearAndReturnAll() 75 { 76 auto r = cast(T[])list.dup; 77 list.length = 0; 78 return r; 79 } 80 81 @property 82 { 83 bool empty() const { return list.length == 0; } 84 85 auto back() const 86 { 87 static if( isBasicType!T ) return list[$-1]; 88 else static if( isArray!T ) return list[$-1].idup; 89 else return T(list[$-1]); 90 } 91 } 92 } 93 94 95 version(unittest) void syncTest(T)( T a, T b ) 96 { 97 auto sl = new shared SyncList!T; 98 99 assert( sl.empty ); 100 sl.pushBack( a ); 101 assert( !sl.empty ); 102 assert( eq( a, sl.back ) ); 103 assert( !sl.empty ); 104 assert( eq( a, sl.back ) ); 105 sl.pushBack( b ); 106 assert( eq( b, sl.back ) ); 107 sl.popBack(); 108 assert( eq( a, sl.back ) ); 109 auto val = sl.popAndReturnBack(); 110 assert( sl.empty ); 111 assert( eq( a, val ) ); 112 sl.pushBack( a ); 113 sl.pushBack( b ); 114 assert( !sl.empty ); 115 auto arr = sl.clearAndReturnAll(); 116 assert( sl.empty ); 117 assert( eq_arr( arr, [ a, b ] ) ); 118 } 119 120 unittest 121 { 122 //assert( creationTest( Command.START ) ); 123 assert( creationTest( Signal(0) ) ); 124 assert( creationTest( FThread.Info( FThread.State.PAUSE ) ) ); 125 assert( creationTest( Event( 0, [1,2] ) ) ); 126 127 syncTest( 1.2, 3.4 ); 128 syncTest( "hello", "world" ); 129 syncTest( Command.START, Command.PAUSE ); 130 syncTest( Signal(0), Signal(1) ); 131 syncTest( FThread.Info( FThread.State.PAUSE ), 132 FThread.Info( FThread.State.WORK )); 133 syncTest( Event(0,[1,2]), Event(1,[2,3]) ); 134 } 135 136 synchronized class HubOutput(T): SyncOutput!T 137 { 138 alias SyncOutput!T LST; 139 140 protected LST[] listeners; 141 142 private void check( shared LST checked ) 143 { 144 auto ll = cast(shared HubOutput)checked; 145 if( ll is null ) return; 146 if( ll is this ) throw new FlowException( "listener found cycle link" ); 147 foreach( lll; ll.listeners ) check( lll ); 148 } 149 150 void pushBack( in T val ) 151 { 152 foreach( listener; listeners ) 153 listener.pushBack( val ); 154 } 155 156 bool inList( shared LST sync ) 157 { 158 foreach( l; listeners ) 159 if( l is sync ) return true; 160 return false; 161 } 162 163 void add( shared LST[] syncs... ) 164 { 165 foreach( s; syncs ) 166 { 167 debug check(s); 168 if( !inList(s) ) 169 listeners ~= s; 170 } 171 } 172 173 void del( shared LST[] syncs... ) 174 { 175 typeof(listeners) moved; 176 m: foreach( lst; listeners ) 177 { 178 foreach( ds; syncs ) 179 if( ds == lst ) 180 continue m; 181 moved ~= lst; 182 } 183 listeners = moved; 184 } 185 } 186 187 unittest 188 { 189 auto sl1 = new shared SyncList!int; 190 auto sl2 = new shared SyncList!int; 191 192 auto hub = new shared HubOutput!int; 193 194 hub.add( sl1 ); 195 // duplicate adding 196 hub.add( sl1 ); 197 hub.pushBack( 12 ); 198 199 hub.add( sl2 ); 200 hub.pushBack( 23 ); 201 202 assert( eq_arr( sl1.list, [ 12, 23 ] ) ); 203 assert( eq_arr( sl2.list, [ 23 ] ) ); 204 205 hub.del( sl1 ); 206 hub.pushBack( 34 ); 207 assert( eq_arr( sl1.list, [ 12, 23 ] ) ); 208 assert( eq_arr( sl2.list, [ 23, 34 ] ) ); 209 210 hub.add( sl1, sl2 ); 211 hub.pushBack( 45 ); 212 assert( eq_arr( sl1.list, [ 12, 23, 45 ] ) ); 213 assert( eq_arr( sl2.list, [ 23, 34, 45 ] ) ); 214 }