Questions? Feedback? powered by Olark live chat software
Ignorar Navegação

Auditing Media Assets Lifecycle – Part 2

Publicado em 6 agosto, 2014

Principal Program Manager, Azure Media Services
Part 1 of this blog was focused on enabling you to create an asset audit report that shows when Media Assets were created and deleted. In Part 2, I will cover, how you can track the assets as they are copied from your Storage account to VMs for media processing. I will build upon the sample code that I provided in Part 1.  

Tracking Assets

When you submit a job, the tasks in the job gets scheduled in a queue and they sit in the queue until the scheduler in Media Services can assign it a VM to execute the task. The assets specified as input assets to a task do not copied over to a VM until the VM is allocated to the task. Given this, we can use the task processing time as a value for making an entry in the AssetAudit table.  

Code Changes

I am adding a string member to the AssetAuditEntity class to capture the id of the task that results in the copying of the asset from Storage.
    public class AssetAuditEntity : TableEntity
    {
        public string OperationType { get; set; }
        public string OperationData { get; set; }
    }
There are two new functions called ProcessJobs and ProcessTasks. The ProcessJobs function loops through all the jobs and the ProcessTasks function loops through all the tasks for a given job. For each task, it looks at all the historical events associated with the task and if it finds an event for “Processing”, then it makes an entry in the AssetAudit table (with OperationType=”InputToTask” and OperationData=TaskId) using the event timestamp as the rowkey.
        static void ProcessJobs()
        {
            try
            {
                int skipSize = 0;
                int batchSize = 1000;
                int currentSkipSize = 0;

                while (true)
                {
                    foreach (IJob job in _context.Jobs.Skip(skipSize).Take(batchSize))
                    {
                        currentSkipSize++;
                        Console.WriteLine("Processing Job " + job.Id);

                        ProcessTasks(job);                        
                    }

                    if (currentSkipSize == batchSize)
                    {
                        skipSize += batchSize;
                        currentSkipSize = 0;
                    }
                    else
                    {
                        break;
                    }

                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="job"></param>
        static void ProcessTasks(IJob job)
        {
            try
            {
                foreach (ITask task in job.Tasks)
                {
                    Console.WriteLine("Processing Task Id:" + task.Id);
                    for (int i = 0; i < task.HistoricalEvents.Count; i++)
                    {
                        if (task.HistoricalEvents[i].Code == "Processing")
                        {
                            for (int j = 0; j < task.InputAssets.Count; j++)
                            {
                                InsertAssetData(task.InputAssets[j].Id, task.HistoricalEvents[i].TimeStamp.ToString("o"), "InputToTask", task.Id);
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
The InsertAssetData function is updated to accept an optional parameter for OperationData
        static void InsertAssetData(string _assetId, string _timeStamp, string _operationType, string _operationData = null)
        {
            try
            {
                bool _insert = true;
                if (_operationType == "Create")
                {
                    // If operationtype is Create, then check if an entry already exists for the given asset id

                    TableQuery<AssetAuditEntity> query = new TableQuery<AssetAuditEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, _assetId));
                    query.Take(1);

                    TableQuerySegment<AssetAuditEntity> tqs = _assetAuditTable.ExecuteQuerySegmented(query, null);
                    if ((tqs != null) && (tqs.Results != null))
                    {
                        if (tqs.Results.Count > 0)
                        {
                            if (tqs.Results[0].OperationType == "Create")
                            {
                                _insert = false;
                            }
                        }
                    }
                }

                if (_insert)
                {
                    AssetAuditEntity _asset = new AssetAuditEntity();
                    _asset.PartitionKey = _assetId;
                    _asset.RowKey = _timeStamp;
                    _asset.OperationType = _operationType;
                    _asset.OperationData = _operationData;

                    TableOperation op = TableOperation.Insert(_asset);
                    _assetAuditTable.Execute(op);
                }
            }
            catch (Exception ex)
            {                
                Console.WriteLine(ex.Message);                
            }
        }
Lastly, the main function is updated to call the ProcessJobs function
        static void Main(string[] args)
        {
            try
            {
                // Create and cache the Media Services credentials in a static class variable.
                _cachedCredentials = new MediaServicesCredentials(_mediaServicesAccountName, _mediaServicesAccountKey);

                // Used the chached credentials to create CloudMediaContext.
                _context = new CloudMediaContext(_cachedCredentials);

                _cloudStorage = CloudStorageAccount.Parse(_storageConnectionString);

                _blobClient = _cloudStorage.CreateCloudBlobClient();
                _tableClient = _cloudStorage.CreateCloudTableClient();

                _assetAuditTable = _tableClient.GetTableReference("AssetAudit");
                _assetAuditTable.CreateIfNotExists();

                ProcessAssetData();
                ParseStorageLogs();

                ProcessJobs();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message + ex.InnerException.StackTrace);
            }
        }

Asset Audit Data

Once you merge and execute the code above with that provided in Part 1 of this blog, the AssetAudit table will be updated with entries related to when assets were copied from Storage to media processing VMs. Below is a screenshot of the contents of the updated table against a test account that I used. I have highlighted an example of the newer entries created by the code above. The highlighted entry shows the time the asset was created and the tasks that used the asset as input (along with RowKey as the timestamp). 2014-07-08_14h10_26 You can also use Excel Power Query (as covered in Part 1 of this blog) to load the above data in Excel for further analysis.  

Considerations

Finally, please note the following as you consider using this sample code for your application
  • The code relies on the Jobs collection to capture the events related to when the asset was copied from Storage to media processing VMs. If there are jobs that have been deleted before the code above was run, then those corresponding events will not be captured. If your application does delete jobs after they are executed, then you should consider calling the ProcessTasks function as part of handling the job notification.
  • The sample code provided in this post is designed to work with a Media Services account that has all assets in a single storage account but it can be easily adapted to work with multiple storage accounts.