Class TopologyTestDriver
- All Implemented Interfaces:
- Closeable,- AutoCloseable
Topology or
 StreamsBuilder.
 You can test simple topologies that have a single processor, or very complex topologies that have multiple sources,
 processors, sinks, or sub-topologies.
 Best of all, the class works without a real Kafka broker, so the tests execute very quickly with very little overhead.
 
 Using the TopologyTestDriver in tests is easy: simply instantiate the driver and provide a Topology
 (cf. StreamsBuilder.build()) and config, create
 and use a TestInputTopic to supply an input records to the topology,
 and then create and use a TestOutputTopic to read and
 verify any output records by the topology.
 
 Although the driver doesn't use a real Kafka broker, it does simulate Kafka consumers and
 producers that read and write raw byte[] messages.
 You can let TestInputTopic and TestOutputTopic to handle conversion
 form regular Java objects to raw bytes.
 
Driver setup
In order to create aTopologyTestDriver instance, you need a Topology and a config.
 The configuration needs to be representative of what you'd supply to the real topology, so that means including
 several key properties (cf. StreamsConfig).
 For example, the following code fragment creates a configuration that specifies a timestamp extractor,
 default serializers and deserializers for string keys and values:
 
 Properties props = new Properties();
 props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
 props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 Topology topology = ...
 TopologyTestDriver driver = new TopologyTestDriver(topology, props);
  Note that the TopologyTestDriver processes input records synchronously.
 This implies that commit.interval.ms and
 cache.max.bytes.buffering configuration have no effect.
 The driver behaves as if both configs would be set to zero, i.e., as if a "commit" (and thus "flush") would happen
 after each input record.
 
Processing messages
 Your test can supply new input records on any of the topics that the topology's sources consume.
 This test driver simulates single-partitioned input topics.
 Here's an example of an input message on the topic named input-topic:
 
 TestInputTopic<String, String> inputTopic = driver.createInputTopic("input-topic", stringSerdeSerializer, stringSerializer);
 inputTopic.pipeInput("key1", "value1");
 TestInputTopic.pipeInput(Object, Object) is called, the driver passes the input message through to the appropriate source that
 consumes the named topic, and will invoke the processor(s) downstream of the source.
 If your topology's processors forward messages to sinks, your test can then consume these output messages to verify
 they match the expected outcome.
 For example, if our topology should have generated 2 messages on output-topic-1 and 1 message on
 output-topic-2, then our test can obtain these messages using the
 TestOutputTopic.readKeyValue()  method:
 
 TestOutputTopic<String, String> outputTopic1 = driver.createOutputTopic("output-topic-1", stringDeserializer, stringDeserializer);
 TestOutputTopic<String, String> outputTopic2 = driver.createOutputTopic("output-topic-2", stringDeserializer, stringDeserializer);
 KeyValue<String, String> record1 = outputTopic1.readKeyValue();
 KeyValue<String, String> record2 = outputTopic2.readKeyValue();
 KeyValue<String, String> record3 = outputTopic1.readKeyValue();
 
 Note, that calling pipeInput() will also trigger event-time base
 punctuation callbacks.
 However, you won't trigger wall-clock type punctuations that you must
 trigger manually via advanceWallClockTime(Duration).
 
 Finally, when completed, make sure your tests close() the driver to release all resources and
 processors.
 
Processor state
 Some processors use Kafka state storage, so this driver class provides the generic
 getStateStore(String) as well as store-type specific methods so that your tests can check the underlying
 state store(s) used by your topology's processors.
 In our previous example, after we supplied a single input message and checked the three output messages, our test
 could also check the key value store to verify the processor correctly added, removed, or updated internal state.
 Or, our test might have pre-populated some state before submitting the input message, and verified afterward
 that the processor(s) correctly updated the state.
- See Also:
- 
Constructor SummaryConstructorsConstructorDescriptionTopologyTestDriver(Topology topology) Create a new test diver instance.TopologyTestDriver(Topology topology, Instant initialWallClockTimeMs) Create a new test diver instance.TopologyTestDriver(Topology topology, Properties config) Create a new test diver instance.TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime) Create a new test diver instance.
- 
Method SummaryModifier and TypeMethodDescriptionvoidadvanceWallClockTime(Duration advance) Advances the internally mocked wall-clock time.voidclose()Close the driver, its topology, and all processors.final <K,V> TestInputTopic <K, V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer) CreateTestInputTopicto be used for piping records to topic Uses current system time as start timestamp for records.final <K,V> TestInputTopic <K, V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant startTimestamp, Duration autoAdvance) CreateTestInputTopicto be used for piping records to topic Uses provided start timestamp and autoAdvance parameter for recordsfinal <K,V> TestOutputTopic <K, V> createOutputTopic(String topicName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) CreateTestOutputTopicto be used for reading records from topicGet allStateStoresfrom the topology.<K,V> KeyValueStore <K, V> getKeyValueStore(String name) Get theKeyValueStoreorTimestampedKeyValueStorewith the given name.<K,V> SessionStore <K, V> getSessionStore(String name) Get theSessionStorewith the given name.getStateStore(String name) Get theStateStorewith the given name.<K,V> KeyValueStore <K, ValueAndTimestamp<V>> Get theTimestampedKeyValueStorewith the given name.<K,V> WindowStore <K, ValueAndTimestamp<V>> Get theTimestampedWindowStorewith the given name.<K,V> VersionedKeyValueStore <K, V> Get theVersionedKeyValueStorewith the given name.<K,V> WindowStore <K, V> getWindowStore(String name) Get theWindowStoreorTimestampedWindowStorewith the given name.Map<MetricName, ? extends Metric> metrics()Get read-only handle on global metrics registry.Get all the names of all the topics to which records have been produced during the test run.
- 
Constructor Details- 
TopologyTestDriverCreate a new test diver instance. Default test properties are used to initialize the driver instance- Parameters:
- topology- the topology to be tested
 
- 
TopologyTestDriverCreate a new test diver instance. Initialized the internally mocked wall-clock time withcurrent system time.- Parameters:
- topology- the topology to be tested
- config- the configuration for the topology
 
- 
TopologyTestDriverCreate a new test diver instance.- Parameters:
- topology- the topology to be tested
- initialWallClockTimeMs- the initial value of internally mocked wall-clock time
 
- 
TopologyTestDriverCreate a new test diver instance.- Parameters:
- topology- the topology to be tested
- config- the configuration for the topology
- initialWallClockTime- the initial value of internally mocked wall-clock time
 
 
- 
- 
Method Details- 
metricsGet read-only handle on global metrics registry.- Returns:
- Map of all metrics.
 
- 
advanceWallClockTimeAdvances the internally mocked wall-clock time. This might trigger awall-clocktypepunctuations.- Parameters:
- advance- the amount of time to advance wall-clock time
 
- 
createInputTopicpublic final <K,V> TestInputTopic<K,V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer) CreateTestInputTopicto be used for piping records to topic Uses current system time as start timestamp for records. Auto-advance is disabled.- Type Parameters:
- K- the key type
- V- the value type
- Parameters:
- topicName- the name of the topic
- keySerializer- the Serializer for the key type
- valueSerializer- the Serializer for the value type
- Returns:
- TestInputTopicobject
 
- 
createInputTopicpublic final <K,V> TestInputTopic<K,V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant startTimestamp, Duration autoAdvance) CreateTestInputTopicto be used for piping records to topic Uses provided start timestamp and autoAdvance parameter for records- Type Parameters:
- K- the key type
- V- the value type
- Parameters:
- topicName- the name of the topic
- keySerializer- the Serializer for the key type
- valueSerializer- the Serializer for the value type
- startTimestamp- Start timestamp for auto-generated record time
- autoAdvance- autoAdvance duration for auto-generated record time
- Returns:
- TestInputTopicobject
 
- 
createOutputTopicpublic final <K,V> TestOutputTopic<K,V> createOutputTopic(String topicName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) CreateTestOutputTopicto be used for reading records from topic- Type Parameters:
- K- the key type
- V- the value type
- Parameters:
- topicName- the name of the topic
- keyDeserializer- the Deserializer for the key type
- valueDeserializer- the Deserializer for the value type
- Returns:
- TestOutputTopicobject
 
- 
producedTopicNamesGet all the names of all the topics to which records have been produced during the test run.Call this method after piping the input into the test driver to retrieve the full set of topic names the topology produced records to. The returned set of topic names may include user (e.g., output) and internal (e.g., changelog, repartition) topic names. - Returns:
- the set of topic names the topology has produced to
 
- 
getAllStateStoresGet allStateStoresfrom the topology. The stores can be a "regular" or global stores.This is often useful in test cases to pre-populate the store before the test case instructs the topology to TestInputTopic.pipeInput(TestRecord)process an input message}, and/or to check the store afterward.Note, that StateStoremight benullif a store is added but not connected to any processor.Caution: Using this method to access stores that are added by the DSL is unsafe as the store types may change. Stores added by the DSL should only be accessed via the corresponding typed methods like getKeyValueStore(String)etc.- Returns:
- all stores my name
- See Also:
 
- 
getStateStoreGet theStateStorewith the given name. The store can be a "regular" or global store.Should be used for custom stores only. For built-in stores, the corresponding typed methods like getKeyValueStore(String)should be used.This is often useful in test cases to pre-populate the store before the test case instructs the topology to process an input message, and/or to check the store afterward.- Parameters:
- name- the name of the store
- Returns:
- the state store, or nullif no store has been registered with the given name
- Throws:
- IllegalArgumentException- if the store is a built-in store like- KeyValueStore,- WindowStore, or- SessionStore
- See Also:
 
- 
getKeyValueStoreGet theKeyValueStoreorTimestampedKeyValueStorewith the given name. The store can be a "regular" or global store.If the registered store is a TimestampedKeyValueStorethis method will return a value-only query interface. It is highly recommended to update the code for this case to avoid bugs and to usegetTimestampedKeyValueStore(String)for full store access instead.This is often useful in test cases to pre-populate the store before the test case instructs the topology to process an input message, and/or to check the store afterward.- Parameters:
- name- the name of the store
- Returns:
- the key value store, or nullif noKeyValueStoreorTimestampedKeyValueStorehas been registered with the given name
- See Also:
 
- 
getTimestampedKeyValueStoreGet theTimestampedKeyValueStorewith the given name. The store can be a "regular" or global store.This is often useful in test cases to pre-populate the store before the test case instructs the topology to process an input message, and/or to check the store afterward.- Parameters:
- name- the name of the store
- Returns:
- the key value store, or nullif noTimestampedKeyValueStorehas been registered with the given name
- See Also:
 
- 
getVersionedKeyValueStoreGet theVersionedKeyValueStorewith the given name. The store can be a "regular" or global store.This is often useful in test cases to pre-populate the store before the test case instructs the topology to process an input message, and/or to check the store afterward.- Parameters:
- name- the name of the store
- Returns:
- the key value store, or nullif noVersionedKeyValueStorehas been registered with the given name
- See Also:
 
- 
getWindowStoreGet theWindowStoreorTimestampedWindowStorewith the given name. The store can be a "regular" or global store.If the registered store is a TimestampedWindowStorethis method will return a value-only query interface. It is highly recommended to update the code for this case to avoid bugs and to usegetTimestampedWindowStore(String)for full store access instead.This is often useful in test cases to pre-populate the store before the test case instructs the topology to process an input message, and/or to check the store afterward.- Parameters:
- name- the name of the store
- Returns:
- the key value store, or nullif noWindowStoreorTimestampedWindowStorehas been registered with the given name
- See Also:
 
- 
getTimestampedWindowStoreGet theTimestampedWindowStorewith the given name. The store can be a "regular" or global store.This is often useful in test cases to pre-populate the store before the test case instructs the topology to process an input message, and/or to check the store afterward.- Parameters:
- name- the name of the store
- Returns:
- the key value store, or nullif noTimestampedWindowStorehas been registered with the given name
- See Also:
 
- 
getSessionStoreGet theSessionStorewith the given name. The store can be a "regular" or global store.This is often useful in test cases to pre-populate the store before the test case instructs the topology to process an input message, and/or to check the store afterward.- Parameters:
- name- the name of the store
- Returns:
- the key value store, or nullif noSessionStorehas been registered with the given name
- See Also:
 
- 
closepublic void close()Close the driver, its topology, and all processors.- Specified by:
- closein interface- AutoCloseable
- Specified by:
- closein interface- Closeable
 
 
-