#ifndef SocketStreamHandlePrivate_h
#define SocketStreamHandlePrivate_h
#include "SocketStreamHandleBase.h"
#include "Timer.h"
#include <wtf/MessageQueue.h>
#include <Winsock2.h>
namespace WebCore {
class AuthenticationChallenge;
class Credential;
class SocketStreamHandleClient;
class SocketStreamHandlePrivate;
class ReceivedBuffer;
class SocketStreamHandlePrivate
{
public:
SocketStreamHandlePrivate(SocketStreamHandle*, const KURL&);
virtual ~SocketStreamHandlePrivate();
virtual void connect();
int send(const char* data, int len);
void closeAll();
void receivedData();
protected:
virtual void notifyClientDataReceived(ReceivedBuffer* buffer);
void notifyClientSocketClosed();
void notifyClientSocketConnected();
void socketConnectedTimerFired(Timer<SocketStreamHandlePrivate>*);
void socketErrorTimerFired(Timer<SocketStreamHandlePrivate>*);
bool socketHandleReady();
protected:
SOCKET m_socket;
SocketStreamHandle* m_streamHandle;
KURL m_url;
Timer<SocketStreamHandlePrivate> m_socketConnectedTimer;
Timer<SocketStreamHandlePrivate> m_socketErrorTimer;
friend class ReceiveThread;
ReceiveThread* m_receiveThread;
bool m_socketStartup;
};
class SocketStreamHandlePrivateOverProxy :public SocketStreamHandlePrivate
{
public:
SocketStreamHandlePrivateOverProxy(SocketStreamHandle*, const KURL&);
void connect();
void notifyClientDataReceived(ReceivedBuffer* buffer);
private:
void socketConnectProxyTimerFired(Timer<SocketStreamHandlePrivateOverProxy>*);
Timer<SocketStreamHandlePrivateOverProxy> m_socketConnectProxyTimer;
bool m_webSocketReady;
};
}
#endif
#include "config.h"
#include "SocketStreamHandle.h"
#include "KURL.h"
#include "Logging.h"
#include "NotImplemented.h"
#include "SocketStreamHandleClient.h"
#include "SocketStreamHandlePrivate.h"
#include "wtf/text/CString.h"
#include "StringBuilder.h"
#include "wtf/text/WTFString.h"
#include "wtf/HashSet.h"
#include "CustomEventHandler.h"
#include "CustomEventType.h"
#include "PassOwnPtr.h"
#include <ws2tcpip.h>
#define MAX_SOCKET_BUFFER_LEN 1024
namespace WebCore {
static String proxyName;
static String proxyPort;
const String connectedResponseString("HTTP/1.1 200 OK");
static int WSACleanupReferenceCount = 0;
static HashSet<SocketStreamHandlePrivate*> streamHandlePrivateObjectSet;
static bool initSocket()
{
if (WSACleanupReferenceCount != 0)
{
WSACleanupReferenceCount ++;
}
else
{
WSADATA wsd;
if (WSAStartup(MAKEWORD(2, 2), &wsd) != 0)
{
return false;
}
WSACleanupReferenceCount ++;
}
return true;
}
static void cleanupSocket()
{
if (WSACleanupReferenceCount > 0)
-- WSACleanupReferenceCount;
else
WSACleanup();
}
static SOCKET connectThroughProxy()
{
SOCKET hSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (hSocket == INVALID_SOCKET)
return hSocket;
sockaddr_in proxyAddr;
proxyAddr.sin_family = AF_INET;
proxyAddr.sin_addr.s_addr = inet_addr(proxyName.ascii().data());
proxyAddr.sin_port = htons(proxyPort.toInt());
// Connect to the proxy.
int succ = connect(hSocket, (SOCKADDR *)&proxyAddr, sizeof(proxyAddr));
if (succ == SOCKET_ERROR)
{
closesocket(hSocket);
hSocket = INVALID_SOCKET;
}
return hSocket;
}
static SOCKET connectDirectly(const KURL& url)
{
SOCKET hSocket;
ADDRINFO *result = NULL, *ptr = NULL, hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
String port = String::number((url.port()>0)?url.port():80);
//Resolve the server address and port
int succ = getaddrinfo(url.host().utf8().data(), port.ascii().data(), &hints, &result);
if ( succ != 0 )
return INVALID_SOCKET;
// Attempt to connect to an address until one succeeds
for(ptr=result; ptr != NULL ;ptr=ptr->ai_next)
{
// Create a SOCKET for connecting to server
hSocket = socket(ptr->ai_family, ptr->ai_socktype,ptr->ai_protocol);
if (hSocket == INVALID_SOCKET)
return hSocket;
// Connect to server.
succ = connect( hSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
if (SOCKET_ERROR == succ)
{
closesocket(hSocket);
hSocket = INVALID_SOCKET;
}
else
{
break;
}
}
freeaddrinfo(result);
return hSocket;
}
static bool dataArrived(SOCKET hSocket)
{
fd_set fdread;
int ret;
struct timeval tv;
FD_ZERO(&fdread);
FD_SET(hSocket, &fdread);
ret = select(0, &fdread, NULL, NULL, NULL);
return ret > 0;
}
static bool isUseProxy()
{
return proxyName.length() != 0 && proxyPort.toInt() > 0;
}
static bool sendConnectCommand(SOCKET hSocket, const KURL& url)
{
int sentSize = 0;
if (INVALID_SOCKET != hSocket)
{
StringBuilder builder;
builder.append("CONNECT ");
builder.append(url.host().utf8().data());
builder.append(":");
builder.append(String::number((url.port()>0)?url.port():80));
builder.append(" HTTP/1.1\r\n");
builder.append("HOST: ");
builder.append(url.host().utf8().data());
builder.append("\r\n\r\n");
sentSize = ::send(hSocket, builder.toString().utf8().data(), builder.length(), 0);
}
return sentSize > 0;
}
//////----ReceivedBuffer-------------------------------------------------------
class ReceivedBuffer :public Noncopyable
{
public:
ReceivedBuffer()
{
m_buffer = new char[MAX_SOCKET_BUFFER_LEN];
memset(m_buffer, '\0', MAX_SOCKET_BUFFER_LEN);
m_bufferUsed = 0;
}
ReceivedBuffer::~ReceivedBuffer()
{
delete []m_buffer;
}
inline char* ReceivedBuffer::getBuffer()
{
return m_buffer;
}
inline int ReceivedBuffer::getMaxLength()
{
return MAX_SOCKET_BUFFER_LEN;
}
inline void ReceivedBuffer::setBufferUsed(int len)
{
m_bufferUsed = len;
}
inline int ReceivedBuffer::getLength()
{
return m_bufferUsed;
}
private:
char* m_buffer;
int m_bufferUsed;
};
//////---- Event Callback-------------------------------------------
static LRESULT CALLBACK PostEventCallBack(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
SocketStreamHandlePrivate* handle = (SocketStreamHandlePrivate*)lParam;
if (streamHandlePrivateObjectSet.contains(handle))
{
handle->receivedData();
}
return 0;
}
//////----ReceiveThread-------------------------------------------------------
class ReceiveThread :public Noncopyable
{
public:
ReceiveThread::ReceiveThread(SocketStreamHandlePrivate* streamHandlePrivate)
:m_streamHandlePrivate(streamHandlePrivate),m_threadID(0){}
ReceiveThread::~ReceiveThread(){}
bool ReceiveThread::start()
{
ASSERT(isMainThread());
if (!m_threadID)
m_threadID = createThread(ReceiveThread::threadEntryPointCallback, this, "WebCore: WebSocket");
return m_threadID;
}
void terminate()
{
ASSERT(isMainThread());
if (!m_threadID)
return;
void* returnValue;
waitForThreadCompletion(m_threadID, &returnValue);
m_threadID = 0;
}
static void* ReceiveThread::threadEntryPointCallback(void* thread)
{
return static_cast<ReceiveThread*>(thread)->threadEntryPoint();
}
void* ReceiveThread::threadEntryPoint()
{
SOCKET hSocket = m_streamHandlePrivate->m_socket;
while(dataArrived(hSocket))
{
ReceivedBuffer* buffer = new ReceivedBuffer;
int receivedSize = recv(hSocket, buffer->getBuffer(), buffer->getMaxLength(), 0);
if (receivedSize <= 0)
break;
buffer->setBufferUsed(receivedSize);
m_queue.append(buffer);
CustomEventHandler::PostCustomEvent(
Custom_EV_WebSocket,
NULL,
0,
0,
(LPARAM)m_streamHandlePrivate,
&(PostEventCallBack));
}
return NULL;
}
ThreadIdentifier m_threadID;
MessageQueue<ReceivedBuffer> m_queue;
SocketStreamHandlePrivate* m_streamHandlePrivate;
};
//////----SocketStreamHandlePrivate-----------------------------------------
bool SocketStreamHandlePrivate::socketHandleReady()
{
return m_socket != INVALID_SOCKET;
}
SocketStreamHandlePrivate::SocketStreamHandlePrivate(SocketStreamHandle* streamHandle, const KURL& url)
:m_receiveThread(NULL), m_socketStartup(false),m_socket(INVALID_SOCKET),
m_streamHandle(streamHandle),
m_socketConnectedTimer(this,&SocketStreamHandlePrivate::socketConnectedTimerFired),
m_socketErrorTimer(this, &SocketStreamHandlePrivate::socketErrorTimerFired)
{
m_url = url.copy();
m_receiveThread = new ReceiveThread(this);
streamHandlePrivateObjectSet.add(this);
}
SocketStreamHandlePrivate::~SocketStreamHandlePrivate()
{
streamHandlePrivateObjectSet.remove(this);
closeAll();
if (m_receiveThread != NULL)
{
m_receiveThread->terminate();
delete m_receiveThread;
}
}
void SocketStreamHandlePrivate::connect()
{
if (!initSocket())
return;
m_socketStartup = true;
m_socket = connectDirectly(m_url);
if (INVALID_SOCKET == m_socket)
{
LOG(Network, "SocketStreamHandlePrivate connect to websocket server fail.");
m_socketErrorTimer.startOneShot(0);
return;
}
m_receiveThread->start();
m_socketConnectedTimer.startOneShot(0);
}
void SocketStreamHandlePrivate::notifyClientSocketClosed()
{
if (!m_streamHandle || !m_streamHandle->client())
return;
SocketStreamHandle* streamHandle = m_streamHandle;
m_streamHandle = 0;
// This following call deletes _this_. Nothing should be after it.
streamHandle->client()->didClose(streamHandle);
}
void SocketStreamHandlePrivate::notifyClientDataReceived(ReceivedBuffer* buffer)
{
if (!m_streamHandle || !m_streamHandle->client())
return;
m_streamHandle->client()->didReceiveData(m_streamHandle, buffer->getBuffer(), buffer->getLength());
}
void SocketStreamHandlePrivate::notifyClientSocketConnected()
{
if (!m_streamHandle || !m_streamHandle->client())
return;
m_streamHandle->m_state = SocketStreamHandleBase::Open;
m_streamHandle->client()->didOpen(m_streamHandle);
}
void SocketStreamHandlePrivate::closeAll()
{
if (m_socket != INVALID_SOCKET)
{
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
if (m_socketStartup)
{
cleanupSocket();
m_socketStartup = false;
}
notifyClientSocketClosed();
}
void SocketStreamHandlePrivate::socketConnectedTimerFired(Timer<SocketStreamHandlePrivate>*)
{
notifyClientSocketConnected();
}
void SocketStreamHandlePrivate::socketErrorTimerFired(Timer<SocketStreamHandlePrivate>*)
{
closeAll();
}
int SocketStreamHandlePrivate::send(const char* data, int len)
{
if (!socketHandleReady())
return 0;
int sentSize = ::send( m_socket, data, len, 0 );
if (sentSize == SOCKET_ERROR)
{
closeAll();
return 0;
}
return sentSize;
}
void SocketStreamHandlePrivate::receivedData()
{
OwnPtr<ReceivedBuffer> buffer = m_receiveThread->m_queue.waitForMessage();
notifyClientDataReceived(buffer.get());
}
//////----SocketStreamHandlePrivateOverProxy-----------------------------------------
SocketStreamHandlePrivateOverProxy::SocketStreamHandlePrivateOverProxy(SocketStreamHandle* streamHandle, const KURL& url)
:SocketStreamHandlePrivate(streamHandle, url),m_webSocketReady(false),
m_socketConnectProxyTimer(this, &SocketStreamHandlePrivateOverProxy::socketConnectProxyTimerFired)
{
}
void SocketStreamHandlePrivateOverProxy::connect()
{
if (!initSocket())
return;
m_socketStartup = true;
m_socket = connectThroughProxy();
bool succ = sendConnectCommand(m_socket, m_url);
if(!succ && INVALID_SOCKET != m_socket)
{
LOG(Network, "SocketStreamHandlePrivate sentConnectCommand fail.");
m_socketErrorTimer.startOneShot(0);
return;
}
m_receiveThread->start();
m_socketConnectProxyTimer.startOneShot(20);
}
void SocketStreamHandlePrivateOverProxy::socketConnectProxyTimerFired(Timer<SocketStreamHandlePrivateOverProxy>*)
{
if (!m_webSocketReady)
closeAll();
}
void SocketStreamHandlePrivateOverProxy::notifyClientDataReceived(ReceivedBuffer* buffer)
{
if (!m_streamHandle || !m_streamHandle->client())
return;
if (m_webSocketReady)
m_streamHandle->client()->didReceiveData(m_streamHandle, buffer->getBuffer(), buffer->getLength());
else
{
String packet(buffer->getBuffer());
if (packet.contains(connectedResponseString))
{
m_webSocketReady = true;
// Notify that client the connection have already established
m_socketConnectedTimer.startOneShot(0);
}
}
}
//////----SocketStreamHandle-----------------------------------------
SocketStreamHandle::SocketStreamHandle(const KURL& url, SocketStreamHandleClient* client)
: SocketStreamHandleBase(url, client)
{
LOG(Network, "SocketStreamHandle %p new client %p", this, m_client);
if (!isUseProxy())
m_p = new SocketStreamHandlePrivate(this, url);
else
m_p = new SocketStreamHandlePrivateOverProxy(this, url);
m_p->connect();
}
SocketStreamHandle::~SocketStreamHandle()
{
LOG(Network, "SocketStreamHandle %p delete", this);
setClient(0);
delete m_p;
}
void SocketStreamHandle::setHttpProxy(const String& name, const String& port)
{
proxyName = name;
proxyPort = port;
}
int SocketStreamHandle::platformSend(const char* data, int len)
{
LOG(Network, "SocketStreamHandle %p platformSend", this);
return m_p->send(data, len);
}
void SocketStreamHandle::platformClose()
{
LOG(Network, "SocketStreamHandle %p platformClose", this);
m_p->closeAll();
}
void SocketStreamHandle::didReceiveAuthenticationChallenge(const AuthenticationChallenge&)
{
notImplemented();
}
void SocketStreamHandle::receivedCredential(const AuthenticationChallenge&, const Credential&)
{
notImplemented();
}
void SocketStreamHandle::receivedRequestToContinueWithoutCredential(const AuthenticationChallenge&)
{
notImplemented();
}
void SocketStreamHandle::receivedCancellation(const AuthenticationChallenge&)
{
notImplemented();
}
} // namespace WebCore
沒有留言:
張貼留言