1+ #include " Client.h"
2+ #include < thread>
3+ #include < chrono>
4+
5+ Client::Client (bool nblock): nblock_(nblock) {
6+ sockfd_ = socket (AF_INET, SOCK_STREAM, 0 );
7+ if (nblock) {
8+ int optval;
9+ socklen_t optlen = sizeof (optval);
10+ if (setsockopt (sockfd_, SOL_SOCKET, SOCK_NONBLOCK, (const void *)&optval, sizeof (optval)) < 0 )
11+ perror (" setsocketopt for SOCK_NONBLOCK failed:" );
12+ }
13+
14+ poller_.reset (new Epoller ());
15+ }
16+
17+ int Client::connect (const char * ip, int32_t port) {
18+ struct sockaddr_in servaddr;
19+ bzero (&servaddr,sizeof (servaddr));
20+ servaddr.sin_family = AF_INET;
21+ servaddr.sin_port = htons (port);
22+ inet_pton (AF_INET, ip, &servaddr.sin_addr );
23+
24+ int optval;
25+ socklen_t optlen = sizeof (optval);
26+ int res = ::connect (sockfd_, (struct sockaddr *)&servaddr, sizeof (servaddr));
27+ if (res == -1 && errno == EINPROGRESS) {
28+ getsockopt (sockfd_, SOL_SOCKET, SO_ERROR, reinterpret_cast <void *>(&optval), &optlen);
29+ if (optval == 0 ) res = 0 ;
30+ else errno = optval;
31+ perror (" connect error: " );
32+ }
33+ return res;
34+ }
35+
36+ void Client::send (char * buf) {
37+ (void *)buf;
38+ handle_write (sockfd_);
39+ }
40+
41+ void Client::loop () {
42+ poller_->add_event (sockfd_, EPOLLOUT, -1 );
43+
44+ for (;;) {
45+ if (server_closed_) {
46+ printf (" server closed\n " );
47+ break ;
48+ }
49+
50+ std::vector<Response> resp = poller_->wait ();
51+ for (const auto & r : resp) {
52+ if (r.fd == sockfd_ && r.event & EPOLLIN)
53+ handle_read (r.fd );
54+ else if (r.fd == sockfd_ && r.event & EPOLLOUT)
55+ handle_write (r.fd );
56+ }
57+ }
58+ }
59+
60+ void Client::handle_write (int fd) {
61+ static char buf[1024 ] = {0 };
62+ snprintf (buf, 1024 , " i am client %d! \n " , fd);
63+ int32_t msg_len = strlen (buf);
64+ char * buffer = new char [msg_len + sizeof (int32_t )];
65+ char * buffer_ori = buffer;
66+ memcpy (buffer, &msg_len , sizeof (int32_t ));
67+ buffer += sizeof (int32_t );
68+ memcpy (buffer, buf, msg_len);
69+
70+ int32_t len = msg_len + sizeof (int32_t );
71+ while (len > 0 )
72+ {
73+ int nwrite = write (fd, buffer_ori, len);
74+ if (nwrite == -1 )
75+ {
76+ perror (" write error would close session:" );
77+ close (fd);
78+ break ;
79+ }
80+ len -= nwrite;
81+ buffer_ori += nwrite;
82+ }
83+ poller_->modify_event (sockfd_, EPOLLIN);
84+ }
85+
86+
87+ void Client::handle_read (int fd) {
88+ static const int32_t MAXSIZE = 1024 ;
89+ static char buf[1024 ] = {0 };
90+ int32_t header_len = sizeof (int32_t );
91+ // read msg header
92+ bool read_error = false ;
93+ char * header_buf = new char [header_len];
94+ char * header_buf_origin = header_buf;
95+ while (header_len > 0 )
96+ {
97+ int nread = read (fd, buf, header_len);
98+ if (nread == -1 )
99+ {
100+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
101+ std::this_thread::sleep_for (std::chrono::milliseconds (5 ));
102+ continue ;
103+ }
104+ else
105+ {
106+ perror (" read header error:" );
107+ read_error = true ;
108+ break ;
109+ }
110+ }
111+ else if (nread == 0 )
112+ {
113+ server_closed_ = true ;
114+ read_error = true ;
115+ break ;
116+ }
117+ else
118+ {
119+ if (nread < header_len)
120+ {
121+ memcpy (header_buf, buf, nread);
122+ header_len -= nread;
123+ header_buf += nread;
124+ }
125+ else
126+ {
127+ memcpy (header_buf, buf, nread);
128+ break ;
129+ }
130+ }
131+ }
132+ printf (" msg len: %d\n " , *(int32_t *)header_buf_origin);
133+
134+ // read msg data
135+ if (!read_error)
136+ {
137+ int32_t len = *(int32_t *)header_buf_origin;
138+ char * pbuffer = new char [len];
139+ char * buffer_origin = pbuffer;
140+ int readed = 0 ;
141+ while (len > MAXSIZE)
142+ {
143+ readed = read (fd, buf, MAXSIZE);
144+ if (readed == 0 )
145+ {
146+ perror (" read msg error:" );
147+ read_error = true ;
148+ break ;
149+ }
150+ else if (readed == -1 )
151+ {
152+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
153+ std::this_thread::sleep_for (std::chrono::milliseconds (5 ));
154+ continue ;
155+ }
156+ else
157+ {
158+ perror (" read msg error:" );
159+ read_error = true ;
160+ break ;
161+ }
162+ }
163+ memcpy (pbuffer, buf, readed);
164+ pbuffer += readed;
165+ len -= readed;
166+ }
167+ while (!read_error)
168+ {
169+ readed = read (fd, buf, len);
170+ if (readed == 0 )
171+ {
172+ server_closed_ = true ;
173+ read_error = true ;
174+ break ;
175+ }
176+ else if (readed == -1 )
177+ {
178+ if (errno == EAGAIN || errno == EWOULDBLOCK)
179+ continue ;
180+ else
181+ {
182+ perror (" read msg error:" );
183+ read_error = true ;
184+ break ;
185+ }
186+ }
187+ memcpy (pbuffer, buf, readed);
188+ pbuffer += readed;
189+ len -= readed;
190+ if (len <= 0 ) break ;
191+ }
192+ printf (" recv msg: %s\n " , buffer_origin);
193+ delete [] buffer_origin;
194+
195+ // 修改描述符对应的事件,由读改为写(为何每次都要修改???)
196+ if (!read_error)
197+ poller_->modify_event (fd, EPOLLOUT);
198+ }
199+ delete [] header_buf_origin;
200+ if (read_error)
201+ {
202+ poller_->delete_event (fd, EPOLLIN);
203+ close (fd);
204+ }
205+
206+ }
0 commit comments