1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.EventLoopGroup; 13 14 import core.thread; 15 import std.parallelism; 16 17 import kiss.event.EventLoop; 18 19 class EventLoopGroup 20 { 21 this(uint size = (totalCPUs - 1)) 22 { 23 assert(size <= totalCPUs && size >= 0); 24 25 size = size > 0 ? size : 1; 26 foreach (i; 0 .. size) 27 { 28 auto loop = new EventLoop(); 29 _loops[loop] = new Thread(&loop.run); 30 } 31 } 32 33 void start() 34 { 35 if (_started) 36 return; 37 foreach (ref t; _loops.values) 38 { 39 t.start(); 40 } 41 _started = true; 42 } 43 44 void stop() 45 { 46 if (!_started) 47 return; 48 foreach (ref loop; _loops.keys) 49 { 50 loop.stop(); 51 } 52 _started = false; 53 wait(); 54 } 55 56 @property size_t length() 57 { 58 return _loops.length; 59 } 60 61 // void addEventLoop(EventLoop lop) 62 // { 63 // auto loop = new GroupMember(lop); 64 // auto th = new Thread(&loop.start); 65 // _loops[loop] = th; 66 // if (_started) 67 // th.start(); 68 // } 69 70 EventLoop opIndex(size_t index) 71 { 72 return at(index); 73 } 74 75 EventLoop at(size_t index) 76 { 77 auto loops = _loops.keys; 78 auto i = index % cast(size_t) loops.length; 79 return loops[i]; 80 } 81 82 void wait() 83 { 84 foreach (ref t; _loops.values) 85 { 86 t.join(false); 87 } 88 } 89 90 int opApply(scope int delegate(EventLoop) dg) 91 { 92 int ret = 0; 93 foreach (ref loop; _loops.keys) 94 { 95 ret = dg(loop); 96 if (ret) 97 break; 98 } 99 return ret; 100 } 101 102 private EventLoop _mainLoop; 103 104 private: 105 bool _started; 106 107 Thread[EventLoop] _loops; 108 109 }