Azure: EventHub

(Last Updated On: )

In this tutorial I will show you how to connect to event hub from Python. Ensure you have first installed an IDE (Eclipse) and Python3.7.

Python Package Installation

pip3 install azure-eventhub

Create a Producer

This will publish events to event hub. The important part here is the “EndPoint”. You need to login to Azure Portal and get the get the endpoint from the “Shared Access Policies” from the event hub namespace.

from azure.eventhub import EventHubProducerClient, EventData, EventHubConsumerClient

connection_str = 'Endpoint=sb://testeventhubnamespace.servicebus.windows.net/;SharedAccessKeyName=<<THE_ACCESS_KEY_NAME>>;SharedAccessKey=<<THE_ACCESS_KEY>>'
eventhub_name = '<<THE_EVENT_HUB_NAME>>'
producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)

event_data_batch = producer.create_batch()

event_data_batch.add(EventData('My Test Data'))

with producer:
    producer.send_batch(event_data_batch)

Create a Consumer

This will monitor the event hub for new messages.

from azure.eventhub import EventHubProducerClient, EventData, EventHubConsumerClient

connection_str = 'Endpoint=sb://testeventhubnamespace.servicebus.windows.net/;SharedAccessKeyName=<<THE_ACCESS_KEY_NAME>>;SharedAccessKey=<<THE_ACCESS_KEY>>'
eventhub_name = '<<THE_EVENT_HUB_NAME>>'
consumer_group = '<<THE_EVENT_HUB_CONSUMER_GROUP>>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

def on_event(partition_context, event):
    print("Received event from partition {} - {}".format(partition_context.partition_id, event))
    partition_context.update_checkpoint(event)

with client:
    #client.receive(
    #    on_event=on_event, 
    #    starting_position="-1",  # "-1" is from the beginning of the partition.
    #)
    client.receive(
        on_event=on_event
    )