Skip to main content
Notice removed Canonical answer required by CommunityBot
Bounty Ended with no winning answer by CommunityBot
Notice added Canonical answer required by user1950349
Bounty Started worth 50 reputation by user1950349
added 167 characters in body
Source Link

I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // senderSendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // senderSendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // senderSendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

Now I need to extend my design so that I can send data in three different ways. It is upto user to decide which way he wants to send data to either "sync" or "async".

  • sendToQueueAsync by passing two parameters
  • sendToQueueSync by passing two parameters
  • sendToQueueAsyncsendToQueueSync by passing three parameters

How should I design my Packet and SendRecord class so that I can tell Packet class that this data needs to be send in either of above three ways to my messaging queue. It is upto user to decide which way he wants to send data to messaging queue. As of now the way my Packet class is structured, it can send data only in one way.

I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to messaging queue.

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // sender.sendToQueueAsync(address, buffer.array());
    // sender.sendToQueueSync(address, buffer.array());
    // sender.sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

Now I need to extend my design so that I can send data in three different ways. It is upto user to decide which way he wants to send data to.

  • sendToQueueAsync by passing two parameters
  • sendToQueueSync by passing two parameters
  • sendToQueueAsync by passing three parameters

How should I design my Packet and SendRecord class so that I can tell Packet class that this data needs to be send in either of above three ways to my messaging queue. It is upto user to decide which way he wants to send data to messaging queue.

I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

Now I need to extend my design so that I can send data in three different ways. It is upto user to decide which way he wants to send data to either "sync" or "async".

  • sendToQueueAsync by passing two parameters
  • sendToQueueSync by passing two parameters
  • sendToQueueSync by passing three parameters

How should I design my Packet and SendRecord class so that I can tell Packet class that this data needs to be send in either of above three ways to my messaging queue. It is upto user to decide which way he wants to send data to messaging queue. As of now the way my Packet class is structured, it can send data only in one way.

Send records using aysncasync or sync way

deleted 10 characters in body
Source Link

Below is my SendRecord classSendRecord class:

Below is my SendRecord class:

SendRecord class:

added 3851 characters in body
Source Link
Loading
Source Link
Loading