C:\tools\kafka\bin\windows>kafka-console-producer.bat --broker-list 192.168.99.100:9092 --topic test
>123 testing
>Deseja finalizar o arquivo em lotes (S/N)? S
C:\tools\kafka\bin\windows>kafka-console-consumer.bat --bootstrap-server 192.168.99.100:9092 --topic test --from-beginning
test
a
123 testing
我调试单元测试时也取得了成功。我已经检查过了,运行和调试配置都是相同的。
然而,当我运行junit测试时,我并没有看到任何尝试性的连接到同一个kafka代理。这是配置。你可能会注意到我试着在所有我能想象的地方添加地址
“单元测试”(讨论是单元测试还是集成测试不在这个问题范围之内)
import org.junit.BeforeClass;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import com.arquiteturareferencia.kafkaassyncsend.producer.Producer;
@RunWith(SpringRunner.class)
@DirtiesContext
@SpringBootTest
class KafkaAssyncSendApplicationTests {
@Autowired
private Producer p;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
System.setProperty("spring.kafka.bootstrap-servers", "192.168.99.100:9092");
}
@Test
public void testReceive() throws Exception {
p.send("test", "Messagem demonstrativa assincrona");
}
}
kafka.brokers= 192.168.99.100:9092
spring.kafka.bootstrap-servers= 192.168.99.100:9092
kafka producer config(用于运行应用程序的配置相同,即无需测试)
src/main/java
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("KafkaMsgExecutor-");
executor.initialize();
return executor;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
}
生产商(与生产案例相同)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Async
public void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
//Call Back de sucesso de postagem da mensagem no tópico
@Override
public void onSuccess(final SendResult<String, String> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
//Call Back de fracasso de postagem da mensagem no tópico
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}
18:56:24.681 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating CacheAwareContextLoaderDelegate from class [org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate]
18:56:24.696 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating BootstrapContext using constructor [public org.springframework.test.context.support.DefaultBootstrapContext(java.lang.Class,org.springframework.test.context.CacheAwareContextLoaderDelegate)]
18:56:24.719 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating TestContextBootstrapper for test class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests] from class [org.springframework.boot.test.context.SpringBootTestContextBootstrapper]
18:56:24.731 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Neither @ContextConfiguration nor @ContextHierarchy found for test class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests], using SpringBootContextLoader
18:56:24.735 [main] DEBUG org.springframework.test.context.support.AbstractContextLoader - Did not detect default resource location for test class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests]: class path resource [com/arquiteturareferencia/kafkaassyncsend/KafkaAssyncSendApplicationTests-context.xml] does not exist
18:56:24.736 [main] DEBUG org.springframework.test.context.support.AbstractContextLoader - Did not detect default resource location for test class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests]: class path resource [com/arquiteturareferencia/kafkaassyncsend/KafkaAssyncSendApplicationTestsContext.groovy] does not exist
18:56:24.736 [main] INFO org.springframework.test.context.support.AbstractContextLoader - Could not detect default resource locations for test class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests]: no resource found for suffixes {-context.xml, Context.groovy}.
18:56:24.737 [main] INFO org.springframework.test.context.support.AnnotationConfigContextLoaderUtils - Could not detect default configuration classes for test class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests]: KafkaAssyncSendApplicationTests does not declare any static, non-private, non-final, nested classes annotated with @Configuration.
18:56:24.770 [main] DEBUG org.springframework.test.context.support.ActiveProfilesUtils - Could not find an 'annotation declaring class' for annotation type [org.springframework.test.context.ActiveProfiles] and class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests]
18:56:24.827 [main] DEBUG org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider - Identified candidate component class: file [C:\WSs\pockafka\kafka-assync-send\target\classes\com\arquiteturareferencia\kafkaassyncsend\KafkaAssyncSendApplication.class]
18:56:24.828 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Found @SpringBootConfiguration com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplication for test class com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests
18:56:24.925 [main] DEBUG org.springframework.boot.test.context.SpringBootTestContextBootstrapper - @TestExecutionListeners is not present for class [com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests]: using defaults.
18:56:24.926 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Loaded default TestExecutionListener class names from location [META-INF/spring.factories]: [org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener, org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener, org.springframework.boot.test.autoconfigure.restdocs.RestDocsTestExecutionListener, org.springframework.boot.test.autoconfigure.web.client.MockRestServiceServerResetTestExecutionListener, org.springframework.boot.test.autoconfigure.web.servlet.MockMvcPrintOnlyOnFailureTestExecutionListener, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverTestExecutionListener, org.springframework.test.context.web.ServletTestExecutionListener, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener, org.springframework.test.context.support.DependencyInjectionTestExecutionListener, org.springframework.test.context.support.DirtiesContextTestExecutionListener, org.springframework.test.context.transaction.TransactionalTestExecutionListener, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener, org.springframework.test.context.event.EventPublishingTestExecutionListener]
18:56:24.934 [main] DEBUG org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Skipping candidate TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener] due to a missing dependency. Specify custom listener classes or make the default listener classes and their required dependencies available. Offending class: [javax/servlet/ServletContext]
18:56:24.940 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Using TestExecutionListeners: [org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener@495fac5f, org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener@76012793, org.springframework.boot.test.autoconfigure.SpringBootDependencyInjectionTestExecutionListener@635572a7, org.springframework.test.context.support.DirtiesContextTestExecutionListener@79d94571, org.springframework.test.context.transaction.TransactionalTestExecutionListener@4dd02341, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener@3212a8d7, org.springframework.test.context.event.EventPublishingTestExecutionListener@7a1a3478, org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener@495b0487, org.springframework.boot.test.autoconfigure.restdocs.RestDocsTestExecutionListener@55dfcc6, org.springframework.boot.test.autoconfigure.web.client.MockRestServiceServerResetTestExecutionListener@222eb8aa, org.springframework.boot.test.autoconfigure.web.servlet.MockMvcPrintOnlyOnFailureTestExecutionListener@93cf163, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverTestExecutionListener@1852a3ff]
18:56:24.944 [main] DEBUG org.springframework.test.context.support.AbstractDirtiesContextTestExecutionListener - Before test class: context [DefaultTestContext@79351f41 testClass = KafkaAssyncSendApplicationTests, testInstance = [null], testMethod = [null], testException = [null], mergedContextConfiguration = [MergedContextConfiguration@1c32386d testClass = KafkaAssyncSendApplicationTests, locations = '{}', classes = '{class com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplication}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true}', contextCustomizers = set[org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@790da477, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@5fb759d6, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@1a4927d6, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@6a4f1a55], contextLoader = 'org.springframework.boot.test.context.SpringBootContextLoader', parent = [null]], attributes = map[[empty]]], class annotated with @DirtiesContext [true] with mode [AFTER_CLASS].
18:56:24.955 [main] DEBUG org.springframework.test.context.support.DependencyInjectionTestExecutionListener - Performing dependency injection for test context [[DefaultTestContext@79351f41 testClass = KafkaAssyncSendApplicationTests, testInstance = com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplicationTests@1477089c, testMethod = [null], testException = [null], mergedContextConfiguration = [MergedContextConfiguration@1c32386d testClass = KafkaAssyncSendApplicationTests, locations = '{}', classes = '{class com.arquiteturareferencia.kafkaassyncsend.KafkaAssyncSendApplication}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true}', contextCustomizers = set[org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@790da477, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@5fb759d6, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@1a4927d6, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@6a4f1a55], contextLoader = 'org.springframework.boot.test.context.SpringBootContextLoader', parent = [null]], attributes = map[[empty]]]].
18:56:24.981 [main] DEBUG org.springframework.test.context.support.TestPropertySourceUtils - Adding inlined properties to environment: {spring.jmx.enabled=false, org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true, server.port=-1}
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.5.RELEASE)
2020-03-02 18:56:25.258 INFO 18524 --- [ main] c.a.k.KafkaAssyncSendApplicationTests : Starting KafkaAssyncSendApplicationTests on SPANOT149 with PID 18524 (started by Cast in C:\WSs\pockafka\kafka-assync-send)
2020-03-02 18:56:25.260 INFO 18524 --- [ main] c.a.k.KafkaAssyncSendApplicationTests : No active profile set, falling back to default profiles: default
2020-03-02 18:56:26.101 INFO 18524 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService
2020-03-02 18:56:26.112 INFO 18524 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'taskExecutor'
2020-03-02 18:56:26.268 INFO 18524 --- [ main] c.a.k.KafkaAssyncSendApplicationTests : Started KafkaAssyncSendApplicationTests in 1.284 seconds (JVM running for 2.242)
2020-03-02 18:56:26.307 INFO 18524 --- [kaMsgExecutor-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.99.100:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-03-02 18:56:26.666 INFO 18524 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'taskExecutor'
调试单元测试时的日志:
...
2020-03-02 19:19:56.049 INFO 16092 --- [kaMsgExecutor-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-03-02 19:19:56.052 INFO 16092 --- [kaMsgExecutor-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-03-02 19:19:56.052 INFO 16092 --- [kaMsgExecutor-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1583187596046
2020-03-02 19:19:56.299 INFO 16092 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: f95SHyheSG-VrUrOY7_R1w
2020-03-02 19:19:56.338 INFO 16092 --- [ad | producer-1] c.a.kafkaassyncsend.producer.Producer : sent message= SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=Messagem demonstrativa assincrona, timestamp=null), recordMetadata=test-0@5] with offset= 5
2020-03-02 19:20:17.569 INFO 16092 --- [ad | producer-1] c.a.kafkaassyncsend.producer.Producer : sent message= SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=Messagem demonstrativa assincrona, timestamp=null), recordMetadata=test-0@6] with offset= 6
2020-03-02 19:20:17.572 INFO 16092 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'taskExecutor'
2020-03-02 19:20:17.573 INFO 16092 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
***添加
实际上,只有当我指向单元测试的断点时,才会调用它