Skip to content

Commit d7c0a6c

Browse files
author
Henning Hasemann
committed
started talking researchers transport
1 parent ffd464f commit d7c0a6c

File tree

2 files changed

+274
-4
lines changed

2 files changed

+274
-4
lines changed

wiselib.testing/algorithms/protocols/reliable_transport/reliable_transport.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ namespace wiselib {
7979

8080
class Endpoint;
8181

82-
//typedef delegate2<bool, Message&, Endpoint&> produce_callback_t;
83-
//typedef delegate2<void, Message&, Endpoint&> consume_callback_t;
84-
//typedef delegate2<void, int, Endpoint&> event_callback_t;
8582
typedef delegate3<bool, int, Message*, Endpoint*> callback_t;
8683

8784
enum SpecialNodeIds {
@@ -766,7 +763,6 @@ namespace wiselib {
766763
size_type sending_channel_idx_;
767764
size_type ack_timer_;
768765
size_type resends_;
769-
//sequence_number_t ack_timeout_sequence_number_;
770766
bool is_sending_;
771767
abs_millis_t send_start_;
772768

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/***************************************************************************
2+
** This file is part of the generic algorithm library Wiselib. **
3+
** Copyright (C) 2008,2009 by the Wisebed (www.wisebed.eu) project. **
4+
** **
5+
** The Wiselib is free software: you can redistribute it and/or modify **
6+
** it under the terms of the GNU Lesser General Public License as **
7+
** published by the Free Software Foundation, either version 3 of the **
8+
** License, or (at your option) any later version. **
9+
** **
10+
** The Wiselib is distributed in the hope that it will be useful, **
11+
** but WITHOUT ANY WARRANTY; without even the implied warranty of **
12+
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the **
13+
** GNU Lesser General Public License for more details. **
14+
** **
15+
** You should have received a copy of the GNU Lesser General Public **
16+
** License along with the Wiselib. **
17+
** If not, see <http://www.gnu.org/licenses/>. **
18+
***************************************************************************/
19+
20+
#ifndef TALKING_RESEARCHERS_TRANSPORT_H
21+
#define TALKING_RESEARCHERS_TRANSPORT_H
22+
23+
namespace wiselib {
24+
25+
/**
26+
* @brief
27+
*
28+
* @ingroup
29+
*
30+
* @tparam
31+
*/
32+
template<
33+
typename OsModel_P,
34+
typename ChannelId_P,
35+
typename Radio_P,
36+
typename Timer_P,
37+
typename Clock_P,
38+
typename Rand_P,
39+
typename Debug_P,
40+
size_t MAX_ENDPOINTS_P
41+
>
42+
class TalkingResearchersTransport : public RadioBase<OsModel_P, typename Radio_P::node_id_t, typename OsModel_P::size_t, typename OsModel_P::block_data_t> {
43+
44+
public:
45+
//{{{ Typedefs & Enums
46+
typedef TalkingResearchersTransport self_type;
47+
typedef self_type* self_pointer_t;
48+
49+
typedef OsModel_P OsModel;
50+
typedef typename OsModel::block_data_t block_data_t;
51+
typedef typename OsModel::size_t size_type;
52+
53+
typedef ChannelId_P ChannelId;
54+
typedef Radio_P Radio;
55+
typedef typename Radio::node_id_t node_id_t;
56+
typedef typename Radio::message_id_t message_id_t;
57+
typedef Timer_P Timer;
58+
typedef Clock_P Clock;
59+
typedef typename Clock::time_t time_t;
60+
typedef Rand_P Rand;
61+
typedef Debug_P Debug;
62+
63+
typedef ReliableTransportMessage<OsModel, ChannelId, Radio> Message;
64+
typedef typename Message::sequence_number_t sequence_number_t;
65+
typedef ::uint32_t abs_millis_t;
66+
67+
class Edge;
68+
typedef Edge Endpoint;
69+
70+
enum SpecialNodeIds {
71+
BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS,
72+
NULL_NODE_ID = Radio::NULL_NODE_ID
73+
};
74+
75+
enum Restrictions {
76+
MAX_MESSAGE_LENGTH = Radio::MAX_MESSAGE_LENGTH - Message::HEADER_SIZE,
77+
MAX_ENDPOINTS = MAX_ENDPOINTS_P,
78+
//RESEND_TIMEOUT = 400 * WISELIB_TIME_FACTOR,
79+
//RESEND_RAND_ADD = 10 * WISELIB_TIME_FACTOR,
80+
//MAX_RESENDS = 1,
81+
//ANSWER_TIMEOUT = 2 * RESEND_TIMEOUT,
82+
};
83+
84+
enum ReturnValues {
85+
SUCCESS = OsModel::SUCCESS, ERR_UNSPEC = OsModel::ERR_UNSPEC
86+
};
87+
88+
enum { npos = (size_type)(-1) };
89+
90+
enum Events {
91+
EVENT_ABORT = 0,
92+
EVENT_OPEN = 1,
93+
EVENT_CLOSE = 2,
94+
EVENT_PRODUCE = 3,
95+
EVENT_CONSUME = 4
96+
};
97+
98+
//}}}
99+
100+
void init() {
101+
// TODO
102+
}
103+
104+
int enable_radio() { return radio_->enable_radio(); }
105+
int disable_radio() { return radio_->disable_radio(); }
106+
107+
int open(const ChannelId& channel) {
108+
if(!local_lock_requested(channel)) {
109+
request_local_lock(channel, radio_->id());
110+
request_remote_lock(channel, radio_->id());
111+
}
112+
}
113+
114+
private:
115+
116+
class Edge {
117+
public:
118+
enum LockState {
119+
LOCK_REQUESTING = 0x01, // waiting for (any) response
120+
LOCK_REQUESTED = 0x02, // lock successfully requested
121+
LOCK_LOCKED = 0x03, // remote side is locked
122+
};
123+
124+
Edge() : used_(0) {
125+
}
126+
127+
void set_local_lock(LockState s) { lock_state_local_ = s; }
128+
LockState local_lock() { return lock_state_local_ ; }
129+
void set_remote_lock(LockState s) { lock_state_remote_ = s; }
130+
LockState remote_lock() { return lock_state_remote_ ; }
131+
132+
private:
133+
ChannelId channel_;
134+
node_id_t source_; // logical source of the (directed) edge
135+
node_id_t next_hop_;
136+
::uint8_t lock_state_local_ : 2;
137+
::uint8_t lock_state_remote_ : 2;
138+
::uint8_t lock_state_;
139+
::uint8_t used_ : 1;
140+
};
141+
142+
bool local_lock_requested(const ChannelId& channel) {
143+
Edge* edge = find_edge(channel, radio_->id());
144+
if(!edge) { return false; }
145+
return edge->local_lock() == Edge::LOCK_REQUESTED;
146+
}
147+
148+
void request_local_lock(const ChannelId& channel, node_id_t source_id) {
149+
Edge *edge = find_edge(channel, source_id);
150+
assert(edge != 0);
151+
edge->set_local_lock(Edge::LOCK_REQUESTED);
152+
}
153+
154+
void request_remote_lock(Edge& edge, bool forward) {
155+
Message msg;
156+
msg.set_source(radio_->id());
157+
msg.set_channel(edge.channel_);
158+
msg.set_forward(forward);
159+
msg.set_request_lock();
160+
161+
edge.state_ = 0;
162+
produce_(msg, edge);
163+
radio_->send(edge.remote_address(), msg.size(), msg.data());
164+
edge_.set_remote_lock(Edge::LOCK_REQUESTING);
165+
// TODO: timer in regular intervals that re-requests all edges
166+
// in state REMOTE_REQUESTING
167+
}
168+
169+
///@name Receiving messages.
170+
///@{
171+
//{{{
172+
173+
void on_receive(...) {
174+
// TODO
175+
176+
if(message.is_ack()) {
177+
// ...
178+
}
179+
180+
switch(message.subtype()) {
181+
case Message::REQUEST_LOCK:
182+
case Message::GRANT_LOCK:
183+
case Message::OPEN:
184+
case Message::CLOSE:
185+
};
186+
187+
// ...
188+
consume_(...);
189+
on_receive_remote_lock(edge);
190+
send_ack(...);
191+
}
192+
193+
void receive_remote_lock(Edge& edge) {
194+
if(edge.local_lock() == Edge::LOCK_LOCKED) {
195+
edge.set_remote_lock(Edge::LOCK_LOCKED);
196+
active_edge_ = &edge;
197+
}
198+
}
199+
200+
void receive_close() {
201+
assert(active_edge_);
202+
203+
active_edge_->set_local_lock(0);
204+
active_edge_->set_remote_lock(0);
205+
206+
top_edge_to_bottom();
207+
}
208+
209+
//}}}
210+
///@}
211+
212+
///@name Timeouts.
213+
///@{
214+
//{{{
215+
216+
void timeout_deadlock_detection(void*) {
217+
// TODO: add abort switch
218+
219+
if(rand_->operator()() < DEADLOCK_RELEASE_THRESHOLD) {
220+
abort_active_locks();
221+
highest_edge_to_top();
222+
}
223+
else {
224+
timer_->set_timer<self_type, &self_type::timeout_deadlock_detection>(TIMEOUT_DEADLOCK_DETECTION, this, 0);
225+
}
226+
}
227+
228+
//}}}
229+
//@}
230+
231+
///@name Edge search & order manipulation.
232+
///@{
233+
//{{{
234+
235+
void highest_edge_to_top() {
236+
// TODO
237+
}
238+
239+
void top_edge_to_bottom() {
240+
// TODO
241+
}
242+
243+
Edge* find_topmost_requested_edge() {
244+
for(size_type i = 0; i < known_edges_; i++) {
245+
assert(edges_[i].used_);
246+
if(edges_[i].requested()) {
247+
return &edges_[i];
248+
}
249+
}
250+
return 0;
251+
}
252+
253+
Edge* find_edge(const ChannelId& channel, node_id_t source_id) {
254+
for(size_type i = 0; i < known_edges_; i++) {
255+
assert(edges_[i].used_);
256+
if(edges_[i].channel_ == channel && edges_[i].source_ = source_id) {
257+
return &edges_[i];
258+
}
259+
}
260+
return 0;
261+
}
262+
263+
//}}}
264+
///@}
265+
266+
Edge edges_[MAX_ENDPOINTS];
267+
size_type known_edges_;
268+
Edge *active_edge_;
269+
270+
}; // TalkingResearchersTransport
271+
}
272+
273+
#endif // TALKING_RESEARCHERS_TRANSPORT_H
274+

0 commit comments

Comments
 (0)