目前SockUtil::connect函数内完成套接字创建和端口绑定,然后立即连接到远程目的ip和端口,但有时候这两件事需要分开进行,比如在gb28181播放协商的时候,下级在收到上级invite请求,返回200ok的sdp报文中附带发流源ip和端口,有些上级加了安全边界设备,需要核对下级的源ip端口才允许下级连接并发送码流数据,这种情况就需要做三个步骤:1、创建并绑定本地端口;2、向上级发送发流ip和端口并等待上级ack;3、连接远程目的ip和端口。具体实现代码:
bool Socket::createAndBindSock(bool udpOrTcp, uint16_t port, const string &local_ip) {
closeSock();
int fd = SockUtil::createAndBindSock(udpOrTcp, port, local_ip.data());
if (fd == -1)
return false;
SockFD::Ptr sock;
if (udpOrTcp) {
sock = makeSock(fd, SockNum::Sock_UDP);
if (!attachEvent(sock, true))
return false;
}
else
{
// 注意tcp在连接目标成功后才会监听事件
sock = makeSock(fd, SockNum::Sock_TCP);
}
LOCK_GUARD(_mtx_sock_fd);
_sock_fd = sock;
return true;
}
void Socket::connect(const string &url, uint16_t port, onErrCB con_cb_in,
float timeout_sec, const string &local_ip, uint16_t local_port, bool newSock) {
if (newSock) {
// 重置当前socket
closeSock();
}
weak_ptr<Socket> weak_self = shared_from_this();
auto con_cb = [con_cb_in, weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
strong_self->_async_con_cb = nullptr;
strong_self->_con_timer = nullptr;
if (err) {
LOCK_GUARD(strong_self->_mtx_sock_fd);
strong_self->_sock_fd = nullptr;
}
con_cb_in(err);
};
auto async_con_cb = std::make_shared<function<void(int)> >([weak_self, con_cb, newSock](int sock) {
auto strong_self = weak_self.lock();
if (sock == -1 || !strong_self) {
if (!strong_self) {
CLOSE_SOCK(sock);
} else {
con_cb(SockException(Err_dns, get_uv_errmsg(true)));
}
return;
}
bool test = false;
SockFD::Ptr sock_fd = strong_self->getSock();
if (newSock || !sock_fd) {
test = true;
sock_fd = strong_self->makeSock(sock, SockNum::Sock_TCP);
}
weak_ptr<SockFD> weak_sock_fd = sock_fd;
// 监听该socket是否可写,可写表明已经连接服务器成功
int result = strong_self->_poller->addEvent(sock, Event_Write, [weak_self, weak_sock_fd, con_cb](int event) {
auto strong_sock_fd = weak_sock_fd.lock();
auto strong_self = weak_self.lock();
if (strong_sock_fd && strong_self) {
//socket可写事件,说明已经连接服务器成功
strong_self->onConnected(strong_sock_fd, con_cb);
}
});
if (result == -1) {
con_cb(SockException(Err_other, "add event to poller failed when start connect"));
return;
}
if (test) {
// 保存fd
strong_self->setSock(sock_fd);
}
});
auto poller = _poller;
weak_ptr<function<void(int)> > weak_task = async_con_cb;
int sockfd_in = -1;
if (!newSock) {
SockFD::Ptr sock_fd = getSock();
if (sock_fd)
sockfd_in = sock_fd->rawFd();
}
WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, sockfd_in, weak_task, poller]() {
// 阻塞式dns解析放在后台线程执行
int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port, sockfd_in);
poller->async([sock, weak_task]() {
auto strong_task = weak_task.lock();
if (strong_task) {
(*strong_task)(sock);
} else {
CLOSE_SOCK(sock);
}
});
});
//连接超时定时器
_con_timer = std::make_shared<Timer>(timeout_sec, [weak_self, con_cb]() {
con_cb(SockException(Err_timeout, uv_strerror(UV_ETIMEDOUT)));
return false;
}, _poller);
_async_con_cb = async_con_cb;
}
int SockUtil::createAndBindSock(bool udpOrTcp, const uint16_t port, const char* localIp, int af, bool bAsync, bool reusePort) {
int sockfd = -1;
if (udpOrTcp)
sockfd = (int)socket(af, SOCK_DGRAM, IPPROTO_UDP);
else
sockfd = (int)socket(af, SOCK_STREAM, IPPROTO_TCP);
if (sockfd == -1) {
WarnL << "创建套接字失败:" << get_uv_errmsg(true);
return -1;
}
setReuseable(sockfd, reusePort);
setNoSigpipe(sockfd);
setNoBlocked(sockfd, bAsync);
if (!udpOrTcp)
setNoDelay(sockfd);
setSendBuf(sockfd);
setRecvBuf(sockfd);
setCloseWait(sockfd);
setCloExec(sockfd);
if (bindSock(sockfd, localIp, port) == -1) {
close(sockfd);
return -1;
}
return sockfd;
}
int SockUtil::connect(const char *host, uint16_t port, bool bAsync, const char* localIp, uint16_t localPort, int sockfd_in) {
sockaddr addr;
if(!DnsCache::Instance().getDomainIP(host,addr)){
//dns解析失败
return -1;
}
//设置端口号
((sockaddr_in *)&addr)->sin_port = htons(port);
int sockfd = sockfd_in;
if (sockfd < 0)
sockfd = createAndBindSock(false, localPort, localIp, addr.sa_family, bAsync);
if (sockfd < 0) {
WarnL << "创建套接字失败:" << host;
return -1;
}
if (::connect(sockfd, &addr, sizeof(struct sockaddr)) == 0) {
//同步连接成功
return sockfd;
}
if (bAsync && get_uv_error(true) == UV_EAGAIN) {
//异步连接成功
return sockfd;
}
WarnL << "连接主机失败:" << host << " " << port << " " << get_uv_errmsg(true);
close(sockfd);
return -1;
}