1. import io.netty.buffer.ByteBuf;
    2. import io.netty.channel.ChannelHandlerContext;
    3. import io.netty.channel.SimpleChannelInboundHandler;
    4. import io.netty.util.CharsetUtil;
    5. import java.util.concurrent.atomic.AtomicInteger;
    6. public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    7. private final int sendNumber;
    8. public MsgPackClientHandler(int sendNumber) {
    9. this.sendNumber = sendNumber;
    10. }
    11. private AtomicInteger counter = new AtomicInteger(0);
    12. /*** 客户端读取到网络数据后的处理*/
    13. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    14. System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
    15. +"] and the counter is:"+counter.incrementAndGet());
    16. }
    17. /*** 客户端被通知channel活跃后,做事*/
    18. @Override
    19. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    20. User[] users = makeUsers();
    21. //发送数据
    22. for(User user:users){
    23. System.out.println("Send user:"+user);
    24. ctx.write(user);
    25. }
    26. ctx.flush();
    27. }
    28. /*** 发生异常后的处理*/
    29. @Override
    30. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    31. cause.printStackTrace();
    32. ctx.close();
    33. }
    34. /*生成用户实体类的数组,以供发送*/
    35. private User[] makeUsers(){
    36. User[] users=new User[sendNumber];
    37. User user =null;
    38. for(int i=0;i<sendNumber;i++){
    39. user=new User();
    40. user.setAge(i);
    41. String userName = "ABCDEFG --->"+i;
    42. user.setUserName(userName);
    43. user.setId("No:"+(sendNumber-i));
    44. user.setUserContact(
    45. new UserContact(userName+"@xiangxue.com","133"));
    46. users[i]=user;
    47. }
    48. return users;
    49. }
    50. }