rabbitmq 学习-4-初试2

RpcClient,RpcServer同步发送接收消息

Channel.basicPublish,Channel.basicGet异步发送接收消息

本例是一个简单的同步发送消息实例

1,发送端

publicclassPublish{

privatestaticConnectionconnection;

static{

ConnectionParametersparams=newConnectionParameters();

ConnectionFactoryfactory=newConnectionFactory(params);

try{

connection=factory.newConnection("localhost",AMQP.PROTOCOL.PORT);

}catch(IOExceptione){

e.printStackTrace();

}

}

publicstaticvoidmain(String[]args){

try{

Channelchannel=connection.createChannel();

RpcClientrpc=newRpcClient(channel,"exchangeName","routingKey");

byte[]primitiveCall=rpc.primitiveCall("helloworld".getBytes());

System.out.println(newString(primitiveCall));

primitiveCall=rpc.primitiveCall("helloworld2".getBytes());

System.out.println(newString(primitiveCall));

rpc=newRpcClient(channel,"exchangeName","routingKey2");

primitiveCall=rpc.primitiveCall("helloworld2".getBytes());

System.out.println(newString(primitiveCall));

System.out.println("publishsuccess.");

}catch(Exceptione){

e.printStackTrace();

}

}

}

2,接收端

publicclassReceive{

privatestaticConnectionconnection;

static{

ConnectionParametersparams=newConnectionParameters();

ConnectionFactoryfactory=newConnectionFactory(params);

try{

connection=factory.newConnection("localhost",AMQP.PROTOCOL.PORT);

}catch(IOExceptione){

e.printStackTrace();

}

}

publicstaticvoidmain(String[]args){

try{

Channelchannel=connection.createChannel();

System.out.println(channel.toString());

channel.exchangeDeclare("exchangeName","topic");

channel.exchangeDeclare("exchangeName2","topic");

channel.queueDeclare("queueName");

channel.queueBind("queueName","exchangeName","routingKey");

channel.queueBind("queueName","exchangeName","routingKey2");

channel.queueBind("queueName","exchangeName2","routingKey2");

channel.queueBind("queueName","exchangeName2","routingKey");

//queue与exchange是多对多的,可以把同一queue和exchange以多个不同的routing进行bind,这样就会有多个routing,而不是一个,虽然说这些rout是绑定相同的exchange,queue

finalRpcServerrpcServer=newRpcServer(channel,"queueName"){

@Override

publicbyte[]handleCall(byte[]requestBody,AMQP.BasicPropertiesreplyProperties){

System.out.println("receivemsg:"+newString(requestBody));

return"returnmessage".getBytes();

}

};

Runnablemain=newRunnable(){

@Override

publicvoidrun(){

try{

throwrpcServer.mainloop();

}catch(IOExceptione){

thrownewRuntimeException(e);

}

}

};

newThread(main).start();

System.out.println("receivesuccess.");

}catch(IOExceptione){

e.printStackTrace();

}

}

}

相关推荐