2012年10月8日 星期一

Use winsocket to implement the websocket in webkit



#ifndef SocketStreamHandlePrivate_h
#define SocketStreamHandlePrivate_h

#include "SocketStreamHandleBase.h"
#include "Timer.h"
#include <wtf/MessageQueue.h>

#include <Winsock2.h>

namespace WebCore {

class AuthenticationChallenge;
class Credential;
class SocketStreamHandleClient;
class SocketStreamHandlePrivate;
class ReceivedBuffer;

class SocketStreamHandlePrivate
{
public:
    SocketStreamHandlePrivate(SocketStreamHandle*, const KURL&);
    virtual ~SocketStreamHandlePrivate();
    virtual void connect();
    int send(const char* data, int len);
    void closeAll();
    void receivedData();

protected:
    virtual void notifyClientDataReceived(ReceivedBuffer* buffer);
    void notifyClientSocketClosed();
    void notifyClientSocketConnected();
    void socketConnectedTimerFired(Timer<SocketStreamHandlePrivate>*);
    void socketErrorTimerFired(Timer<SocketStreamHandlePrivate>*);
    bool socketHandleReady();

protected:
    SOCKET m_socket;
    SocketStreamHandle* m_streamHandle;
    KURL m_url;
    Timer<SocketStreamHandlePrivate> m_socketConnectedTimer;
    Timer<SocketStreamHandlePrivate> m_socketErrorTimer;
    friend class ReceiveThread;
    ReceiveThread* m_receiveThread;
    bool m_socketStartup;
};


class SocketStreamHandlePrivateOverProxy :public SocketStreamHandlePrivate
{
public:
    SocketStreamHandlePrivateOverProxy(SocketStreamHandle*, const KURL&);
    void connect();
    void notifyClientDataReceived(ReceivedBuffer* buffer);

private:
    void socketConnectProxyTimerFired(Timer<SocketStreamHandlePrivateOverProxy>*);
    Timer<SocketStreamHandlePrivateOverProxy> m_socketConnectProxyTimer;
    bool m_webSocketReady;
};

}

#endif

#include "config.h"

#include "SocketStreamHandle.h"

#include "KURL.h"
#include "Logging.h"
#include "NotImplemented.h"
#include "SocketStreamHandleClient.h"
#include "SocketStreamHandlePrivate.h"
#include "wtf/text/CString.h"
#include "StringBuilder.h"
#include "wtf/text/WTFString.h"
#include "wtf/HashSet.h"
#include "CustomEventHandler.h"
#include "CustomEventType.h"
#include "PassOwnPtr.h"

#include <ws2tcpip.h>

#define MAX_SOCKET_BUFFER_LEN 1024

namespace WebCore {

static String proxyName;
static String proxyPort;
const String connectedResponseString("HTTP/1.1 200 OK");
static int WSACleanupReferenceCount = 0;
static HashSet<SocketStreamHandlePrivate*> streamHandlePrivateObjectSet;

static bool  initSocket()
{
    if (WSACleanupReferenceCount != 0)
    {
        WSACleanupReferenceCount ++;
    }
    else
    {
        WSADATA wsd;
        if (WSAStartup(MAKEWORD(2, 2), &wsd) != 0)
        {
            return false;
        }
        WSACleanupReferenceCount ++;
    }
    return true;
}

static void cleanupSocket()
{
    if (WSACleanupReferenceCount > 0)
        -- WSACleanupReferenceCount;
    else
        WSACleanup();
}

static SOCKET connectThroughProxy()
{
    SOCKET hSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    if (hSocket == INVALID_SOCKET)
        return hSocket;

    sockaddr_in proxyAddr;
    proxyAddr.sin_family = AF_INET;
    proxyAddr.sin_addr.s_addr = inet_addr(proxyName.ascii().data());
    proxyAddr.sin_port = htons(proxyPort.toInt());

    // Connect to the proxy.
    int succ = connect(hSocket, (SOCKADDR *)&proxyAddr, sizeof(proxyAddr));
    if (succ == SOCKET_ERROR)
    {
        closesocket(hSocket);
        hSocket = INVALID_SOCKET;
    }
    return hSocket;
}

static SOCKET connectDirectly(const KURL& url)
{
    SOCKET hSocket;
    ADDRINFO  *result = NULL, *ptr = NULL, hints;

    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    String port = String::number((url.port()>0)?url.port():80);

    //Resolve the server address and port
    int succ = getaddrinfo(url.host().utf8().data(), port.ascii().data(), &hints, &result);
    if ( succ != 0 )
        return INVALID_SOCKET;

    // Attempt to connect to an address until one succeeds
    for(ptr=result; ptr != NULL ;ptr=ptr->ai_next)
    {
        // Create a SOCKET for connecting to server
        hSocket = socket(ptr->ai_family, ptr->ai_socktype,ptr->ai_protocol);

        if (hSocket == INVALID_SOCKET)
            return hSocket;

        // Connect to server.
        succ = connect( hSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
        if (SOCKET_ERROR == succ)
        {
            closesocket(hSocket);
            hSocket = INVALID_SOCKET;
        }
        else
        {
            break;
        }
    }
    freeaddrinfo(result);
    return hSocket;
}

static bool dataArrived(SOCKET hSocket)
{
    fd_set fdread;
    int ret;
    struct timeval tv;
    FD_ZERO(&fdread);
    FD_SET(hSocket, &fdread);
    ret = select(0, &fdread, NULL, NULL, NULL);
    return ret > 0;
}

static bool isUseProxy()
{
    return proxyName.length() != 0 && proxyPort.toInt() > 0;
}

static bool sendConnectCommand(SOCKET hSocket, const KURL& url)
{
    int sentSize = 0;
    if (INVALID_SOCKET != hSocket)
    {
        StringBuilder builder;
        builder.append("CONNECT ");
        builder.append(url.host().utf8().data());
        builder.append(":");
        builder.append(String::number((url.port()>0)?url.port():80));
        builder.append(" HTTP/1.1\r\n");
        builder.append("HOST: ");
        builder.append(url.host().utf8().data());
        builder.append("\r\n\r\n");

        sentSize = ::send(hSocket, builder.toString().utf8().data(), builder.length(), 0);
    }
    return sentSize > 0;
}

//////----ReceivedBuffer-------------------------------------------------------
class ReceivedBuffer :public Noncopyable
{
public:
    ReceivedBuffer()
    {
        m_buffer = new char[MAX_SOCKET_BUFFER_LEN];
        memset(m_buffer, '\0', MAX_SOCKET_BUFFER_LEN);
        m_bufferUsed = 0;
    }

    ReceivedBuffer::~ReceivedBuffer()
    {
        delete []m_buffer;
    }

    inline char* ReceivedBuffer::getBuffer()
    {
        return m_buffer;
    }

    inline int ReceivedBuffer::getMaxLength()
    {
        return MAX_SOCKET_BUFFER_LEN;
    }

    inline void ReceivedBuffer::setBufferUsed(int len)
    {
        m_bufferUsed = len;
    }

    inline int ReceivedBuffer::getLength()
    {
        return m_bufferUsed;
    }

private:
    char* m_buffer;
    int m_bufferUsed;
};

//////---- Event Callback-------------------------------------------
static LRESULT CALLBACK PostEventCallBack(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
    SocketStreamHandlePrivate* handle = (SocketStreamHandlePrivate*)lParam;

    if (streamHandlePrivateObjectSet.contains(handle))
    {
        handle->receivedData();
    }
    return 0;
}

//////----ReceiveThread-------------------------------------------------------
class ReceiveThread :public Noncopyable
{
public:
    ReceiveThread::ReceiveThread(SocketStreamHandlePrivate* streamHandlePrivate)
        :m_streamHandlePrivate(streamHandlePrivate),m_threadID(0){}

    ReceiveThread::~ReceiveThread(){}

    bool ReceiveThread::start()
    {
        ASSERT(isMainThread());
        if (!m_threadID)
            m_threadID = createThread(ReceiveThread::threadEntryPointCallback, this, "WebCore: WebSocket");
        return m_threadID;
    }

    void terminate()
    {
        ASSERT(isMainThread());
        if (!m_threadID)
            return;

        void* returnValue;
        waitForThreadCompletion(m_threadID, &returnValue);
        m_threadID = 0;
    }

    static void* ReceiveThread::threadEntryPointCallback(void* thread)
    {
        return static_cast<ReceiveThread*>(thread)->threadEntryPoint();
    }

    void* ReceiveThread::threadEntryPoint()
    {
        SOCKET hSocket = m_streamHandlePrivate->m_socket;
        while(dataArrived(hSocket))
        {
            ReceivedBuffer* buffer = new ReceivedBuffer;
            int receivedSize = recv(hSocket, buffer->getBuffer(), buffer->getMaxLength(), 0);

            if (receivedSize <= 0)
                break;

            buffer->setBufferUsed(receivedSize);
            m_queue.append(buffer);

            CustomEventHandler::PostCustomEvent(
                Custom_EV_WebSocket,
                NULL,
                0,
                0,
                (LPARAM)m_streamHandlePrivate,
                &(PostEventCallBack));
        }
        return NULL;
    }

    ThreadIdentifier m_threadID;
    MessageQueue<ReceivedBuffer> m_queue;
    SocketStreamHandlePrivate* m_streamHandlePrivate;
};

//////----SocketStreamHandlePrivate-----------------------------------------
bool SocketStreamHandlePrivate::socketHandleReady()
{
    return m_socket != INVALID_SOCKET;
}

SocketStreamHandlePrivate::SocketStreamHandlePrivate(SocketStreamHandle* streamHandle, const KURL& url)
:m_receiveThread(NULL), m_socketStartup(false),m_socket(INVALID_SOCKET),
m_streamHandle(streamHandle),
m_socketConnectedTimer(this,&SocketStreamHandlePrivate::socketConnectedTimerFired),
m_socketErrorTimer(this, &SocketStreamHandlePrivate::socketErrorTimerFired)
{
    m_url = url.copy();
    m_receiveThread = new ReceiveThread(this);
    streamHandlePrivateObjectSet.add(this);
}

SocketStreamHandlePrivate::~SocketStreamHandlePrivate()
{
    streamHandlePrivateObjectSet.remove(this);
    closeAll();
    if (m_receiveThread != NULL)
    {
        m_receiveThread->terminate();
        delete m_receiveThread;
    }
}

void SocketStreamHandlePrivate::connect()
{
    if (!initSocket())
        return;
    m_socketStartup = true;

    m_socket = connectDirectly(m_url);

    if (INVALID_SOCKET == m_socket)
    {
        LOG(Network, "SocketStreamHandlePrivate connect to websocket server fail.");
        m_socketErrorTimer.startOneShot(0);
        return;
    }

    m_receiveThread->start();
    m_socketConnectedTimer.startOneShot(0);
}

void SocketStreamHandlePrivate::notifyClientSocketClosed()
{
    if (!m_streamHandle || !m_streamHandle->client())
        return;
    SocketStreamHandle* streamHandle = m_streamHandle;
    m_streamHandle = 0;
    // This following call deletes _this_. Nothing should be after it.
    streamHandle->client()->didClose(streamHandle);
}

void SocketStreamHandlePrivate::notifyClientDataReceived(ReceivedBuffer* buffer)
{
    if (!m_streamHandle || !m_streamHandle->client())
        return;
    m_streamHandle->client()->didReceiveData(m_streamHandle, buffer->getBuffer(), buffer->getLength());
}

void SocketStreamHandlePrivate::notifyClientSocketConnected()
{
    if (!m_streamHandle || !m_streamHandle->client())
        return;

    m_streamHandle->m_state = SocketStreamHandleBase::Open;
    m_streamHandle->client()->didOpen(m_streamHandle);
}

void SocketStreamHandlePrivate::closeAll()
{
    if (m_socket != INVALID_SOCKET)
    {
        closesocket(m_socket);
        m_socket = INVALID_SOCKET;
    }

    if (m_socketStartup)
    {
        cleanupSocket();
        m_socketStartup = false;
    }

    notifyClientSocketClosed();
}

void SocketStreamHandlePrivate::socketConnectedTimerFired(Timer<SocketStreamHandlePrivate>*)
{
    notifyClientSocketConnected();
}

void SocketStreamHandlePrivate::socketErrorTimerFired(Timer<SocketStreamHandlePrivate>*)
{
    closeAll();
}

int SocketStreamHandlePrivate::send(const char* data, int len)
{
    if (!socketHandleReady())
        return 0;

    int sentSize = ::send( m_socket, data, len, 0 );
    if (sentSize == SOCKET_ERROR)
    {
        closeAll();
        return 0;
    }
    return sentSize;
}

void SocketStreamHandlePrivate::receivedData()
{
    OwnPtr<ReceivedBuffer> buffer = m_receiveThread->m_queue.waitForMessage();
    notifyClientDataReceived(buffer.get());
}

//////----SocketStreamHandlePrivateOverProxy-----------------------------------------
SocketStreamHandlePrivateOverProxy::SocketStreamHandlePrivateOverProxy(SocketStreamHandle* streamHandle, const KURL& url)
    :SocketStreamHandlePrivate(streamHandle, url),m_webSocketReady(false),
    m_socketConnectProxyTimer(this, &SocketStreamHandlePrivateOverProxy::socketConnectProxyTimerFired)
{

}

void SocketStreamHandlePrivateOverProxy::connect()
{
    if (!initSocket())
        return;
    m_socketStartup = true;

    m_socket = connectThroughProxy();
    bool succ = sendConnectCommand(m_socket, m_url);
    if(!succ && INVALID_SOCKET != m_socket)
    {
        LOG(Network, "SocketStreamHandlePrivate sentConnectCommand fail.");
        m_socketErrorTimer.startOneShot(0);
        return;
    }
    m_receiveThread->start();

    m_socketConnectProxyTimer.startOneShot(20);
}

void SocketStreamHandlePrivateOverProxy::socketConnectProxyTimerFired(Timer<SocketStreamHandlePrivateOverProxy>*)
{
    if (!m_webSocketReady)
        closeAll();
}

void SocketStreamHandlePrivateOverProxy::notifyClientDataReceived(ReceivedBuffer* buffer)
{
    if (!m_streamHandle || !m_streamHandle->client())
        return;

    if (m_webSocketReady)
        m_streamHandle->client()->didReceiveData(m_streamHandle, buffer->getBuffer(), buffer->getLength());
    else
    {
        String packet(buffer->getBuffer());
        if (packet.contains(connectedResponseString))
        {
            m_webSocketReady = true;
            // Notify that client the connection have already established
            m_socketConnectedTimer.startOneShot(0);
        }
    }
}

//////----SocketStreamHandle-----------------------------------------
SocketStreamHandle::SocketStreamHandle(const KURL& url, SocketStreamHandleClient* client)
    : SocketStreamHandleBase(url, client)
{
    LOG(Network, "SocketStreamHandle %p new client %p", this, m_client);

    if (!isUseProxy())
        m_p = new SocketStreamHandlePrivate(this, url);
    else
        m_p = new SocketStreamHandlePrivateOverProxy(this, url);

    m_p->connect();
}

SocketStreamHandle::~SocketStreamHandle()
{
    LOG(Network, "SocketStreamHandle %p delete", this);
    setClient(0);
    delete m_p;
}

void SocketStreamHandle::setHttpProxy(const String& name, const String& port)
{
    proxyName = name;
    proxyPort = port;
}

int SocketStreamHandle::platformSend(const char* data, int len)
{
    LOG(Network, "SocketStreamHandle %p platformSend", this);
    return m_p->send(data, len);
}

void SocketStreamHandle::platformClose()
{
    LOG(Network, "SocketStreamHandle %p platformClose", this);
    m_p->closeAll();
}

void SocketStreamHandle::didReceiveAuthenticationChallenge(const AuthenticationChallenge&)
{
    notImplemented();
}

void SocketStreamHandle::receivedCredential(const AuthenticationChallenge&, const Credential&)
{
    notImplemented();
}

void SocketStreamHandle::receivedRequestToContinueWithoutCredential(const AuthenticationChallenge&)
{
    notImplemented();
}

void SocketStreamHandle::receivedCancellation(const AuthenticationChallenge&)
{
    notImplemented();
}

} // namespace WebCore



沒有留言:

張貼留言