1 module modernui.rx;
2 
3 import modernui.collections;
4 
5 import std.algorithm;
6 import std.range;
7 import std.container.array;
8 
9 struct None
10 {
11 	void[0] dummy;
12 
13 	immutable static None val = {};
14 }
15 
16 alias Action(T) = void delegate(T);
17 alias Delegate = void delegate();
18 alias Func(T,K) = K delegate(T);
19 
20 final class Subscription
21 {
22 	private void delegate() action;
23 
24 	this(void delegate() action)
25 	{
26 		this.action = action;
27 	}
28 
29 	void release()
30 	{
31 		if(action == null) return;
32 
33 		// executes the action
34 		this.action();
35 
36 		// release the reference
37 		this.action = null;
38 	}
39 }
40 
41 final class Observer(T)
42 {
43 	private Action!T onNextCallback;
44 	private Delegate onCompletedCallback;
45 	private Action!Exception onErrorCallback;
46 
47 	this(Action!T nextCallback)
48 	{
49 		this.onNextCallback = nextCallback;
50 		this.onCompletedCallback = null;
51 		this.onErrorCallback = null;
52 	}
53 	
54 	this(Action!T nextCallback, Action!Exception errorCallback)
55 	{
56 		this.onNextCallback = nextCallback;
57 		this.onCompletedCallback = null;
58 		this.onErrorCallback = errorCallback;
59 	}
60 
61 	this(Action!T nextCallback, Action!Exception errorCallback, Delegate completedCallback)
62 	{
63 		this.onNextCallback = nextCallback;
64 		this.onCompletedCallback = completedCallback;
65 		this.onErrorCallback = errorCallback;
66 	}
67 
68 	void onCompleted()
69 	{
70 		if(onCompletedCallback != null) onCompletedCallback();
71 		finalize();
72 	}
73 
74 	void onNext(T value)
75 	{
76 		if(onNextCallback != null) onNextCallback(value);
77 	}
78 
79 	void onError(Exception e)
80 	{
81 		if(onErrorCallback != null) onErrorCallback(e);
82 		finalize();
83 	}
84 
85 	private void finalize()
86 	{
87 		// release references to delegates
88 		onNextCallback = null;
89 		onErrorCallback = null;
90 		onCompletedCallback = null;
91 	}
92 }
93 
94 abstract class Observable(T)
95 {
96 	private Subscription[Observer!T] observers;
97 	private Array!(Observer!T) myObserversArray;
98 	private bool myIsCompleted;
99 
100 	alias ObservedType = T;
101 
102 	@property bool isCompleted() { return myIsCompleted; }
103 	@property bool hasSubscribers() { return observers.length != 0; }
104 
105 	abstract Subscription subscribe(Observer!T observer);
106 
107 	bool unsubscribe(Observer!T observer)
108 	{
109 		if(observer !in observers)
110 		{
111 			return false;
112 		}
113 
114 		auto subscription = observers[observer];
115 		auto result = observers.remove(observer);
116 		auto foundObs = myObserversArray[].find(observer).takeOne;
117 		myObserversArray.linearRemove(foundObs);
118 		subscription.release();
119 		return result;
120 	}
121 }
122 
123 final class Subject(T) : Observable!T
124 {
125 	import std.stdio : writefln;
126 
127 	void next(T value)
128 	{
129 		foreach(observer ; myObserversArray)
130 		{
131 			observer.onNext(value);
132 		}
133 	}
134 
135 	void complete()
136 	{
137 		foreach(observer ; myObserversArray)
138 		{
139 			observer.onCompleted();
140 		}
141 
142 		observers.clear();
143 		myIsCompleted = true;
144 	}
145 
146 	void error(Exception e)
147 	{
148 		foreach(observer ; myObserversArray)
149 		{
150 			observer.onError(e);
151 		}
152 	}
153 
154 	override Subscription subscribe(Observer!T observer)
155 	{
156 		if(isCompleted)
157 		{
158 			return new Subscription(null);
159 		}
160 
161 		auto subscription = new Subscription({
162 			this.unsubscribe(observer);
163 		});
164 
165 		observers[observer] = subscription;
166 		myObserversArray.insertBack(observer);
167 
168 		return subscription;
169 	}
170 }
171 
172 unittest
173 {
174 	// Observable
175 	auto test1 = new Subject!int;
176 	auto test1var = 10;
177 	test1.then!int((v) { 
178 		test1var = v; 
179 	},
180 	(e)
181 	{
182 		test1var = -1;
183 	},
184 	{
185 		test1var = -100;
186 	});
187 	assert(test1var == 10);
188 
189 	test1.next(15);
190 	assert(test1var == 15);
191 
192 	test1.next(32);
193 	assert(test1var == 32);
194 	
195 	assert(!test1.isCompleted);
196 	test1.complete();
197 	assert(test1var == -100);
198 	assert(test1.isCompleted);
199 }
200 
201 // A Promise is an object representing a observable value that will be resolved in the future.
202 // As an observable it will yield a single value and switch to completed state.
203 abstract class Promise(T) : Observable!T
204 {
205 	private T myResolvedValue;
206 
207 	@property T value() { return myResolvedValue; }
208 
209 	override Subscription subscribe(Observer!T observer)
210 	{
211 		if(isCompleted)
212 		{
213 			observer.onNext(value);
214 			return new Subscription({});
215 		}
216 
217 		auto subscription = new Subscription({
218 			this.unsubscribe(observer);
219 		});
220 
221 		observers[observer] = subscription;
222 		myObserversArray.insertBack(observer);
223 
224 		return subscription;
225 	}
226 }
227 
228 final class Deferred(T) : Promise!T
229 {
230 	void resolve(T value)
231 	{
232 		myIsCompleted = true;
233 		foreach(observer; myObserversArray)
234 		{
235 			observer.onNext(value);
236 		}
237 
238 		foreach(observer; myObserversArray)
239 		{
240 			observer.onCompleted();
241 		}
242 
243 		observers.clear();
244 	}
245 
246 	void error(Exception e)
247 	{
248 		myIsCompleted = true;
249 		foreach(observer; myObserversArray)
250 		{
251 			observer.onError(e);
252 		}
253 
254 		foreach(observer; myObserversArray)
255 		{
256 			observer.onCompleted();
257 		}
258 
259 		observers.clear();
260 	}
261 }
262 
263 Subscription then(T)(Observable!T self, Action!T action)
264 {
265 	return self.subscribe(new Observer!T(action));
266 }
267 
268 Subscription then(T)(Observable!T self, Action!T action, Action!Exception error)
269 {
270 	self.subscribe(new Observer!T(action, error));
271 }
272 
273 Subscription then(T)(Observable!T self, Action!T action, Action!Exception error, Delegate complete)
274 {
275 	return self.subscribe(new Observer!T(action, error, complete));
276 }
277 
278 unittest
279 {
280 	// Promise
281 	auto test1 = new Deferred!int;
282 	auto test1var = 10;
283 	test1.then!int((v) { 
284 		test1var = v; 
285 	});
286 	assert(test1var == 10);
287 	assert(!test1.isCompleted);
288 
289 	test1.resolve(15);
290 	assert(test1var == 15);
291 	assert(test1.isCompleted);
292 }
293 
294 Observable!T merge(T)(Observable!T[] inputs ...)
295 {
296 	auto output = new Subject!T;
297 
298 	// Intialize a copy of the observables
299 	auto alive_cnt = inputs.length;
300 	foreach(input ; inputs)
301 	{
302 		// We subscribe and forward next() and error() events
303 		input.then!T((v) {
304 			output.next(v); 
305 		}, 
306 		(e) { 
307 			output.error(e); 
308 		}, 
309 		{
310 			// On complete(), we test if this is the last observable alive
311 			if(--alive_cnt == 0)
312 			{
313 				output.complete();
314 			}
315 		});
316 	}
317 
318 	return output;
319 }
320 
321 unittest
322 {
323 	auto obs1 = new Subject!int;
324 	auto obs2 = new Subject!int;
325 	auto merged = merge(obs1, obs2);
326 
327 	auto received = 0;
328 	merged.then!int((v) {
329 		received = v;
330 	});
331 
332 	assert(received == 0);
333 
334 	obs1.next(10);
335 	assert(received == 10);
336 	assert(!merged.isCompleted);
337 
338 	obs2.next(20);
339 	assert(received == 20);
340 	assert(!merged.isCompleted);
341 
342 	obs2.complete();
343 	assert(!merged.isCompleted);
344 
345 	obs1.complete();
346 	assert(merged.isCompleted);
347 }