Gegevens streamen als invoer in Stream Analytics

Stream Analytics heeft eersteklas integratie met Azure-gegevensstromen als invoer van drie soorten resources:

Deze invoerbronnen kunnen zich in hetzelfde Azure-abonnement bevinden als uw Stream Analytics-taak of een ander abonnement.

Compressie

Stream Analytics ondersteunt compressie voor alle invoerbronnen. Ondersteunde compressietypen zijn: Geen, Gzip en Deflate. De ondersteuning voor compressie is niet beschikbaar voor referentiegegevens. Als de invoergegevens Avro-gegevens gecomprimeerd zijn, verwerkt Stream Analytics deze transparant. U hoeft het compressietype niet op te geven met Avro-serialisatie.

Invoer maken, bewerken of testen

U kunt Azure Portal, Visual Studio en Visual Studio Code gebruiken om bestaande invoer toe te voegen en weer te geven of te bewerken in uw streamingtaak. U kunt ook invoerverbindingen testen en query's testen op basis van voorbeeldgegevens vanuit Azure Portal, Visual Studio en Visual Studio Code. Wanneer u een query schrijft, geeft u de invoer in de FROM-component weer. U kunt de lijst met beschikbare invoer ophalen op de pagina Query in de portal. Als u meerdere invoergegevens wilt gebruiken, JOIN of meerdere SELECT query's wilt schrijven.

Notitie

We raden u ten zeerste aan Stream Analytics-hulpprogramma's voor Visual Studio Code te gebruiken voor de beste lokale ontwikkelervaring. Er zijn bekende functie-hiaten in Stream Analytics-hulpprogramma's voor Visual Studio 2019 (versie 2.6.3000.0) en dit wordt in de toekomst niet verbeterd.

Gegevens streamen vanuit Event Hubs

Azure Event Hubs is een zeer schaalbare ingestor voor het publiceren en abonneren van gebeurtenissen. Een Event Hub kan miljoenen gebeurtenissen per seconde verzamelen, zodat u de enorme hoeveelheden gegevens kunt verwerken en analyseren die worden geproduceerd door uw verbonden apparaten en toepassingen. Event Hubs en Stream Analytics kunnen samen een end-to-end oplossing bieden voor realtime analyse. Met Event Hubs kunt u gebeurtenissen in realtime in Azure invoeren en met Stream Analytics-taken kunt u deze gebeurtenissen in realtime verwerken. U kunt bijvoorbeeld webklikken, sensormetingen of onlinelogboeken naar Event Hubs verzenden. Vervolgens kunt u Stream Analytics-taken maken om Event Hubs te gebruiken voor de invoergegevens voor realtime filteren, samenvoegen en correlatie.

EventEnqueuedUtcTime is de tijdstempel van de aankomst van een gebeurtenis in een Event Hub en is de standaardtijdstempel van gebeurtenissen die afkomstig zijn van Event Hubs naar Stream Analytics. Als u de gegevens wilt verwerken als een stroom met behulp van een tijdstempel in de nettolading van de gebeurtenis, moet u het trefwoord TIMESTAMP BY gebruiken.

Event Hubs-consumentengroepen

U moet elke Event Hub-invoer configureren voor een eigen consumentengroep. Wanneer een taak een self-join bevat of meerdere invoerwaarden heeft, kunnen sommige invoerwaarden worden gelezen door meer dan één lezer downstream. Deze situatie is van invloed op het aantal lezers in één consumentengroep. Om te voorkomen dat de Event Hubs-limiet van vijf lezers per consumentengroep per partitie wordt overschreden, is het een best practice om een consumentengroep aan te wijzen voor elke Stream Analytics-taak. Er is ook een limiet van 20 consumentengroepen voor een Event Hub van de Standard-laag. Zie Problemen met Azure Stream Analytics-invoer oplossen voor meer informatie.

Een invoer maken vanuit Event Hubs

In de volgende tabel wordt elke eigenschap op de pagina Nieuwe invoer in Azure Portal uitgelegd om gegevensinvoer van een Event Hub te streamen:

Eigenschappen Beschrijving
Invoeralias Een beschrijvende naam die u in de query van de taak gebruikt om naar deze invoer te verwijzen.
Abonnement Kies het Azure-abonnement waarin de Event Hub-resource bestaat.
Event Hub-naamruimte De Event Hubs-naamruimte is een container voor Event Hubs. Wanneer u een Event Hub maakt, maakt u ook de naamruimte.
Event Hub-naam De naam van de Event Hub die moet worden gebruikt als invoer.
Event Hub-consumentengroep (aanbevolen) U wordt aangeraden een afzonderlijke consumentengroep te gebruiken voor elke Stream Analytics-taak. Deze tekenreeks identificeert de consumentengroep die moet worden gebruikt voor het opnemen van gegevens uit de Event Hub. Als er geen consumentengroep is opgegeven, gebruikt de Stream Analytics-taak de $Default consumentengroep.
Verificatiemodus Geef het type verificatie op dat u wilt gebruiken om verbinding te maken met de Event Hub. U kunt een verbindingsreeks of een beheerde identiteit gebruiken om te verifiëren met de Event Hub. Voor de optie beheerde identiteit kunt u een door het systeem toegewezen beheerde identiteit maken aan de Stream Analytics-taak of een door de gebruiker toegewezen beheerde identiteit om te verifiëren met de Event Hub. Wanneer u een beheerde identiteit gebruikt, moet de beheerde identiteit lid zijn van de rollen Azure Event Hubs-gegevensontvanger of Azure Event Hubs-gegevenseigenaar.
Naam van Event Hub-beleid Het beleid voor gedeelde toegang dat toegang biedt tot de Event Hubs. Elk beleid voor gedeelde toegang heeft een naam, machtigingen die u instelt en toegangssleutels. Deze optie wordt automatisch ingevuld, tenzij u de optie selecteert om de Event Hubs-instellingen handmatig op te geven.
Partitiesleutel Het is een optioneel veld dat alleen beschikbaar is als uw taak is geconfigureerd voor het gebruik van compatibiliteitsniveau 1.2 of hoger. Als uw invoer is gepartitioneerd door een eigenschap, kunt u hier de naam van deze eigenschap toevoegen. Deze wordt gebruikt voor het verbeteren van de prestaties van uw query als deze een PARTITION BY of GROUP BY component voor deze eigenschap bevat. Als voor deze taak compatibiliteitsniveau 1.2 of hoger wordt gebruikt, wordt dit veld standaard ingesteld op PartitionId.
Serialisatie-indeling voor gebeurtenissen De serialisatie-indeling (JSON, CSV, Avro, Parquet of Other (Protobuf, XML, eigen...)) van de binnenkomende gegevensstroom. Zorg ervoor dat de JSON-indeling overeenkomt met de specificatie en geen voorlooptekens bevat voor decimale getallen.
Codering UTF-8 is momenteel de enige ondersteunde coderingsindeling.
Gebeurteniscompressietype Het compressietype dat wordt gebruikt voor het lezen van de binnenkomende gegevensstroom, zoals None (standaard), Gzip of Deflate.
Schemaregister (preview) U kunt het schemaregister selecteren met schema's voor gebeurtenisgegevens die worden ontvangen van de Event Hub.

Wanneer uw gegevens afkomstig zijn van een Event Hubs-stroominvoer, hebt u toegang tot de volgende metagegevensvelden in uw Stream Analytics-query:

Eigenschappen Beschrijving
EventProcessedUtcTime De datum en tijd waarop Stream Analytics de gebeurtenis verwerkt.
EventEnqueuedUtcTime De datum en tijd waarop Event Hubs de gebeurtenissen ontvangt.
PartitionId De partitie-id op basis van nul voor de invoeradapter.

Als u deze velden gebruikt, kunt u bijvoorbeeld een query schrijven zoals in het volgende voorbeeld:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Notitie

Wanneer u Event Hubs gebruikt als eindpunt voor IoT Hub-routes, hebt u toegang tot de metagegevens van IoT Hub met behulp van de functie GetMetadataPropertyValue.

Gegevens streamen vanuit IoT Hub

Azure IoT Hub is een zeer schaalbaar gebeurtenisgebeurtenis ingestor dat is geoptimaliseerd voor IoT-scenario's.

De standaardtijdstempel van gebeurtenissen die afkomstig zijn van een IoT Hub in Stream Analytics is het tijdstempel dat de gebeurtenis is aangekomen in de IoT Hub.EventEnqueuedUtcTime Als u de gegevens wilt verwerken als een stroom met behulp van een tijdstempel in de nettolading van de gebeurtenis, moet u het trefwoord TIMESTAMP BY gebruiken.

IoT Hub-consumentengroepen

U moet elke Stream Analytics IoT Hub-invoer configureren voor een eigen consumentengroep. Wanneer een taak een self-join bevat of wanneer deze meerdere invoer heeft, kan sommige invoer worden gelezen door meer dan één lezer downstream. Deze situatie is van invloed op het aantal lezers in één consumentengroep. Om te voorkomen dat de Azure IoT Hub-limiet van vijf lezers per consumentengroep per partitie wordt overschreden, is het een best practice om een consumentengroep aan te wijzen voor elke Stream Analytics-taak.

Een IoT Hub configureren als invoer voor een gegevensstroom

In de volgende tabel wordt elke eigenschap op de pagina Nieuwe invoer in Azure Portal uitgelegd wanneer u een IoT Hub configureert als stroominvoer.

Eigenschappen Beschrijving
Invoeralias Een beschrijvende naam die u in de query van de taak gebruikt om naar deze invoer te verwijzen.
Abonnement Kies het abonnement waarin de IoT Hub-resource bestaat.
IoT Hub De naam van de IoT Hub die moet worden gebruikt als invoer.
Consumentengroep U wordt aangeraden voor elke Stream Analytics-taak een andere consumentengroep te gebruiken. De consumentengroep wordt gebruikt voor het opnemen van gegevens uit de IoT Hub. Stream Analytics maakt gebruik van de $Default consumentengroep, tenzij u anders opgeeft.
Naam van beleid voor gedeelde toegang Het beleid voor gedeelde toegang dat toegang biedt tot de IoT Hub. Elk beleid voor gedeelde toegang heeft een naam, machtigingen die u instelt en toegangssleutels.
Sleutel voor gedeeld toegangsbeleid De gedeelde toegangssleutel die wordt gebruikt om toegang tot de IoT Hub te autoriseren. Deze optie wordt automatisch ingevuld, tenzij u de optie selecteert om de Iot Hub-instellingen handmatig op te geven.
Eindpunt Het eindpunt voor de IoT Hub.
Partitiesleutel Het is een optioneel veld dat alleen beschikbaar is als uw taak is geconfigureerd voor het gebruik van compatibiliteitsniveau 1.2 of hoger. Als uw invoer is gepartitioneerd door een eigenschap, kunt u hier de naam van deze eigenschap toevoegen. Deze wordt gebruikt voor het verbeteren van de prestaties van uw query als deze een PARTITION BY- of GROUP BY-component voor deze eigenschap bevat. Als voor deze taak compatibiliteitsniveau 1.2 of hoger wordt gebruikt, wordt dit veld standaard ingesteld op PartitionId.
Serialisatie-indeling voor gebeurtenissen De serialisatie-indeling (JSON, CSV, Avro, Parquet of Other (Protobuf, XML, eigen...)) van de binnenkomende gegevensstroom. Zorg ervoor dat de JSON-indeling overeenkomt met de specificatie en geen voorlooptekens bevat voor decimale getallen.
Codering UTF-8 is momenteel de enige ondersteunde coderingsindeling.
Gebeurteniscompressietype Het compressietype dat wordt gebruikt voor het lezen van de binnenkomende gegevensstroom, zoals None (standaard), Gzip of Deflate.

Wanneer u streamgegevens van een IoT Hub gebruikt, hebt u toegang tot de volgende metagegevensvelden in uw Stream Analytics-query:

Eigenschappen Beschrijving
EventProcessedUtcTime De datum en tijd waarop de gebeurtenis is verwerkt.
EventEnqueuedUtcTime De datum en tijd waarop de IoT Hub de gebeurtenis ontvangt.
PartitionId De partitie-id op basis van nul voor de invoeradapter.
IoTHub.MessageId Een id die wordt gebruikt om communicatie in twee richtingen in IoT Hub te correleren.
IoTHub.CorrelationId Een id die wordt gebruikt in berichtantwoorden en feedback in IoT Hub.
IoTHub. Verbinding maken ionDeviceId De verificatie-id die wordt gebruikt om dit bericht te verzenden. Deze waarde wordt gestempeld op servicegebonden berichten door de IoT Hub.
IoTHub. Verbinding maken ionDeviceGenerationId De generatie-id van het geverifieerde apparaat dat is gebruikt om dit bericht te verzenden. Deze waarde wordt gestempeld op serviceberichten door de IoT Hub.
IoTHub.EnqueuedTime Het tijdstip waarop de IoT Hub het bericht ontvangt.

Gegevens streamen vanuit Blob Storage of Data Lake Storage Gen2

Voor scenario's met grote hoeveelheden ongestructureerde gegevens die in de cloud moeten worden opgeslagen, biedt Azure Blob Storage of Azure Data Lake Storage Gen2 een rendabele en schaalbare oplossing. Gegevens in Blob Storage of Azure Data Lake Storage Gen2 worden beschouwd als data-at-rest. Deze gegevens kunnen echter worden verwerkt als een gegevensstroom door Stream Analytics.

Logboekverwerking is een veelgebruikt scenario voor het gebruik van dergelijke invoer met Stream Analytics. In dit scenario worden telemetriegegevensbestanden vastgelegd vanuit een systeem en moeten ze worden geparseerd en verwerkt om zinvolle gegevens te extraheren.

De standaardtijdstempel van een Blob Storage- of Azure Data Lake Storage Gen2-gebeurtenis in Stream Analytics is de tijdstempel dat het voor het laatst is gewijzigd.BlobLastModifiedUtcTime Als een blob wordt geüpload naar een opslagaccount om 13:00 uur en de Azure Stream Analytics-taak wordt gestart met de optie Nu om 13:01, wordt deze niet opgehaald omdat de gewijzigde tijd buiten de taakuitvoeringsperiode valt.

Als een blob wordt geüpload naar een opslagaccountcontainer om 13:00 uur en de Azure Stream Analytics-taak wordt gestart met aangepaste tijd om 13:00 of eerder, wordt de blob opgehaald wanneer de gewijzigde tijd binnen de taakuitvoeringsperiode valt.

Als een Azure Stream Analytics-taak nu om 13:00 uur wordt gebruikt en een blob wordt geüpload naar de opslagaccountcontainer om 13:01, haalt Azure Stream Analytics de blob op. De tijdstempel die aan elke blob is toegewezen, is alleen gebaseerd op BlobLastModifiedTime. De map waarin de blob zich bevindt, heeft geen relatie met de tijdstempel die is toegewezen. Als er bijvoorbeeld een blob 2019/10-01/00/b1.txt met een BlobLastModifiedTime van 2019-11-11is, is de tijdstempel die aan deze blob is 2019-11-11toegewezen.

Als u de gegevens wilt verwerken als een stroom met behulp van een tijdstempel in de nettolading van de gebeurtenis, moet u het trefwoord TIMESTAMP BY gebruiken. Een Stream Analytics-taak haalt elke seconde gegevens op uit Azure Blob Storage of Azure Data Lake Storage Gen2 als het blobbestand beschikbaar is. Als het blobbestand niet beschikbaar is, is er een exponentieel uitstel met een maximale tijdsvertraging van 90 seconden.

Notitie

Stream Analytics biedt geen ondersteuning voor het toevoegen van inhoud aan een bestaand blobbestand. Stream Analytics bekijkt elk bestand slechts één keer en eventuele wijzigingen in het bestand nadat de taak de gegevens heeft gelezen, worden niet verwerkt. Het is raadzaam om alle gegevens voor een blobbestand tegelijk te uploaden en vervolgens extra nieuwere gebeurtenissen toe te voegen aan een ander, nieuw blobbestand.

In scenario's waarin veel blobs continu worden toegevoegd en Stream Analytics de blobs verwerkt terwijl ze worden toegevoegd, is het mogelijk dat sommige blobs in zeldzame gevallen worden overgeslagen vanwege de granulariteit van de BlobLastModifiedTime. U kunt dit geval beperken door blobs ten minste twee seconden uit elkaar te uploaden. Als deze optie niet haalbaar is, kunt u Event Hubs gebruiken om grote hoeveelheden gebeurtenissen te streamen.

Blob-opslag configureren als stroominvoer

In de volgende tabel wordt elke eigenschap in de pagina Nieuwe invoer in Azure Portal uitgelegd wanneer u Blob Storage configureert als stroominvoer.

Eigenschappen Beschrijving
Invoeralias Een beschrijvende naam die u in de query van de taak gebruikt om naar deze invoer te verwijzen.
Abonnement Kies het abonnement waarin de opslagresource bestaat.
Opslagaccount De naam van het opslagaccount waarin de blobbestanden zich bevinden.
Sleutel van opslagaccount De geheime sleutel die is gekoppeld aan het opslagaccount. Deze optie wordt automatisch ingevuld, tenzij u de optie selecteert om de instellingen handmatig op te geven.
Container Containers bieden een logische groepering voor blobs. U kunt bestaande container gebruiken of Nieuwe maken kiezen om een nieuwe container te maken.
Verificatiemodus Geef het type verificatie op dat u wilt gebruiken om verbinding te maken met het opslagaccount. U kunt een verbindingsreeks of een beheerde identiteit gebruiken om te verifiëren met het opslagaccount. Voor de optie beheerde identiteit kunt u een door het systeem toegewezen beheerde identiteit maken aan de Stream Analytics-taak of een door de gebruiker toegewezen beheerde identiteit om te verifiëren met het opslagaccount. Wanneer u een beheerde identiteit gebruikt, moet de beheerde identiteit lid zijn van een geschikte rol in het opslagaccount.
Padpatroon (optioneel) Het bestandspad dat wordt gebruikt om de blobs in de opgegeven container te vinden. Als u blobs wilt lezen uit de hoofdmap van de container, moet u geen padpatroon instellen. In het pad kunt u een of meer exemplaren van de volgende drie variabelen opgeven: {date}, {time}of {partition}

Voorbeeld 1: cluster1/logs/{date}/{time}/{partition}

Voorbeeld 2: cluster1/logs/{date}

Het * teken is geen toegestane waarde voor het padvoorvoegsel. Alleen geldige Azure-blobtekens zijn toegestaan. Neem geen containernamen of bestandsnamen op.
Datumnotatie (optioneel) Als u de datumvariabele in het pad gebruikt, wordt de datumnotatie waarin de bestanden zijn geordend. Voorbeeld: YYYY/MM/DD

Wanneer de blobinvoer het pad heeft {date} of {time} heeft, worden de mappen in oplopende tijdsvolgorde bekeken.
Tijdnotatie (optioneel) Als u de tijdvariabele in het pad gebruikt, wordt de tijdnotatie waarin de bestanden zijn ingedeeld. Momenteel is HH de enige ondersteunde waarde uren.
Partitiesleutel Het is een optioneel veld dat alleen beschikbaar is als uw taak is geconfigureerd voor het gebruik van compatibiliteitsniveau 1.2 of hoger. Als uw invoer is gepartitioneerd door een eigenschap, kunt u hier de naam van deze eigenschap toevoegen. Deze wordt gebruikt voor het verbeteren van de prestaties van uw query als deze een PARTITION BY- of GROUP BY-component voor deze eigenschap bevat. Als voor deze taak compatibiliteitsniveau 1.2 of hoger wordt gebruikt, wordt dit veld standaard ingesteld op PartitionId.
Aantal invoerpartities Dit veld is alleen aanwezig wanneer {partition} aanwezig is in padpatroon. De waarde van deze eigenschap is een geheel getal >=1. Waar {partition} ook wordt weergegeven in pathPattern, wordt een getal tussen 0 en de waarde van dit veld -1 gebruikt.
Serialisatie-indeling voor gebeurtenissen De serialisatie-indeling (JSON, CSV, Avro, Parquet of Other (Protobuf, XML, eigen...)) van de binnenkomende gegevensstroom. Zorg ervoor dat de JSON-indeling overeenkomt met de specificatie en geen voorlooptekens bevat voor decimale getallen.
Codering Voor CSV en JSON is UTF-8 momenteel de enige ondersteunde coderingsindeling.
Compressie Het compressietype dat wordt gebruikt voor het lezen van de binnenkomende gegevensstroom, zoals None (standaard), Gzip of Deflate.

Wanneer uw gegevens afkomstig zijn van een Blob Storage-bron, hebt u toegang tot de volgende metagegevensvelden in uw Stream Analytics-query:

Eigenschappen Beschrijving
BlobName De naam van de invoerblob waaruit de gebeurtenis afkomstig is.
EventProcessedUtcTime De datum en tijd waarop Stream Analytics de gebeurtenis verwerkt.
BlobLastModifiedUtcTime De datum en tijd waarop de blob het laatst is gewijzigd.
PartitionId De partitie-id op basis van nul voor de invoeradapter.

Als u deze velden gebruikt, kunt u bijvoorbeeld een query schrijven zoals in het volgende voorbeeld:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Gegevens streamen vanuit Apache Kafka

Met Azure Stream Analytics kunt u rechtstreeks verbinding maken met Apache Kafka-clusters om gegevens op te nemen. De oplossing is weinig code en wordt volledig beheerd door het Azure Stream Analytics-team van Microsoft, zodat deze kan voldoen aan bedrijfsnalevingsstandaarden. De Kafka-invoer is achterwaarts compatibel en ondersteunt alle versies met de nieuwste clientrelease vanaf versie 0.10. Gebruikers kunnen verbinding maken met Kafka-clusters in een virtueel netwerk en Kafka-clusters met een openbaar eindpunt, afhankelijk van de configuraties. De configuratie is afhankelijk van bestaande Kafka-configuratieconventies. Ondersteunde compressietypen zijn None, Gzip, Snappy, LZ4 en Zstd.

Zie Stream-gegevens van Kafka naar Azure Stream Analytics (preview) voor meer informatie.

Volgende stappen