1 module modernui.rx; 2 3 import modernui.collections; 4 5 import std.algorithm; 6 import std.range; 7 import std.container.array; 8 9 struct None 10 { 11 void[0] dummy; 12 13 immutable static None val = {}; 14 } 15 16 alias Action(T) = void delegate(T); 17 alias Delegate = void delegate(); 18 alias Func(T,K) = K delegate(T); 19 20 final class Subscription 21 { 22 private void delegate() action; 23 24 this(void delegate() action) 25 { 26 this.action = action; 27 } 28 29 void release() 30 { 31 if(action == null) return; 32 33 // executes the action 34 this.action(); 35 36 // release the reference 37 this.action = null; 38 } 39 } 40 41 final class Observer(T) 42 { 43 private Action!T onNextCallback; 44 private Delegate onCompletedCallback; 45 private Action!Exception onErrorCallback; 46 47 this(Action!T nextCallback) 48 { 49 this.onNextCallback = nextCallback; 50 this.onCompletedCallback = null; 51 this.onErrorCallback = null; 52 } 53 54 this(Action!T nextCallback, Action!Exception errorCallback) 55 { 56 this.onNextCallback = nextCallback; 57 this.onCompletedCallback = null; 58 this.onErrorCallback = errorCallback; 59 } 60 61 this(Action!T nextCallback, Action!Exception errorCallback, Delegate completedCallback) 62 { 63 this.onNextCallback = nextCallback; 64 this.onCompletedCallback = completedCallback; 65 this.onErrorCallback = errorCallback; 66 } 67 68 void onCompleted() 69 { 70 if(onCompletedCallback != null) onCompletedCallback(); 71 finalize(); 72 } 73 74 void onNext(T value) 75 { 76 if(onNextCallback != null) onNextCallback(value); 77 } 78 79 void onError(Exception e) 80 { 81 if(onErrorCallback != null) onErrorCallback(e); 82 finalize(); 83 } 84 85 private void finalize() 86 { 87 // release references to delegates 88 onNextCallback = null; 89 onErrorCallback = null; 90 onCompletedCallback = null; 91 } 92 } 93 94 abstract class Observable(T) 95 { 96 private Subscription[Observer!T] observers; 97 private Array!(Observer!T) myObserversArray; 98 private bool myIsCompleted; 99 100 alias ObservedType = T; 101 102 @property bool isCompleted() { return myIsCompleted; } 103 @property bool hasSubscribers() { return observers.length != 0; } 104 105 abstract Subscription subscribe(Observer!T observer); 106 107 bool unsubscribe(Observer!T observer) 108 { 109 if(observer !in observers) 110 { 111 return false; 112 } 113 114 auto subscription = observers[observer]; 115 auto result = observers.remove(observer); 116 auto foundObs = myObserversArray[].find(observer).takeOne; 117 myObserversArray.linearRemove(foundObs); 118 subscription.release(); 119 return result; 120 } 121 } 122 123 final class Subject(T) : Observable!T 124 { 125 import std.stdio : writefln; 126 127 void next(T value) 128 { 129 foreach(observer ; myObserversArray) 130 { 131 observer.onNext(value); 132 } 133 } 134 135 void complete() 136 { 137 foreach(observer ; myObserversArray) 138 { 139 observer.onCompleted(); 140 } 141 142 observers.clear(); 143 myIsCompleted = true; 144 } 145 146 void error(Exception e) 147 { 148 foreach(observer ; myObserversArray) 149 { 150 observer.onError(e); 151 } 152 } 153 154 override Subscription subscribe(Observer!T observer) 155 { 156 if(isCompleted) 157 { 158 return new Subscription(null); 159 } 160 161 auto subscription = new Subscription({ 162 this.unsubscribe(observer); 163 }); 164 165 observers[observer] = subscription; 166 myObserversArray.insertBack(observer); 167 168 return subscription; 169 } 170 } 171 172 unittest 173 { 174 // Observable 175 auto test1 = new Subject!int; 176 auto test1var = 10; 177 test1.then!int((v) { 178 test1var = v; 179 }, 180 (e) 181 { 182 test1var = -1; 183 }, 184 { 185 test1var = -100; 186 }); 187 assert(test1var == 10); 188 189 test1.next(15); 190 assert(test1var == 15); 191 192 test1.next(32); 193 assert(test1var == 32); 194 195 assert(!test1.isCompleted); 196 test1.complete(); 197 assert(test1var == -100); 198 assert(test1.isCompleted); 199 } 200 201 // A Promise is an object representing a observable value that will be resolved in the future. 202 // As an observable it will yield a single value and switch to completed state. 203 abstract class Promise(T) : Observable!T 204 { 205 private T myResolvedValue; 206 207 @property T value() { return myResolvedValue; } 208 209 override Subscription subscribe(Observer!T observer) 210 { 211 if(isCompleted) 212 { 213 observer.onNext(value); 214 return new Subscription({}); 215 } 216 217 auto subscription = new Subscription({ 218 this.unsubscribe(observer); 219 }); 220 221 observers[observer] = subscription; 222 myObserversArray.insertBack(observer); 223 224 return subscription; 225 } 226 } 227 228 final class Deferred(T) : Promise!T 229 { 230 void resolve(T value) 231 { 232 myIsCompleted = true; 233 foreach(observer; myObserversArray) 234 { 235 observer.onNext(value); 236 } 237 238 foreach(observer; myObserversArray) 239 { 240 observer.onCompleted(); 241 } 242 243 observers.clear(); 244 } 245 246 void error(Exception e) 247 { 248 myIsCompleted = true; 249 foreach(observer; myObserversArray) 250 { 251 observer.onError(e); 252 } 253 254 foreach(observer; myObserversArray) 255 { 256 observer.onCompleted(); 257 } 258 259 observers.clear(); 260 } 261 } 262 263 Subscription then(T)(Observable!T self, Action!T action) 264 { 265 return self.subscribe(new Observer!T(action)); 266 } 267 268 Subscription then(T)(Observable!T self, Action!T action, Action!Exception error) 269 { 270 self.subscribe(new Observer!T(action, error)); 271 } 272 273 Subscription then(T)(Observable!T self, Action!T action, Action!Exception error, Delegate complete) 274 { 275 return self.subscribe(new Observer!T(action, error, complete)); 276 } 277 278 unittest 279 { 280 // Promise 281 auto test1 = new Deferred!int; 282 auto test1var = 10; 283 test1.then!int((v) { 284 test1var = v; 285 }); 286 assert(test1var == 10); 287 assert(!test1.isCompleted); 288 289 test1.resolve(15); 290 assert(test1var == 15); 291 assert(test1.isCompleted); 292 } 293 294 Observable!T merge(T)(Observable!T[] inputs ...) 295 { 296 auto output = new Subject!T; 297 298 // Intialize a copy of the observables 299 auto alive_cnt = inputs.length; 300 foreach(input ; inputs) 301 { 302 // We subscribe and forward next() and error() events 303 input.then!T((v) { 304 output.next(v); 305 }, 306 (e) { 307 output.error(e); 308 }, 309 { 310 // On complete(), we test if this is the last observable alive 311 if(--alive_cnt == 0) 312 { 313 output.complete(); 314 } 315 }); 316 } 317 318 return output; 319 } 320 321 unittest 322 { 323 auto obs1 = new Subject!int; 324 auto obs2 = new Subject!int; 325 auto merged = merge(obs1, obs2); 326 327 auto received = 0; 328 merged.then!int((v) { 329 received = v; 330 }); 331 332 assert(received == 0); 333 334 obs1.next(10); 335 assert(received == 10); 336 assert(!merged.isCompleted); 337 338 obs2.next(20); 339 assert(received == 20); 340 assert(!merged.isCompleted); 341 342 obs2.complete(); 343 assert(!merged.isCompleted); 344 345 obs1.complete(); 346 assert(merged.isCompleted); 347 }