BastEt 阅读(3189) 评论(0)
extern "C" void evhttp_get_request(struct evhttp *, evutil_socket_t, struct sockaddr *, ev_socklen_t);
class MultiEventHttp
{
public:
MultiEventHttp()
{
}
void create(const char *listenaddr,int listenport,int ncount,int ncpucount)
{
_pos=0;

evthread_use_windows_threads();
event_config *base_cfg=event_config_new();
event_config_set_flag(base_cfg,EVENT_BASE_FLAG_STARTUP_IOCP);
event_config_set_num_cpus_hint(base_cfg,ncpucount);


struct timeval tv;
evutil_timerclear(&tv);
tv.tv_sec = 7200;
_timeouts.resize(ncount);
for(int i=0;i {
event_base *base=event_base_new_with_config(base_cfg);
evhttp *http = evhttp_new(base);

event &curtimeout=_timeouts[i];
event_assign(&curtimeout, base, -1, EV_PERSIST, &MultiEventHttp::timeout_cb, (void*) &curtimeout);
event_add(&curtimeout, &tv);
_ev_bases.push_back(base);
_ev_https.push_back(http);
}
evhttp_bound_socket *listen_socket=evhttp_bind_socket_with_handle(_ev_https[0],listenaddr,listenport);
evconnlistener_set_cb(evhttp_bound_socket_get_listener(listen_socket), MultiEventHttp::accept_socket_cb, this);
}

void start()
{
std::vector threads;
for(int i=0;i<_ev_bases.size();i++)
{
boost::thread *pthread=new boost::thread(boost::bind(event_base_dispatch,_ev_bases[i]));
threads.push_back(pthread);
}
for(int i=0;i {
boost::thread *pthread=threads[i];
pthread->join();
delete pthread;
}
}

int set_cb(const char *path,void (*cb)(struct evhttp_request *, void *), void *cb_arg)
{
for(int i=0;i<_ev_https.size();i++)
{
evhttp_set_cb(_ev_https[i],path,cb,cb_arg);
}
return 0;
}
int set_gen_cb(void (*cb)(struct evhttp_request *, void *), void *arg)
{
for(int i=0;i<_ev_https.size();i++)
{
evhttp_set_gencb(_ev_https[i],cb,arg);
}
return 0;
}
event_config *_event_cfg;
std::vector _ev_https;
std::vector _ev_bases;
std::vector _timeouts;

private:
static void timeout_cb(evutil_socket_t fd, short event, void *arg)
{

}
static void accept_socket_cb(struct evconnlistener *listener, evutil_socket_t nfd, struct sockaddr *peer_sa, int peer_socklen, void *arg)
{
MultiEventHttp *pThis = (MultiEventHttp *)arg;
evhttp_get_request(pThis->next_http(), nfd, peer_sa, peer_socklen);
}


evhttp *next_http()
{
evhttp *result=_ev_https[_pos];
_pos++;
if(_pos>=_ev_https.size())
_pos=0;
return result;
}
int _pos;
};

static void dump_request_cb(struct evhttp_request *req, void *arg)
{
const char *cmdtype;
struct evkeyvalq *headers;
struct evkeyval *header;
struct evbuffer *buf;

switch (evhttp_request_get_command(req))
{
case EVHTTP_REQ_GET: cmdtype = "GET"; break;
case EVHTTP_REQ_POST: cmdtype = "POST"; break;
case EVHTTP_REQ_HEAD: cmdtype = "HEAD"; break;
case EVHTTP_REQ_PUT: cmdtype = "PUT"; break;
case EVHTTP_REQ_DELETE: cmdtype = "DELETE"; break;
case EVHTTP_REQ_OPTIONS: cmdtype = "OPTIONS"; break;
case EVHTTP_REQ_TRACE: cmdtype = "TRACE"; break;
case EVHTTP_REQ_CONNECT: cmdtype = "CONNECT"; break;
case EVHTTP_REQ_PATCH: cmdtype = "PATCH"; break;
default: cmdtype = "unknown"; break;
}

printf("Received a %s request for %s\nHeaders:\n",
cmdtype, evhttp_request_get_uri(req));

headers = evhttp_request_get_input_headers(req);
for (header = headers->tqh_first; header;
header = header->next.tqe_next)
{
printf(" %s: %s\n", header->key, header->value);
}

buf = evhttp_request_get_input_buffer(req);
puts("Input data: <<<");
while (evbuffer_get_length(buf))
{
int n;
char cbuf[128];
n = evbuffer_remove(buf, cbuf, sizeof(buf)-1);
fwrite(cbuf, 1, n, stdout);
}
puts(">>>");
evbuffer *evb = evbuffer_new();
evbuffer_add_printf(evb, "fuck you\n");
evhttp_send_reply(req, 200, "OK", evb);
}

int main(int argc, char **argv)
{
WSADATA wsa_data;
WSAStartup(0x0201, &wsa_data);



MultiEventHttp gHttps;
gHttps.create("0.0.0.0",9000,4,1);
gHttps.set_gen_cb(dump_request_cb, NULL);

gHttps.start();

printf("done\n");
return 0;
}

注:这个是多线程不安全的,要多线程安全,应该加入如下代码。

#pragma pack(push,4)
typedef void (*deferred_cb_fn)(struct deferred_cb *, void *);
#define TAILQ_ENTRY(type) \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
}
struct deferred_cb
{
/** Links to the adjacent active (pending) deferred_cb objects. */
TAILQ_ENTRY (deferred_cb) cb_next;
/** True iff this deferred_cb is pending in an event_base. */
unsigned queued : 1;
/** The function to execute when the callback runs. */
deferred_cb_fn cb;
/** The function's second argument. */
void *arg;
};

struct multi_accept_deferred_cb
{
deferred_cb cb;
evhttp *curhttp;
evutil_socket_t nfd;
struct sockaddr peer_sa;
char extbuf[32];//应该够放了吧。
int peer_socklen;
};
#pragma pack(pop)

extern "C"
{
void event_deferred_cb_init(struct deferred_cb *, deferred_cb_fn, void *);
void event_deferred_cb_schedule(struct deferred_cb_queue *, struct deferred_cb *);
struct deferred_cb_queue *event_base_get_deferred_cb_queue(struct event_base *);
};
void new_acceptdeferred_cb_fn(struct deferred_cb *, void *arg)
{
multi_accept_deferred_cb *multicb=(multi_accept_deferred_cb *)arg;

evutil_socket_t nfd=multicb->nfd;
struct sockaddr *peer_sa=&multicb->peer_sa;
int peer_socklen=multicb->peer_socklen;

evhttp_get_request(multicb->curhttp, nfd, peer_sa, peer_socklen);
delete multicb;
}

void MultiEventHttp::accept_socket_cb(struct evconnlistener *listener, evutil_socket_t nfd, struct sockaddr *peer_sa, int peer_socklen, void *arg)
{
MultiEventHttp *pThis = (MultiEventHttp *)arg;
int &pos=pThis->_pos;
evhttp *curhttp=pThis->_ev_https[pos];
event_base *curevent=pThis->_ev_bases[pos];
pos++;
if(pos>=pThis->_ev_https.size())
pos=0;

multi_accept_deferred_cb *newmulti_cb=new multi_accept_deferred_cb();
event_deferred_cb_init(&newmulti_cb->cb,new_acceptdeferred_cb_fn,newmulti_cb);
newmulti_cb->curhttp=curhttp;
newmulti_cb->nfd=nfd;
newmulti_cb->peer_socklen=peer_socklen;
memset(newmulti_cb->extbuf,0,32);
memcpy(&newmulti_cb->peer_sa,peer_sa,peer_socklen);
event_deferred_cb_schedule(event_base_get_deferred_cb_queue(curevent),&newmulti_cb->cb);
}

发表评论
切换编辑模式