如何通过 C++ 使用队列存储

提示

试用 Microsoft Azure 存储资源管理器

Microsoft Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。

概述

本指南将展示如何使用 Azure 队列存储服务执行常见方案。 示例采用 C++ 编写,并使用了适用于 C++ 的 Azure 存储客户端库。 介绍的方案包括插入扫视获取删除队列消息以及创建和删除队列

注意

本指南面向适用于 C++ 的 Azure 存储客户端库 v1.0.0 及更高版本。 建议使用的版本是 Azure 存储客户端库 v2.2.0(可通过 NuGetGitHub 获得)。

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述

  • 队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式:使用以下 URL 格式对队列进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

创建 C++ 应用程序

在本指南中,将使用存储功能,这些功能可以在 C++ 应用程序中运行。

为此,需要安装适用于 C++ 的 Azure 存储客户端库,并在 Azure 订阅中创建 Azure 存储帐户。

若要安装适用于 C++ 的 Azure 存储客户端库,可使用以下方法:

.\vcpkg.exe install azure-storage-cpp

可以在自述文件中找到有关如何生成源代码和导出到 NuGet 的指南。

配置应用程序以访问队列存储

将以下 include 语句添加到要在其中使用 Azure 存储 API 来访问队列的 C++ 文件的顶部:

#include <was/storage_account.h>
#include <was/queue.h>

设置 Azure 存储连接字符串

Azure 存储客户端使用存储连接字符串来存储用于访问数据管理服务的终结点和凭据。 在客户端应用程序中运行时,必须提供以下格式的存储连接字符串,并使用 Azure 门户中列出的存储帐户的名称和存储帐户的存储访问密钥作为 AccountNameAccountKey 值。 有关存储帐户和访问密钥的信息,请参阅关于 Azure 存储帐户。 此示例演示如何声明一个静态字段以保存连接字符串:

// Define the connection-string with your values.
const utility::string_t storage_connection_string(U("DefaultEndpointsProtocol=https;AccountName=your_storage_account;AccountKey=your_storage_account_key"));

若要在本地 Windows 计算机中测试应用程序,可以使用 Azurite 存储模拟器。 Azurite 是一种实用工具,用于在本地开发计算机上模拟 Azure Blob 存储和和队列存储。 以下示例演示如何声明一个静态字段以将连接字符串保存到本地存储模拟器:

// Define the connection-string with Azurite.
const utility::string_t storage_connection_string(U("UseDevelopmentStorage=true;"));  

若要启动 Azurite,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

下面的示例假定使用了这两个方法之一来获取存储连接字符串。

检索连接字符串

可使用 cloud_storage_account 类来表示存储帐户信息。 若要从存储连接字符串中检索存储帐户信息,可以使用 parse 方法。

// Retrieve storage account from connection string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

如何:创建队列

利用 cloud_queue_client 对象,可以获取队列的引用对象。 以下代码创建 cloud_queue_client 对象。

// Retrieve storage account from connection string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create a queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

使用 cloud_queue_client 对象获取对要使用的队列的引用。 如果队列不存在,可以创建它。

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Create the queue if it doesn't already exist.
queue.create_if_not_exists();  

如何:在队列中插入消息

要将消息插入到现有队列中,请先创建新的 cloud_queue_message。 接下来,调用 add_message 方法。 可以通过字符串(采用 UTF-8 格式)或字节数组创建 cloud_queue_message。 下面是创建队列(如果不存在)并插入消息“Hello, World”的代码:

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Create the queue if it doesn't already exist.
queue.create_if_not_exists();

// Create a message and add it to the queue.
azure::storage::cloud_queue_message message1(U("Hello, World"));
queue.add_message(message1);  

如何:扫视下一条消息

通过调用 peek_message 方法,可以速览队列前面的消息,且不会将其从队列中删除。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Peek at the next message.
azure::storage::cloud_queue_message peeked_message = queue.peek_message();

// Output the message content.
std::wcout << U("Peeked message content: ") << peeked_message.content_as_string() << std::endl;

如何:更改已排队消息的内容

可以更改队列中现有消息的内容。 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作的状态,并额外为客户端提供一分钟的时间来继续处理消息。 可使用此方法跟踪队列消息上的多步骤工作流,即使处理步骤因硬件或软件故障而失败,也无需从头开始操作。 通常,还可以保留重试计数,如果某条消息的重试次数超过 n,将删除此消息。 这可避免每次处理某条消息时都触发应用程序错误。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Get the message from the queue and update the message contents.
// The visibility timeout "0" means make it visible immediately.
// The visibility timeout "60" means the client can get another minute to continue
// working on the message.
azure::storage::cloud_queue_message changed_message = queue.get_message();

changed_message.set_content(U("Changed message"));
queue.update_message(changed_message, std::chrono::seconds(60), true);

// Output the message content.
std::wcout << U("Changed message content: ") << changed_message.content_as_string() << std::endl;  

如何:取消对下一条消息的排队

代码通过两个步骤来取消对队列中某条消息的排队。 调用 get_message 时,会获得队列中的下一条消息。 从 get_message 返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 若要完成从队列中删除消息,还必须调用 delete_message。 此删除消息的两步过程可确保,如果代码因硬件或软件故障而无法处理消息,则代码的其他实例可以获取相同消息并重试。 代码在处理消息后会立即调用 delete_message

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Get the next message.
azure::storage::cloud_queue_message dequeued_message = queue.get_message();
std::wcout << U("Dequeued message: ") << dequeued_message.content_as_string() << std::endl;

// Delete the message.
queue.delete_message(dequeued_message);

如何:使用用于取消消息排队的其他选项

可通过两种方式自定义队列中消息的检索。 首先,可获取一批消息(最多 32 条)。 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。 以下代码示例使用 get_messages 方法在一个调用中获取 20 条消息。 然后,使用 for 循环处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。 请注意,5 分钟超时时间对于所有消息都是同时开始的,因此在调用 get_messages 5 分钟后,尚未删除的任何消息都会再次变得可见。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Dequeue some queue messages (maximum 32 at a time) and set their visibility timeout to
// 5 minutes (300 seconds).
azure::storage::queue_request_options options;
azure::storage::operation_context context;

// Retrieve 20 messages from the queue with a visibility timeout of 300 seconds.
std::vector<azure::storage::cloud_queue_message> messages = queue.get_messages(20, std::chrono::seconds(300), options, context);

for (auto it = messages.cbegin(); it != messages.cend(); ++it)
{
    // Display the contents of the message.
    std::wcout << U("Get: ") << it->content_as_string() << std::endl;
}

如何:获取队列长度

可以获取队列中消息的估计数。 download_attributes 方法返回包括消息计数在内的队列属性。 approximate_message_count 方法可获取队列中的大致消息数。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Fetch the queue attributes.
queue.download_attributes();

// Retrieve the cached approximate message count.
int cachedMessageCount = queue.approximate_message_count();

// Display number of messages.
std::wcout << U("Number of messages in queue: ") << cachedMessageCount << std::endl;  

如何:删除队列

若要删除队列及其包含的所有消息,请对队列对象调用 delete_queue_if_exists 方法。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// If the queue exists and delete it.
queue.delete_queue_if_exists();  

后续步骤

现在,你已了解了队列存储的基本知识,可以通过单击以下链接了解有关 Azure 存储的详细信息。