package packagename;
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);
TestProducer(long events, String topic, String fileID_gen, String a_broker) throws IOException {
//sfileID_gen="random";
Properties props = new Properties();
//props.put("metadata.broker.list", "localhost:9092");
//props.put("metadata.broker.list", "localhost:9092");
LOG.info("setting metadata.broker.list ="+a_broker);
props.put("metadata.broker.list", a_broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "packagename.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String msg="";
int index=0;
for (long nEvents = 0; nEvents < events; nEvents++) {
if (fileID_gen.equals("random"))
{
msg = "\""+randomChar()+"\""+ randomLorT()+"\",\"numRecords\":165848,\"hasErrors\":" + randomBoolean() + "}";
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg.trim());
LOG.info("Generated data is " + msg);
LOG.info("waiting to generate file details");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
LOG.error("thread sleep error",e);
}
producer.send(data);
}// for-close
producer.close();
}// constructor-close
public boolean randomBoolean() {
return Math.random() < 0.5;
}
public String randomChar() {
Random r = new Random();
char random_char = (char) (r.nextInt(26) + 'a');
//Character.toString(random_char);
return Character.toString(random_char);
}
public String randomLorT()
{
final String alphabet = "T";
final int N = alphabet.length();
char random_char = 0;
Random r = new Random();
for (int i = 0; i <= N ; i++) {
random_char=alphabet.charAt(r.nextInt(N));
}
return Character.toString(random_char);
}
}// TestProducer-class-close
package packagename;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}
}
Dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);
TestProducer(long events, String topic, String fileID_gen, String a_broker) throws IOException {
//sfileID_gen="random";
Properties props = new Properties();
//props.put("metadata.broker.list", "localhost:9092");
//props.put("metadata.broker.list", "localhost:9092");
LOG.info("setting metadata.broker.list ="+a_broker);
props.put("metadata.broker.list", a_broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "packagename.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String msg="";
int index=0;
for (long nEvents = 0; nEvents < events; nEvents++) {
if (fileID_gen.equals("random"))
{
msg = "\""+randomChar()+"\""+ randomLorT()+"\",\"numRecords\":165848,\"hasErrors\":" + randomBoolean() + "}";
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg.trim());
LOG.info("Generated data is " + msg);
LOG.info("waiting to generate file details");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
LOG.error("thread sleep error",e);
}
producer.send(data);
}// for-close
producer.close();
}// constructor-close
public boolean randomBoolean() {
return Math.random() < 0.5;
}
public String randomChar() {
Random r = new Random();
char random_char = (char) (r.nextInt(26) + 'a');
//Character.toString(random_char);
return Character.toString(random_char);
}
public String randomLorT()
{
final String alphabet = "T";
final int N = alphabet.length();
char random_char = 0;
Random r = new Random();
for (int i = 0; i <= N ; i++) {
random_char=alphabet.charAt(r.nextInt(N));
}
return Character.toString(random_char);
}
}// TestProducer-class-close
SimplePartitioner Class
package packagename;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}
}
Dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>