Match users for online dating using confluent
Match users for online dating
To validate that this recipe is working, run the following query:Match users for online dating
When it comes to online dating, matching users based on mutual interests and their personal preferences, while enabling real-time communication are key to finding the right counterpart. This tutorial enables developers to dynamically determine which pairs of people have connected and are ripe to get the ball rolling.
To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.
- Run it
- 1. Set up your environment
- 2. Execute ksqlDB code
- 3. Test with real data
- 4. Explanation
- 5. Cleanup
Run it
Set up your environment
Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.
Execute ksqlDB code
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.
The application breaks up the stream of messages into individual conversations and puts each of those conversations in time order, keeping track of the sender as we go. Then it builds up the function (old_state, element) ⇒ … , which considers different states.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted. SET 'auto.offset.reset' = 'earliest'; CREATE STREAM messages ( send_id BIGINT, recv_id BIGINT, message VARCHAR ) WITH ( KAFKA_TOPIC = 'messages', VALUE_FORMAT = 'JSON', PARTITIONS = 6 ); CREATE TABLE conversations AS SELECT ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id, AS_VALUE(ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>')) AS conversation_value FROM messages GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') HAVING REDUCE( ENTRIES( AS_MAP( COLLECT_LIST(CAST(rowtime AS VARCHAR)), COLLECT_LIST(send_id) ), true ), STRUCT(step := 'start', last_sender := CAST(-1 AS BIGINT)), (old_state, element) => CASE WHEN old_state->step = 'start' THEN struct(step := 'opened', last_sender := element->v) WHEN old_state->step = 'opened' AND old_state->last_sender != element->v THEN struct(step := 'replied', last_sender := element->v) WHEN old_state->step = 'replied' AND old_state->last_sender != element->v THEN struct(step := 'connected', last_sender := element->v) ELSE old_state END )->step = 'connected' EMIT CHANGES; -- Add the INSERT INTO commands to insert mock data INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 2, 'Hello' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 3, 'Hello' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 2, 1, 'Hey there' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 2, 'What''s going on?' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'Hi' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'Hello' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 5, 4, 'Hi' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 4, 5, 'Well hi to you too.' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'I''d like to chat.' ); INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 5, 4, 'Would you like to discuss event streaming with me?' );
To validate that this recipe is working, run the following query:
SELECT * FROM conversations EMIT CHANGES LIMIT 2;
Your output should resemble:
+------------------------------------------------------------------------+------------------------------------------------------------------------+ |CONVERSATION_ID |CONVERSATION_VALUE | +------------------------------------------------------------------------+------------------------------------------------------------------------+ |4<>5 |4<>5 | |1<>2 |1<>2 | Limit Reached Query terminated