summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_unit.c
blob: 7a4124fbea23309d416b95db03be29fb6c3a3a59 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
4253
4254
4255
4256
4257
4258
4259
4260
4261
4262
4263
4264
4265
4266
4267
4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279
4280
4281
4282
4283
4284
4285
4286
4287
4288
4289
4290
4291
4292
4293
4294
4295
4296
4297
4298
4299
4300
4301
4302
4303
4304
4305
4306
4307
4308
4309
4310
4311
4312
4313
4314
4315
4316
4317
4318
4319
4320
4321
4322
4323
4324
4325
4326
4327
4328
4329
4330
4331
4332
4333
4334
4335
4336
4337
4338
4339
4340
4341
4342
4343
4344
4345
4346
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
4369
4370
4371
4372
4373
4374
4375
4376
4377
4378
4379
4380
4381
4382
4383
4384
4385
4386
4387
4388
4389
4390
4391
4392
4393
4394
4395
4396
4397
4398
4399
4400
4401
4402
4403
4404
4405
4406
4407
4408
4409
4410
4411
4412
4413
4414
4415
4416
4417
4418
4419
4420
4421
4422
4423
4424
4425
4426
4427
4428
4429
4430
4431
4432
4433
4434
4435
4436
4437
4438
4439
4440
4441
4442
4443
4444
4445
4446
4447
4448
4449
4450
4451
4452
4453
4454
4455
4456
4457
4458
4459
4460
4461
4462
4463
4464
4465
4466
4467
4468
4469
4470
4471
4472
4473
4474
4475
4476
4477
4478
4479
4480
4481
4482
4483
4484
4485
4486
4487
4488
4489
4490
4491
4492
4493
4494
4495
4496
4497
4498
4499
4500
4501
4502
4503
4504
4505
4506
4507
4508
4509
4510
4511
4512
4513
4514
4515
4516
4517
4518
4519
4520
4521
4522
4523
4524
4525
4526
4527
4528
4529
4530
4531
4532
4533
4534
4535
4536
4537
4538
4539
4540
4541
4542
4543
4544
4545
4546
4547
4548
4549
4550
4551
4552
4553
4554
4555
4556
4557
4558
4559
4560
4561
4562
4563
4564
4565
4566
4567
4568
4569
4570
4571
4572
4573
4574
4575
4576
4577
4578
4579
4580
4581
4582
4583
4584
4585
4586
4587
4588
4589
4590
4591
4592
4593
4594
4595
4596
4597
4598
4599
4600
4601
4602
4603
4604
4605
4606
4607
4608
4609
4610
4611
4612
4613
4614
4615
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631
4632
4633
4634
4635
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
4700
4701
4702
4703
4704
4705
4706
4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719
4720
4721
4722
4723
4724
4725
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741












                                


                               




                           



                                                      





                                                                    
                                                                    



                                                                                

                                                               
                                                  
                                               



                                                                         
                                                                        
                                                         

                                                              

                                                                           





                                                            
                                                         



                                                                          



                                                                               





                                                                        
                                                                  
                                                                      




                                                                       

                                                     

                                                                              

                                                                     


                                                                                






                                                                               
                                                                       










                                                                              
                                   



                                                                 





                                                                            

                                                  























                                                                             




                                                                





                                                                                


                                    
                                  


                                       
                                      

                                        



                            





                                                   



                                    
                                
                                      

                                           







                                     
                         





                                     





                                           

                                   
                                       






                                          







                                          


  







                                             


                                      

                                        




                                               
                                            



                                           


                                             


                                             


                                           



                                                      
                                             
                                               









                                               
                                            


































                                                                            
                                              






























                                                                
                                             





















                                                                
 

                                                                     
                                                          


                                               






                                                                   

     
                                




























































                                                                    

                                                                     



                               




                                   



                                                                










































                                                        
          


                                                                      

            




                                                           






                                                                    
                                        
                                       

                                          



                                                                         

                                                                    




                                                               



                                           
                                

                       






























                                                         
                                                       











                                    




                                                                          
                                                       

































                                                                    
                             


                                                 
                                   
 

                                                                 






















































                                                                             






                                    



                                                             
                     








                                                
                                                         

     
                                 
 




                                                                           





                                               






















                                                                            
                                                                















                                                                  

                                                       
 



                                                             
 



                                                                     



                      

                                                                     
 


                                                          
 


                                                        
 




                                                                              



                      
                                                  
 

                                                         
 
                                 
 
                         

              



                                           


                                                                  
 

                  
 
     
 


                            
 


                                                      
 


                                                        
 

              
 
 






                                                                             
 



                                                                           
 

                              
 


                                                                     
 

                              
 
                                   
 


                                                             
 
           
 

                                                                          

                                                                               
 

                              
 

                                                          
 


                                   
 
                      
 
                                                             
 

                                                   
 
 







                                                                                
 


                                                                      
 

                              
 



                                                                            
 

                              
 



                                                                        
 

                              
 
                         
 

                                                             
 
                                   
 
                               
 














                                                                    

     
                             
 





                                                              

     




                                                           


                                   


                                        
 






























                                                                                

     
















                                                                               















                                                                      


                                                                         

                                                               


                                      































                                                                    

     













                                                       


 
















                                                             











                                                               

                                         
                                                  

                                               


                                                              





                                       

                                             








                                                                             

                                           








                                                                              
                                            







                                                                        

                                                                             
 

                                
 


                                                       
 


                                                       
 





                                









                                                                        

                                         


                                                                
 

                                           
                                           







                                                                  


















                                                                             

                                         
                                                 

                                               

                                                                  






                                                  

                                               





















                                                                               

                                                  
                                                                       

                                                    

 
 



                                                                       
 
                  
























                                                                 













                                                               































































                                                                         



                                                         
                                          
                                                                  








































































                                                                             



                                                         
                                          
                                                                  

                                 

                                                           

                                                     
                                                                     













                                                                 
                                        




                                            
                                                                  

                                                       



                                                                        




                                                                           
                    


                                                                             
                    













                                                                 



                                                                           


                      

                                                                               































































                                                                               
                                                    















                                                                      
                        


                                                           
                        













































































                                                                            



                                                                            






                                                                             
                                                                         




                                                    
                                         




















                                                                        













                                                                        

                                               

                                                                              




                        
                                                                     
 
                                                               

                                                                             






















                                                                            
                                                                     




                                                              
                                                             








                                          




                                                               

                                         
                                     

                                               

                                                       
                        


            
                                      
 
                                           

                                               



                                  


                              






                                                        
                                       
 

                                                   
                                                                      

                                                     













































































                                                                           
 


                                                                                































                                                                         
                                                                             




                                               
                                     

















                                                                        
                                                                         
                                           
                                         

















                                                            
                                
                                                     








                                                             
                        







                                             
                                                    



                       

                        
                     
                                     

                                                                            
 




                                                                
 


                                                                              
                          
         
 

                                                                      
 
                                                           
                                                                          
 





                                             
 







                                 






                                                                                






                                                                               
                          












                                                                                
                          
         

     






                                         





                                      

                                                                            
 
 


                                                     









                                                         


                                                                 
                                                                       
 





                                     
                                 

                                  
     


 



















































                                                               


                                      
                                   

                                                               
 
                                 


                    
                                




















                                                                        











                                                                           
                                     

                                                                     
                                              

                                            
                                                                     



                                                                        
             
 


                                                                      
                               

     
                                            
                                                   




                                                                       
                       



                                               
                       



                                


                                                 

     

                                                       

                                                                     
 

                                                                      
                                                                            
                                               





                                                        
         
                                                 



                                                              
                                                                              
                                               
                       



                                


                                                 

     
                






                                                        

                                     
                                           





                                                                        











































                                                                            


                                                            

                                                                     

                                                                   
                                                          


                                                             

         

                            




                                                                            
                                                      






                                      
                                                                              













                                                                           





























                                                                        


 


                                                                             



                                                 
























                                          












                                                                   






                                     























































                                                                          


                                                                            

                                     
                                    



                
             
                   

                         

                       


















                                             
                  


                       
                 



















































                                                                           
                                  




















                                                                    













                                                                      

                                          
                                                                    







                                                                        







                                      
                                                        
 

                                                               
                                                          


                                                         

     

                        


                      

                                      


















                                                          


                                                                           
 


                                                           

                 

                                                                    

                                                                           
                                                                      


                                                                     
                 

                                                  




                                 

                                                               








































                                                                        
                                                                      















                                                           
                               











                                                       

                                                                   
                                                                      

                                            
                                          
                                         
                                 

                                 

                                                             

                                                 




                                                    












                                                                         
                                  



                                                                     





                                           











                                                                            


                                   


                            











































                                                                            


           
                                                       


       





                                                                  





                                                   















































































                                                                             













                                                     
                          

                
                                


















































































































































































                                                                                




                                                                             
                                   






























                                                                           
                                                                  
                                                   
 
                                                 


                                 


















                                                                           
                                    






                                                                         
                                                                       
                                                                               
 
                                                                              
                                     









                                                              







                                                                             
                                
                              
                                                                         





































































































                                                                               
                                



































































                                                                               
                                                              





















                                                                          

                                                           




                                  
                                                                    


                                                             
                                         










                                                                
                                                                      



                                     
                                                




                                                                           
                                                              











                                                          

                                            















                                                                               








                                                                          

                                                               







                                                                   



                                    



                                         
                             
 

                    
                                                                               
                                        


                                                               
                                             







                                                   



                                                             
 



                                  





                                       
                     





                                                       


















































                                                                              




























































































































































                                                                               
   


                                      
                                   
                               
 

                                                               










                                                                       
 
            





                                                    

     



                                                                

               
                                            

      



                            

                                         



              
























                                                                            















                                                                         

                                        

                                                  

                                              





                                              

                                       








































                                                                           







                                                      









                                        




                                               












                                                                        

                                                    
 

                                        
                                           
                       
     







                                                      






                                                         

                                            


























































































































































                                                                               
                                   








































































































































































































































































































































































































































































































                                                                              












































































                                                                           







































































                                                                               
                                                                    




















































































                                                                           

/*
 * Copyright (C) NGINX, Inc.
 */

#include <stdlib.h>

#include "nxt_main.h"
#include "nxt_port_memory_int.h"

#include "nxt_unit.h"
#include "nxt_unit_request.h"
#include "nxt_unit_response.h"
#include "nxt_unit_websocket.h"

#include "nxt_websocket.h"

#if (NXT_HAVE_MEMFD_CREATE)
#include <linux/memfd.h>
#endif

#define NXT_UNIT_MAX_PLAIN_SIZE  1024
#define NXT_UNIT_LOCAL_BUF_SIZE  \
    (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))

typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
typedef struct nxt_unit_process_s               nxt_unit_process_t;
typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;

static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
    nxt_unit_ctx_impl_t *ctx_impl, void *data);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
    nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
    nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
    nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
    uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    uint32_t stream);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
    nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
    nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
    nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
    nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
    nxt_unit_read_buf_t *rbuf);
static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
    nxt_unit_request_info_t *req, size_t size);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
    size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
    nxt_chunk_id_t *c, int *n, int min_n);
static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
    uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);

static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, int i);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, uint32_t id);
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
    nxt_unit_recv_msg_t *recv_msg);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process,
    nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);

static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
    pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
    pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
    nxt_unit_read_buf_t *rbuf);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, int *fd);

static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
    nxt_unit_port_id_t *new_port, int fd);

static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
    nxt_unit_process_t **process);
static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process);

static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
    const void *oob, size_t oob_size);
static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
    nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
    void *oob, size_t oob_size);

static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
    nxt_unit_port_t *port);
static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
    nxt_unit_port_id_t *port_id, int remove);

static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
    nxt_unit_request_info_impl_t *req_impl);
static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
    nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);

static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);


struct nxt_unit_mmap_buf_s {
    nxt_unit_buf_t           buf;

    nxt_unit_mmap_buf_t      *next;
    nxt_unit_mmap_buf_t      **prev;

    nxt_port_mmap_header_t   *hdr;
    nxt_unit_port_id_t       port_id;
    nxt_unit_request_info_t  *req;
    nxt_unit_ctx_impl_t      *ctx_impl;
    nxt_unit_process_t       *process;
    char                     *free_ptr;
    char                     *plain_ptr;
};


struct nxt_unit_recv_msg_s {
    uint32_t                 stream;
    nxt_pid_t                pid;
    nxt_port_id_t            reply_port;

    uint8_t                  last;      /* 1 bit */
    uint8_t                  mmap;      /* 1 bit */

    void                     *start;
    uint32_t                 size;

    int                      fd;
    nxt_unit_process_t       *process;

    nxt_unit_mmap_buf_t      *incoming_buf;
};


typedef enum {
    NXT_UNIT_RS_START           = 0,
    NXT_UNIT_RS_RESPONSE_INIT,
    NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
    NXT_UNIT_RS_RESPONSE_SENT,
    NXT_UNIT_RS_RELEASED,
} nxt_unit_req_state_t;


struct nxt_unit_request_info_impl_s {
    nxt_unit_request_info_t  req;

    uint32_t                 stream;

    nxt_unit_process_t       *process;

    nxt_unit_mmap_buf_t      *outgoing_buf;
    nxt_unit_mmap_buf_t      *incoming_buf;

    nxt_unit_req_state_t     state;
    uint8_t                  websocket;

    nxt_queue_link_t         link;

    char                     extra_data[];
};


struct nxt_unit_websocket_frame_impl_s {
    nxt_unit_websocket_frame_t  ws;

    nxt_unit_mmap_buf_t         *buf;

    nxt_queue_link_t            link;

    nxt_unit_ctx_impl_t         *ctx_impl;
};


struct nxt_unit_read_buf_s {
    nxt_unit_read_buf_t           *next;
    ssize_t                       size;
    char                          buf[16384];
    char                          oob[256];
};


struct nxt_unit_ctx_impl_s {
    nxt_unit_ctx_t                ctx;

    pthread_mutex_t               mutex;

    nxt_unit_port_id_t            read_port_id;
    int                           read_port_fd;

    nxt_queue_link_t              link;

    nxt_unit_mmap_buf_t           *free_buf;

    /*  of nxt_unit_request_info_impl_t */
    nxt_queue_t                   free_req;

    /*  of nxt_unit_websocket_frame_impl_t */
    nxt_queue_t                   free_ws;

    /*  of nxt_unit_request_info_impl_t */
    nxt_queue_t                   active_req;

    /*  of nxt_unit_request_info_impl_t */
    nxt_lvlhsh_t                  requests;

    nxt_unit_read_buf_t           *pending_read_head;
    nxt_unit_read_buf_t           **pending_read_tail;
    nxt_unit_read_buf_t           *free_read_buf;

    nxt_unit_mmap_buf_t           ctx_buf[2];
    nxt_unit_read_buf_t           ctx_read_buf;

    nxt_unit_request_info_impl_t  req;
};


struct nxt_unit_impl_s {
    nxt_unit_t               unit;
    nxt_unit_callbacks_t     callbacks;

    uint32_t                 request_data_size;
    uint32_t                 shm_mmap_limit;

    pthread_mutex_t          mutex;

    nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
    nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */

    nxt_unit_port_id_t       ready_port_id;

    nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */

    pid_t                    pid;
    int                      log_fd;
    int                      online;

    nxt_unit_ctx_impl_t      main_ctx;
};


struct nxt_unit_port_impl_s {
    nxt_unit_port_t          port;

    nxt_queue_link_t         link;
    nxt_unit_process_t       *process;
};


struct nxt_unit_mmap_s {
    nxt_port_mmap_header_t   *hdr;
};


struct nxt_unit_mmaps_s {
    pthread_mutex_t          mutex;
    uint32_t                 size;
    uint32_t                 cap;
    nxt_atomic_t             allocated_chunks;
    nxt_unit_mmap_t          *elts;
};


struct nxt_unit_process_s {
    pid_t                    pid;

    nxt_queue_t              ports;

    nxt_unit_mmaps_t         incoming;
    nxt_unit_mmaps_t         outgoing;

    nxt_unit_impl_t          *lib;

    nxt_atomic_t             use_count;

    uint32_t                 next_port_id;
};


/* Explicitly using 32 bit types to avoid possible alignment. */
typedef struct {
    int32_t   pid;
    uint32_t  id;
} nxt_unit_port_hash_id_t;


nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
    int              rc;
    uint32_t         ready_stream, shm_limit;
    nxt_unit_ctx_t   *ctx;
    nxt_unit_impl_t  *lib;
    nxt_unit_port_t  ready_port, read_port;

    lib = nxt_unit_create(init);
    if (nxt_slow_path(lib == NULL)) {
        return NULL;
    }

    if (init->ready_port.id.pid != 0
        && init->ready_stream != 0
        && init->read_port.id.pid != 0)
    {
        ready_port = init->ready_port;
        ready_stream = init->ready_stream;
        read_port = init->read_port;
        lib->log_fd = init->log_fd;

        nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
                              ready_port.id.id);
        nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
                              read_port.id.id);

    } else {
        rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
                               &ready_stream, &shm_limit);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            goto fail;
        }

        lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
                                / PORT_MMAP_DATA_SIZE;
    }

    if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
        lib->shm_mmap_limit = 1;
    }

    lib->pid = read_port.id.pid;
    ctx = &lib->main_ctx.ctx;

    rc = lib->callbacks.add_port(ctx, &ready_port);
    if (rc != NXT_UNIT_OK) {
        nxt_unit_alert(NULL, "failed to add ready_port");

        goto fail;
    }

    rc = lib->callbacks.add_port(ctx, &read_port);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_alert(NULL, "failed to add read_port");

        goto fail;
    }

    lib->main_ctx.read_port_id = read_port.id;
    lib->ready_port_id = ready_port.id;

    rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_alert(NULL, "failed to send READY message");

        goto fail;
    }

    return ctx;

fail:

    free(lib);

    return NULL;
}


static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t *init)
{
    int                   rc;
    nxt_unit_impl_t       *lib;
    nxt_unit_callbacks_t  *cb;

    lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
    if (nxt_slow_path(lib == NULL)) {
        nxt_unit_alert(NULL, "failed to allocate unit struct");

        return NULL;
    }

    rc = pthread_mutex_init(&lib->mutex, NULL);
    if (nxt_slow_path(rc != 0)) {
        nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);

        goto fail;
    }

    lib->unit.data = init->data;
    lib->callbacks = init->callbacks;

    lib->request_data_size = init->request_data_size;
    lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
                            / PORT_MMAP_DATA_SIZE;

    lib->processes.slot = NULL;
    lib->ports.slot = NULL;

    lib->log_fd = STDERR_FILENO;
    lib->online = 1;

    nxt_queue_init(&lib->contexts);

    rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        goto fail;
    }

    cb = &lib->callbacks;

    if (cb->request_handler == NULL) {
        nxt_unit_alert(NULL, "request_handler is NULL");

        goto fail;
    }

    if (cb->add_port == NULL) {
        cb->add_port = nxt_unit_add_port;
    }

    if (cb->remove_port == NULL) {
        cb->remove_port = nxt_unit_remove_port;
    }

    if (cb->remove_pid == NULL) {
        cb->remove_pid = nxt_unit_remove_pid;
    }

    if (cb->quit == NULL) {
        cb->quit = nxt_unit_quit;
    }

    if (cb->port_send == NULL) {
        cb->port_send = nxt_unit_port_send_default;
    }

    if (cb->port_recv == NULL) {
        cb->port_recv = nxt_unit_port_recv_default;
    }

    return lib;

fail:

    free(lib);

    return NULL;
}


static int
nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
    void *data)
{
    int  rc;

    ctx_impl->ctx.data = data;
    ctx_impl->ctx.unit = &lib->unit;

    nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);

    rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
    if (nxt_slow_path(rc != 0)) {
        nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);

        return NXT_UNIT_ERROR;
    }

    nxt_queue_init(&ctx_impl->free_req);
    nxt_queue_init(&ctx_impl->free_ws);
    nxt_queue_init(&ctx_impl->active_req);

    ctx_impl->free_buf = NULL;
    nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
    nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);

    nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);

    ctx_impl->pending_read_head = NULL;
    ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
    ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
    ctx_impl->ctx_read_buf.next = NULL;

    ctx_impl->req.req.ctx = &ctx_impl->ctx;
    ctx_impl->req.req.unit = &lib->unit;

    ctx_impl->read_port_fd = -1;
    ctx_impl->requests.slot = 0;

    return NXT_UNIT_OK;
}


nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
    nxt_unit_mmap_buf_t *mmap_buf)
{
    mmap_buf->next = *head;

    if (mmap_buf->next != NULL) {
        mmap_buf->next->prev = &mmap_buf->next;
    }

    *head = mmap_buf;
    mmap_buf->prev = head;
}


nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
    nxt_unit_mmap_buf_t *mmap_buf)
{
    while (*prev != NULL) {
        prev = &(*prev)->next;
    }

    nxt_unit_mmap_buf_insert(prev, mmap_buf);
}


nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
{
    nxt_unit_mmap_buf_t  **prev;

    prev = mmap_buf->prev;

    if (mmap_buf->next != NULL) {
        mmap_buf->next->prev = prev;
    }

    if (prev != NULL) {
        *prev = mmap_buf->next;
    }
}


static int
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
    int *log_fd, uint32_t *stream, uint32_t *shm_limit)
{
    int       rc;
    int       ready_fd, read_fd;
    char      *unit_init, *version_end;
    long      version_length;
    int64_t   ready_pid, read_pid;
    uint32_t  ready_stream, ready_id, read_id;

    unit_init = getenv(NXT_UNIT_INIT_ENV);
    if (nxt_slow_path(unit_init == NULL)) {
        nxt_unit_alert(NULL, "%s is not in the current environment",
                       NXT_UNIT_INIT_ENV);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);

    version_length = nxt_length(NXT_VERSION);

    version_end = strchr(unit_init, ';');
    if (version_end == NULL
        || version_end - unit_init != version_length
        || memcmp(unit_init, NXT_VERSION, version_length) != 0)
    {
        nxt_unit_alert(NULL, "version check error");

        return NXT_UNIT_ERROR;
    }

    rc = sscanf(version_end + 1,
                "%"PRIu32";"
                "%"PRId64",%"PRIu32",%d;"
                "%"PRId64",%"PRIu32",%d;"
                "%d,%"PRIu32,
                &ready_stream,
                &ready_pid, &ready_id, &ready_fd,
                &read_pid, &read_id, &read_fd,
                log_fd, shm_limit);

    if (nxt_slow_path(rc != 9)) {
        nxt_unit_alert(NULL, "failed to scan variables: %d", rc);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);

    ready_port->in_fd = -1;
    ready_port->out_fd = ready_fd;
    ready_port->data = NULL;

    nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);

    read_port->in_fd = read_fd;
    read_port->out_fd = -1;
    read_port->data = NULL;

    *stream = ready_stream;

    return NXT_UNIT_OK;
}


static int
nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    uint32_t stream)
{
    ssize_t          res;
    nxt_port_msg_t   msg;
    nxt_unit_impl_t  *lib;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    msg.stream = stream;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = _NXT_PORT_MSG_PROCESS_READY;
    msg.last = 1;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
    if (res != sizeof(msg)) {
        return NXT_UNIT_ERROR;
    }

    return NXT_UNIT_OK;
}


int
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    void *buf, size_t buf_size, void *oob, size_t oob_size)
{
    int                   rc;
    pid_t                 pid;
    struct cmsghdr        *cm;
    nxt_port_msg_t        *port_msg;
    nxt_unit_impl_t       *lib;
    nxt_unit_recv_msg_t   recv_msg;
    nxt_unit_callbacks_t  *cb;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    rc = NXT_UNIT_ERROR;
    recv_msg.fd = -1;
    recv_msg.process = NULL;
    port_msg = buf;
    cm = oob;

    if (oob_size >= CMSG_SPACE(sizeof(int))
        && cm->cmsg_len == CMSG_LEN(sizeof(int))
        && cm->cmsg_level == SOL_SOCKET
        && cm->cmsg_type == SCM_RIGHTS)
    {
        memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
    }

    recv_msg.incoming_buf = NULL;

    if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
        nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
        goto fail;
    }

    recv_msg.stream = port_msg->stream;
    recv_msg.pid = port_msg->pid;
    recv_msg.reply_port = port_msg->reply_port;
    recv_msg.last = port_msg->last;
    recv_msg.mmap = port_msg->mmap;

    recv_msg.start = port_msg + 1;
    recv_msg.size = buf_size - sizeof(nxt_port_msg_t);

    if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
        nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
                      port_msg->stream, (int) port_msg->type);
        goto fail;
    }

    if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
        rc = NXT_UNIT_OK;

        goto fail;
    }

    /* Fragmentation is unsupported. */
    if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
        nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
                      port_msg->stream, (int) port_msg->type);
        goto fail;
    }

    if (port_msg->mmap) {
        if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
            goto fail;
        }
    }

    cb = &lib->callbacks;

    switch (port_msg->type) {

    case _NXT_PORT_MSG_QUIT:
        nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);

        cb->quit(ctx);
        rc = NXT_UNIT_OK;
        break;

    case _NXT_PORT_MSG_NEW_PORT:
        rc = nxt_unit_process_new_port(ctx, &recv_msg);
        break;

    case _NXT_PORT_MSG_CHANGE_FILE:
        nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
                       port_msg->stream, recv_msg.fd);
        break;

    case _NXT_PORT_MSG_MMAP:
        if (nxt_slow_path(recv_msg.fd < 0)) {
            nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
                           port_msg->stream, recv_msg.fd);

            goto fail;
        }

        rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
        break;

    case _NXT_PORT_MSG_REQ_HEADERS:
        rc = nxt_unit_process_req_headers(ctx, &recv_msg);
        break;

    case _NXT_PORT_MSG_WEBSOCKET:
        rc = nxt_unit_process_websocket(ctx, &recv_msg);
        break;

    case _NXT_PORT_MSG_REMOVE_PID:
        if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
            nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
                          "(%d != %d)", port_msg->stream, (int) recv_msg.size,
                          (int) sizeof(pid));

            goto fail;
        }

        memcpy(&pid, recv_msg.start, sizeof(pid));

        nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
                       port_msg->stream, (int) pid);

        cb->remove_pid(ctx, pid);

        rc = NXT_UNIT_OK;
        break;

    case _NXT_PORT_MSG_SHM_ACK:
        rc = nxt_unit_process_shm_ack(ctx);
        break;

    default:
        nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
                       port_msg->stream, (int) port_msg->type);

        goto fail;
    }

fail:

    if (recv_msg.fd != -1) {
        close(recv_msg.fd);
    }

    while (recv_msg.incoming_buf != NULL) {
        nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
    }

    if (recv_msg.process != NULL) {
        nxt_unit_process_use(ctx, recv_msg.process, -1);
    }

    return rc;
}


static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    int                      nb;
    nxt_unit_impl_t          *lib;
    nxt_unit_port_t          new_port;
    nxt_port_msg_new_port_t  *new_port_msg;

    if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
        nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
                      "invalid message size (%d)",
                      recv_msg->stream, (int) recv_msg->size);

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(recv_msg->fd < 0)) {
        nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
                       recv_msg->stream, recv_msg->fd);

        return NXT_UNIT_ERROR;
    }

    new_port_msg = recv_msg->start;

    nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
                   recv_msg->stream, (int) new_port_msg->pid,
                   (int) new_port_msg->id, recv_msg->fd);

    nb = 0;

    if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
        nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
                       "failed: %s (%d)",
                       recv_msg->stream, recv_msg->fd, strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
                          new_port_msg->id);

    new_port.in_fd = -1;
    new_port.out_fd = recv_msg->fd;
    new_port.data = NULL;

    recv_msg->fd = -1;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    return lib->callbacks.add_port(ctx, &new_port);
}


static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    nxt_unit_impl_t               *lib;
    nxt_unit_request_t            *r;
    nxt_unit_mmap_buf_t           *b;
    nxt_unit_request_info_t       *req;
    nxt_unit_request_info_impl_t  *req_impl;

    if (nxt_slow_path(recv_msg->mmap == 0)) {
        nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
                      recv_msg->stream);

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
        nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
                      "%d expected", recv_msg->stream, (int) recv_msg->size,
                      (int) sizeof(nxt_unit_request_t));

        return NXT_UNIT_ERROR;
    }

    req_impl = nxt_unit_request_info_get(ctx);
    if (nxt_slow_path(req_impl == NULL)) {
        nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
                      recv_msg->stream);

        return NXT_UNIT_ERROR;
    }

    req = &req_impl->req;

    nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
                          recv_msg->reply_port);

    req->request = recv_msg->start;

    b = recv_msg->incoming_buf;

    req->request_buf = &b->buf;
    req->response = NULL;
    req->response_buf = NULL;

    r = req->request;

    req->content_length = r->content_length;

    req->content_buf = req->request_buf;
    req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);

    /* "Move" process reference to req_impl. */
    req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
    if (nxt_slow_path(req_impl->process == NULL)) {
        return NXT_UNIT_ERROR;
    }

    recv_msg->process = NULL;

    req_impl->stream = recv_msg->stream;

    req_impl->outgoing_buf = NULL;

    for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
        b->req = req;
    }

    /* "Move" incoming buffer list to req_impl. */
    req_impl->incoming_buf = recv_msg->incoming_buf;
    req_impl->incoming_buf->prev = &req_impl->incoming_buf;
    recv_msg->incoming_buf = NULL;

    req->content_fd = recv_msg->fd;
    recv_msg->fd = -1;

    req->response_max_fields = 0;
    req_impl->state = NXT_UNIT_RS_START;
    req_impl->websocket = 0;

    nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
                   (int) r->method_length, nxt_unit_sptr_get(&r->method),
                   (int) r->target_length, nxt_unit_sptr_get(&r->target),
                   (int) r->content_length);

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    lib->callbacks.request_handler(req);

    return NXT_UNIT_OK;
}


static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    size_t                           hsize;
    nxt_unit_impl_t                  *lib;
    nxt_unit_mmap_buf_t              *b;
    nxt_unit_ctx_impl_t              *ctx_impl;
    nxt_unit_callbacks_t             *cb;
    nxt_unit_request_info_t          *req;
    nxt_unit_request_info_impl_t     *req_impl;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
                                          recv_msg->last);
    if (req_impl == NULL) {
        return NXT_UNIT_OK;
    }

    req = &req_impl->req;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
    cb = &lib->callbacks;

    if (cb->websocket_handler && recv_msg->size >= 2) {
        ws_impl = nxt_unit_websocket_frame_get(ctx);
        if (nxt_slow_path(ws_impl == NULL)) {
            nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
                          req_impl->stream);

            return NXT_UNIT_ERROR;
        }

        ws_impl->ws.req = req;

        ws_impl->buf = NULL;

        if (recv_msg->mmap) {
            for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
                b->req = req;
            }

            /* "Move" incoming buffer list to ws_impl. */
            ws_impl->buf = recv_msg->incoming_buf;
            ws_impl->buf->prev = &ws_impl->buf;
            recv_msg->incoming_buf = NULL;

            b = ws_impl->buf;

        } else {
            b = nxt_unit_mmap_buf_get(ctx);
            if (nxt_slow_path(b == NULL)) {
                nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
                               req_impl->stream);

                nxt_unit_websocket_frame_release(&ws_impl->ws);

                return NXT_UNIT_ERROR;
            }

            b->req = req;
            b->buf.start = recv_msg->start;
            b->buf.free = b->buf.start;
            b->buf.end = b->buf.start + recv_msg->size;

            nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
        }

        ws_impl->ws.header = (void *) b->buf.start;
        ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
            ws_impl->ws.header);

        hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);

        if (ws_impl->ws.header->mask) {
            ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;

        } else {
            ws_impl->ws.mask = NULL;
        }

        b->buf.free += hsize;

        ws_impl->ws.content_buf = &b->buf;
        ws_impl->ws.content_length = ws_impl->ws.payload_len;

        nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
                           "payload_len=%"PRIu64,
                            ws_impl->ws.header->opcode,
                            ws_impl->ws.payload_len);

        cb->websocket_handler(&ws_impl->ws);
    }

    if (recv_msg->last) {
        req_impl->websocket = 0;

        if (cb->close_handler) {
            nxt_unit_req_debug(req, "close_handler");

            cb->close_handler(req);

        } else {
            nxt_unit_request_done(req, NXT_UNIT_ERROR);
        }
    }

    return NXT_UNIT_OK;
}


static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t       *lib;
    nxt_unit_callbacks_t  *cb;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
    cb = &lib->callbacks;

    if (cb->shm_ack_handler != NULL) {
        cb->shm_ack_handler(ctx);
    }

    return NXT_UNIT_OK;
}


static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t               *lib;
    nxt_queue_link_t              *lnk;
    nxt_unit_ctx_impl_t           *ctx_impl;
    nxt_unit_request_info_impl_t  *req_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&ctx_impl->mutex);

    if (nxt_queue_is_empty(&ctx_impl->free_req)) {
        pthread_mutex_unlock(&ctx_impl->mutex);

        req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
                          + lib->request_data_size);
        if (nxt_slow_path(req_impl == NULL)) {
            return NULL;
        }

        req_impl->req.unit = ctx->unit;
        req_impl->req.ctx = ctx;

        pthread_mutex_lock(&ctx_impl->mutex);

    } else {
        lnk = nxt_queue_first(&ctx_impl->free_req);
        nxt_queue_remove(lnk);

        req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
    }

    nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);

    pthread_mutex_unlock(&ctx_impl->mutex);

    req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;

    return req_impl;
}


static void
nxt_unit_request_info_release(nxt_unit_request_info_t *req)
{
    nxt_unit_ctx_impl_t           *ctx_impl;
    nxt_unit_request_info_impl_t  *req_impl;

    ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    req->response = NULL;
    req->response_buf = NULL;

    if (req_impl->websocket) {
        nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);

        req_impl->websocket = 0;
    }

    while (req_impl->outgoing_buf != NULL) {
        nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
    }

    while (req_impl->incoming_buf != NULL) {
        nxt_unit_mmap_buf_free(req_impl->incoming_buf);
    }

    if (req->content_fd != -1) {
        close(req->content_fd);

        req->content_fd = -1;
    }

    /*
     * Process release should go after buffers release to guarantee mmap
     * existence.
     */
    if (req_impl->process != NULL) {
        nxt_unit_process_use(req->ctx, req_impl->process, -1);

        req_impl->process = NULL;
    }

    pthread_mutex_lock(&ctx_impl->mutex);

    nxt_queue_remove(&req_impl->link);

    nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);

    pthread_mutex_unlock(&ctx_impl->mutex);

    req_impl->state = NXT_UNIT_RS_RELEASED;
}


static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
{
    nxt_unit_ctx_impl_t  *ctx_impl;

    ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);

    nxt_queue_remove(&req_impl->link);

    if (req_impl != &ctx_impl->req) {
        free(req_impl);
    }
}


static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
{
    nxt_queue_link_t                 *lnk;
    nxt_unit_ctx_impl_t              *ctx_impl;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    pthread_mutex_lock(&ctx_impl->mutex);

    if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
        pthread_mutex_unlock(&ctx_impl->mutex);

        ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
        if (nxt_slow_path(ws_impl == NULL)) {
            return NULL;
        }

    } else {
        lnk = nxt_queue_first(&ctx_impl->free_ws);
        nxt_queue_remove(lnk);

        pthread_mutex_unlock(&ctx_impl->mutex);

        ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
    }

    ws_impl->ctx_impl = ctx_impl;

    return ws_impl;
}


static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
{
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);

    while (ws_impl->buf != NULL) {
        nxt_unit_mmap_buf_free(ws_impl->buf);
    }

    ws->req = NULL;

    pthread_mutex_lock(&ws_impl->ctx_impl->mutex);

    nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);

    pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
}


static void
nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
{
    nxt_queue_remove(&ws_impl->link);

    free(ws_impl);
}


uint16_t
nxt_unit_field_hash(const char *name, size_t name_length)
{
    u_char      ch;
    uint32_t    hash;
    const char  *p, *end;

    hash = 159406; /* Magic value copied from nxt_http_parse.c */
    end = name + name_length;

    for (p = name; p < end; p++) {
        ch = *p;
        hash = (hash << 4) + hash + nxt_lowcase(ch);
    }

    hash = (hash >> 16) ^ hash;

    return hash;
}


void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
{
    uint32_t            i, j;
    nxt_unit_field_t    *fields, f;
    nxt_unit_request_t  *r;

    nxt_unit_req_debug(req, "group_dup_fields");

    r = req->request;
    fields = r->fields;

    for (i = 0; i < r->fields_count; i++) {

        switch (fields[i].hash) {
        case NXT_UNIT_HASH_CONTENT_LENGTH:
            r->content_length_field = i;
            break;

        case NXT_UNIT_HASH_CONTENT_TYPE:
            r->content_type_field = i;
            break;

        case NXT_UNIT_HASH_COOKIE:
            r->cookie_field = i;
            break;
        };

        for (j = i + 1; j < r->fields_count; j++) {
            if (fields[i].hash != fields[j].hash) {
                continue;
            }

            if (j == i + 1) {
                continue;
            }

            f = fields[j];
            f.name.offset += (j - (i + 1)) * sizeof(f);
            f.value.offset += (j - (i + 1)) * sizeof(f);

            while (j > i + 1) {
                fields[j] = fields[j - 1];
                fields[j].name.offset -= sizeof(f);
                fields[j].value.offset -= sizeof(f);
                j--;
            }

            fields[j] = f;

            i++;
        }
    }
}


int
nxt_unit_response_init(nxt_unit_request_info_t *req,
    uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
{
    uint32_t                      buf_size;
    nxt_unit_buf_t                *buf;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "init: response already sent");

        return NXT_UNIT_ERROR;
    }

    nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
                       (int) max_fields_count, (int) max_fields_size);

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_debug(req, "duplicate response init");
    }

    /*
     * Each field name and value 0-terminated by libunit,
     * this is the reason of '+ 2' below.
     */
    buf_size = sizeof(nxt_unit_response_t)
               + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
               + max_fields_size;

    if (nxt_slow_path(req->response_buf != NULL)) {
        buf = req->response_buf;

        if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
            goto init_response;
        }

        nxt_unit_buf_free(buf);

        req->response_buf = NULL;
        req->response = NULL;
        req->response_max_fields = 0;

        req_impl->state = NXT_UNIT_RS_START;
    }

    buf = nxt_unit_response_buf_alloc(req, buf_size);
    if (nxt_slow_path(buf == NULL)) {
        return NXT_UNIT_ERROR;
    }

init_response:

    memset(buf->start, 0, sizeof(nxt_unit_response_t));

    req->response_buf = buf;

    req->response = (nxt_unit_response_t *) buf->start;
    req->response->status = status;

    buf->free = buf->start + sizeof(nxt_unit_response_t)
                + max_fields_count * sizeof(nxt_unit_field_t);

    req->response_max_fields = max_fields_count;
    req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;

    return NXT_UNIT_OK;
}


int
nxt_unit_response_realloc(nxt_unit_request_info_t *req,
    uint32_t max_fields_count, uint32_t max_fields_size)
{
    char                          *p;
    uint32_t                      i, buf_size;
    nxt_unit_buf_t                *buf;
    nxt_unit_field_t              *f, *src;
    nxt_unit_response_t           *resp;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "realloc: response not init");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "realloc: response already sent");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
        nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");

        return NXT_UNIT_ERROR;
    }

    /*
     * Each field name and value 0-terminated by libunit,
     * this is the reason of '+ 2' below.
     */
    buf_size = sizeof(nxt_unit_response_t)
               + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
               + max_fields_size;

    nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);

    buf = nxt_unit_response_buf_alloc(req, buf_size);
    if (nxt_slow_path(buf == NULL)) {
        nxt_unit_req_warn(req, "realloc: new buf allocation failed");
        return NXT_UNIT_ERROR;
    }

    resp = (nxt_unit_response_t *) buf->start;

    memset(resp, 0, sizeof(nxt_unit_response_t));

    resp->status = req->response->status;
    resp->content_length = req->response->content_length;

    p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
    f = resp->fields;

    for (i = 0; i < req->response->fields_count; i++) {
        src = req->response->fields + i;

        if (nxt_slow_path(src->skip != 0)) {
            continue;
        }

        if (nxt_slow_path(src->name_length + src->value_length + 2
                          > (uint32_t) (buf->end - p)))
        {
            nxt_unit_req_warn(req, "realloc: not enough space for field"
                  " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
                  i, src, src->name_length, src->value_length);

            goto fail;
        }

        nxt_unit_sptr_set(&f->name, p);
        p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
        *p++ = '\0';

        nxt_unit_sptr_set(&f->value, p);
        p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
        *p++ = '\0';

        f->hash = src->hash;
        f->skip = 0;
        f->name_length = src->name_length;
        f->value_length = src->value_length;

        resp->fields_count++;
        f++;
    }

    if (req->response->piggyback_content_length > 0) {
        if (nxt_slow_path(req->response->piggyback_content_length
                          > (uint32_t) (buf->end - p)))
        {
            nxt_unit_req_warn(req, "realloc: not enought space for content"
                  " #%"PRIu32", %"PRIu32" required",
                  i, req->response->piggyback_content_length);

            goto fail;
        }

        resp->piggyback_content_length =
                                       req->response->piggyback_content_length;

        nxt_unit_sptr_set(&resp->piggyback_content, p);
        p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
                       req->response->piggyback_content_length);
    }

    buf->free = p;

    nxt_unit_buf_free(req->response_buf);

    req->response = resp;
    req->response_buf = buf;
    req->response_max_fields = max_fields_count;

    return NXT_UNIT_OK;

fail:

    nxt_unit_buf_free(buf);

    return NXT_UNIT_ERROR;
}


int
nxt_unit_response_is_init(nxt_unit_request_info_t *req)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
}


int
nxt_unit_response_add_field(nxt_unit_request_info_t *req,
    const char *name, uint8_t name_length,
    const char *value, uint32_t value_length)
{
    nxt_unit_buf_t                *buf;
    nxt_unit_field_t              *f;
    nxt_unit_response_t           *resp;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "add_field: response not initialized or "
                          "already sent");

        return NXT_UNIT_ERROR;
    }

    resp = req->response;

    if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
        nxt_unit_req_warn(req, "add_field: too many response fields");

        return NXT_UNIT_ERROR;
    }

    buf = req->response_buf;

    if (nxt_slow_path(name_length + value_length + 2
                      > (uint32_t) (buf->end - buf->free)))
    {
        nxt_unit_req_warn(req, "add_field: response buffer overflow");

        return NXT_UNIT_ERROR;
    }

    nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
                       resp->fields_count,
                       (int) name_length, name,
                       (int) value_length, value);

    f = resp->fields + resp->fields_count;

    nxt_unit_sptr_set(&f->name, buf->free);
    buf->free = nxt_cpymem(buf->free, name, name_length);
    *buf->free++ = '\0';

    nxt_unit_sptr_set(&f->value, buf->free);
    buf->free = nxt_cpymem(buf->free, value, value_length);
    *buf->free++ = '\0';

    f->hash = nxt_unit_field_hash(name, name_length);
    f->skip = 0;
    f->name_length = name_length;
    f->value_length = value_length;

    resp->fields_count++;

    return NXT_UNIT_OK;
}


int
nxt_unit_response_add_content(nxt_unit_request_info_t *req,
    const void* src, uint32_t size)
{
    nxt_unit_buf_t                *buf;
    nxt_unit_response_t           *resp;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "add_content: response not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "add_content: response already sent");

        return NXT_UNIT_ERROR;
    }

    buf = req->response_buf;

    if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
        nxt_unit_req_warn(req, "add_content: buffer overflow");

        return NXT_UNIT_ERROR;
    }

    resp = req->response;

    if (resp->piggyback_content_length == 0) {
        nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
        req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
    }

    resp->piggyback_content_length += size;

    buf->free = nxt_cpymem(buf->free, src, size);

    return NXT_UNIT_OK;
}


int
nxt_unit_response_send(nxt_unit_request_info_t *req)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "send: response is not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "send: response already sent");

        return NXT_UNIT_ERROR;
    }

    if (req->request->websocket_handshake && req->response->status == 101) {
        nxt_unit_response_upgrade(req);
    }

    nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
                       req->response->fields_count,
                       (int) (req->response_buf->free
                              - req->response_buf->start));

    mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);

    rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
    if (nxt_fast_path(rc == NXT_UNIT_OK)) {
        req->response = NULL;
        req->response_buf = NULL;
        req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;

        nxt_unit_mmap_buf_free(mmap_buf);
    }

    return rc;
}


int
nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
}


nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_impl_t  *req_impl;

    if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
        nxt_unit_req_warn(req, "response_buf_alloc: "
                          "requested buffer (%"PRIu32") too big", size);

        return NULL;
    }

    nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
    if (nxt_slow_path(mmap_buf == NULL)) {
        nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");

        return NULL;
    }

    mmap_buf->req = req;

    nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);

    rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
                                   &req->response_port, size, size, mmap_buf,
                                   NULL);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_mmap_buf_release(mmap_buf);

        return NULL;
    }

    return &mmap_buf->buf;
}


static nxt_unit_process_t *
nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    nxt_unit_impl_t  *lib;

    if (recv_msg->process != NULL) {
        return recv_msg->process;
    }

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);

    pthread_mutex_unlock(&lib->mutex);

    if (recv_msg->process == NULL) {
        nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
                      recv_msg->stream, (int) recv_msg->pid);
    }

    return recv_msg->process;
}


static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
    nxt_unit_mmap_buf_t  *mmap_buf;
    nxt_unit_ctx_impl_t  *ctx_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    pthread_mutex_lock(&ctx_impl->mutex);

    if (ctx_impl->free_buf == NULL) {
        pthread_mutex_unlock(&ctx_impl->mutex);

        mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
        if (nxt_slow_path(mmap_buf == NULL)) {
            return NULL;
        }

    } else {
        mmap_buf = ctx_impl->free_buf;

        nxt_unit_mmap_buf_unlink(mmap_buf);

        pthread_mutex_unlock(&ctx_impl->mutex);
    }

    mmap_buf->ctx_impl = ctx_impl;

    mmap_buf->hdr = NULL;
    mmap_buf->free_ptr = NULL;

    return mmap_buf;
}


static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
{
    nxt_unit_mmap_buf_unlink(mmap_buf);

    pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);

    nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);

    pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
}


typedef struct {
    size_t      len;
    const char  *str;
} nxt_unit_str_t;


#define nxt_unit_str(str)  { nxt_length(str), str }


int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
{
    return req->request->websocket_handshake;
}


int
nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
{
    int                           rc;
    nxt_unit_ctx_impl_t           *ctx_impl;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    if (nxt_slow_path(req_impl->websocket != 0)) {
        nxt_unit_req_debug(req, "upgrade: already upgraded");

        return NXT_UNIT_OK;
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "upgrade: response is not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "upgrade: response already sent");

        return NXT_UNIT_ERROR;
    }

    ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);

    rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_req_warn(req, "upgrade: failed to add request to hash");

        return NXT_UNIT_ERROR;
    }

    req_impl->websocket = 1;

    req->response->status = 101;

    return NXT_UNIT_OK;
}


int
nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    return req_impl->websocket;
}


nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void *data)
{
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);

    return &req_impl->req;
}


int
nxt_unit_buf_send(nxt_unit_buf_t *buf)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_t       *req;
    nxt_unit_request_info_impl_t  *req_impl;

    mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

    req = mmap_buf->req;
    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    nxt_unit_req_debug(req, "buf_send: %d bytes",
                       (int) (buf->free - buf->start));

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "buf_send: response not initialized yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
        nxt_unit_req_warn(req, "buf_send: headers not sent yet");

        return NXT_UNIT_ERROR;
    }

    if (nxt_fast_path(buf->free > buf->start)) {
        rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return rc;
        }
    }

    nxt_unit_mmap_buf_free(mmap_buf);

    return NXT_UNIT_OK;
}


static void
nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
{
    int                           rc;
    nxt_unit_mmap_buf_t           *mmap_buf;
    nxt_unit_request_info_t       *req;
    nxt_unit_request_info_impl_t  *req_impl;

    mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

    req = mmap_buf->req;
    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
    if (nxt_slow_path(rc == NXT_UNIT_OK)) {
        nxt_unit_mmap_buf_free(mmap_buf);

        nxt_unit_request_info_release(req);

    } else {
        nxt_unit_request_done(req, rc);
    }
}


static int
nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
    nxt_unit_mmap_buf_t *mmap_buf, int last)
{
    struct {
        nxt_port_msg_t       msg;
        nxt_port_mmap_msg_t  mmap_msg;
    } m;

    int                      rc;
    u_char                   *last_used, *first_free;
    ssize_t                  res;
    nxt_chunk_id_t           first_free_chunk;
    nxt_unit_buf_t           *buf;
    nxt_unit_impl_t          *lib;
    nxt_port_mmap_header_t   *hdr;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    buf = &mmap_buf->buf;
    hdr = mmap_buf->hdr;

    m.mmap_msg.size = buf->free - buf->start;

    m.msg.stream = stream;
    m.msg.pid = lib->pid;
    m.msg.reply_port = 0;
    m.msg.type = _NXT_PORT_MSG_DATA;
    m.msg.last = last != 0;
    m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
    m.msg.nf = 0;
    m.msg.mf = 0;
    m.msg.tracking = 0;

    rc = NXT_UNIT_ERROR;

    if (m.msg.mmap) {
        m.mmap_msg.mmap_id = hdr->id;
        m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
                                                     (u_char *) buf->start);

        nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
                       stream,
                       (int) m.mmap_msg.mmap_id,
                       (int) m.mmap_msg.chunk_id,
                       (int) m.mmap_msg.size);

        res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
                                       NULL, 0);
        if (nxt_slow_path(res != sizeof(m))) {
            goto free_buf;
        }

        last_used = (u_char *) buf->free - 1;
        first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;

        if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
            first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);

            buf->start = (char *) first_free;
            buf->free = buf->start;

            if (buf->end < buf->start) {
                buf->end = buf->start;
            }

        } else {
            buf->start = NULL;
            buf->free = NULL;
            buf->end = NULL;

            mmap_buf->hdr = NULL;
        }

        nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
                            (int) m.mmap_msg.chunk_id - (int) first_free_chunk);

        nxt_unit_debug(ctx, "process %d allocated_chunks %d",
                       mmap_buf->process->pid,
                       mmap_buf->process->outgoing.allocated_chunks);

    } else {
        if (nxt_slow_path(mmap_buf->plain_ptr == NULL
                          || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
        {
            nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
                          ": no space reserved for message header", stream);

            goto free_buf;
        }

        memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));

        nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
                       stream,
                       (int) (sizeof(m.msg) + m.mmap_msg.size));

        res = lib->callbacks.port_send(ctx, &mmap_buf->port_id,
                                       buf->start - sizeof(m.msg),
                                       m.mmap_msg.size + sizeof(m.msg),
                                       NULL, 0);
        if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
            goto free_buf;
        }
    }

    rc = NXT_UNIT_OK;

free_buf:

    nxt_unit_free_outgoing_buf(mmap_buf);

    return rc;
}


void
nxt_unit_buf_free(nxt_unit_buf_t *buf)
{
    nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
}


static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
{
    nxt_unit_free_outgoing_buf(mmap_buf);

    nxt_unit_mmap_buf_release(mmap_buf);
}


static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
{
    if (mmap_buf->hdr != NULL) {
        nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
                              mmap_buf->process,
                              mmap_buf->hdr, mmap_buf->buf.start,
                              mmap_buf->buf.end - mmap_buf->buf.start);

        mmap_buf->hdr = NULL;

        return;
    }

    if (mmap_buf->free_ptr != NULL) {
        free(mmap_buf->free_ptr);

        mmap_buf->free_ptr = NULL;
    }
}


static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
{
    nxt_unit_ctx_impl_t  *ctx_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    pthread_mutex_lock(&ctx_impl->mutex);

    return nxt_unit_read_buf_get_impl(ctx_impl);
}


static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
{
    nxt_unit_read_buf_t  *rbuf;

    if (ctx_impl->free_read_buf != NULL) {
        rbuf = ctx_impl->free_read_buf;
        ctx_impl->free_read_buf = rbuf->next;

        pthread_mutex_unlock(&ctx_impl->mutex);

        return rbuf;
    }

    pthread_mutex_unlock(&ctx_impl->mutex);

    rbuf = malloc(sizeof(nxt_unit_read_buf_t));

    return rbuf;
}


static void
nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
    nxt_unit_read_buf_t *rbuf)
{
    nxt_unit_ctx_impl_t  *ctx_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    pthread_mutex_lock(&ctx_impl->mutex);

    rbuf->next = ctx_impl->free_read_buf;
    ctx_impl->free_read_buf = rbuf;

    pthread_mutex_unlock(&ctx_impl->mutex);
}


nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t *buf)
{
    nxt_unit_mmap_buf_t  *mmap_buf;

    mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

    if (mmap_buf->next == NULL) {
        return NULL;
    }

    return &mmap_buf->next->buf;
}


uint32_t
nxt_unit_buf_max(void)
{
    return PORT_MMAP_DATA_SIZE;
}


uint32_t
nxt_unit_buf_min(void)
{
    return PORT_MMAP_CHUNK_SIZE;
}


int
nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
    size_t size)
{
    ssize_t  res;

    res = nxt_unit_response_write_nb(req, start, size, size);

    return res < 0 ? -res : NXT_UNIT_OK;
}


ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
    size_t size, size_t min_size)
{
    int                           rc;
    ssize_t                       sent;
    uint32_t                      part_size, min_part_size, buf_size;
    const char                    *part_start;
    nxt_unit_mmap_buf_t           mmap_buf;
    nxt_unit_request_info_impl_t  *req_impl;
    char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    part_start = start;
    sent = 0;

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
        nxt_unit_req_warn(req, "write: response not initialized yet");

        return -NXT_UNIT_ERROR;
    }

    /* Check if response is not send yet. */
    if (nxt_slow_path(req->response_buf != NULL)) {
        part_size = req->response_buf->end - req->response_buf->free;
        part_size = nxt_min(size, part_size);

        rc = nxt_unit_response_add_content(req, part_start, part_size);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return -rc;
        }

        rc = nxt_unit_response_send(req);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return -rc;
        }

        size -= part_size;
        part_start += part_size;
        sent += part_size;

        min_size -= nxt_min(min_size, part_size);
    }

    while (size > 0) {
        part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
        min_part_size = nxt_min(min_size, part_size);
        min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);

        rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
                                       &req->response_port, part_size,
                                       min_part_size, &mmap_buf, local_buf);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return -rc;
        }

        buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
        if (nxt_slow_path(buf_size == 0)) {
            return sent;
        }
        part_size = nxt_min(buf_size, part_size);

        mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
                                       part_start, part_size);

        rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return -rc;
        }

        size -= part_size;
        part_start += part_size;
        sent += part_size;

        min_size -= nxt_min(min_size, part_size);
    }

    return sent;
}


int
nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
    nxt_unit_read_info_t *read_info)
{
    int                           rc;
    ssize_t                       n;
    uint32_t                      buf_size;
    nxt_unit_buf_t                *buf;
    nxt_unit_mmap_buf_t           mmap_buf;
    nxt_unit_request_info_impl_t  *req_impl;
    char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    /* Check if response is not send yet. */
    if (nxt_slow_path(req->response_buf)) {

        /* Enable content in headers buf. */
        rc = nxt_unit_response_add_content(req, "", 0);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_req_error(req, "Failed to add piggyback content");

            return rc;
        }

        buf = req->response_buf;

        while (buf->end - buf->free > 0) {
            n = read_info->read(read_info, buf->free, buf->end - buf->free);
            if (nxt_slow_path(n < 0)) {
                nxt_unit_req_error(req, "Read error");

                return NXT_UNIT_ERROR;
            }

            /* Manually increase sizes. */
            buf->free += n;
            req->response->piggyback_content_length += n;

            if (read_info->eof) {
                break;
            }
        }

        rc = nxt_unit_response_send(req);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_req_error(req, "Failed to send headers with content");

            return rc;
        }

        if (read_info->eof) {
            return NXT_UNIT_OK;
        }
    }

    while (!read_info->eof) {
        nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
                           read_info->buf_size);

        buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);

        rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
                                       &req->response_port,
                                       buf_size, buf_size,
                                       &mmap_buf, local_buf);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            return rc;
        }

        buf = &mmap_buf.buf;

        while (!read_info->eof && buf->end > buf->free) {
            n = read_info->read(read_info, buf->free, buf->end - buf->free);
            if (nxt_slow_path(n < 0)) {
                nxt_unit_req_error(req, "Read error");

                nxt_unit_free_outgoing_buf(&mmap_buf);

                return NXT_UNIT_ERROR;
            }

            buf->free += n;
        }

        rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            nxt_unit_req_error(req, "Failed to send content");

            return rc;
        }
    }

    return NXT_UNIT_OK;
}


ssize_t
nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
{
    ssize_t  buf_res, res;

    buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
                                dst, size);

    if (buf_res < (ssize_t) size && req->content_fd != -1) {
        res = read(req->content_fd, dst, size);
        if (res < 0) {
            nxt_unit_req_alert(req, "failed to read content: %s (%d)",
                               strerror(errno), errno);

            return res;
        }

        if (res < (ssize_t) size) {
            close(req->content_fd);

            req->content_fd = -1;
        }

        req->content_length -= res;
        size -= res;

        dst = nxt_pointer_to(dst, res);

    } else {
        res = 0;
    }

    return buf_res + res;
}


ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
{
    char                 *p;
    size_t               l_size, b_size;
    nxt_unit_buf_t       *b;
    nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;

    if (req->content_length == 0) {
        return 0;
    }

    l_size = 0;

    b = req->content_buf;

    while (b != NULL) {
        b_size = b->end - b->free;
        p = memchr(b->free, '\n', b_size);

        if (p != NULL) {
            p++;
            l_size += p - b->free;
            break;
        }

        l_size += b_size;

        if (max_size <= l_size) {
            break;
        }

        mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
        if (mmap_buf->next == NULL
            && req->content_fd != -1
            && l_size < req->content_length)
        {
            preread_buf = nxt_unit_request_preread(req, 16384);
            if (nxt_slow_path(preread_buf == NULL)) {
                return -1;
            }

            nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
        }

        b = nxt_unit_buf_next(b);
    }

    return nxt_min(max_size, l_size);
}


static nxt_unit_mmap_buf_t *
nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
{
    ssize_t              res;
    nxt_unit_mmap_buf_t  *mmap_buf;

    if (req->content_fd == -1) {
        nxt_unit_req_alert(req, "preread: content_fd == -1");
        return NULL;
    }

    mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
    if (nxt_slow_path(mmap_buf == NULL)) {
        nxt_unit_req_alert(req, "preread: failed to allocate buf");
        return NULL;
    }

    mmap_buf->free_ptr = malloc(size);
    if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
        nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
        nxt_unit_mmap_buf_release(mmap_buf);
        return NULL;
    }

    mmap_buf->plain_ptr = mmap_buf->free_ptr;

    mmap_buf->hdr = NULL;
    mmap_buf->buf.start = mmap_buf->free_ptr;
    mmap_buf->buf.free = mmap_buf->buf.start;
    mmap_buf->buf.end = mmap_buf->buf.start + size;
    mmap_buf->process = NULL;

    res = read(req->content_fd, mmap_buf->free_ptr, size);
    if (res < 0) {
        nxt_unit_req_alert(req, "failed to read content: %s (%d)",
                           strerror(errno), errno);

        nxt_unit_mmap_buf_free(mmap_buf);

        return NULL;
    }

    if (res < (ssize_t) size) {
        close(req->content_fd);

        req->content_fd = -1;
    }

    nxt_unit_req_debug(req, "preread: read %d", (int) res);

    mmap_buf->buf.end = mmap_buf->buf.free + res;

    return mmap_buf;
}


static ssize_t
nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
{
    u_char          *p;
    size_t          rest, copy, read;
    nxt_unit_buf_t  *buf, *last_buf;

    p = dst;
    rest = size;

    buf = *b;
    last_buf = buf;

    while (buf != NULL) {
        last_buf = buf;

        copy = buf->end - buf->free;
        copy = nxt_min(rest, copy);

        p = nxt_cpymem(p, buf->free, copy);

        buf->free += copy;
        rest -= copy;

        if (rest == 0) {
            if (buf->end == buf->free) {
                buf = nxt_unit_buf_next(buf);
            }

            break;
        }

        buf = nxt_unit_buf_next(buf);
    }

    *b = last_buf;

    read = size - rest;

    *len -= read;

    return read;
}


void
nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
{
    ssize_t                       res;
    uint32_t                      size;
    nxt_port_msg_t                msg;
    nxt_unit_impl_t               *lib;
    nxt_unit_request_info_impl_t  *req_impl;

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    nxt_unit_req_debug(req, "done: %d", rc);

    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        goto skip_response_send;
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {

        size = nxt_length("Content-Type") + nxt_length("text/plain");

        rc = nxt_unit_response_init(req, 200, 1, size);
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            goto skip_response_send;
        }

        rc = nxt_unit_response_add_field(req, "Content-Type",
                                   nxt_length("Content-Type"),
                                   "text/plain", nxt_length("text/plain"));
        if (nxt_slow_path(rc != NXT_UNIT_OK)) {
            goto skip_response_send;
        }
    }

    if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {

        req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;

        nxt_unit_buf_send_done(req->response_buf);

        return;
    }

skip_response_send:

    lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);

    msg.stream = req_impl->stream;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
                                   : _NXT_PORT_MSG_RPC_ERROR;
    msg.last = 1;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    res = lib->callbacks.port_send(req->ctx, &req->response_port,
                                   &msg, sizeof(msg), NULL, 0);
    if (nxt_slow_path(res != sizeof(msg))) {
        nxt_unit_req_alert(req, "last message send failed: %s (%d)",
                           strerror(errno), errno);
    }

    nxt_unit_request_info_release(req);
}


int
nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
    uint8_t last, const void *start, size_t size)
{
    const struct iovec  iov = { (void *) start, size };

    return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
}


int
nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
    uint8_t last, const struct iovec *iov, int iovcnt)
{
    int                           i, rc;
    size_t                        l, copy;
    uint32_t                      payload_len, buf_size, alloc_size;
    const uint8_t                 *b;
    nxt_unit_buf_t                *buf;
    nxt_unit_mmap_buf_t           mmap_buf;
    nxt_websocket_header_t        *wh;
    nxt_unit_request_info_impl_t  *req_impl;
    char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];

    req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

    payload_len = 0;

    for (i = 0; i < iovcnt; i++) {
        payload_len += iov[i].iov_len;
    }

    buf_size = 10 + payload_len;
    alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);

    rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
                                   &req->response_port,
                                   alloc_size, alloc_size,
                                   &mmap_buf, local_buf);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        return rc;
    }

    buf = &mmap_buf.buf;

    buf->start[0] = 0;
    buf->start[1] = 0;

    buf_size -= buf->end - buf->start;

    wh = (void *) buf->free;

    buf->free = nxt_websocket_frame_init(wh, payload_len);
    wh->fin = last;
    wh->opcode = opcode;

    for (i = 0; i < iovcnt; i++) {
        b = iov[i].iov_base;
        l = iov[i].iov_len;

        while (l > 0) {
            copy = buf->end - buf->free;
            copy = nxt_min(l, copy);

            buf->free = nxt_cpymem(buf->free, b, copy);
            b += copy;
            l -= copy;

            if (l > 0) {
                if (nxt_fast_path(buf->free > buf->start)) {
                    rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
                                                &mmap_buf, 0);

                    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
                        return rc;
                    }
                }

                alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);

                rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
                                               &req->response_port,
                                               alloc_size, alloc_size,
                                               &mmap_buf, local_buf);
                if (nxt_slow_path(rc != NXT_UNIT_OK)) {
                    return rc;
                }

                buf_size -= buf->end - buf->start;
            }
        }
    }

    if (buf->free > buf->start) {
        rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
                                    &mmap_buf, 0);
    }

    return rc;
}


ssize_t
nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
    size_t size)
{
    ssize_t   res;
    uint8_t   *b;
    uint64_t  i, d;

    res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
                            dst, size);

    if (ws->mask == NULL) {
        return res;
    }

    b = dst;
    d = (ws->payload_len - ws->content_length - res) % 4;

    for (i = 0; i < (uint64_t) res; i++) {
        b[i] ^= ws->mask[ (i + d) % 4 ];
    }

    return res;
}


int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
{
    char                             *b;
    size_t                           size;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);

    if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
        return NXT_UNIT_OK;
    }

    size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;

    b = malloc(size);
    if (nxt_slow_path(b == NULL)) {
        return NXT_UNIT_ERROR;
    }

    memcpy(b, ws_impl->buf->buf.start, size);

    ws_impl->buf->buf.start = b;
    ws_impl->buf->buf.free = b;
    ws_impl->buf->buf.end = b + size;

    ws_impl->buf->free_ptr = b;

    return NXT_UNIT_OK;
}


void
nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
{
    nxt_unit_websocket_frame_release(ws);
}


static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
{
    int                     res, nchunks, i;
    uint32_t                outgoing_size;
    nxt_unit_mmap_t         *mm, *mm_end;
    nxt_unit_impl_t         *lib;
    nxt_port_mmap_header_t  *hdr;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&process->outgoing.mutex);

retry:

    outgoing_size = process->outgoing.size;

    mm_end = process->outgoing.elts + outgoing_size;

    for (mm = process->outgoing.elts; mm < mm_end; mm++) {
        hdr = mm->hdr;

        if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
            continue;
        }

        *c = 0;

        while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
            nchunks = 1;

            while (nchunks < *n) {
                res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
                                                       *c + nchunks);

                if (res == 0) {
                    if (nchunks >= min_n) {
                        *n = nchunks;

                        goto unlock;
                    }

                    for (i = 0; i < nchunks; i++) {
                        nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
                    }

                    *c += nchunks + 1;
                    nchunks = 0;
                    break;
                }

                nchunks++;
            }

            if (nchunks >= min_n) {
                *n = nchunks;

                goto unlock;
            }
        }

        hdr->oosm = 1;
    }

    if (outgoing_size >= lib->shm_mmap_limit) {
        /* Cannot allocate more shared memory. */
        pthread_mutex_unlock(&process->outgoing.mutex);

        if (min_n == 0) {
            *n = 0;
        }

        if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
                          >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
        {
            /* Memory allocated by application, but not send to router. */
            return NULL;
        }

        /* Notify router about OOSM condition. */

        res = nxt_unit_send_oosm(ctx, port_id);
        if (nxt_slow_path(res != NXT_UNIT_OK)) {
            return NULL;
        }

        /* Return if caller can handle OOSM condition. Non-blocking mode. */

        if (min_n == 0) {
            return NULL;
        }

        nxt_unit_debug(ctx, "oosm: waiting for ACK");

        res = nxt_unit_wait_shm_ack(ctx);
        if (nxt_slow_path(res != NXT_UNIT_OK)) {
            return NULL;
        }

        nxt_unit_debug(ctx, "oosm: retry");

        pthread_mutex_lock(&process->outgoing.mutex);

        goto retry;
    }

    *c = 0;
    hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);

unlock:

    nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);

    nxt_unit_debug(ctx, "process %d allocated_chunks %d",
                   process->pid,
                   process->outgoing.allocated_chunks);

    pthread_mutex_unlock(&process->outgoing.mutex);

    return hdr;
}


static int
nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
    ssize_t          res;
    nxt_port_msg_t   msg;
    nxt_unit_impl_t  *lib;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    msg.stream = 0;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = _NXT_PORT_MSG_OOSM;
    msg.last = 0;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
    if (nxt_slow_path(res != sizeof(msg))) {
        nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
                      (int) port_id->pid, strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    return NXT_UNIT_OK;
}


static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
{
    nxt_port_msg_t       *port_msg;
    nxt_unit_ctx_impl_t  *ctx_impl;
    nxt_unit_read_buf_t  *rbuf;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    while (1) {
        rbuf = nxt_unit_read_buf_get(ctx);
        if (nxt_slow_path(rbuf == NULL)) {
            return NXT_UNIT_ERROR;
        }

        nxt_unit_read_buf(ctx, rbuf);
        if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
            nxt_unit_read_buf_release(ctx, rbuf);

            return NXT_UNIT_ERROR;
        }

        port_msg = (nxt_port_msg_t *) rbuf->buf;

        if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
            nxt_unit_read_buf_release(ctx, rbuf);

            break;
        }

        pthread_mutex_lock(&ctx_impl->mutex);

        *ctx_impl->pending_read_tail = rbuf;
        ctx_impl->pending_read_tail = &rbuf->next;
        rbuf->next = NULL;

        pthread_mutex_unlock(&ctx_impl->mutex);

        if (port_msg->type == _NXT_PORT_MSG_QUIT) {
            nxt_unit_debug(ctx, "oosm: quit received");

            return NXT_UNIT_ERROR;
        }
    }

    return NXT_UNIT_OK;
}


static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
    uint32_t  cap;

    cap = mmaps->cap;

    if (cap == 0) {
        cap = i + 1;
    }

    while (i + 1 > cap) {

        if (cap < 16) {
            cap = cap * 2;

        } else {
            cap = cap + cap / 2;
        }
    }

    if (cap != mmaps->cap) {

        mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
        if (nxt_slow_path(mmaps->elts == NULL)) {
            return NULL;
        }

        memset(mmaps->elts + mmaps->cap, 0,
               sizeof(*mmaps->elts) * (cap - mmaps->cap));

        mmaps->cap = cap;
    }

    if (i + 1 > mmaps->size) {
        mmaps->size = i + 1;
    }

    return mmaps->elts + i;
}


static nxt_port_mmap_header_t *
nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    nxt_unit_port_id_t *port_id, int n)
{
    int                     i, fd, rc;
    void                    *mem;
    char                    name[64];
    nxt_unit_mmap_t         *mm;
    nxt_unit_impl_t         *lib;
    nxt_port_mmap_header_t  *hdr;

    lib = process->lib;

    mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
    if (nxt_slow_path(mm == NULL)) {
        nxt_unit_warn(ctx, "failed to add mmap to outgoing array");

        return NULL;
    }

    snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
             lib->pid, (void *) pthread_self());

#if (NXT_HAVE_MEMFD_CREATE)

    fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
    if (nxt_slow_path(fd == -1)) {
        nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
                       strerror(errno), errno);

        goto remove_fail;
    }

    nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);

#elif (NXT_HAVE_SHM_OPEN_ANON)

    fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
    if (nxt_slow_path(fd == -1)) {
        nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
                       strerror(errno), errno);

        goto remove_fail;
    }

#elif (NXT_HAVE_SHM_OPEN)

    /* Just in case. */
    shm_unlink(name);

    fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
    if (nxt_slow_path(fd == -1)) {
        nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
                       strerror(errno), errno);

        goto remove_fail;
    }

    if (nxt_slow_path(shm_unlink(name) == -1)) {
        nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
                      strerror(errno), errno);
    }

#else

#error No working shared memory implementation.

#endif

    if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
        nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
                       strerror(errno), errno);

        goto remove_fail;
    }

    mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    if (nxt_slow_path(mem == MAP_FAILED)) {
        nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
                       strerror(errno), errno);

        goto remove_fail;
    }

    mm->hdr = mem;
    hdr = mem;

    memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
    memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));

    hdr->id = process->outgoing.size - 1;
    hdr->src_pid = lib->pid;
    hdr->dst_pid = process->pid;
    hdr->sent_over = port_id->id;

    /* Mark first n chunk(s) as busy */
    for (i = 0; i < n; i++) {
        nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
    }

    /* Mark as busy chunk followed the last available chunk. */
    nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
    nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);

    pthread_mutex_unlock(&process->outgoing.mutex);

    rc = nxt_unit_send_mmap(ctx, port_id, fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        munmap(mem, PORT_MMAP_SIZE);
        hdr = NULL;

    } else {
        nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
                       hdr->id, (int) lib->pid, (int) process->pid);
    }

    close(fd);

    pthread_mutex_lock(&process->outgoing.mutex);

    if (nxt_fast_path(hdr != NULL)) {
        return hdr;
    }

remove_fail:

    process->outgoing.size--;

    return NULL;
}


static int
nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
{
    ssize_t          res;
    nxt_port_msg_t   msg;
    nxt_unit_impl_t  *lib;
    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    } cmsg;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    msg.stream = 0;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = _NXT_PORT_MSG_MMAP;
    msg.last = 0;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    /*
     * Fill all padding fields with 0.
     * Code in Go 1.11 validate cmsghdr using padding field as part of len.
     * See Cmsghdr definition and socketControlMessageHeaderAndData function.
     */
    memset(&cmsg, 0, sizeof(cmsg));

    cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
    cmsg.cm.cmsg_level = SOL_SOCKET;
    cmsg.cm.cmsg_type = SCM_RIGHTS;

    /*
     * memcpy() is used instead of simple
     *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
     * because GCC 4.4 with -O2/3/s optimization may issue a warning:
     *   dereferencing type-punned pointer will break strict-aliasing rules
     *
     * Fortunately, GCC with -O1 compiles this nxt_memcpy()
     * in the same simple assignment as in the code above.
     */
    memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));

    res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
                                   &cmsg, sizeof(cmsg));
    if (nxt_slow_path(res != sizeof(msg))) {
        nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
                      (int) port_id->pid, strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    return NXT_UNIT_OK;
}


static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
    nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
    int                     nchunks, min_nchunks;
    nxt_chunk_id_t          c;
    nxt_port_mmap_header_t  *hdr;

    if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
        if (local_buf != NULL) {
            mmap_buf->free_ptr = NULL;
            mmap_buf->plain_ptr = local_buf;

        } else {
            mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
            if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
                return NXT_UNIT_ERROR;
            }

            mmap_buf->plain_ptr = mmap_buf->free_ptr;
        }

        mmap_buf->hdr = NULL;
        mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
        mmap_buf->buf.free = mmap_buf->buf.start;
        mmap_buf->buf.end = mmap_buf->buf.start + size;
        mmap_buf->port_id = *port_id;
        mmap_buf->process = process;

        nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
                       mmap_buf->buf.start, (int) size);

        return NXT_UNIT_OK;
    }

    nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
    min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;

    hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
    if (nxt_slow_path(hdr == NULL)) {
        if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
            mmap_buf->hdr = NULL;
            mmap_buf->buf.start = NULL;
            mmap_buf->buf.free = NULL;
            mmap_buf->buf.end = NULL;
            mmap_buf->free_ptr = NULL;

            return NXT_UNIT_OK;
        }

        return NXT_UNIT_ERROR;
    }

    mmap_buf->hdr = hdr;
    mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
    mmap_buf->buf.free = mmap_buf->buf.start;
    mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
    mmap_buf->port_id = *port_id;
    mmap_buf->process = process;
    mmap_buf->free_ptr = NULL;
    mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
                  (int) hdr->id, (int) c,
                  (int) (nchunks * PORT_MMAP_CHUNK_SIZE));

    return NXT_UNIT_OK;
}


static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
{
    int                      rc;
    void                     *mem;
    struct stat              mmap_stat;
    nxt_unit_mmap_t          *mm;
    nxt_unit_impl_t          *lib;
    nxt_unit_process_t       *process;
    nxt_port_mmap_header_t   *hdr;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_find(ctx, pid, 0);

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(process == NULL)) {
        nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
                      (int) pid, fd);

        return NXT_UNIT_ERROR;
    }

    rc = NXT_UNIT_ERROR;

    if (fstat(fd, &mmap_stat) == -1) {
        nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
                      strerror(errno), errno);

        goto fail;
    }

    mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
               MAP_SHARED, fd, 0);
    if (nxt_slow_path(mem == MAP_FAILED)) {
        nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
                      strerror(errno), errno);

        goto fail;
    }

    hdr = mem;

    if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {

        nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
                      "detected: %d != %d or %d != %d", (int) hdr->src_pid,
                      (int) pid, (int) hdr->dst_pid, (int) lib->pid);

        munmap(mem, PORT_MMAP_SIZE);

        goto fail;
    }

    pthread_mutex_lock(&process->incoming.mutex);

    mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
    if (nxt_slow_path(mm == NULL)) {
        nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");

        munmap(mem, PORT_MMAP_SIZE);

    } else {
        mm->hdr = hdr;

        hdr->sent_over = 0xFFFFu;

        rc = NXT_UNIT_OK;
    }

    pthread_mutex_unlock(&process->incoming.mutex);

fail:

    nxt_unit_process_use(ctx, process, -1);

    return rc;
}


static void
nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
{
    pthread_mutex_init(&mmaps->mutex, NULL);

    mmaps->size = 0;
    mmaps->cap = 0;
    mmaps->elts = NULL;
    mmaps->allocated_chunks = 0;
}


static void
nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
{
    long c;

    c = nxt_atomic_fetch_add(&process->use_count, i);

    if (i < 0 && c == -i) {
        nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);

        nxt_unit_mmaps_destroy(&process->incoming);
        nxt_unit_mmaps_destroy(&process->outgoing);

        free(process);
    }
}


static void
nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
{
    nxt_unit_mmap_t  *mm, *end;

    if (mmaps->elts != NULL) {
        end = mmaps->elts + mmaps->size;

        for (mm = mmaps->elts; mm < end; mm++) {
            munmap(mm->hdr, PORT_MMAP_SIZE);
        }

        free(mmaps->elts);
    }

    pthread_mutex_destroy(&mmaps->mutex);
}


static nxt_port_mmap_header_t *
nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
    uint32_t id)
{
    nxt_port_mmap_header_t  *hdr;

    if (nxt_fast_path(process->incoming.size > id)) {
        hdr = process->incoming.elts[id].hdr;

    } else {
        hdr = NULL;
    }

    return hdr;
}


static int
nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    int                           rc;
    nxt_chunk_id_t                c;
    nxt_unit_process_t            *process;
    nxt_port_mmap_header_t        *hdr;
    nxt_port_mmap_tracking_msg_t  *tracking_msg;

    if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
        nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
                      recv_msg->stream, (int) recv_msg->size);

        return 0;
    }

    tracking_msg = recv_msg->start;

    recv_msg->start = tracking_msg + 1;
    recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);

    process = nxt_unit_msg_get_process(ctx, recv_msg);
    if (nxt_slow_path(process == NULL)) {
        return 0;
    }

    pthread_mutex_lock(&process->incoming.mutex);

    hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
    if (nxt_slow_path(hdr == NULL)) {
        pthread_mutex_unlock(&process->incoming.mutex);

        nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
                      "invalid mmap id %d,%"PRIu32,
                      recv_msg->stream, (int) process->pid,
                      tracking_msg->mmap_id);

        return 0;
    }

    c = tracking_msg->tracking_id;
    rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);

    if (rc == 0) {
        nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
                       recv_msg->stream);

        nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
    }

    pthread_mutex_unlock(&process->incoming.mutex);

    return rc;
}


static int
nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
    void                    *start;
    uint32_t                size;
    nxt_unit_process_t      *process;
    nxt_unit_mmap_buf_t     *b, **incoming_tail;
    nxt_port_mmap_msg_t     *mmap_msg, *end;
    nxt_port_mmap_header_t  *hdr;

    if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
        nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
                      recv_msg->stream, (int) recv_msg->size);

        return NXT_UNIT_ERROR;
    }

    process = nxt_unit_msg_get_process(ctx, recv_msg);
    if (nxt_slow_path(process == NULL)) {
        return NXT_UNIT_ERROR;
    }

    mmap_msg = recv_msg->start;
    end = nxt_pointer_to(recv_msg->start, recv_msg->size);

    incoming_tail = &recv_msg->incoming_buf;

    for (; mmap_msg < end; mmap_msg++) {
        b = nxt_unit_mmap_buf_get(ctx);
        if (nxt_slow_path(b == NULL)) {
            nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
                          recv_msg->stream);

            return NXT_UNIT_ERROR;
        }

        nxt_unit_mmap_buf_insert(incoming_tail, b);
        incoming_tail = &b->next;
    }

    b = recv_msg->incoming_buf;
    mmap_msg = recv_msg->start;

    pthread_mutex_lock(&process->incoming.mutex);

    for (; mmap_msg < end; mmap_msg++) {
        hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
        if (nxt_slow_path(hdr == NULL)) {
            pthread_mutex_unlock(&process->incoming.mutex);

            nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
                          "invalid mmap id %d,%"PRIu32,
                          recv_msg->stream, (int) process->pid,
                          mmap_msg->mmap_id);

            return NXT_UNIT_ERROR;
        }

        start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
        size = mmap_msg->size;

        if (recv_msg->start == mmap_msg) {
            recv_msg->start = start;
            recv_msg->size = size;
        }

        b->buf.start = start;
        b->buf.free = start;
        b->buf.end = b->buf.start + size;
        b->hdr = hdr;
        b->process = process;

        b = b->next;

        nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
                       recv_msg->stream,
                       start, (int) size,
                       (int) hdr->src_pid, (int) hdr->dst_pid,
                       (int) hdr->id, (int) mmap_msg->chunk_id,
                       (int) mmap_msg->size);
    }

    pthread_mutex_unlock(&process->incoming.mutex);

    return NXT_UNIT_OK;
}


static void
nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
    nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
    void *start, uint32_t size)
{
    int              freed_chunks;
    u_char           *p, *end;
    nxt_chunk_id_t   c;
    nxt_unit_impl_t  *lib;

    memset(start, 0xA5, size);

    p = start;
    end = p + size;
    c = nxt_port_mmap_chunk_id(hdr, p);
    freed_chunks = 0;

    while (p < end) {
        nxt_port_mmap_set_chunk_free(hdr->free_map, c);

        p += PORT_MMAP_CHUNK_SIZE;
        c++;
        freed_chunks++;
    }

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    if (hdr->src_pid == lib->pid && freed_chunks != 0) {
        nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
                             -freed_chunks);

        nxt_unit_debug(ctx, "process %d allocated_chunks %d",
                       process->pid,
                       process->outgoing.allocated_chunks);
    }

    if (hdr->dst_pid == lib->pid
        && freed_chunks != 0
        && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
    {
        nxt_unit_send_shm_ack(ctx, hdr->src_pid);
    }
}


static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
{
    ssize_t             res;
    nxt_port_msg_t      msg;
    nxt_unit_impl_t     *lib;
    nxt_unit_port_id_t  port_id;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_port_id_init(&port_id, pid, 0);

    msg.stream = 0;
    msg.pid = lib->pid;
    msg.reply_port = 0;
    msg.type = _NXT_PORT_MSG_SHM_ACK;
    msg.last = 0;
    msg.mmap = 0;
    msg.nf = 0;
    msg.mf = 0;
    msg.tracking = 0;

    res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
    if (nxt_slow_path(res != sizeof(msg))) {
        nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
                      (int) port_id.pid, strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    return NXT_UNIT_OK;
}


static nxt_int_t
nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
{
    nxt_process_t  *process;

    process = data;

    if (lhq->key.length == sizeof(pid_t)
        && *(pid_t *) lhq->key.start == process->pid)
    {
        return NXT_OK;
    }

    return NXT_DECLINED;
}


static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
    NXT_LVLHSH_DEFAULT,
    nxt_unit_lvlhsh_pid_test,
    nxt_lvlhsh_alloc,
    nxt_lvlhsh_free,
};


static inline void
nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
{
    lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
    lhq->key.length = sizeof(*pid);
    lhq->key.start = (u_char *) pid;
    lhq->proto = &lvlhsh_processes_proto;
}


static nxt_unit_process_t *
nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
{
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;
    nxt_lvlhsh_query_t  lhq;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_process_lhq_pid(&lhq, &pid);

    if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
        process = lhq.value;
        nxt_unit_process_use(ctx, process, 1);

        return process;
    }

    process = malloc(sizeof(nxt_unit_process_t));
    if (nxt_slow_path(process == NULL)) {
        nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);

        return NULL;
    }

    process->pid = pid;
    process->use_count = 1;
    process->next_port_id = 0;
    process->lib = lib;

    nxt_queue_init(&process->ports);

    nxt_unit_mmaps_init(&process->incoming);
    nxt_unit_mmaps_init(&process->outgoing);

    lhq.replace = 0;
    lhq.value = process;

    switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {

    case NXT_OK:
        break;

    default:
        nxt_unit_warn(ctx, "process %d insert failed", (int) pid);

        pthread_mutex_destroy(&process->outgoing.mutex);
        pthread_mutex_destroy(&process->incoming.mutex);
        free(process);
        process = NULL;
        break;
    }

    nxt_unit_process_use(ctx, process, 1);

    return process;
}


static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
{
    int                 rc;
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;
    nxt_lvlhsh_query_t  lhq;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_process_lhq_pid(&lhq, &pid);

    if (remove) {
        rc = nxt_lvlhsh_delete(&lib->processes, &lhq);

    } else {
        rc = nxt_lvlhsh_find(&lib->processes, &lhq);
    }

    if (rc == NXT_OK) {
        process = lhq.value;

        if (!remove) {
            nxt_unit_process_use(ctx, process, 1);
        }

        return process;
    }

    return NULL;
}


static nxt_unit_process_t *
nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
{
    return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
}


int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
    int              rc;
    nxt_unit_impl_t  *lib;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
    rc = NXT_UNIT_OK;

    while (nxt_fast_path(lib->online)) {
        rc = nxt_unit_run_once(ctx);
    }

    return rc;
}


int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
    int                  rc;
    nxt_unit_ctx_impl_t  *ctx_impl;
    nxt_unit_read_buf_t  *rbuf;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    pthread_mutex_lock(&ctx_impl->mutex);

    if (ctx_impl->pending_read_head != NULL) {
        rbuf = ctx_impl->pending_read_head;
        ctx_impl->pending_read_head = rbuf->next;

        if (ctx_impl->pending_read_tail == &rbuf->next) {
            ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
        }

        pthread_mutex_unlock(&ctx_impl->mutex);

    } else {
        rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
        if (nxt_slow_path(rbuf == NULL)) {
            return NXT_UNIT_ERROR;
        }

        nxt_unit_read_buf(ctx, rbuf);
    }

    if (nxt_fast_path(rbuf->size > 0)) {
        rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
                                  rbuf->buf, rbuf->size,
                                  rbuf->oob, sizeof(rbuf->oob));

#if (NXT_DEBUG)
        memset(rbuf->buf, 0xAC, rbuf->size);
#endif

    } else {
        rc = NXT_UNIT_ERROR;
    }

    nxt_unit_read_buf_release(ctx, rbuf);

    return rc;
}


static void
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
    nxt_unit_impl_t      *lib;
    nxt_unit_ctx_impl_t  *ctx_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    memset(rbuf->oob, 0, sizeof(struct cmsghdr));

    if (ctx_impl->read_port_fd != -1) {
        rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
                                        rbuf->buf, sizeof(rbuf->buf),
                                        rbuf->oob, sizeof(rbuf->oob));

    } else {
        lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

        rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
                                              rbuf->buf, sizeof(rbuf->buf),
                                              rbuf->oob, sizeof(rbuf->oob));
    }
}


void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t      *lib;
    nxt_unit_process_t   *process;
    nxt_unit_ctx_impl_t  *ctx_impl;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {

        nxt_unit_ctx_free(&ctx_impl->ctx);

    } nxt_queue_loop;

    for ( ;; ) {
        pthread_mutex_lock(&lib->mutex);

        process = nxt_unit_process_pop_first(lib);
        if (process == NULL) {
            pthread_mutex_unlock(&lib->mutex);

            break;
        }

        nxt_unit_remove_process(ctx, process);
    }

    pthread_mutex_destroy(&lib->mutex);

    free(lib);
}


nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
{
    int                  rc, fd;
    nxt_unit_impl_t      *lib;
    nxt_unit_port_id_t   new_port_id;
    nxt_unit_ctx_impl_t  *new_ctx;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
    if (nxt_slow_path(new_ctx == NULL)) {
        nxt_unit_warn(ctx, "failed to allocate context");

        return NULL;
    }

    rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        free(new_ctx);

        return NULL;
    }

    rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        lib->callbacks.remove_port(ctx, &new_port_id);

        close(fd);

        free(new_ctx);

        return NULL;
    }

    close(fd);

    rc = nxt_unit_ctx_init(lib, new_ctx, data);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        lib->callbacks.remove_port(ctx, &new_port_id);

        free(new_ctx);

        return NULL;
    }

    new_ctx->read_port_id = new_port_id;

    return &new_ctx->ctx;
}


void
nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t                  *lib;
    nxt_unit_ctx_impl_t              *ctx_impl;
    nxt_unit_mmap_buf_t              *mmap_buf;
    nxt_unit_request_info_impl_t     *req_impl;
    nxt_unit_websocket_frame_impl_t  *ws_impl;

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_queue_each(req_impl, &ctx_impl->active_req,
                   nxt_unit_request_info_impl_t, link)
    {
        nxt_unit_req_warn(&req_impl->req, "active request on ctx free");

        nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);

    } nxt_queue_loop;

    nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
    nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);

    while (ctx_impl->free_buf != NULL) {
        mmap_buf = ctx_impl->free_buf;
        nxt_unit_mmap_buf_unlink(mmap_buf);
        free(mmap_buf);
    }

    nxt_queue_each(req_impl, &ctx_impl->free_req,
                   nxt_unit_request_info_impl_t, link)
    {
        nxt_unit_request_info_free(req_impl);

    } nxt_queue_loop;

    nxt_queue_each(ws_impl, &ctx_impl->free_ws,
                   nxt_unit_websocket_frame_impl_t, link)
    {
        nxt_unit_websocket_frame_free(ws_impl);

    } nxt_queue_loop;

    pthread_mutex_destroy(&ctx_impl->mutex);

    nxt_queue_remove(&ctx_impl->link);

    if (ctx_impl != &lib->main_ctx) {
        free(ctx_impl);
    }
}


/* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
#if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
#define NXT_UNIX_SOCKET  SOCK_SEQPACKET
#else
#define NXT_UNIX_SOCKET  SOCK_DGRAM
#endif


void
nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
{
    nxt_unit_port_hash_id_t  port_hash_id;

    port_hash_id.pid = pid;
    port_hash_id.id = id;

    port_id->pid = pid;
    port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
    port_id->id = id;
}


int
nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
    nxt_unit_port_id_t *port_id)
{
    int                 rc, fd;
    nxt_unit_impl_t     *lib;
    nxt_unit_port_id_t  new_port_id;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        return rc;
    }

    rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);

    if (nxt_fast_path(rc == NXT_UNIT_OK)) {
        *port_id = new_port_id;

    } else {
        lib->callbacks.remove_port(ctx, &new_port_id);
    }

    close(fd);

    return rc;
}


static int
nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
{
    int                 rc, port_sockets[2];
    nxt_unit_impl_t     *lib;
    nxt_unit_port_t     new_port;
    nxt_unit_process_t  *process;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
    if (nxt_slow_path(rc != 0)) {
        nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
                      strerror(errno), errno);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
                   port_sockets[0], port_sockets[1]);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_get(ctx, lib->pid);
    if (nxt_slow_path(process == NULL)) {
        pthread_mutex_unlock(&lib->mutex);

        close(port_sockets[0]);
        close(port_sockets[1]);

        return NXT_UNIT_ERROR;
    }

    nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);

    new_port.in_fd = port_sockets[0];
    new_port.out_fd = -1;
    new_port.data = NULL;

    pthread_mutex_unlock(&lib->mutex);

    nxt_unit_process_use(ctx, process, -1);

    rc = lib->callbacks.add_port(ctx, &new_port);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        nxt_unit_warn(ctx, "create_port: add_port() failed");

        close(port_sockets[0]);
        close(port_sockets[1]);

        return rc;
    }

    *port_id = new_port.id;
    *fd = port_sockets[1];

    return rc;
}


static int
nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
    nxt_unit_port_id_t *new_port, int fd)
{
    ssize_t          res;
    nxt_unit_impl_t  *lib;

    struct {
        nxt_port_msg_t            msg;
        nxt_port_msg_new_port_t   new_port;
    } m;

    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    } cmsg;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    m.msg.stream = 0;
    m.msg.pid = lib->pid;
    m.msg.reply_port = 0;
    m.msg.type = _NXT_PORT_MSG_NEW_PORT;
    m.msg.last = 0;
    m.msg.mmap = 0;
    m.msg.nf = 0;
    m.msg.mf = 0;
    m.msg.tracking = 0;

    m.new_port.id = new_port->id;
    m.new_port.pid = new_port->pid;
    m.new_port.type = NXT_PROCESS_WORKER;
    m.new_port.max_size = 16 * 1024;
    m.new_port.max_share = 64 * 1024;

    memset(&cmsg, 0, sizeof(cmsg));

    cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
    cmsg.cm.cmsg_level = SOL_SOCKET;
    cmsg.cm.cmsg_type = SCM_RIGHTS;

    /*
     * memcpy() is used instead of simple
     *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
     * because GCC 4.4 with -O2/3/s optimization may issue a warning:
     *   dereferencing type-punned pointer will break strict-aliasing rules
     *
     * Fortunately, GCC with -O1 compiles this nxt_memcpy()
     * in the same simple assignment as in the code above.
     */
    memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));

    res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
                                   &cmsg, sizeof(cmsg));

    return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
}


int
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
    int                   rc;
    nxt_unit_impl_t       *lib;
    nxt_unit_process_t    *process;
    nxt_unit_port_impl_t  *new_port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
                   port->id.pid, port->id.id,
                   port->in_fd, port->out_fd);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_get(ctx, port->id.pid);
    if (nxt_slow_path(process == NULL)) {
        rc = NXT_UNIT_ERROR;
        goto unlock;
    }

    if (port->id.id >= process->next_port_id) {
        process->next_port_id = port->id.id + 1;
    }

    new_port = malloc(sizeof(nxt_unit_port_impl_t));
    if (nxt_slow_path(new_port == NULL)) {
        rc = NXT_UNIT_ERROR;
        goto unlock;
    }

    new_port->port = *port;

    rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
    if (nxt_slow_path(rc != NXT_UNIT_OK)) {
        goto unlock;
    }

    nxt_queue_insert_tail(&process->ports, &new_port->link);

    rc = NXT_UNIT_OK;

    new_port->process = process;

unlock:

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
        nxt_unit_process_use(ctx, process, -1);
    }

    return rc;
}


void
nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
    nxt_unit_find_remove_port(ctx, port_id, NULL);
}


void
nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    nxt_unit_port_t *r_port)
{
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    process = NULL;

    nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(process != NULL)) {
        nxt_unit_process_use(ctx, process, -1);
    }
}


static void
nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    nxt_unit_port_t *r_port, nxt_unit_process_t **process)
{
    nxt_unit_impl_t       *lib;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
    if (nxt_slow_path(port == NULL)) {
        nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
                       (int) port_id->pid, (int) port_id->id);

        return;
    }

    nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
                   (int) port_id->pid, (int) port_id->id,
                   port->port.in_fd, port->port.out_fd, port->port.data);

    if (port->port.in_fd != -1) {
        close(port->port.in_fd);
    }

    if (port->port.out_fd != -1) {
        close(port->port.out_fd);
    }

    if (port->process != NULL) {
        nxt_queue_remove(&port->link);
    }

    if (process != NULL) {
        *process = port->process;
    }

    if (r_port != NULL) {
        *r_port = port->port;
    }

    free(port);
}


void
nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
{
    nxt_unit_impl_t     *lib;
    nxt_unit_process_t  *process;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    process = nxt_unit_process_find(ctx, pid, 1);
    if (nxt_slow_path(process == NULL)) {
        nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);

        pthread_mutex_unlock(&lib->mutex);

        return;
    }

    nxt_unit_remove_process(ctx, process);
}


static void
nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
{
    nxt_queue_t           ports;
    nxt_unit_impl_t       *lib;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    nxt_queue_init(&ports);

    nxt_queue_add(&ports, &process->ports);

    nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {

        nxt_unit_process_use(ctx, process, -1);
        port->process = NULL;

        /* Shortcut for default callback. */
        if (lib->callbacks.remove_port == nxt_unit_remove_port) {
            nxt_queue_remove(&port->link);

            nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
        }

    } nxt_queue_loop;

    pthread_mutex_unlock(&lib->mutex);

    nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {

        nxt_queue_remove(&port->link);

        lib->callbacks.remove_port(ctx, &port->port.id);

    } nxt_queue_loop;

    nxt_unit_process_use(ctx, process, -1);
}


void
nxt_unit_quit(nxt_unit_ctx_t *ctx)
{
    nxt_unit_impl_t  *lib;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    lib->online = 0;
}


static ssize_t
nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
    int                   fd;
    nxt_unit_impl_t       *lib;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);

    if (nxt_fast_path(port != NULL)) {
        fd = port->port.out_fd;

    } else {
        nxt_unit_warn(ctx, "port_send: port %d,%d not found",
                      (int) port_id->pid, (int) port_id->id);
        fd = -1;
    }

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(fd == -1)) {
        if (port != NULL) {
            nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
                          (int) port_id->pid, (int) port_id->id);
        }

        return -1;
    }

    nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
                   (int) port_id->pid, (int) port_id->id, fd);

    return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
}


ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
    const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
    ssize_t        res;
    struct iovec   iov[1];
    struct msghdr  msg;

    iov[0].iov_base = (void *) buf;
    iov[0].iov_len = buf_size;

    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    msg.msg_flags = 0;
    msg.msg_control = (void *) oob;
    msg.msg_controllen = oob_size;

    res = sendmsg(fd, &msg, 0);

    if (nxt_slow_path(res == -1)) {
        nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
                      fd, (int) buf_size, strerror(errno), errno);

    } else {
        nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
                       (int) res);
    }

    return res;
}


static ssize_t
nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
    void *buf, size_t buf_size, void *oob, size_t oob_size)
{
    int                   fd;
    nxt_unit_impl_t       *lib;
    nxt_unit_ctx_impl_t   *ctx_impl;
    nxt_unit_port_impl_t  *port;

    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

    pthread_mutex_lock(&lib->mutex);

    port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);

    if (nxt_fast_path(port != NULL)) {
        fd = port->port.in_fd;

    } else {
        nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
                       (int) port_id->pid, (int) port_id->id);
        fd = -1;
    }

    pthread_mutex_unlock(&lib->mutex);

    if (nxt_slow_path(fd == -1)) {
        return -1;
    }

    nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
                   (int) port_id->pid, (int) port_id->id, fd);

    ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);

    if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
        ctx_impl->read_port_fd = fd;
    }

    return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
}


ssize_t
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
    void *oob, size_t oob_size)
{
    ssize_t        res;
    struct iovec   iov[1];
    struct msghdr  msg;

    iov[0].iov_base = buf;
    iov[0].iov_len = buf_size;

    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    msg.msg_flags = 0;
    msg.msg_control = oob;
    msg.msg_controllen = oob_size;

    res = recvmsg(fd, &msg, 0);

    if (nxt_slow_path(res == -1)) {
        nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
                      fd, strerror(errno), errno);

    } else {
        nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
    }

    return res;
}


static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
    nxt_unit_port_t          *port;
    nxt_unit_port_hash_id_t  *port_id;

    port = data;
    port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

    if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
        && port_id->pid == port->id.pid
        && port_id->id == port->id.id)
    {
        return NXT_OK;
    }

    return NXT_DECLINED;
}


static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
    NXT_LVLHSH_DEFAULT,
    nxt_unit_port_hash_test,
    nxt_lvlhsh_alloc,
    nxt_lvlhsh_free,
};


static inline void
nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
    nxt_unit_port_hash_id_t *port_hash_id,
    nxt_unit_port_id_t *port_id)
{
    port_hash_id->pid = port_id->pid;
    port_hash_id->id = port_id->id;

    if (nxt_fast_path(port_id->hash != 0)) {
        lhq->key_hash = port_id->hash;

    } else {
        lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));

        port_id->hash = lhq->key_hash;

        nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
                       (int) port_id->pid, (int) port_id->id,
                       (int) port_id->hash);
    }

    lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
    lhq->key.start = (u_char *) port_hash_id;
    lhq->proto = &lvlhsh_ports_proto;
    lhq->pool = NULL;
}


static int
nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
{
    nxt_int_t                res;
    nxt_lvlhsh_query_t       lhq;
    nxt_unit_port_hash_id_t  port_hash_id;

    nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
    lhq.replace = 0;
    lhq.value = port;

    res = nxt_lvlhsh_insert(port_hash, &lhq);

    switch (res) {

    case NXT_OK:
        return NXT_UNIT_OK;

    default:
        return NXT_UNIT_ERROR;
    }
}


static nxt_unit_port_impl_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
    int remove)
{
    nxt_int_t                res;
    nxt_lvlhsh_query_t       lhq;
    nxt_unit_port_hash_id_t  port_hash_id;

    nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);

    if (remove) {
        res = nxt_lvlhsh_delete(port_hash, &lhq);

    } else {
        res = nxt_lvlhsh_find(port_hash, &lhq);
    }

    switch (res) {

    case NXT_OK:
        return lhq.value;

    default:
        return NULL;
    }
}


static nxt_int_t
nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
    return NXT_OK;
}


static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
    NXT_LVLHSH_DEFAULT,
    nxt_unit_request_hash_test,
    nxt_lvlhsh_alloc,
    nxt_lvlhsh_free,
};


static int
nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
    nxt_unit_request_info_impl_t *req_impl)
{
    uint32_t            *stream;
    nxt_int_t           res;
    nxt_lvlhsh_query_t  lhq;

    stream = &req_impl->stream;

    lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
    lhq.key.length = sizeof(*stream);
    lhq.key.start = (u_char *) stream;
    lhq.proto = &lvlhsh_requests_proto;
    lhq.pool = NULL;
    lhq.replace = 0;
    lhq.value = req_impl;

    res = nxt_lvlhsh_insert(request_hash, &lhq);

    switch (res) {

    case NXT_OK:
        return NXT_UNIT_OK;

    default:
        return NXT_UNIT_ERROR;
    }
}


static nxt_unit_request_info_impl_t *
nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
    int remove)
{
    nxt_int_t           res;
    nxt_lvlhsh_query_t  lhq;

    lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
    lhq.key.length = sizeof(stream);
    lhq.key.start = (u_char *) &stream;
    lhq.proto = &lvlhsh_requests_proto;
    lhq.pool = NULL;

    if (remove) {
        res = nxt_lvlhsh_delete(request_hash, &lhq);

    } else {
        res = nxt_lvlhsh_find(request_hash, &lhq);
    }

    switch (res) {

    case NXT_OK:
        return lhq.value;

    default:
        return NULL;
    }
}


void
nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
{
    int              log_fd, n;
    char             msg[NXT_MAX_ERROR_STR], *p, *end;
    pid_t            pid;
    va_list          ap;
    nxt_unit_impl_t  *lib;

    if (nxt_fast_path(ctx != NULL)) {
        lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

        pid = lib->pid;
        log_fd = lib->log_fd;

    } else {
        pid = getpid();
        log_fd = STDERR_FILENO;
    }

    p = msg;
    end = p + sizeof(msg) - 1;

    p = nxt_unit_snprint_prefix(p, end, pid, level);

    va_start(ap, fmt);
    p += vsnprintf(p, end - p, fmt, ap);
    va_end(ap);

    if (nxt_slow_path(p > end)) {
        memcpy(end - 5, "[...]", 5);
        p = end;
    }

    *p++ = '\n';

    n = write(log_fd, msg, p - msg);
    if (nxt_slow_path(n < 0)) {
        fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
    }
}


void
nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
{
    int                           log_fd, n;
    char                          msg[NXT_MAX_ERROR_STR], *p, *end;
    pid_t                         pid;
    va_list                       ap;
    nxt_unit_impl_t               *lib;
    nxt_unit_request_info_impl_t  *req_impl;

    if (nxt_fast_path(req != NULL)) {
        lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);

        pid = lib->pid;
        log_fd = lib->log_fd;

    } else {
        pid = getpid();
        log_fd = STDERR_FILENO;
    }

    p = msg;
    end = p + sizeof(msg) - 1;

    p = nxt_unit_snprint_prefix(p, end, pid, level);

    if (nxt_fast_path(req != NULL)) {
        req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);

        p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
    }

    va_start(ap, fmt);
    p += vsnprintf(p, end - p, fmt, ap);
    va_end(ap);

    if (nxt_slow_path(p > end)) {
        memcpy(end - 5, "[...]", 5);
        p = end;
    }

    *p++ = '\n';

    n = write(log_fd, msg, p - msg);
    if (nxt_slow_path(n < 0)) {
        fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
    }
}


static const char * nxt_unit_log_levels[] = {
    "alert",
    "error",
    "warn",
    "notice",
    "info",
    "debug",
};


static char *
nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
{
    struct tm        tm;
    struct timespec  ts;

    (void) clock_gettime(CLOCK_REALTIME, &ts);

#if (NXT_HAVE_LOCALTIME_R)
    (void) localtime_r(&ts.tv_sec, &tm);
#else
    tm = *localtime(&ts.tv_sec);
#endif

    p += snprintf(p, end - p,
                  "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
                  tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
                  tm.tm_hour, tm.tm_min, tm.tm_sec,
                  (int) ts.tv_nsec / 1000000);

    p += snprintf(p, end - p,
                  "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
                  (int) pid,
                  (uint64_t) (uintptr_t) nxt_thread_get_tid());

    return p;
}


/* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */

void *
nxt_memalign(size_t alignment, size_t size)
{
    void        *p;
    nxt_err_t   err;

    err = posix_memalign(&p, alignment, size);

    if (nxt_fast_path(err == 0)) {
        return p;
    }

    return NULL;
}

#if (NXT_DEBUG)

void
nxt_free(void *p)
{
    free(p);
}

#endif