In this article, we’ll explore how to implement a JSON event processing pipeline with several powerful Oracle Database features — The sample app uses Oracle Database 23ai Freewhich you can download or run in a container to follow along with this sample. As you read through the article, it’s recommended to peruse the sample for reference.
We’ll first cover two great features of Oracle Database — Transactional Event Queues and JSON Relational Duality Viewsand then jump to the event streaming application implementation. If you’d rather skip directly to the code, check out the Sample Application on GitHub.
Prerequisites:
- Maven, Java 21+ and basic familiarity writing Java code.
- A Docker compatible environment to run the sample’s containerized Oracle Database instance.
Oracle Database can fill the role of an event streaming architecture’s message bus with Transactional Event Queues. Oracle Database Transactional Event Queues is a robust, real-time message broker that runs inside the database, presenting topic-oriented pub/sub with multiple producers and consumers that naturally fit event processing applications. At a high-level, you can think of Oracle Database Transactional Event Queues as a message broker similar to Apache Kafka or RabbitMQ.
Why is this important? Running your message broker inside the database allows you to converge your system to the database, simplifying production deployments and eliminating reliance on domain specific SRE knowledge.
https://github.com/oracle/okafka
And what’s the best way for developers to interact with Oracle Database Transactional Event Queues? While you can use PL/SQL or other clients, I vastly prefer the Kafka Java Client for Oracle Transactional Event Queuesenabling you to use familiar Kafka APIS to send and receive messages.
Learn more about the Kafka Java Client for Oracle Transactional Event Queues Here and Here.
JSON Relational Duality Views combines the best of document databases and relational data , layering a JSON document structure over relational data structures. We’ll use JSON Relational Duality Views to express our application’s JSON event schema, defining two related tables and creating corresponding views.
Learn more about JSON Relational Duality Views Here.
The sample event streaming application processes events representing weather station sensor readings in a series of steps:
- Receive raw sensor data via REST.
- Parse raw sensor data to the event schema.
- Produce those parsed events to the message broker (Transactional Event Queue).
- Consume sensor events from the message bus and enrich them with relational weather station metadata.
- Save the enriched events using a JSON Relational Duality View.
- Query enriched events from the Database using REST.

Spring Starter dependencies
To build the event streaming application, we’ll pull in the Spring starters for JSON Relational Duality Views and the Kafka Java Client for Oracle Transactional Event Queues from Spring Cloud Oracle. We’ll also grab a few Spring starters to create a web app with a JDBC interface.
<dependency>
<groupId>com.oracle.database.springgroupId>
<artifactId>oracle-spring-boot-starter-json-collectionsartifactId>
<version>24.4.0version>
dependency>
<dependency>
<groupId>com.oracle.database.springgroupId>
<artifactId>oracle-spring-boot-starter-okafkaartifactId>
<version>24.4.0version>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-jdbcartifactId>
dependency>
You can find the full sample pom Here.
Configuring the Spring Starter for the Kafka Java Client for Oracle Transactional Event Queues
For a user to access Transactional Event Queuing, specific grants are required:
grant resource, connect, unlimited tablespace to TESTUSER;
grant aq_user_role to TESTUSER;
grant execute on dbms_aq to TESTUSER;
grant execute on dbms_aqadm to TESTUSER;
grant select on gv_$session to TESTUSER;
grant select on v_$session to TESTUSER;
grant select on gv_$instance to TESTUSER;
grant select on gv_$listener_network to TESTUSER;
grant select on SYS.DBA_RSRC_PLAN_DIRECTIVES to TESTUSER;
grant select on gv_$pdbs to TESTUSER;
grant select on user_queue_partition_assignment_table to TESTUSER;
exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER');
commit;
You should also have a file named “ojdbc.properties” containing the database user and password used to connect. Note that if you are using Oracle Database Wallet, this will be included in your wallet ZIP file.
user = testuser
password = testpwd
Let’s now create Spring beans for a Kafka-style Producer and Consumer, and define JSONB serializers for our JSON Relational Duality Views. Note that the “JSONB” bean used here is included from the Oracle JSON Collections Spring Starter, simplifying JSONB operations in Java code.
import java.util.Properties;
import com.oracle.database.spring.jsonevents.model.Sensor;
import com.oracle.database.spring.jsonevents.serde.JSONBDeserializer;
import com.oracle.database.spring.jsonevents.serde.JSONBSerializer;
import com.oracle.spring.json.jsonb.JSONB;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OKafkaConfiguration {
private final JSONB jsonb;
@Value("${app.ojdbcPath}")
private String ojdbcPath;
@Value("${app.bootstrapServers}")
private String bootstrapServers;
@Value("${app.serviceName:freepdb1}")
private String serviceName;
@Value("${app.securityProtocol:PLAINTEXT}")
private String securityProtocol;
@Value("${app.consumerGroup:SensorEvents}")
private String consumerGroup;
public OKafkaConfiguration(JSONB jsonb) {
this.jsonb = jsonb;
}
@Bean
@Qualifier("okafkaProperties")
public Properties okafkaProperties() {
Properties props = new Properties();
props.put("oracle.service.name", serviceName);
props.put("security.protocol", securityProtocol);
props.put("bootstrap.servers", bootstrapServers);
props.put("oracle.net.tns_admin", ojdbcPath);
return props;
}
@Bean
@Qualifier("okafkaConsumer")
public Consumer okafkaConsumer() {
Properties props = okafkaProperties();
props.put("group.id", consumerGroup);
props.put("enable.auto.commit", "false");
props.put("max.poll.records", 2000);
props.put("auto.offset.reset", "earliest");
Deserializer keyDeserializer = new StringDeserializer();
Deserializer valueDeserializer = new JSONBDeserializer<>(jsonb, Sensor.class);
return new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
}
@Bean
@Qualifier("okafkaProducer")
public Producer okafkaProducer() {
Properties props = okafkaProperties();
props.put("enable.idempotence", "true");
Serializer keySerializer = new StringSerializer();
Serializer valueSerializer = new JSONBSerializer<>(jsonb);
return new KafkaProducer<>(props, keySerializer, valueSerializer);
}
}
Next, let’s take a look at the deserializer and serializer classes for JSONB objects. These serialization classes implement the Kafka serialization interfaces, allowing us to take an POJO and turn it to/from JSONB. The sample application uses these for data transfer on the message bus.
import java.nio.ByteBuffer;
import com.oracle.spring.json.jsonb.JSONB;
import org.apache.kafka.common.serialization.Deserializer;
public class JSONBDeserializer implements Deserializer {
private final JSONB jsonb;
private final Class clazz;
public JSONBDeserializer(JSONB jsonb, Class clazz) {
this.jsonb = jsonb;
this.clazz = clazz;
}
@Override
public T deserialize(String s, byte[] bytes) {
return jsonb.fromOSON(ByteBuffer.wrap(bytes), clazz);
}
}
import com.oracle.spring.json.jsonb.JSONB;
import org.apache.kafka.common.serialization.Serializer;
public class JSONBSerializer implements Serializer {
private final JSONB jsonb;
public JSONBSerializer(JSONB jsonb) {
this.jsonb = jsonb;
}
@Override
public byte[] serialize(String s, T obj) {
return jsonb.toOSON(obj);
}
}
The Producer and Consumer can now be autowired into our Spring services! We define a SensorEventProducer and SensorConsumer to produce and consume weather station events respectively. These classes will be almost identical to Kafka producer and consumer code you might have seen previously.
Lastly, let’s ensure our topic and consumer are created and started during application startup. The following component creates a topic if it doesn’t exist, and starts the SensorConsumer asynchronously:
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.oracle.okafka.clients.admin.AdminClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Component;
@Component
public class OKafkaComponent {
private final AsyncTaskExecutor asyncTaskExecutor;
private final SensorConsumer sensorConsumer;
private final Properties okafkaProperties;
@Value("${app.topic:weathersensor}")
private String topic;
private Future> consumer;
public OKafkaComponent(@Qualifier("applicationTaskExecutor") AsyncTaskExecutor asyncTaskExecutor,
SensorConsumer sensorConsumer,
@Qualifier("okafkaProperties") Properties okafkaProperties) {
this.asyncTaskExecutor = asyncTaskExecutor;
this.sensorConsumer = sensorConsumer;
this.okafkaProperties = okafkaProperties;
}
@PostConstruct
void init() {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
try (Admin admin = AdminClient.create(okafkaProperties)) {
admin.createTopics(Collections.singletonList(newTopic))
.all()
.get();
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof TopicExistsException) {
System.out.println("Topic already exists, skipping creation");
} else {
throw new RuntimeException(e);
}
}
consumer = asyncTaskExecutor.submit(sensorConsumer);
}
public void await() {
try {
consumer.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Using JSON Relational Duality Views to store and view JSON events
As our application is handling JSON, we also want to store data as JSON (technically JSONB). We can easily do this with JSON Relational Duality Views, layering a powerful read-write JSON view over a relational schema.
In this examplewe create two tables for sensor events and weather stations, where each row in the sensor events table is related to a specific weather station. Following the relational definition, we create JSON Relational Duality Views over the two tables, defining the JSON schema our application will interact with.
create table station (
id varchar2(36) default sys_guid() primary key,
station_name varchar2(50) not null,
latitude number(10,8) not null,
longitude number(11,8) not null,
elevation number(16,2),
constraint chk_latitude check (latitude between -90 and 90),
constraint chk_longitude check (longitude between -180 and 180)
);
create table weather_sensor (
id varchar2(36) default sys_guid() primary key,
station_id varchar2(36) not null,
relative_humidity number(5,2),
temperature number(5,2),
uv_index number(4,1),
timestamp timestamp default CURRENT_TIMESTAMP,
constraint sensor_station_fk foreign key (station_id)
references station(id)
);
create or replace json relational duality view
weather_sensor_dv as
weather_sensor @insert @update @delete {
_id : id,
relativeHumidity : relative_humidity,
temperature,
uvIndex : uv_index,
station: station {
_id : id,
stationName : station_name,
latitude,
longitude,
elevation
}
};
create or replace json relational duality view
station_dv as
station @insert @update @delete {
_id : id,
stationName : station_name,
latitude,
longitude,
elevation
};
insert into station (id, station_name, latitude, longitude, elevation)
values ('ST001', 'Mount Hood Observatory', 45.3424092, -121.7824754, 11249);
insert into station (id, station_name, latitude, longitude, elevation)
values ('ST002', 'Astoria Research Center', 46.187580, -123.834114, 15.2);
Reading and writing JSON Events
We’ll implement a SensorService and StationService to interact with with the JSON Relational Duality Views that represent our relational data.
First, taking a look at the SensorService. We use plain JDBC with the JSONB and JSONBRowMapper classes from the JSON Collections Spring Starter to insert and query records:
import java.sql.PreparedStatement;
import java.util.List;
import com.oracle.database.spring.jsonevents.model.Sensor;
import com.oracle.spring.json.jsonb.JSONB;
import com.oracle.spring.json.jsonb.JSONBRowMapper;
import oracle.jdbc.OracleTypes;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Service;
@Service
public class SensorService {
private static final String insertSensorData = """
insert into weather_sensor_dv (data) values (?)
""";
private static final String byStationId = """
select * from weather_sensor_dv v
where v.data.station."_id" = ?
""";
private final JdbcTemplate jdbcTemplate;
private final JSONB jsonb;
private final RowMapper rowMapper;
public SensorService(JdbcTemplate jdbcTemplate, JSONB jsonb) {
this.jdbcTemplate = jdbcTemplate;
this.jsonb = jsonb;
this.rowMapper = new JSONBRowMapper(jsonb, Sensor.class);
}
public void save(Sensor sensor) {
jdbcTemplate.update(con -> {
PreparedStatement ps = con.prepareStatement(insertSensorData);
byte[] oson = jsonb.toOSON(sensor);
ps.setObject(1, oson, OracleTypes.JSON);
return ps;
});
}
public List byStationId(String stationId) {
return jdbcTemplate.query(con -> {
PreparedStatement ps = con.prepareStatement(byStationId);
ps.setString(1, stationId);
return ps;
}, rowMapper);
}
}
The StationService is similar, but for the weather station view:
import java.sql.PreparedStatement;
import java.util.List;
import com.oracle.database.spring.jsonevents.model.Station;
import com.oracle.spring.json.jsonb.JSONB;
import com.oracle.spring.json.jsonb.JSONBRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Service;
@Service
public class StationService {
private static final String stationById = """
select * from station_dv v
where v.data."_id" = ?
""";
private final JdbcTemplate jdbcTemplate;
private final RowMapper rowMapper;
public StationService(JdbcTemplate jdbcTemplate, JSONB jsonb) {
this.jdbcTemplate = jdbcTemplate;
this.rowMapper = new JSONBRowMapper<>(jsonb, Station.class);
}
public List byId(String stationId) {
return jdbcTemplate.query(con -> {
PreparedStatement ps = con.prepareStatement(stationById);
ps.setString(1, stationId);
return ps;
}, rowMapper);
}
}
Finally, let’s define a simple RestController, the SensorController to interact externally with our application. This enables us to send batches of events to the SensorEventProducer, and query events from the SensorService:
import java.util.List;
import com.oracle.database.spring.jsonevents.model.Sensor;
import com.oracle.database.spring.jsonevents.model.SensorEvent;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1/events")
public class SensorController {
private final SensorEventProducer sensorEventProducer;
private final SensorService sensorService;
public SensorController(SensorEventProducer sensorEventProducer, SensorService sensorService) {
this.sensorEventProducer = sensorEventProducer;
this.sensorService = sensorService;
}
@PostMapping
public ResponseEntity> produce(@RequestBody SensorEvent event) {
sensorEventProducer.send(event);
return ResponseEntity.noContent().build();
}
@GetMapping("/station/{stationId}")
ResponseEntity> getEvents(@PathVariable String stationId) {
List sensors = sensorService.byStationId(stationId);
return ResponseEntity.ok(sensors);
}
}
The sample application is tied together by a integration test using Testcontainers. Note that you’ll need a Docker-compatible runtime on your system to run the test.
In the test, batches of events are sent to the SensorController, which pass through the processing pipeline and end up in the database. These events are then queried by the SensorController to validate the event pipeline was successful.
To see the sample in action, run mvn test -Dtest=JSONEventsSampleTest
from the sample’s root directory. You should see output similar to the following, indicating events were received by the application, processed, and then queried from the REST endpoint:
Produced events for ST001
Produced events for ST002
Waiting for consumer to process all events
Received 5 events for ST001
Received 10 events for ST002
You can view the full test code Here.
I hope this article helps you better understand how to use Oracle Database as not just a relational database, but also a powerful document database and message broker. Feel to reach out directly or in the comments if you have questions!
Need more samples for Oracle Database? Check out the samples below for more resources.
For more info visit at Times Of Tech