Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

Yveltals Blog

AsyncQuery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#define BOOST_THREAD_VERSION 4

#include <bits/stdc++.h>
#include <boost/thread/future.hpp>
using namespace std;

enum class StatusCode : int {
Ok = 0,
NotFound = 1
};
class Status {
public:
StatusCode code_;
string message_;
Status(StatusCode code, const string &message): code_(code), message_(message) {}
};
Status AsyncRead(const string &req, function<void(const string &suffix)> cb) {
boost::future<string> f = boost::async([&req](){
this_thread::sleep_for(chrono::seconds(3));
return string{"value"}; // mock rpc request
});
f.then([cb](boost::future<string> future) {
auto result = future.get();
cb(result);
});
return {StatusCode::Ok, "async read"};
}
Status Read(const string &req, string &resp) {
auto pms = make_shared<boost::promise<Status>>();
boost::future<Status> fut(pms->get_future());
function<void(const string &value)> cb = [&resp, &pms] (const string &value) {
resp = value; // exec something after async finished
pms->set_value({StatusCode::Ok, "pms set value"});
};
auto st = AsyncRead(req, cb); // non-blocking
cout << st.message_ << endl;
st = fut.get(); // blocking
cout << st.message_ << endl;
return {StatusCode::Ok, "read"};
}

int main() {
string resp;
auto st = Read("key", resp);
if (st.code_ == StatusCode::Ok) {
cout << resp << endl;
} else {
cout << st.message_ << endl;
}
}

RecordBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <bits/stdc++.h>
using namespace std;

class RecordBuffer {
public:
explicit RecordBuffer(int size, string file):
file_(file),
buffer_size_(size),
buffer_(new char[size]) {}

~RecordBuffer() {
FlushBuffer();
cout << "Destructed after flushing" << endl;
}
void WriteRecord(const char *data, int size) {
if (!IsAvailable(size)) {
FlushBuffer();
}
WriteToBuffer(data, size);
}
void WriteToBuffer(const char *data, int size) {
assert(IsAvailable(size));
memcpy(buffer_.get() + buffer_used_, data, size);
buffer_used_ += size;
}
void FlushBuffer() {
if (buffer_used_ == 0) return;
ofstream f(file_, ios::app);
f << string(buffer_.get(), buffer_used_) << endl;
this_thread::sleep_for(chrono::seconds(3));
f.close();
buffer_used_ = 0;
}
bool IsAvailable(int size) {
return buffer_size_ - buffer_used_ >= size;
}
unique_ptr<char[]> buffer_;
int buffer_size_;
int buffer_used_{0};
string file_;
};

int main() {
RecordBuffer rb(16, "output.txt");
string s{"zxcvbnm"};
for (int i = 0; i < 20; ++i) {
rb.WriteRecord(s.data(), s.size());
}
}

ThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <bits/stdc++.h>
using namespace std;

class ThreadPool {
public:
ThreadPool(int size = thread::hardware_concurrency());
~ThreadPool();
template<class F, class... Args>
auto submit(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type>;
private:
queue<function<void()>> tasks;
vector<thread> workers;
mutex mtx_;
condition_variable cv_;
bool stop;
};
ThreadPool::ThreadPool(int size) : stop(false) {
for (int i = 0; i < size; i++) {
workers.emplace_back([this] {
while (!stop) {
unique_lock<mutex> lock(mtx_);
cv_.wait(lock, [this] { return !tasks.empty() || stop; });
if (stop) return;
function<void()> task = move(tasks.front());
tasks.pop();
lock.unlock();
task();
}
});
}
}
ThreadPool::~ThreadPool() {
{
unique_lock<mutex> lock(mtx_);
stop = true;
}
cv_.notify_all();
for (auto& worker : workers) {
worker.join();
}
}
template<class F, class... Args>
auto ThreadPool::submit(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type;
auto task = make_shared<packaged_task<return_type()>>(
bind(forward<F>(f), forward<Args>(args)...)
);
future<return_type> result = task->get_future();
{
unique_lock<mutex> lock(mtx_);
if (stop) {
throw runtime_error("submit on stopped ThreadPool");
}
tasks.emplace([task] { (*task)(); });
}
cv_.notify_one();
return result;
}

int main() {
unique_ptr<ThreadPool> pool = make_unique<ThreadPool>();
auto result = pool->submit([] {
this_thread::sleep_for(chrono::seconds(3));
return 100;
});
cout << result.get() << endl;
}