Wednesday, July 20, 2016

How to Create a Kafka simple Producer (Java)

package packagename;

import java.io.IOException;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

public static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);

TestProducer(long events, String topic, String fileID_gen, String a_broker) throws IOException {

//sfileID_gen="random";

Properties props = new Properties();
//props.put("metadata.broker.list", "localhost:9092");
//props.put("metadata.broker.list", "localhost:9092");
LOG.info("setting metadata.broker.list ="+a_broker);
props.put("metadata.broker.list", a_broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "packagename.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);




String msg="";
int index=0;

for (long nEvents = 0; nEvents < events; nEvents++) {


if (fileID_gen.equals("random"))
{
msg = "\""+randomChar()+"\""+ randomLorT()+"\",\"numRecords\":165848,\"hasErrors\":" + randomBoolean() + "}";
}

KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg.trim());

LOG.info("Generated data is " + msg);

LOG.info("waiting to generate file details");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
LOG.error("thread sleep error",e);
}

producer.send(data);

}// for-close


producer.close();
}// constructor-close

public boolean randomBoolean() {
return Math.random() < 0.5;
}

public String randomChar() {
Random r = new Random();
char random_char = (char) (r.nextInt(26) + 'a');
//Character.toString(random_char);
return Character.toString(random_char);
}

public String randomLorT()
{
final String alphabet = "T";
   final int N = alphabet.length();
   char random_char = 0;
   Random r = new Random();

   for (int i = 0; i <= N ; i++) {
       random_char=alphabet.charAt(r.nextInt(N));
   }
 
   return Character.toString(random_char);

}



}// TestProducer-class-close




SimplePartitioner Class



package packagename;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {

}

public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}

}

Dependency:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>

No comments: